"phaul热迁移过程总结"

  "docker容器热迁移过程分析"

Posted by Xu on July 19, 2017

Phaul热迁移启动步骤分析

在phaul项目分析的第二章中,我们深入研究了源主机客户端和目的主机服务器端的通信实现方式,了解了客户端是如何通过发送相应的控制数据,然后调用服务器端的函数来实现端到端协调热迁移的过程。这一章我们将继续第一章的内容,沿着热迁移命令的执行流程展开分析:

live_migration

在第一章节中通过初始化phaul_iter_worker对象来建立客户端与服务器端的通信机制,在phaul_iter_worker初始化的过程中,有几个对象需要我们理解:

  • _mode(迁移模式:热迁移,冷迁移);
  • connection(几个socket的元组:rpc_sk,mem_sk,fs_sk三个socket对象);
  • target_host(用于远程调用的对象);
  • nostart(是否在目的机端启动容器);
  • htype(好像是迁移类型:lxc,docker等,该对象包含不同类型特有的迁移步骤实现);
  • fs(fs_haul_subtree.py.p_haul_fs对象,文件系统迁移驱动,使用rsync复制子目录,用于处理文件系统);
  • img(用于checkpoint后产生的镜像文件热迁移的驱动,处理所有镜像文件的传输);
  • criu_connection(用于本机调用criu命令)

初始化,对以上几个对象也进行初始化:

 class phaul_iter_worker(object):
	def __init__(self, p_type, dst_id, mode, connection, nostart):
		self.__mode = mode
		self.connection = connection
		self.target_host = xem_rpc_client.rpc_proxy(self.connection.rpc_sk)
		self.nostart = nostart

		logging.info("Setting up local")
		self.htype = htype.get_src(p_type)
		if not self.htype:
			raise Exception("No htype driver found")

		self.fs = self.htype.get_fs(self.connection.fdfs)
		if not self.fs:
			raise Exception("No FS driver found")

		self.img = None
		self.criu_connection = None
		if is_live_mode(self.__mode):
			self.img = images.phaul_images("dmp")
			self.criu_connection = criu_api.criu_conn(self.connection.mem_sk)

		logging.info("Setting up remote")
		p_dst_type = (p_type[0], dst_id if dst_id else p_type[1])
		self.target_host.setup(p_dst_type, mode)
 

完成worker的初始化后,开始调用worker.start_migration()来进入热迁移的具体实现过程,所以我们从这个start_migration()入手,该函数如下:

def start_migration(self):
		logging.info("Start migration in %s mode", self.__mode)
		if is_live_mode(self.__mode):
		//由于根据前面的分析,该迁移模式为热迁移模式,执行该步骤
			self.__start_live_migration()
		elif is_restart_mode(self.__mode):
			self.__start_restart_migration()
		else:
			raise Exception("Unknown migration mode")

热迁移模式实现函数:__start_live_migration

由于整个实现过程过于复杂,所以,我们将该函数分阶段进行解析

step1 迁移准备:检查cpu,criu,及是否支持mem_track,目前docker不支持page_server不支持pre-dump,最后进行文件系统传输

def __start_live_migration(self):
		"""Start migration in live mode

		Migrate memory and fs to target host iteratively while possible,
		checkpoint process tree on source host and restore it on target host.
		"""

		self.fs.set_work_dir(self.img.work_dir())
		self.__validate_cpu()//备份cpu信息,并传输到目的机,criu cpuinfo dump
		self.__validate_criu_version()//检查源主机和目的机criu版本,源主机的criu版本要小于目的机的criu版本
		use_pre_dumps = self.__check_use_pre_dumps()//检查是否可以pre_dumpmem_track,目前docker还不支持pre-dump因为docker目前不支持pageserver,所以目前不能支持mem——track
		root_pid = self.htype.root_task_pid()//获取容器全id

		migration_stats = mstats.live_stats()//创建一个记录迁移状态的对象,记录迁移各阶段迁移时间的对象
		migration_stats.handle_start()//迁移开始,记录开始时间

		# Handle preliminary FS migration
		logging.info("Preliminary FS migration")
		fsstats = self.fs.start_migration()//开始文件系统迁移
		migration_stats.handle_preliminary(fsstats)//打印文件系统迁移情况

FS_Migration

文件系统传输过程的实现:

def start_migration(self):
		logging.info("Starting FS migration")
		self.__run_rsync()
		return None

