Skynet 服务创建流程
Skynet 服务创建流程
根据设计综述 Skynet 是为了让服务器充分利用多核优势,将不同的业务放在独立的执行环境中处理。
Skynet 核心功能是加载一个 C 模块(动态库),模块用数字 id 标识,作为其 handle ,模块被称为服务 service 。服务间可以自由发送消息。每个模块可以向 Skynet 框架注册一个 callback 函数,用来接收发给它的消息。
每个服务是被一个个消息驱动,当无消息时,服务处于挂起状态。每个服务拥有一个属于自己的消息队列,框架中存在一个全局队列负责调度处理服务所接收到的消息。
代码层面,Skynet 服务对应于数据结构 struct skynet_context
,其中重要字段如下。
struct skynet_context { void * instance; // 模块自定义数据 struct skynet_module * mod; // 框架模块数据 void * cb_ud; // 传给回调函数的自定义数据 skynet_cb cb; // 回调函数 struct message_queue * queue; // 消息队列,用于接收发送给服务的消息 };
函数 skynet_context_new
用于创建服务,返回值表示此服务。
// 参数 name: 服务名 // 参数 param: 传递给服务的参数 struct skynet_context * skynet_context_new(const char * name, const char *param);
服务调用函数 skynet_callback
用于向框架注册回调函数,处理接收到的消息。
// 参数 context: 表示服务 // 参数 ud: user data 表示自定义数据 // 参数 cb: 表示回调函数 void skynet_callback(struct skynet_context * context, void *ud, skynet_cb cb); // 参数 context: 表示服务 // 参数 ud: user data 由 skynet_callback 指定 // 参数 type: 消息类型 // 参数 session: 由发送方指定,标识发送的消息 // 参数 source: 表示发送方服务的地址 // 参数 msg sz: 数据 typedef int (*skynet_cb)(struct skynet_context * context, void *ud, int type, int session, uint32_t source, const void * msg, size_t sz);
函数 skynet_send
用于向服务发送消息。向一个服务发送消息就是向这个服务的消息队列中添加消息。
// 参数 context: 表示服务 // 参数 source: 表示发送方服务的地址,可为 0 // 参数 destination: 表示接收方服务的地址 // 参数 type: 消息类型 // 参数 session: 用于发送方标识发送的消息,可为 0 // 参数 data sz: 数据 // 返回值 : session int skynet_send(struct skynet_context * context, uint32_t source, uint32_t destination , int type, int session, void * data, size_t sz);
C 服务
// 参数 name: 服务名 // 参数 param: 传递给服务的 c-style 字符串,以空白字符分割 struct skynet_context * skynet_context_new(const char * name, const char *param) { struct skynet_module * mod = skynet_module_query(name); // 执行 create 传递参数无 void *inst = skynet_module_instance_create(mod); // 执行 init ,参数 param 是传递给服务的参数 int r = skynet_module_instance_init(mod, inst, ctx, param); }
函数 skynet_context_new
完成 C 服务的创建,一次函数调用即可完成。函数执行成功,返回的 context 便是创建的服务。创建过程中的初始化包含 create
和 init
流程,在 init
流程中调用 skynet_callback
注册回调函数。此后,服务创建完成,便可接收消息。
Lua 函数 skynet.launch
用于在 Lua 中创建 C 服务。
Lua 服务
Lua 服务本质上也是 C 服务,只是将 Lua 回调函数注册到 C 服务中,将对服务接收消息的处理移动到 Lua 中。完成此功能的 C 服务是 snlua 服务。
因此,创建 Lua 服务需要先创建 snlua 服务,然后在 Lua 中调用 skynet.start
将 Lua 回调函数注册到 C 服务中,skynet.start
函数执行完后,Lua 服务创建完成。
skynet.newservice
用于创建 Lua 服务。参数 name
是服务名,参数 ...
是传给服务的参数,需要是 Lua 中能被转换成字符串的值。
function skynet.newservice(name, ...) return skynet.call(".launcher", "lua" , "LAUNCH", "snlua", name, ...) end
skynet.newservice
流程如下。
- 当前服务向 launcher 服务发送请求,请求创建服务
name
。 - launcher 服务创建新服务。
- 新服务创建完成后,
skynet.newservice
返回新服务的地址。
具体的创建过程如下所示。
逻辑来到 launcher 服务 command.LAUNCH
函数(主要代码片段)。
local function launch_service(service, ...) local param = table.concat({...}, " ") local inst = skynet.launch(service, param) local response = skynet.response() if inst then -- launch 成功 services[inst] = service .. " " .. param instance[inst] = response else -- launch 失败 response(false) return end return inst end function command.LAUNCH(_, service, ...) launch_service(service, ...) return NORET end
command.LAUNCH
调用 launch_service
传递的参数依次是 ("snlua", name, ...)
与 skynet.newservice
传递的参数对应。 在 launch_service
函数中 service
变量是 "snlua"
而 param
变量是要创建的服务名及其参数。调用 skynet.launch
创建 C 服务 snlua 且将 param
传递给 snlua ,返回的 inst
是创建的地址。 创建成功记录数据到 services
和 instance
变量中,services
存储通过 launcher 服务创建的服务,instance
存储 skynet.response
用于回复请求方创建结果。
创建失败调用 response(false)
回复请求方创建失败。
新服务创建完成后,发送 LAUNCHOK
到 launcher 服务。launcher 服务回复请求方新服务的地址。
逻辑来到 C 服务 snlua ,模块是 service_snlua.c 。
snlua 服务的核心工作就是将消息处理回调函数对接到 Lua 中指定的回调函数。
snlua 服务首先执行 create 函数,调用 lua_newstate
创建 Lua 虚拟机。然后在 init 函数调用 skynet_callback
向框架注册回调函数,并向自身发送第一条消息,用于进行后续初始化。注意 args
参数是要创建的服务信息。
// 参数 args: 字符串,包含由空白字符分割的多个字符串,第一个字符串是要创建的 Lua 服务名,参考 skynet.launch 的 param 参数 int snlua_init(struct snlua *l, struct skynet_context *ctx, const char * args) { int sz = strlen(args); char * tmp = skynet_malloc(sz); memcpy(tmp, args, sz); // 1. 注册回调函数 launch_cb skynet_callback(ctx, l , launch_cb); // 2. 在第一条消息中进行后续初始化 const char * self = skynet_command(ctx, "REG", NULL); uint32_t handle_id = strtoul(self+1, NULL, 16); // it must be first message skynet_send(ctx, 0, handle_id, PTYPE_TAG_DONTCOPY,0, tmp, sz); return 0; } struct snlua * snlua_create(void) { struct snlua * l = skynet_malloc(sizeof(*l)); memset(l,0,sizeof(*l)); l->L = lua_newstate(lalloc, l); // 创建 Lua 虚拟机 return l; }
问题:为何没有在 snlua_init
中直接调用 init_cb
?
乍一看,后续初始化逻辑 init_cb
可在 snlua_init
函数中完成,但这里设计成延迟到第一条消息中执行,好处是简化 snlua_init
函数逻辑,虽然增加了流程,但由于是在第一条消息中处理,整个过程是连续的,从框架整体来看,可认为此过程是“原子”的。对于这种复杂初始化流程,我也很认可这种设计,学习了。
在第一条消息中处理后续初始化。如果初始化失败,snlua 服务退出。
// 参数 msg sz: 要创建的服务信息,参考 snlua_init 中 args 参数 static int launch_cb(struct skynet_context * context, void *ud, int type, int session, uint32_t source , const void * msg, size_t sz) { assert(type == 0 && session == 0); struct snlua *l = ud; skynet_callback(context, NULL, NULL); // 细节:先清空回调字段 int err = init_cb(l, context, msg, sz); if (err) { skynet_command(context, "EXIT", NULL); } return 0; }
Lua 服务具体的初始化逻辑如下。
// 参数 args sz: 对应 launch_cb 中 msg sz static int init_cb(struct snlua *l, struct skynet_context *ctx, const char * args, size_t sz) { lua_State *L = l->L; l->ctx = ctx; lua_gc(L, LUA_GCSTOP, 0); // 关闭 GC lua_pushboolean(L, 1); /* signal for libraries to ignore env. vars. */ lua_setfield(L, LUA_REGISTRYINDEX, "LUA_NOENV"); luaL_openlibs(L); // 打开标准库 lua_pushlightuserdata(L, ctx); // 设置 struct skynet_context lua_setfield(L, LUA_REGISTRYINDEX, "skynet_context"); luaL_requiref(L, "skynet.codecache", codecache , 0); lua_pop(L,1); // 保存一些路径到如下全局变量中 const char *path = optstring(ctx, "lua_path","./lualib/?.lua;./lualib/?/init.lua"); lua_pushstring(L, path); lua_setglobal(L, "LUA_PATH"); const char *cpath = optstring(ctx, "lua_cpath","./luaclib/?.so"); lua_pushstring(L, cpath); lua_setglobal(L, "LUA_CPATH"); const char *service = optstring(ctx, "luaservice", "./service/?.lua"); lua_pushstring(L, service); lua_setglobal(L, "LUA_SERVICE"); const char *preload = skynet_command(ctx, "GETENV", "preload"); lua_pushstring(L, preload); lua_setglobal(L, "LUA_PRELOAD"); // 设置 traceback 函数 lua_pushcfunction(L, traceback); assert(lua_gettop(L) == 1); // 加载 Lua 服务入口脚本 const char * loader = optstring(ctx, "lualoader", "./lualib/loader.lua"); int r = luaL_loadfile(L,loader); if (r != LUA_OK) { skynet_error(ctx, "Can't load %s : %s", loader, lua_tostring(L, -1)); report_launcher_error(ctx); return 1; } lua_pushlstring(L, args, sz); r = lua_pcall(L,1,0,1); if (r != LUA_OK) { skynet_error(ctx, "lua loader error : %s", lua_tostring(L, -1)); report_launcher_error(ctx); return 1; } lua_settop(L,0); // 处理 skynet.memlimit 设置的内存限制 if (lua_getfield(L, LUA_REGISTRYINDEX, "memlimit") == LUA_TNUMBER) { size_t limit = lua_tointeger(L, -1); l->mem_limit = limit; skynet_error(ctx, "Set memory limit to %.2f M", (float)limit / (1024 * 1024)); lua_pushnil(L); lua_setfield(L, LUA_REGISTRYINDEX, "memlimit"); } lua_pop(L, 1); lua_gc(L, LUA_GCRESTART, 0); // 重启 GC return 0; }
函数 init_cb
的核心作用就是调用 lua_pcall
函数执行 Lua 服务入口脚本,脚本文件由 args sz
指定,具体实现有一些细节。
- 上述整个加载过程是关闭 Lua GC 的,完成后才重新启动 GC 。猜测是为了加快加载速度。
skynet.codecache
用于在 Lua 虚拟机之间共享代码。- 调用
lua_pcall
前设置了traceback
函数。 - 通过
lualoader
加载lualoader
脚本,并传递args sz
参数。 - snlua 服务中调用
skynet_callback(context, NULL, NULL);
删除回调函数后,未再发现注册回调函数的 C 代码,此注册是在 Lua 中完成的。
执行 lua_pcall
函数,逻辑来到框架提供的 lualoader
脚本,位于 ./lualib/loader.lua
。
-- ... 就是 C 函数 init_cb 中的 args sz local args = {} -- Lua 服务名及参数 for word in string.gmatch(..., "%S+") do table.insert(args, word) end SERVICE_NAME = args[1] -- 服务名 -- 定位并加载 Lua 服务脚本文件 local main, pattern -- 分别表示加载后的 Lua chunk 和 Lua 服务脚本文件路径 local err = {} for pat in string.gmatch(LUA_SERVICE, "([^;]+);*") do local filename = string.gsub(pat, "?", SERVICE_NAME) local f, msg = loadfile(filename) if not f then table.insert(err, msg) else pattern = pat main = f break end end -- 预处理,可选 if LUA_PRELOAD then local f = assert(loadfile(LUA_PRELOAD)) f(table.unpack(args)) LUA_PRELOAD = nil end -- 执行 Lua 服务入口脚本 main(select(2, table.unpack(args)))
前面提到,需要在脚本调用 skynet.start
向框架注册 Lua 回调函数,完成创建 Lua 服务。lualoader
中调用 main
函数,执行 Lua 服务入口脚本,于是 skynet.start
函数被调用,Lua 服务创建完成。
skynet.start
函数中 c.callback(skynet.dispatch_message)
完成 Lua 回调函数的注册。
如下代码,skynet.start
执行完毕后,注册一个 0
秒定时器回调,那时调用 start_func
执行上层业务初始化,并根据初始化结果发送消息到 launcher 服务,告知创建成功与否。
function skynet.start(start_func) c.callback(skynet.dispatch_message) -- 注册 Lua 回调函数 -- 注册定时器回调处理上层业务初始化 init_thread = skynet.timeout(0, function() skynet.init_service(start_func) init_thread = nil end) end function skynet.init_service(start) -- 初始化上层业务,并告知 launcher 创建结果 local ok, err = skynet.pcall(start) if not ok then skynet.error("init service failed: " .. tostring(err)) skynet.send(".launcher","lua", "ERROR") skynet.exit() else skynet.send(".launcher","lua", "LAUNCHOK") end end
注意,skynet.start
函数执行完后,在 Skynet 框架层面 Lua 服务已创建完成,可对外提供服务。
而 skynet.init_service
是在业务层面完成初始化,然后才通知 launcher 服务。
问题:为何没有在 skynet.start
中直接调用 skynet.init_service
?
先看调用链 snlua - init_cb()
-> lua_pcall
-> lualoader - main()
-> skynet.start()
。skynet.start
调用堆栈是从 init_cb
函数触发,而 skynet.init_service
会调用上层业务初始化 start_func
函数,start_func
函数可能会很复杂,比如有 RPC ,在 skynet.start
函数中将 skynet.init_service
放到定时器回调中执行,可以将业务层初始化和框架层初始化分离,简化 init_cb
函数触发到脚本中的逻辑。学习了。
理解:在 Skynet 框架层面,创建服务的“原子”性。
对于 C 服务,一次 C 函数 skynet_context_new
调用完成创建,框架内部处理具体过程中的多线程临界区,但从框架层面来看,给这个服务发送消息,要么未查询到服务,要么查询到服务且此时服务是可接收消息的。
对于 Lua 服务,需要一次 C 函数 skynet_context_new
调用和第一条消息完成创建,虽然带有流程,但第一条消息保证了连续性,从框架层面来看,也满足给这个服务发送消息,要么未查询到服务,要么查询到服务且此时服务是可接收消息的。但 Lua 服务中,skynet.init_service
是在定时器回调中调用的,假设在此函数执行前,消息队列中已经存在其它消息,此时在 Lua raw_dispatch_message
函数中,如下代码片段,若 p == nil
则进行如下处理。
local p = proto[prototype] if p == nil then if session ~= 0 then c.send(source, skynet.PTYPE_ERROR, session, "") else unknown_request(session, source, msg, sz, prototype) end return end
理解:函数 skynet.register_protocol
和 skynet.dispatch
调用时机。
此函数指定用于业务的消息处理函数。而 skynet.start
的参数 start_func
调用之前,可能已经接收到了消息。
因此,如果某类消息依赖于 start_func
进行初始化,则应该在 start_func
中才指定用于业务的消息处理函数。
如果某消息不依赖于 start_func
,则可和 skynet.start
的调用时机一样,指定消息处理函数。
理解:服务与 worker 线程。
Skynet 在启动 worker 线程之前,就创建了服务,并且可向此服务发送消息,只是 worker 线程开始工作后,才开始调度执行服务接收到的消息。
低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
- 上一篇
GitOps 与 ChatOps 的落地实践
前言 说到 GitOps 和 ChatOps ,那就不得不谈到 DevOps 。 DevOps 作为一种文化,旨在促进开发、测试和运维人员之间的沟通与协作。而促进合作的方式,往往是使用一系列工具,完成这三个角色的相互协作。这带来的好处也是显而易见的:更快的交付速度和更低的人力成本。获益于 DevOps 和公有云,一个近百人的研发团队,可以只配备一到两个专职运维人员,降低的成本不言而喻。既然 DevOps 是一种文化,那么在不同的团队则会有不同的实践,而无论实践如何,其最终目的都是一样的:最大化的实现自动化,释放更多的人力资源,创建更大价值。 而 GitOps 和 ChatOps ,则是 DevOps 的两种实践。这两种实践分别通过使用 版本控制软件 Git 和实时聊天软件来达到提升交付速度和研发效率的目的。 GitOps GitOps 是一种实现持续交付的模型,它的核心思想是将应用系统的声明性基础架构和应用程序存放在 Git 的版本控制库中。 将 Git 作为交付流水线的核心,每个开发人员都可以提交拉取请求(Pull Request)并使用 Git 来加速和简化 Kubernete...
- 下一篇
分布式Redis深度历险-Sentinel
上一篇介绍了Redis的主从服务器之间是如何同步数据的。试想下,在一主一从或一主多从的结构下,如果主服务器挂了,整个集群就不可用了,单点问题并没有解决。Redis使用Sentinel解决该问题,保障集群的高可用。 如何保障集群高可用 保障集群高可用,要具备如下能力: 能监测服务器的状态,当主服务器不可用时,能及时发现 当主服务器不可用时,选择一台最合适的从服务器替代原有主服务器 存储相同数据的主服务器同一时刻只有一台 要实现上述功能,最直观的做法就是,使用一台监控服务器来监视Redis 服务器的状态。 监控服务器和主从服务器间维护一个心跳连接,当超出一定时间没有收到主服务器心跳时,主服务器就会被标记为下线,然后通知从服务器上线成为主服务器。 当原来的主服务器上线后,监控服务器会将其转换为从服务器。 按照上述流程似乎解决了集群高可用的问题,但似乎有哪里不对:如果监控服务器出了问题怎么办?我们可以在加上一个从监控服务器,当主服务器不可用的时候顶上。 但问题是谁来监控’监控服务器’呢?子子孙孙无穷尽也。。 先把疑问放在一旁,先来看下Redis Sentinel集群的实现 Sentinel...
相关文章
文章评论
共有0条评论来说两句吧...