mongodb内核源码实现、性能调优、最佳运维实践系列-mongodb网络传输层模块源码实现三
关于作者
前滴滴出行技术专家,现任OPPO文档数据库mongodb负责人,负责oppo千万级峰值TPS/十万亿级数据量文档数据库mongodb研发和运维工作,一直专注于分布式缓存、高性能服务端、数据库、中间件等相关研发。Github账号地址:https://github.com/y123456yz
1. 说明
在之前的<<Mongodb网络传输处理源码实现及性能调优-体验内核性能极致设计>>和<<transport_layer网络传输层模块源码实现二>>一文中分析了如何阅读百万级大工程源码、Asio网络库实现、线程模型、transport_layer套接字处理及传输层管理子模块、session会话子模块、Ticket数据收发子模块、service_entry_point服务入口点子模块。
本文将继续分析网络传输层模块中service_state_machine状态机调度子模块内核源码实现。
2. service_state_machine状态机调度子模块
service_state_machine状态机处理模块主要复杂一次完整请求的状态转换,确保请求可以按照指定的流程顺利进行,最终实现客户端的正常mongodb访问。该模块核心代码实现主要由以下三个源码文件实现(test为测试相关,可以忽略):
2.1 核心代码实现
在service_entry_point服务入口点子模块分析中,当接收到一个新的链接后,在ServiceEntryPointImpl::startSession(...)回调函数中会构造一个ServiceStateMachine ssm类,从而实现了新链接、session、ssm的一一映射关系。其中,ServiceStateMachine 类实现对ThreadGuard(线程守护)有较多的依赖,因此本文从这两个类核心代码实现来分析整个状态机调度模块的内部设计细节。
2.1.1 ThreadGuard线程守护类核心代码实现
ThreadGuard也就是”线程守护”类,该类主要用于工作线程名的管理维护、ssm归属管理、该ssm对应session链接的回收处理等。该类核心成员及接口实现如下:
1.class ServiceStateMachine::ThreadGuard { 2. ...... 3.public: 4. // create a ThreadGuard which will take ownership of the SSM in this thread. 5. //ThreadGuard初始化,标记ssm所有权属于本线程 6. explicit ThreadGuard(ServiceStateMachine* ssm) : _ssm{ssm} { 7. //获取ssm状态机所有权 8. auto owned = _ssm->_owned.compareAndSwap(Ownership::kUnowned, Ownership::kOwned); 9. //如果是一个链接一个线程模型,则条件满足 10. if (owned == Ownership::kStatic) { 11. //一个链接一个线程模型,由于链接始终由同一个线程处理,因此该链接对应的ssm始终归属于同一个线程 12. dassert(haveClient()); 13. dassert(Client::getCurrent() == _ssm->_dbClientPtr); 14. //标识归属权已确定 15. _haveTakenOwnership = true; 16. return; 17. } 18. ...... 19. //adaptive 动态线程模式走下面的模式 20. 21. //把当前线程的线程名零时保存起来 22. auto oldThreadName = getThreadName(); 23. //ssm保存的线程名和当前线程名不同 24. if (oldThreadName != _ssm->_threadName) { 25. //即将修改线程名了,把修改前的线程名保存到_oldThreadName 26. _ssm->_oldThreadName = getThreadName().toString(); 27. //把运行本ssm状态机的线程名改为conn-x线程 28. setThreadName(_ssm->_threadName); //把当前线程改名为_threadName 29. } 30. 31. //设置本线程对应client信息,一个链接对应一个client,标识本client当前归属于本线程处理 32. Client::setCurrent(std::move(_ssm->_dbClient)); 33. //本状态机ssm所有权有了,归属于运行本ssm的线程 34. _haveTakenOwnership = true; 35. } 36. ...... 37. //重新赋值 38. ThreadGuard& operator=(ThreadGuard&& other) { 39. if (this != &other) { 40. _ssm = other._ssm; 41. _haveTakenOwnership = other._haveTakenOwnership; 42. //原来的other所有权失效 43. other._haveTakenOwnership = false; 44. } 45. //返回 46. return *this; 47. }; 48. 49. //析构函数 50. ~ThreadGuard() { 51. //ssm所有权已确定,则析构的时候,调用release处理,恢复线程原有线程名 52. if (_haveTakenOwnership) 53. release(); 54. } 55. 56. //一个链接一个线程模型,标记_owned为kStatic,也就是线程名永远不会改变 57. void markStaticOwnership() { 58. dassert(static_cast<bool>(*this)); 59. _ssm->_owned.store(Ownership::kStatic); 60. } 61. 62. //恢复原有线程名,同时把client信息从调度线程归还给状态机 63. void release() { 64. auto owned = _ssm->_owned.load(); 65. //adaptive异步线程池模式满足if条件,表示SSM固定归属于某个线程 66. if (owned != Ownership::kStatic) { 67. //本线程拥有currentClient信息,于是把它归还给SSM状态机 68. if (haveClient()) { 69. _ssm->_dbClient = Client::releaseCurrent(); 70. } 71. //恢复到以前的线程名 72. if (!_ssm->_oldThreadName.empty()) { 73. //恢复到老线程名 74. setThreadName(_ssm->_oldThreadName); 75. } 76. } 77. //状态机状态进入end,则调用对应回收hook处理 78. if (_ssm->state() == State::Ended) { 79. //链接关闭的回收处理 ServiceStateMachine::setCleanupHook 80. auto cleanupHook = std::move(_ssm->_cleanupHook); 81. if (cleanupHook) 82. cleanupHook(); 83. return; 84. } 85. 86. //归属权失效 87. _haveTakenOwnership = false; 88. //归属状态变为未知 89. if (owned == Ownership::kOwned) { 90. _ssm->_owned.store(Ownership::kUnowned); 91. } 92. } 93. 94.private: 95. //本线程守护当前对应的ssm 96. ServiceStateMachine* _ssm; 97. //默认false,标识该状态机ssm不归属于任何线程 98. bool _haveTakenOwnership = false; 99.}
从上面的代码分析可以看出线程守护类作用比较明确,就是守护当前线程的归属状态,并记录状态机ssm不同状态变化前后的线程名。此外,状态机ssm对应的session链接如果进入end状态,则该链接的资源回收释放也由该类完成。
查看mongod或者mongos实例,如果启动实例的时候配置了”serviceExecutor: adaptive”会发现这些进程下面有很多线程名为”conn-x”和”worker-x”线程,同时同一个线程线程名可能发生改变,这个过程就是由ThreadGuard线程守护类来实现。synchronous一个链接一个线程模型只有”conn-x”线程,adaptive线程模型将同时存在有线程名为”conn-x”和”worker-x”的线程,具体原理讲在后面继续分析,如下图:
说明:synchronous线程模式对应worker初始线程名为”conn-x”,adaptive线程模型对应worker初始线程名为”worker-x”。
ThreadGuard线程守护类和状态机ssm(service_state_machine)关联紧密,客户端请求处理的内部ssm状态转换也和该类密切关联,请看后续分析。
2.1.2 ServiceStateMachine 类核心代码实现
service_state_machine状态机处理模块核心代码实现通过ServiceStateMachine类完成,该类的核心结构成员和函数接口如下:
1.//一个新链接对应一个ServiceStateMachine保存到ServiceEntryPointImpl._sessions中 2.class ServiceStateMachine : public std::enable_shared_from_this<ServiceStateMachine> { 3. ...... 4.public: 5. ...... 6. static std::shared_ptr<ServiceStateMachine> create(...); 7. ServiceStateMachine(...); 8. //状态机所属状态 9. enum class State { 10. //ServiceStateMachine::ServiceStateMachine构造函数初始状态 11. Created, 12. //ServiceStateMachine::_runNextInGuard开始进入接收网络数据状态 13. //标记本次客户端请求已完成(已经发送DB获取的数据给客户端),等待调度进行该链接的 14. //下一轮请求,该状态对应处理流程为_sourceMessage 15. Source, 16. //等待获取客户端的数据 17. SourceWait, 18. //接收到一个完整mongodb报文后进入该状态 19. Process, 20. //等待数据发送成功 21. SinkWait, 22. //接收或者发送数据异常、链接关闭,则进入该状态 _cleanupSession 23. EndSession, 24. //session回收处理进入该状态 25. Ended 26. }; 27. //所有权状态,主要用来判断是否需要在状态转换中跟新线程名,只对动态线程模型生效 28. enum class Ownership { 29. //该状态表示本状态机SSM处于非活跃状态 30. kUnowned, 31. //该状态标识本状态机SSM归属于某个工作worker线程,处于活跃调度运行状态 32. kOwned, 33. //表示SSM固定归属于某个线程 34. kStatic 35. }; 36. 37. ...... 38.private: 39. //ThreadGuard可以理解为线程守护,后面在ThreadGuard类中单独说明 40. class ThreadGuard; 41. friend class ThreadGuard; 42. 43. ...... 44. //获取session信息 45. const transport::SessionHandle& _session() 46. //以下两个接口为任务task调度相关接口 47. void _scheduleNextWithGuard(...); 48. void _runNextInGuard(ThreadGuard guard); 49. //接收到一个完整mongodb报文后的处理 50. inline void _processMessage(ThreadGuard guard); 51. //以下四个接口完成底层数据读写及其对应回调处理 52. void _sourceCallback(Status status); 53. void _sinkCallback(Status status); 54. void _sourceMessage(ThreadGuard guard); 55. void _sinkMessage(ThreadGuard guard, Message toSink); 56. 57. //一次客户端请求,当前mongodb服务端所处的状态 58. AtomicWord<State> _state{State::Created}; 59. //服务入口,ServiceEntryPointMongod ServiceEntryPointMongos mongod及mongos入口点 60. ServiceEntryPoint* _sep; 61. //synchronous及adaptive模式,也就是线程模型是一个链接一个线程还是动态线程池 62. transport::Mode _transportMode; 63. //ServiceContextMongoD(mongod)或者ServiceContextNoop(mongos)服务上下文 64. ServiceContext* const _serviceContext; 65. //也就是本ssm对应的session信息,默认对应ASIOSession 66. transport::SessionHandle _sessionHandle; 67. //根据session构造对应client信息,ServiceStateMachine::ServiceStateMachine赋值 68. //也就是本次请求对应的客户端信息 69. ServiceContext::UniqueClient _dbClient; 70. //指向上面的_dbClient 71. const Client* _dbClientPtr; 72. //该SSM当前处理线程的线程名,因为adaptive线程模型一次请求中的不同状态会修改线程名 73. const std::string _threadName; 74. //修改线程名之前的线程名称 75. std::string _oldThreadName; 76. //ServiceEntryPointImpl::startSession->ServiceStateMachine::setCleanupHook中设置赋值 77. //session链接回收处理 78. stdx::function<void()> _cleanupHook; 79. //接收处理的message信息 一个完整的报文就记录在该msg中 80. Message _inMessage; 81. //默认初始化kUnowned,标识本SSM状态机处于非活跃状态, 82. //主要用来判断是否需要在状态转换中跟新线程名,只对动态线程模型生效 83. AtomicWord<Ownership> _owned{Ownership::kUnowned}; 84.}
该类核心成员功能说明如下表:
成员名 | 功能说明 |
_state | SSM状态机当前所处状态 |
_sep | 服务入口,mongod对应ServiceEntryPointMongod;mongos对应 ServiceEntryPointMongos |
_transportMode | synchronous及adaptive模式,也就是线程模型是一个链接一个线程还是动态线程池 |
_serviceContext | 服务上下文,mongod和mongos分别对应:ServiceContextMongoD、ServiceContextNoop |
_sessionHandle | 本ssm对应的session信息,默认对应ASIOSession |
_dbClient | 也就是本次请求对应的客户端信息 |
_dbClientPtr | 指向上面的_dbClient |
_threadName | SSM所属线程的线程名,初始化为”conn-n”,adaptive线程模型一次请求中的不同状态会修改线程名 |
_oldThreadName | 修改线程名之前的线程名称 |
_cleanupHook | session链接回收处理 |
_inMessage | 接收处理的message信息 一个完整的报文就记录在该msg中 |
_owned | 默认初始化kUnowned,标识本SSM状态机处于非活跃状态,主要用来判断是否需要在状态转换中跟新线程名,只对动态线程模型生效 |
我们知道,链接、session、SSM状态机一一对应,他们也拥有对应的归属权,这里的归属权指的就是当前SSM归属于那个线程,也就是当前SSM状态机调度模块由那个线程实现。归属权通过Ownership类标记,该类保护如下几种状态,每种状态功能说明如下:
归属码 | 功能说明 |
kUnowned | 未知状态,也就是SSM当前不归属与具体的线程 |
kOwned | 针对adaptive线程模型,表示当前SSM状态转换处理由具体的线程负责,也就是归属权明确 |
kStatic | 针对synchronous线程模型,也就是一个链接一个线程模型。由于一个链接始终由同一个线程处理,因此该链接对应SSM也就始终归属于同一个线程,整个运行状态不会改变。 |
Mongodb服务端接收到客户端请求后的数据接收、协议解析、从db层获取数据、发送数据给客户端都是通过SSM状态机进行有序的状态转换处理,SSM调度处理过程中保护多个状态,每个状态对应一个状态码,具体状态码及其功能说明如下表所示:
状态码 | 功能说明 |
Created | ServiceStateMachine::ServiceStateMachine()构造函数初始状态 |
Source | 下面两种情况会进入该状态:1. 新连接到来第一次进入SSM处理;2. 客户端请求已完成(已经发送DB获取的数据给客户端),等待调度进行该链接的下一轮请求 |
SourceWait | 等待获取客户端的数据 |
Process | 接收到一个完整mongodb报文后进入该状态,开始进行协议解析、db层数据访问 |
SinkWait | 等待数据发送成功,发送数据给客户端过程中 |
EndSession | 接收或者发送数据异常、链接关闭,session对应客户端,同时进入Ended状态 |
Ended | session回收处理,进入EndSession后立马通过_cleanupSession进入该状态 |
以上是SSM处理请求过程的状态码信息,状态转换的具体实现过程请参考后面的核心代码分析。listerner线程接收到新的客户端链接后会调用通过service_entry_point服务入口点子模块的ssm->start()接口进入SSM状态机调度模块,该接口相关的源码实现如下:
1.//ServiceEntryPointImpl::startSession中执行 启动SSM状态机 2.void ServiceStateMachine::start(Ownership ownershipModel) { 3. //直接调用_scheduleNextWithGuard接口 4. _scheduleNextWithGuard( 5. //listener线程暂时性的变为conn线程名,在_scheduleNextWithGuard中任 6. //务入队完成后,在下面的_scheduleNextWithGuard调用guard.release()恢复listener线程名 7. ThreadGuard(this), transport::ServiceExecutor::kEmptyFlags, ownershipModel); 8.} 9. 10.void ServiceStateMachine::_scheduleNextWithGuard(...) { 11. //该任务func实际上由worker线程运行,worker线程从asio库的全局队列获取任务调度执行 12. auto func = [ ssm = shared_from_this(), ownershipModel ] { 13. //构造ThreadGuard 14. ThreadGuard guard(ssm.get()); 15. //说明是sync mode,即一个链接一个线程模式, 归属权明确,属于指定线程 16. if (ownershipModel == Ownership::kStatic) 17. guard.markStaticOwnership(); 18. //对应:ServiceStateMachine::_runNextInGuard,在这里面完成状态调度转换 19. ssm->_runNextInGuard(std::move(guard)); 20. }; 21. //恢复之前的线程名,如果该SSM进入Ended状态,则开始资源回收处理 22. guard.release(); 23. //ServiceExecutorAdaptive::schedule(adaptive) ServiceExecutorSynchronous::schedule(synchronous) 24. //第一次进入该函数的时候在这里面创建新线程,不是第一次则把task任务入队调度 25. Status status = _serviceContext->getServiceExecutor()->schedule(std::move(func), flags); 26. if (status.isOK()) { 27. return; 28. } 29. //异常处理 30. ...... 31.}
ServiceStateMachine::start()接口调用ServiceStateMachine::_scheduleNextWithGuard()来启动状态机运行。_scheduleNextWithGuard()接口最核心的作用就是调用service_executor服务运行子模块(线程模型子模块)的schedule()接口来把状态机调度任务入队到ASIO网络库的一个全局队列(adaptive动态线程模型),如果是一个链接一个线程模型,则任务入队到线程级私有队列。
adaptive线程模型,任务入队以及工作线程调度任务执行的流程将在后续的线程模型子模块中分析,也可以参考:<<Mongodb网络传输处理源码实现及性能调优-体验内核性能极致设计>>
此外,_scheduleNextWithGuard()入队到全局队列的任务就是本模块后面需要分析的SSM状态机任务,这些task任务通过本函数接口的func (...)进行封装,然后通过线程模型子模块入队到一个全局队列。Func(...)这个task任务中会直接调用_runNextInGuard()接口来进行状态转换处理,该接口也就是入队到ASIO全局队列的任务,核心代码功能如下:
1.void ServiceStateMachine::_runNextInGuard(ThreadGuard guard) { 2. //获取当前SSM状态 3. auto curState = state(); 4. // If this is the first run of the SSM, then update its state to Source 5. //如果是第一次运行该SSM,则状态为Created,到这里标记可以准备接收数据了 6. if (curState == State::Created) { 7. //进入Source等待接收数据 8. curState = State::Source; 9. _state.store(curState); 10. } 11. //各状态对应处理 12. try { 13. switch (curState) { 14. //接收数据 15. case State::Source: 16. _sourceMessage(std::move(guard)); 17. break; 18. //以及接收到完整的一个mongodb报文,可以内部处理(解析+命令处理+应答客户端) 19. case State::Process: 20. _processMessage(std::move(guard)); 21. break; 22. //链接异常或者已经关闭,则开始回收处理 23. case State::EndSession: 24. _cleanupSession(std::move(guard)); 25. break; 26. default: 27. MONGO_UNREACHABLE; 28. } 29. return; 30. } catch (...) { 31. //异常打印 32. } 33. //异常处理 34. ...... 35. //进入EndSession状态 36. _state.store(State::EndSession); 37. _cleanupSession(std::move(guard)); 38.}
从上面的代码实现可以看出,真正入队到全局队列中的任务类型只有三个,分别是:
- 接收mongodb数据的task任务,简称为readTask。
- 接收到一个完整mongodb数据后的后续处理(包括协议解析、命令处理、DB获取数据、发送数据给客户端等),简称为dealTask。
- 接收或者发送数据异常、链接关闭等引起的后续资源释放,简称为cleanTask。
下面针对这三种task任务核心代码实现进行分析:
- readTask任务核心代码实现
readTask任务核心代码实现由_sourceMessage()接口实现,具体代码如下:
1.//接收客户端数据 2.void ServiceStateMachine::_sourceMessage(ThreadGuard guard) { 3. ...... 4. //获取本session接收数据的ticket,也就是ASIOSourceTicket 5. auto ticket = _session()->sourceMessage(&_inMessage); 6. //进入等等接收数据状态 7. _state.store(State::SourceWait); 8. //release恢复worker线程原有的线程名,synchronous线程模型为"conn-xx",adaptive对应worker线程名为"conn-xx" 9. guard.release(); 10. //线程模型默认同步方式,也就是一个链接一个线程 11. if (_transportMode == transport::Mode::kSynchronous) { 12. //同步方式,读取到一个完整mongodb报文后执行_sourceCallback回调 13. _sourceCallback([this](auto ticket) { 14. MONGO_IDLE_THREAD_BLOCK; 15. return _session()->getTransportLayer()->wait(std::move(ticket)); 16. }(std::move(ticket))); 17. } else if (_transportMode == transport::Mode::kAsynchronous) { 18. //adaptive线程模型,异步读取一个mongodb报文后执行_sourceCallback回调 19. _session()->getTransportLayer()->asyncWait( 20. ////TransportLayerASIO::ASIOSourceTicket::_bodyCallback读取到一个完整报文后执行该回调 21. std::move(ticket), [this](Status status) { _sourceCallback(status); }); 22. } 23.} 24. 25.//接收到一个完整mongodb报文后的回调处理 26.void ServiceStateMachine::_sourceCallback(Status status) { 27. //构造ThreadGuard,修改执行本SSM接口的线程名为conn-xx 28. ThreadGuard guard(this); 29. 30. //状态检查 31. dassert(state() == State::SourceWait); 32. //获取链接session远端信息 33. auto remote = _session()->remote(); 34. if (status.isOK()) { 35. //等待调度,进入处理消息阶段 _processMessage 36. _state.store(State::Process); 37. //注意kMayRecurse标识State::Process阶段的处理还是由本线程执行,这是一个递归标记 38. return _scheduleNextWithGuard(std::move(guard), ServiceExecutor::kMayRecurse); 39. } 40. ...... 41. //异常流程调用 42. _runNextInGuard(std::move(guard)); 43.}
SSM调度的第一个任务就是readTask任务,从上面的源码分析可以看出,该任务就是通过ticket数据分发模块从ASIO网络库读取一个完整长度的mongodb报文,然后执行_sourceCallback回调。进入该回调函数后,即刻设置SSM状态为State::Process状态,然后调用_scheduleNextWithGuard(...)把dealTask任务入队到ASIO的全局队列(adaptive线程模型),或者入队到线程级私有队列(synchronous线程模型)等待worker线程调度执行。
这里有个细节,在把dealTask入队的时候,携带了kMayRecurse标记,该标记标识该任务可以递归调用,也就是该任务可以由当前线程继续执行,这样也就可以保证同一个请求的taskRead任务和dealTask任务由同一个线程处理。任务递归调度,可以参考后面的线程模型子模块源码实现。
2. dealTask任务核心代码实现
当读取到一个完整长度的mongodb报文后,就会把dealTask任务入队到全局队列,然后由worker线程调度执行该task任务。dealTask任务的核心代码实现如下:
1.//dealTask处理 2.void ServiceStateMachine::_processMessage(ThreadGuard guard) { 3. ...... 4. //入口流量计数 5. networkCounter.hitLogicalIn(_inMessage.size()); 6. //获取一个唯一的UniqueOperationContext,一个客户端对应一个UniqueOperationContext 7. auto opCtx = Client::getCurrent()->makeOperationContext(); 8. //ServiceEntryPointMongod::handleRequest ServiceEntryPointMongos::handleRequest请求处理 9. //command处理、DB访问后的数据通过dbresponse返回 10. DbResponse dbresponse = _sep->handleRequest(opCtx.get(), _inMessage); 11. //释放opCtx,这样currentop就看不到了 12. opCtx.reset(); 13. //需要发送给客户端的应答数据 14. Message& toSink = dbresponse.response; 15. //应答数据存在 16. if (!toSink.empty()) { 17. ...... 18. //发送数据 ServiceStateMachine::_sinkMessage() 19. _sinkMessage(std::move(guard), std::move(toSink)); 20. 21. } else { 22. //如果不需要应答客户端的处理 23. ...... 24. } 25.} 26. 27.//调用Sinkticket发送数据 28.void ServiceStateMachine::_sinkMessage(ThreadGuard guard, Message toSink) { 29. //获取发送数据的ASIOSinkTicket 30. auto ticket = _session()->sinkMessage(toSink); 31. //进入sink发送等待状态 32. _state.store(State::SinkWait); 33. //恢复原有的worker线程名 34. guard.release(); 35. //synchronous线程模型,同步发送 36. if (_transportMode == transport::Mode::kSynchronous) { 37. //最终在ASIOSinkTicket同步发送数据成功后执行_sinkCallback 38. _sinkCallback(_session()->getTransportLayer()->wait(std::move(ticket))); 39. } else if (_transportMode == transport::Mode::kAsynchronous) { 40. //最终在ASIOSinkTicket异步发送数据成功后执行_sinkCallback 41. _session()->getTransportLayer()->asyncWait( 42. std::move(ticket), [this](Status status) { _sinkCallback(status); }); 43. } 44.} 45. 46.//sink数据发送 47.void ServiceStateMachine::_sinkCallback(Status status) { 48. //SSM归属于新的guard,同时修改当前线程名为conn-xx 49. ThreadGuard guard(this); 50. //状态检查 51. dassert(state() == State::SinkWait); 52. if (!status.isOK()) { 53. //进入EndSession状态 54. _state.store(State::EndSession); 55. //异常情况调用 56. return _runNextInGuard(std::move(guard)); 57. } else if (_inExhaust) { //_inExhaust方式 58. //注意这里的状态是process _processMessage 还需要继续进行Process处理 59. _state.store(State::Process); 60. } else { 61. //正常流程始终进入该分支 _sourceMessage 这里继续进行递归接收数据处理 62. //注意这里的状态是Source,继续接收客户端请求 63. _state.store(State::Source); 64. } 65. //本链接对应的一次mongo访问已经应答完成,需要继续要一次调度了 66. return _scheduleNextWithGuard(std::move(guard), 67. ServiceExecutor::kDeferredTask | 68. ServiceExecutor::kMayYieldBeforeSchedule); 69.}
readTask通过ticket数据分发子模块读取一个完整长度的mongodb报文后,开始dealTask任务逻辑,该任务也就是_processMessage(...)。该接口中核心实现就是调用mongod和mongos实例对应的服务入口类的handleRequest(...)接口来完成后续的command命令处理、DB层数据访问等,访问到的数据存储到DbResponse中,最终在通过_sinkMessage(...)把数据发送出去。
真正的mongodb内部处理流程实际上就是通过该dealTask任务完成,该任务也是处理整个请求中资源耗费最重的一个环节。在该task任务中,当数据成功发送给客户端后,该session链接对应的SSM状态机进入State::Source状态,继续等待worker线程调度来完成后续该链接的新请求。
3. cleanTask任务
在数据读写过程、客户端链接关闭、访问DB数据层等任何一个环节异常,则会进入State::EndSession状态。该状态对应得任务实现相对比较简单,具体代码实现如下:
1.//session对应链接回收处理 2.void ServiceStateMachine::_cleanupSession(ThreadGuard guard) { 3. //进入这个状态后在~ThreadGuard::release中进行ssm _cleanupHook处理,该hook在ServiceEntryPointImpl::startSession 4. _state.store(State::Ended); 5. //清空message buffer 6. _inMessage.reset(); 7. //释放链接对应client资源 8. Client::releaseCurrent(); 9.}
进入该状态后直接由本线程进行session资源回收和client资源释放处理,而无需状态机调度worker线程来回收。
2.2 关于worker线程名和guardthread线程守护类
前面得分析我们知道,当线程模型为adaptive动态线程模型的时候,mongod和mongos实例对应的子线程中有很多名为“conn-xx”和”worker-xx”的线程,而且同一个线程可能一会儿线程名为“conn-xx”,下一次又变为了”worker-xx”。这个线程名的初始命名和线程名更改与ServiceStateMachine状态机调度类、guardthread线程守护类、worker线程模型等都有关系。
Worker线程由ServiceExecutor线程模型子模块创建,请参考后续线程模型子模块相关章节。默认初始化线程名为”conn-x”,初始化代码实现如下:
1.//ServiceStateMachine::create调用,ServiceStateMachine类初始化构造 2.ServiceStateMachine::ServiceStateMachine(...) 3. ...... 4. //线程名初始化:conn-xx,xx代码session id 5. _threadName{str::stream() << "conn-" << _session()->id()} {} 6.} 7. 8.class Session { 9. ...... 10. //sessionID,自增生成 11. const Id _id; 12.} 13. 14.//全局sessionIdCounter计数器,初始化为0 15.AtomicUInt64 sessionIdCounter(0); 16. 17.//session id自增 18.Session::Session() : _id(sessionIdCounter.addAndFetch(1)) {}
SSM状态处理过程中,会把一个完整的请求过程 = readTask任务 + dealTask任务,这两个任务都是通过SSM状态机和ServiceExecutor线程模型子模块的worker线程配合调度完成,在任务处理过程中处理同一个任务的线程可能会有多次线程名更改,这个就是结合guardthread线程守护类来完成,以一个线程名切换更改伪代码实现为例:
1.worker_thread_run_task(...) 2.{ 3. //如果是adaptive线程模型,当前worker线程名为"worker-xx" 4. print(threadName) 5. //业务逻辑处理1 6. ...... 7. 8. //初始化构造ThreadGuard,这里面修改线程名为_ssm->_threadName,也就是"conn-xx", 9. //同时保存原来的线程名"worker-xx"到_ssm->_oldThreadName中 10. ThreadGuard guard(this); 11. //如果是adaptive线程模型,线程名打印内容为"conn-xx" 12. print(threadName) 13. //业务逻辑处理2 14. ...... 15. //恢复_ssm->_oldThreadName保存的线程名"worker-xx" 16. guard.release(); 17. 18. //如果是adaptive线程模型,线程名恢复为"worker-xx" 19. print(threadName) 20.}
从上面的伪代码可以看出,adaptive线程模型对应worker线程名为”worker”,在进入ThreadGuard guard(this)流程后,线程名更改为”conn-xx”线程,当guard.release()释放后恢复原有”worker-xx”线程名。
结合前面的SSM状态处理流程,adaptive线程模型可以得到如下总结:底层网络IO数据读写过程,worker线程名会改为”worker-xx”,其他非网络IO的mongodb内部逻辑处理线程名为”conn-xx”。所以,如果查看mongod或者mongos进程所有线程名的时候,如果发现线程名为”worker-xx”,说明当前线程在处理网络IO;如果发现线程名为”conn-xx”,则说明当前线程在处理内部逻辑处理,对于mongod实例可以理解为主要处理磁盘IO。
由于synchronous同步线程模型,同一链接对应的所有客户端请求至始至终都有同一线程处理,所以整个处理线程名不会改变,也没必要修改线程名,整个过程都是”conn-xx”线程名。
2.3 该模块函数接口总结大全
前面分析了主要核心接口源码实现,很多其他接口没有一一列举详细分析,该模块u所有接口功能总结如下,更多接口代码实现详见Mongodb内核源码详细注释分析:
类名 | 函数接口 | 功能说明 |
ThreadGuard | ThreadGuard(...) | 线程守护初始化,把worker线程名改为”conn-xx” |
ThreadGuard | operator=(...) | 类赋值 |
ThreadGuard | ~ThreadGuard() | 类析构,析构函数中调用release()接口 |
ThreadGuard | operator bool() | 获取所有权标识 |
ThreadGuard | markStaticOwnership() | synchronous线程模型默认归属权为kStatic |
ThreadGuard | release() |
|
ServiceStateMachine | ServiceStateMachine(...) | ServiceStateMachine类初始化构造赋值 |
ServiceStateMachine | _session() | 获取当前ssm对应的session信息 |
ServiceStateMachine | _sourceMessage(...) | 等待通过ASIO库接收网络IO数据 |
ServiceStateMachine | _sinkMessage(...) | 等待通过ASIO库发送网络IO数据 |
ServiceStateMachine | _sourceCallback(...) | 接收到一个完整长度mongodb报文的回调处理 |
ServiceStateMachine | _sinkCallback(...) | 发送一个完整应答报文后的回调处理 |
ServiceStateMachine | _processMessage(...) | 开始内部处理,如协议解析、命令处理、DB层数据访问等 |
ServiceStateMachine | _runNextInGuard(...) | SSM状态机调度运行 |
ServiceStateMachine | start(...) | 接收到一个新链接通过start()接口启用SSM状态机 |
ServiceStateMachine | _scheduleNextWithGuard(...) | readTask和dealTask交由worker线程处理 |
ServiceStateMachine | terminate(...) | 调用TransportLayerASIO::end()进行套接字回收处理 |
ServiceStateMachine | setCleanupHook(...) | 设置clean hook,也就是回收处理 |
ServiceStateMachine | state() | 获取SSM当前状态 |
ServiceStateMachine | _terminateAndLogIfError(...) | 打印回收日志并进行terminate回收处理 |
ServiceStateMachine | _cleanupSession(...) | session会话回收处理 |
3. 总结
本文主要分析了service_state_machine状态机子模块,该模块把session对应的客户端请求转换为readTask任务、dealTask任务和cleanTask任务,前两个任务通过worker线程完成调度处理,cleanTask任务在内部处理异常或者链接关闭的时候由本线程直接执行,而不是通过worker线程调度执行。
这三个任务处理过程会分别对应到Created、Source、SourceWait、Process、SinkWait、EndSession、Ended七种状态的一种或者多种,具体详见前面的状态码分析。一个正常的客户端请求状态转换过程如下:
- 链接刚建立的第一次请求状态转换过程:
Created->Source -> SourceWait -> Process -> SinkWait -> Source
2. 该链接后续的请求状态转换过程:
Source -> SourceWait -> Process -> SinkWait -> Source
此外,SSM状态机调度模块通过ServiceStateMachine::_scheduleNextWithGuard(...)接口和线程模型子模块关联起来。SSM通过该接口完成worker线程初始创建、task任务入队处理,下期将分析<<网络线程模型子模块>>详细源码实现。
说明:
该模块更多接口实现细节详见Mongodb内核源码注释:Mongodb内核源码详细注释分析

低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
- 上一篇
面试官:如何写出让 CPU 跑得更快的代码?
前言 代码都是由 CPU 跑起来的,我们代码写的好与坏就决定了 CPU 的执行效率,特别是在编写计算密集型的程序,更要注重 CPU 的执行效率,否则将会大大影响系统性能。 CPU 内部嵌入了 CPU Cache(高速缓存),它的存储容量很小,但是离 CPU 核心很近,所以缓存的读写速度是极快的,那么如果 CPU 运算时,直接从 CPU Cache 读取数据,而不是从内存的话,运算速度就会很快。 但是,大多数人不知道 CPU Cache 的运行机制,以至于不知道如何才能够写出能够配合 CPU Cache 工作机制的代码,一旦你掌握了它,你写代码的时候,就有新的优化思路了。 那么,接下来我们就来看看,CPU Cache 到底是什么样的,是如何工作的呢,又该写出让 CPU 执行更快的代码呢? 正文 CPU Cache 有多快? 你可能会好奇为什么有了内存,还需要 CPU Cache?根据摩尔定律,CPU 的访问速度每 18 个月就会翻倍,相当于每年增长 60% 左右,内存的速度当然也会不断增长,但是增长的速度远小于 CPU,平均每年只增长 7% 左右。于是,CPU 与内存的访问性能的差距不断...
- 下一篇
Flutter Android小米推送集成
Flutter 小米推送集成 最近 Flutter 项目需要集成推送,IOS 还好,Android 需要接入各个厂商的推送通道。可谓一步一坑,有时候真的是边看文档边骂。不过还好,含泪填坑后,最终集成了华为,小米,Vivo,Oppo,极光等推送。 恰好昨晚一网友也咨询我小米推送集成的问题,还有就是我感觉小米集也确实有点稍微不一样,所以记录一下 Flutter 小米推送集成的步骤,也算是一个教程。 Flutter 集成小米推送,有插件,插件叫:xiaomipush_plugin: ^1.0.0。源码和用法我看了,也都挺简单的。但是用的时候和后端推送 API 对接不起来。我出现的现象是: 后端调接口如果携带参数,点击通知能收到 NotificationMessageClicked 类型的监听。但是如果程序处于被杀死的情况下,无法唤起应用重新运行。后台定义打开应用首页时,点击通知却不能收到 NotificationMessageClicked 类型的监听。 后来我就直接把小米推送集成到自己的应用了,此处我新开一个应用总结一下:创建应用的包名:com.jumanyi.merchant 开始 St...
相关文章
文章评论
共有0条评论来说两句吧...