//文件系统源主机到目的机的文件系统同步
def __run_rsync(self):
		logf = open(os.path.join(self.__wdir, rsync_log_file), "w+")

        //遍历根目录挂载点下的所有子目录通过rsync同步到目的机
		for dir_name in self.__roots:
            # container_id = dir_name.split("/")[-1]
            # mount_path = "/var/lib/docker/image/aufs/layerdb/mounts/"+container_id+"/mount-id"
            # file = open(mount_path)
            # mounts_id = file.read()
            # file.close()
            # dir_name = dir_name[0:dir_name.rfind("/")]
            # dir_name = dir_name+mounts_id
            # logging.info("container_id:%s, mount_path:%s,mount_id:%s,dir_name:%s",container_id,mount_path,mounts_id,dir_name)
            //以上为我修改的代码,这是由于新版docker/var/lib/docker/aufs/mnt下挂载的容器文件系统目录id与容器id并不是一致的,而是通过/var/lib/docker/image/aufs/layerdb/mounts/container_id/mount-id下的关联文件进行映射的,所以这里与旧版的docker有区别
			dst = "%s:%s" % (self.__thost, os.path.dirname(dir_name))

			# First rsync might be very long. Wait for it not
			# to produce big pause between the 1st pre-dump and
			# .stop_migration

			ret = sp.call(
				["rsync", "-a", dir_name, dst],
				stdout=logf, stderr=logf)
			if ret != 0:
				raise Exception("Rsync failed")

step2:pre-dump迭代迁移

        iter_index = 0//记录迭代的次数
		prev_dstats = None

		while use_pre_dumps://这是用于迭代迁移的步骤,而目前docker并不支持迭代迁移,所以use_pre_dumpsfalse,故跳过此步骤

			# Handle predump
			logging.info("* Iteration %d", iter_index)
			self.target_host.start_iter(True)
			self.img.new_image_dir()
			criu_cr.criu_predump(self.htype, root_pid, self.img,
								self.criu_connection, self.fs)
			self.target_host.end_iter()

			# Handle FS migration iteration
			fsstats = self.fs.next_iteration()

			dstats = criu_api.criu_get_dstats(self.img)
			migration_stats.handle_iteration(dstats, fsstats)

			# Decide whether we continue iteration or stop and do final dump
			if not self.__check_live_iter_progress(iter_index, dstats,
												prev_dstats):
				break

			iter_index += 1
			prev_dstats = dstats

step3: Stop-And-Copy,达到阈值条件进行最后一轮传输

# Dump htype on source and leave its tasks in frozen state
		logging.info("Final dump and restore")
		 self.target_host.start_iter(self.htype.dump_need_page_server())//目的机准备进行最后一轮迭代
		
		self.img.new_image_dir()//新建一个镜像文件目录,用于存放这一轮迭代镜像的镜像文件
		self.htype.final_dump(root_pid, self.img,
							self.criu_connection, self.fs)//执行criu dump命令调用最后一轮迭代
		self.target_host.end_iter()//通知目的机结束迭代传输过程

		try:
			# Handle final FS and images sync on frozen htype
			logging.info("Final FS and images sync")
			fsstats = self.fs.stop_migration()//做最后一轮文件系统迁移同步
			self.img.sync_imgs_to_target(self.target_host, self.htype,
										self.connection.mem_sk)//将镜像文件传输到目的机

			# Restore htype on target
			logging.info("Asking target host to restore")
			self.target_host.restore_from_images()//让目的机根据镜像文件恢复容器运行

		except Exception:
			self.htype.migration_fail(self.fs)
			raise

阶段三中有三个步骤需要仔细研究:

  • final_dump(最后一轮迭代备份)
  • sync_imgs_to_target(同步备份镜像文件到目的机)
  • restore_from_images(目的机端根据镜像文件恢复容器运行)

final_dump

我们先看第一个步骤final_dump: 原版:

# Some constants for docker
docker_bin = "/usr/bin/docker-1.9.0-dev"
docker_dir = "/var/lib/docker/"
docker_run_meta_dir = "/var/run/docker/execdriver/native"
def final_dump(self, pid, img, ccon, fs):
		logging.info("Dump docker container %s", pid)
		logf = open("/tmp/docker_checkpoint.log", "w+")//打开docker日志文件
		image_path_opt = "--image-dir=" + img.image_dir()//配置docker checkpoint命令
		ret = sp.call([docker_bin, "checkpoint", image_path_opt, self._ctid],
					stdout=logf, stderr=logf)
		if ret != 0:
			raise Exception("docker checkpoint failed")

修改后代码:将执行命令修改为docker checkpoint create –checkpoint-dir image-dir containerid checkpointName,通过该命令生成检查点文件checkpointName

