"phaul热迁移过程总结"

  "远程调用目的机命令分析"

Posted by Xu on July 18, 2017

Phaul远程调用目的机命令分析

这一章我们分析phaul如何通过源主机发送消息调用远程目的机上对应的命令: phaul_client_server

在上一章节我们了解到phaul-wrap命令的初始化过程:

客户端发送请求

首先初始化rpc_proxy对象:

self.target_host = xem_rpc_client.rpc_proxy(self.connection.rpc_sk)

rpc_proxy对象的定义如下:

class rpc_proxy(object):
	def __init__(self, sk, *args):
		self._rpc_sk = sk
		c = _rpc_proxy_caller(self._rpc_sk, xem_rpc.RPC_CMD, "init_rpc")
		//初始化_rpc_proxy_caller对象
		c(args)//调用_rpc_proxy_caller_call_函数

	def __getattr__(self, attr):
		return _rpc_proxy_caller(self._rpc_sk, xem_rpc.RPC_CALL, attr)//这里就是返回一个远程命令调用器对象,执行该调用器,相当于执行_rpc_proxy_caller对象中的_call_函数

我们知道在python对象创建过程中_call_和_getattr_的用法,下面是_rpc_proxy_caller对象的定义:

class _rpc_proxy_caller(object):
	def __init__(self, sk, typ, fname):
		self._rpc_sk = sk
		self._fn_typ = typ
		self._fn_name = fname

	def __call__(self, *args):
		call = (self._fn_typ, self._fn_name, args)
		raw_data = repr(call)//拼接成字符串
		self._rpc_sk.send(raw_data)//rpc_sk发送请求数据
		raw_data = self._rpc_sk.recv(xem_rpc.rpc_sk_buf)
		resp = eval(raw_data)//接受返回的响应

		if resp[0] == xem_rpc.RPC_RESP:
			return resp[1]
		elif resp[0] == xem_rpc.RPC_EXC:
			logging.info("Remote exception")
			raise Exception(resp[1])
		else:
			raise Exception("Proto resp error")

服务器端启动服务

以上是phaul在源主机上的客户端发送远程命令调用请求的实现部分,接下来就需要了解远程目的机是如何接受请求,并进行处理的过程,我们知道,为了实现phaul热迁移,我们需要在源主机及目的机两端都需要启动phaul service服务守护进程,启动的命令为

   ./p.haul-wrap service

同样是在p.haul-wrap脚本之中的执行流程,只不过子命令发生变化,而对service子命令的参数配置过程如下:

# Initialize service mode arguments parser
service_parser = subparsers.add_parser("service", help="Service mode")
service_parser.set_defaults(func=run_phaul_service)
service_parser.add_argument("--bind-addr", help="IP to bind to", type=str,
	default=default_service_bind_addr)
service_parser.add_argument("--bind-port", help="Port to bind to", type=int,
	default=default_rpc_port)
service_parser.add_argument("--path", help="Path to p.haul-service script",
	default=os.path.join(os.path.dirname(__file__), "p.haul-service"))
service_parser.add_argument("--one-shot",
	help="Do not run in loop to accept multiple connections",
	default=False, action='store_true')
service_parser.add_argument("--web-gui",
	help="Start web gui", default = False, action = 'store_true')
service_parser.add_argument("--web-partner",
	help="Start web gui", type = str, default = None)
//根据client子命令参数配置的经验,service子命令的参数有:--bind-addr,--bind-port,--path,--one-shot,--web-gui,--web-partner且都具有默认值,同时service的默认执行函数funcrun_phaul_service

run_phaul_service实现过程如下:

def run_phaul_service(args, unknown_args):
	"""Run p.haul-service"""

	if args.web_gui:
	//是否开启web图形界面
		start_web_gui(args.web_partner, args.bind_port)

	print "Waiting for connection..."

	# Establish connection
	host = args.bind_addr, args.bind_port
	server_sk = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
	server_sk.bind(host)//socket绑定本机的默认12345端口
	server_sk.listen(8)//监听该端口信息
	connection_sks = [None, None]
	while True:
		for i in range(len(connection_sks)):
			connection_sks[i], dummy = server_sk.accept()//使该服务器套接字处于等待请求状态,返回一个用于与客户端通信的连接,以及客户端的ip,这里循环两次执行,一次返回用于与客户端接受rpc通信的socket,还有一次返回用于接受客户端内存传输的socket

		# Organize p.haul-service args
		target_args = [args.path]
		target_args.extend(unknown_args)
		target_args.extend(["--fdrpc", str(connection_sks[0].fileno()),
			"--fdmem", str(connection_sks[1].fileno())])

		# Call p.haul-service
		cmdline = " ".join(target_args)
		print "Exec p.haul-service: {0}".format(cmdline)
		if args.one_shot:
			os.system(cmdline)
			return
		else:
			thread.start_new_thread(os.system, tuple([cmdline]))//执行p.haul-service命令,由于one_shot参数默认为false,相当于新建一个线程,执行p.haul-service脚本命令:p.haul-service --fdrpc rpc_sk --fdmem mem_sk

p.haul-service.py:

# Parse arguments
args = phaul.args_parser.parse_service_args()//同样先解析p.haul-service参数到args

# Configure logging
logging.basicConfig(filename=args.log_file, filemode="a", level=logging.INFO,
	format="%(asctime)s.%(msecs)03d: %(process)d: %(message)s",
	datefmt="%H:%M:%S")//配置日志系统

# Setup hook to log uncaught exceptions
sys.excepthook = phaul.util.log_uncaught_exception

phaul.util.log_header()
logging.info("Starting p.haul service")//配置异常追踪