这里的image-dir默认根目录为def_path = “/var/local/p.haul-fs/”,产生的checkpoint备份目录名称格式为ckdir(type-random-time)这里type在初始化过程设置为了dmp,全路径名为:def_path/ckdir/img/itertimes即”/var/local/p.haul-fs/type-random-time/img/1”

# Some constants for docker
docker_bin = "/usr/bin/docker"
docker_dir = "/var/lib/docker/"
docker_run_meta_dir = "/run/runc"
def final_dump(self, pid, img, ccon, fs):
                logging.info("Dump docker container %s", pid)
                logf = open("/tmp/docker_checkpoint.log", "w+")
                image_path_opt = "--checkpoint-dir="+img.image_dir();
                logging.info("Execute Cmd:%s checkpoint create %s %s mysql_checkpoint",docker_bin,image_path_opt,self._ctid)
                ret = sp.call([docker_bin, "checkpoint","create", image_path_opt, self._ctid,"mysql_checkpoint"],
                                        stdout=logf, stderr=logf)
                if ret != 0:
                        raise Exception("docker checkpoint failed")

 

sync_imgs_to_target

传输镜像文件到目的端

def sync_imgs_to_target(self, target_host, htype, sk):
		# Pre-dump doesn't generate any images (yet?)
		# so copy only those from the top dir
		logging.info("Sending images to target")

		start = time.time()
		cdir = self.image_dir()

		target_host.start_accept_images(phaul_images.IMGDIR)//目的端开始准备接收镜像文件,sk.recv(),再利用tarfileobj解压镜像
		tf = img_tar(sk, cdir)//这里将sk包裹到tarfileobj里面去,并重写fileobjreadwrite方法,所以当执行tf.add时候相当于执行了sk.senddata)一样将镜像文件发送到了目的端

		logging.info("\tPack")
		for img in filter(lambda x: x.endswith(".img"), os.listdir(cdir)):
			tf.add(img)//遍历cdir目录,def_path/ckdir/img/itertimes,将该目录下所有镜像文件全部发送到目的机

		logging.info("\tAdd htype images")
		for himg in htype.get_meta_images(cdir):
			tf.add(himg[1], himg[0])//最后发送metaimage,包括:state.jsondescribe.json

		tf.close()//关闭源主机上镜像输出端sk
		target_host.stop_accept_images()//关闭目的机上镜像文件接受端sk

		self.sync_time = time.time() - start//计算镜像文件传输同步所花时间

restore_from_images

def rpc_restore_from_images(self):
		logging.info("Restoring from images")
		self.htype.put_meta_images(self.img.image_dir())//放置metadata
		self.htype.final_restore(self.img, self.criu_connection)//恢复容器运行
		logging.info("Restore succeeded")
		self.restored = True

由代码中可以看到,根据镜像恢复分为两小步:放置metadata,恢复容器运行,我们逐个进行分析:

(1) put_meta_images()放置metadata

def put_meta_images(self, dir):
		# Create docker runtime meta dir on dst side
		with open(os.path.join(dir, "state.json")) as data_file:
			data = json.load(data_file)
		//首先打开state.json文件,在def_path/ckdir/img/itertimes目录下面,然后使用json工具加载该json文件
		self.full_ctid = data["id"]//获取该json文件中id所映射的数据即容器的id

		self.__load_ct_config(docker_dir)//配置好docker文件系统需要迁移的三个目录:/var/lib/docker/aufs/mnt/mount_id,/var/lib/docker/container/container_id,run/runc/container_id/state.json
		os.makedirs(self._ct_run_meta_dir)//首先打开容器metadata目录,把state.json复制到该目录下面
		pd = sp.Popen(["cp", os.path.join(dir, "state.json"),
					self._ct_run_meta_dir], stdout=PIPE)
		pd.wait()

(2)根据镜像文件恢复容器

重启了docker daemon,然后调用docker restore有待修改

def final_restore(self, img, criu):
		logf = open("/tmp/docker_restore.log", "w+")

		# Kill any previous docker daemon in order to reload the
		# status of the migrated container
		self.kill_last_docker_daemon()//重启docker daemon,我觉得这里没必要

		# start docker daemon in background
		sp.Popen([docker_bin, "daemon", "-s", "aufs"],
				stdout=logf, stderr=logf)
		# daemon.wait() TODO(dguryanov): docker daemon not return
		time.sleep(2)

		image_path_opt = "--image-dir=" + img.image_dir()
		ret = sp.call([docker_bin, "restore", image_path_opt, self._ctid],
					stdout=logf, stderr=logf)//这里的代码需要修改为docker start --checkpoint-dir...
		if ret != 0:
			raise Exception("docker restore failed")