# Establish connection
connection = phaul.connection.establish(args.fdrpc, args.fdmem, args.fdfs)//rpc_sk,mem_sk socket文件描述符中新建socket,并填充到connection对象

t = phaul.xem_rpc.rpc_threaded_srv(phaul.service.phaul_service, connection)//新建rpc_threaded_srv对象,该对象继承threading.Thread,并重写了run函数

# FIXME: Setup stop handlers
stop_fd = t.init_stop_fd()
signal.signal(signal.SIGTERM, fin)
signal.signal(signal.SIGINT, fin)

t.start()//开始线程
signal.pause()
t.join()//直到该线程执行完毕后才继续向后面执行
logging.info("Bye!")

# Close connection
connection.close()

rpc_threaded_srv,rpc线程服务器

这里我们研究一下rpc_threaded_srv对象的初始化过程:

class rpc_threaded_srv(threading.Thread):
      //这里有python继承的概念,方式就是在类名后面的括号加上要继承的父类名
	def __init__(self, srv_class, connection):
		threading.Thread.__init__(self)//初始化的过程,父类也要初始化
		self._mgr = _rpc_server_manager(srv_class, connection)//新建服务器管理器对象
		self._stop_fd = None

//重写线程run方法
	def run(self):
		try:
			self._mgr.loop(self._stop_fd)//当该线程对象执行run函数时会调用管理器的loop函数
		except Exception:
			logging.exception("Exception in rpc_threaded_srv")

//定义函数,初始化停止线程socket文件描述符
	def init_stop_fd(self):
		sks = socket.socketpair()//建立socket对,只能在AF_UNIX域本机上进程之间的通信,不用于网络,为双工通信模式。
		self._stop_fd = sks[0]//线程这一端socketsks0
		return sks[1]//向外部进程返回该socket对的另一端用于发送停止信号

_rpc_server_manager,rpc服务器socket管理器

在rpc_threaded_srv线程服务器初始化过程中,初始化了一个rpc服务器管理者对象_rpc_server_manager,现在我们来看该管理器的定义:

class _rpc_server_manager(object):
	def __init__(self, srv_class, connection):
		self._srv_class = srv_class
		self._connection = connection
		self._poll_list = []
		self._alive = True

		self.add_poll_item(_rpc_server_sk(connection.rpc_sk))

	def add_poll_item(self, item):
		self._poll_list.append(item)

	def remove_poll_item(self, item):
		self._poll_list.remove(item)

	def make_master(self):
		return self._srv_class(self._connection)

	def stop(self):
		self._alive = False
//rpc服务器线程对象运行时会执行该loop函数
	def loop(self, stop_fd):
		if stop_fd:
			self.add_poll_item(_rpc_stop_fd(stop_fd))

		while self._alive:
			r, w, x = select.select(self._poll_list, [], [])//select函数对_poll_list中所有的socket文件描述符对象进行监听,将有事件发生的socket对象放入r,将所有fd放入w,将发生错误的socket对象放入x,当没有事件发生时,该函数会阻塞
			for sk in r://遍历所有发生请求的socket,即_rpc_server_sk对象,执行该对象中的work方法
				sk.work(self)

		logging.info("RPC Service stops")

_rpc_server_sk,socket事件处理器

从_rpc_server_manager初始化过程中可以看到,它是对poll_list中所有的socket使用select进行监控,当有请求发生的时候,会执行该sk.work,而socket对象为_rpc_server_sk对象,所以我们需要了解_rpc_server_sk的定义:

  class _rpc_server_sk(object):
	def __init__(self, sk):
		self._sk = sk//初始化过程中就把rpc_sk赋给该对象中的_sk
		self._master = None

	def fileno(self):
		return self._sk.fileno()

	def work(self, mgr):
		raw_data = self._sk.recv(rpc_sk_buf)//rpc_sk接受数据
		if not raw_data:
			mgr.remove_poll_item(self)
			if self._master:
				self._master.on_disconnect()
			return

		data = eval(raw_data)//把字符串解析为list,这里传过来的数据应该为[RPC_CMD,init_rpc...][RPC_CALL,setup...]
		try:
			if data[0] == RPC_CALL:
			//执行init_rpc之后,执行setup命令
				if not self._master:
					raise Exception("Proto seq error")

				res = getattr(self._master, "rpc_" + data[1])(*data[2])//初始化init_rpc之后,self._masterphaul.service.phaul_service类,执行该类中的rpc_setup(args)方法得到返回值返回,完成方法调用
			elif data[0] == RPC_CMD:
			//首先执行init_rpc对象内调用
				res = getattr(self, data[1])(mgr, *data[2])
			else:
				raise Exception(("Proto typ error", data[0]))
		except Exception as e:
			traceback.print_exc()
			res = (RPC_EXC, e)
		else:
			res = (RPC_RESP, res)

		raw_data = repr(res)
		self._sk.send(raw_data)//将调用结果通过rpc_sk返回

	def init_rpc(self, mgr, args):
		self._master = mgr.make_master()//p.haul-service中执行的phaul.xem_rpc.rpc_threaded_srv(phaul.service.phaul_service, connection),第一个参数phaul.service.phaul_service就指定被调用方法源所在的类
		self._master.on_connect(*args)

  

这一章节我了解了phaul是如何实现源主机到目的机的函数调用过程,主要是通过socket信息的发送及服务端启动线程对socket进行监听后再进行事件处理的过程。

在Python的语言学习过程中,我学习到Python对象继承的实现,socket的操作和调用:主要包括socket创建,socketpair创建(本机unix域内进程间通信),socket绑定,socket监听及socket接发消息,和select()实现socket多重监听,还有线程模块的操作(start,join),还有字符串的拼接与解析,最后还有getattr(object,name)的使用