您现在的位置是:首页 > 文章详情

mongodb内核源码实现、性能调优、最佳运维实践系列-command命令处理模块源码实现一

日期:2020-11-09点击:357

关于作者

      前滴滴出行技术专家,现任OPPO文档数据库mongodb负责人,负责oppo千万级峰值TPS/十万亿级数据量文档数据库mongodb内核研发及运维工作,一直专注于分布式缓存、高性能服务端、数据库、中间件等相关研发。后续持续分享《MongoDB内核源码设计、性能优化、最佳运维实践》,Github账号地址:https://github.com/y123456yz

  1. 背景

      <<transport_layer网络传输层模块源码实现>>中分享了mongodb内核底层网络IO处理相关实现,包括套接字初始化、一个完整mongodb报文的读取、获取到DB数据发送给客户端等。Mongodb支持多种增、删、改、查、聚合处理、cluster处理等操作,每个操作在内核实现中对应一个command,每个command有不同的功能,mongodb内核如何进行command源码处理将是本文分析的重点

      此外,mongodb提供了mongostat工具来监控当前集群的各种操作统计。Mongostat监控统计如下图所示:

      其中,insert、delete、update、query这四项统计比较好理解,分别对应增、删、改、查。但是,comand、getmore不是很好理解,command代表什么统计?getMore代表什么统计?,这两项相对比较难理解。

      此外,通过本文字分析,我们将搞明白这六项统计的具体含义,同时弄清这六项统计由那些操作进行计数。

      Command命令处理模块分为:mongos操作命令、mongod操作命令、mongodb集群内部命令,具体定义如下:

  • mongos操作命令,客户端可以通过mongos访问集群相关的命令。
  • mongod操作命令:客户端可以通过mongod复制集和cfg server访问集群的相关命令。
  • mongodb集群内部命令:mongos、mongod、mongo-cfg集群实例之间交互的命令。

      Command命令处理模块核心代码实现如下:

     《command命令处理模块源码实现》相关文章重点分析命令处理模块核心代码实现,也就是上面截图中的命令处理源码文件实现。

2. <<transport_layer网络传输层模块源码实现>>衔接回顾

      <<transport_layer网络传输层模块源码实现三>>一文中,我们对service_state_machine状态机调度子模块进行了分析,该模块中的dealTask任务进行mongodb内部业务逻辑处理,其核心实现如下:

1.//dealTask处理   2.void ServiceStateMachine::_processMessage(ThreadGuard guard) {   3. ...... 4.    //command处理、DB访问后的数据通过dbresponse返回   5.    DbResponse dbresponse = _sep->handleRequest(opCtx.get(), _inMessage);   6. ...... 7.}

      上面的_sep对应mongod或者mongos实例的服务入口实现,该_seq成员分别在如下代码中初始化为ServiceEntryPointMongod和ServiceEntryPointMongod类实现。SSM状态机的_seq成员初始化赋值核心代码实现如下:

1.//mongos实例启动初始化   2.static ExitCode runMongosServer() {   3.    ......   4.    //mongos实例对应sep为ServiceEntryPointMongos   5.    auto sep = stdx::make_unique<ServiceEntryPointMongos>(getGlobalServiceContext());   6.    getGlobalServiceContext()->setServiceEntryPoint(std::move(sep));   7.    ......   8.}   9.   10.//mongod实例启动初始化   11.ExitCode _initAndListen(int listenPort) {   12.    ......   13.    //mongod实例对应sep为ServiceEntryPointMongod   14.    serviceContext->setServiceEntryPoint(   15.        stdx::make_unique<ServiceEntryPointMongod>(serviceContext));   16.    ......   17.}   18.   19.//SSM状态机初始化   20.ServiceStateMachine::ServiceStateMachine(...)   21.    : _state{State::Created},   22.      //mongod和mongos实例的服务入口通过这里赋值给_seq成员变量   23.      _sep{svcContext->getServiceEntryPoint()},   24.      ......   } 

         通过上面的几个核心接口实现,把mongos和mongod两个实例的服务入口与状态机SSM(ServiceStateMachine)联系起来,最终和下面的command命令处理模块关联。

     dealTask进行一次mongodb请求的内部逻辑处理,该处理由_sep->handleRequest()接口实现。由于mongos和mongod服务入口分别由ServiceEntryPointMongosServiceEntryPointMongod两个类实现,因此dealTask也就演变为如下接口处理:

  • mongos实例:ServiceEntryPointMongos::handleRequest(...)
  • Mongod实例::ServiceEntryPointMongod::handleRequest(...)

      这两个接口入参都是OperationContext和Message,分别对应操作上下文、请求原始数据内容。下文会分析Message解析实现、OperationContext服务上下文实现将在后续章节分析。

      Mongod和mongos实例服务入口类都继承自网络传输模块中的ServiceEntryPointImpl类,如下图所示:

      Tips: mongos和mongod服务入口类为何要继承网络传输模块服务入口类?

      原因是一个请求对应一个链接session,该session对应的请求又和SSM状态机唯一对应。所有客户端请求对应的SSM状态机信息全部保存再ServiceEntryPointImpl._sessions成员中,而command命令处理模块为SSM状态机任务中的dealTask任务,通过该继承关系,ServiceEntryPointMongod和ServiceEntryPointMongos子类也就可以和状态机及任务处理关联起来,同时也可以获取当前请求对应的session链接信息。

3. Mongodb协议解析

在《transport_layer网络传输层模块源码实现二》中的数据收发子模块完成了一个完整mongodb报文的接收,一个mongodb报文由Header头部+opCode包体组成,如下图所示:

      上图中各个字段说明如下表:

      opCode取值比较多,早期版本中OP_INSERT、OP_DELETE、OP_UPDATE、OP_QUERY分别针对增删改查请求,Mongodb从3.6版本开始默认使用OP_MSG操作作为默认opCode,是一种可扩展的消息格式,旨在包含其他操作码的功能,新版本读写请求协议都对应该操作码。本文以OP_MSG操作码对应协议为例进行分析,其他操作码协议分析过程类似,OP_MSG请求协议格式如下:

1.OP_MSG {   2.    //mongodb报文头部   3.    MsgHeader header;             4.    //位图,用于标识报文是否需要校验 是否需要应答等   5.    uint32 flagBits;           // message flags   6.    //报文内容,例如find write等命令内容通过bson格式存在于该结构中   7.    Sections[] sections;       // data sections   8.    //报文CRC校验   9.    optional<uint32> checksum; // optional CRC-32C checksum   } 

      OP_MSG各个字段说明如下表:

     一个完整OP_MSG请求格式如下:

      除了通用头部header外,客户端命令请求实际上都保存于sections字段中,该字段存放的是请求的原始bson格式数据。BSON是由10gen开发的一个数据格式,目前主要用于MongoDB中,是MongoDB的数据存储格式。BSON基于JSON格式,选择JSON进行改造的原因主要是JSON的通用性及JSON的schemaless的特性。BSON相比JSON具有以下特性

  • Lightweight(更轻量级)
  • Traversable(易操作)
  • Efficient(高效性能)

      本文重点不是分析bson协议格式,bson协议实现细节将在后续章节分享。bson协议更多设计细节详见:http://bsonspec.org/

      总结:一个完整mongodb报文由header+body组成,其中header长度固定为16字节,body长度等于messageLength-16。Header部分协议解析由message.cpp和message.h两源码文件实现,body部分对应的OP_MSG类请求解析由op_msg.cpp和op_msg.h两源码文件实现。

3. mongodb报文通用头部解析及封装源码实现

      Header头部解析由src/mongo/util/net目录下message.cpp和message.h两文件完成,该类主要完成通用header头部和body部分的解析、封装。因此报文头部核心代码分为以下两类:

  • 报文头部内容解析及封装(MSGHEADER命名空间实现)
  • 头部和body内容解析及封装(MsgData命名空间实现)

3.1 mongodb报文头部解析及封装核心代码实现

      mongodb报文头部解析由namespace MSGHEADER {...}实现,该类主要成员及接口实现如下:

1.namespace MSGHEADER {   2.//header头部各个字段信息   3.struct Layout {   4.    //整个message长度,包括header长度和body长度   5.    int32_t messageLength;      6.    //requestID 该请求id信息   7.    int32_t requestID;          8.    //getResponseToMsgId解析   9.    int32_t responseTo;         10.    //操作类型:OP_UPDATE、OP_INSERT、OP_QUERY、OP_DELETE、OP_MSG等   11.    int32_t opCode;   12.};   13.   14.//ConstView实现header头部数据解析   15.class ConstView {    16.public:   17.    ......   18.    //初始化构造   19.    ConstView(const char* data) : _data(data) {}   20.    //获取_data地址   21.    const char* view2ptr() const {   22.        return data().view();   23.    }   24.    //TransportLayerASIO::ASIOSourceTicket::_headerCallback调用   25.    //解析header头部的messageLength字段   26.    int32_t getMessageLength() const {   27.        return data().read<LittleEndian<int32_t>>(offsetof(Layout, messageLength));   28.    }   29.    //解析header头部的requestID字段   30.    int32_t getRequestMsgId() const {   31.        return data().read<LittleEndian<int32_t>>(offsetof(Layout, requestID));   32.    }   33.    //解析header头部的getResponseToMsgId字段   34.    int32_t getResponseToMsgId() const {   35.        return data().read<LittleEndian<int32_t>>(offsetof(Layout, responseTo));   36.    }   37.    //解析header头部的opCode字段   38.    int32_t getOpCode() const {   39.        return data().read<LittleEndian<int32_t>>(offsetof(Layout, opCode));   40.    }   41.   42.protected:   43.    //mongodb报文数据起始地址   44.    const view_type& data() const {   45.        return _data;   46.    }   47.private:   48.    //数据部分   49.    view_type _data;   50.};   51.   52.//View填充header头部数据   53.class View : public ConstView {   54.public:   55.    ......   56.    //构造初始化   57.    View(char* data) : ConstView(data) {}   58.    //header起始地址   59.    char* view2ptr() {   60.        return data().view();   61.    }   62.    //以下四个接口进行header填充   63.    //填充header头部messageLength字段   64.    void setMessageLength(int32_t value) {   65.        data().write(tagLittleEndian(value), offsetof(Layout, messageLength));   66.    }   67.    //填充header头部requestID字段   68.    void setRequestMsgId(int32_t value) {   69.        data().write(tagLittleEndian(value), offsetof(Layout, requestID));   70.    }   71.    //填充header头部responseTo字段   72.    void setResponseToMsgId(int32_t value) {   73.        data().write(tagLittleEndian(value), offsetof(Layout, responseTo));   74.    }   75.    //填充header头部opCode字段   76.    void setOpCode(int32_t value) {   77.        data().write(tagLittleEndian(value), offsetof(Layout, opCode));   78.    }   79.private:   80.    //指向header起始地址   81.    view_type data() const {   82.        return const_cast<char*>(ConstView::view2ptr());   83.    }   84.};   85.}

      从上面的header头部解析、填充的实现类可以看出,header头部解析由MSGHEADER::ConstView实现;header头部填充由MSGHEADER::View完成。实际上代码实现上,通过offsetof来进行移位,从而快速定位到头部对应字段。

3.2 mongodb报文头部+body解析封装核心代码实现

      Namespace MSGHEADER{...}命名空间只负责header头部的处理,namespace MsgData{...}命名空间相对MSGHEADER命名空间更加完善,除了处理头部解析封装外,还负责body数据起始地址维护、body数据封装、数据长度检查等。MsgData命名空间核心代码实现如下:

1.namespace MsgData {   2.struct Layout {   3.    //数据填充组成:header部分   4.    MSGHEADER::Layout header;   5.    //数据填充组成: body部分,body先用data占位置   6.    char data[4];   7.};   8.   9.//解析header字段信息及body其实地址信息   10.class ConstView {   11.public:   12.    //初始化构造   13.    ConstView(const char* storage) : _storage(storage) {}   14.    //获取数据起始地址   15.    const char* view2ptr() const {   16.        return storage().view();   17.    }   18.   19.    //以下四个接口间接执行前面的MSGHEADER中的头部字段解析   20.    //填充header头部messageLength字段   21.    int32_t getLen() const {   22.        return header().getMessageLength();   23.    }   24.    //填充header头部requestID字段   25.    int32_t getId() const {   26.        return header().getRequestMsgId();   27.    }   28.    //填充header头部responseTo字段   29.    int32_t getResponseToMsgId() const {   30.        return header().getResponseToMsgId();   31.    }   32.    //获取网络数据报文中的opCode字段   33.    NetworkOp getNetworkOp() const {   34.        return NetworkOp(header().getOpCode());   35.    }   36.    //指向body起始地址   37.    const char* data() const {   38.        return storage().view(offsetof(Layout, data));   39.    }   40.    //messageLength长度检查,opcode检查   41.    bool valid() const {   42.        if (getLen() <= 0 || getLen() > (4 * BSONObjMaxInternalSize))   43.            return false;   44.        if (getNetworkOp() < 0 || getNetworkOp() > 30000)   45.            return false;   46.        return true;   47.    }   48.    ......   49.protected:   50.    //获取_storage   51.    const ConstDataView& storage() const {   52.        return _storage;   53.    }   54.    //指向header起始地址   55.    MSGHEADER::ConstView header() const {   56.        return storage().view(offsetof(Layout, header));   57.    }   58.private:   59.    //mongodb报文存储在这里   60.    ConstDataView _storage;   61.};   62.   63.//填充数据,包括Header和body   64.class View : public ConstView {   65.public:   66.    //构造初始化   67.    View(char* storage) : ConstView(storage) {}   68.    ......   69.    //获取报文起始地址   70.    char* view2ptr() {   71.        return storage().view();   72.    }   73.   74.    //以下四个接口间接执行前面的MSGHEADER中的头部字段构造   75.    //以下四个接口完成msg header赋值   76.    //填充header头部messageLength字段   77.    void setLen(int value) {   78.        return header().setMessageLength(value);   79.    }   80.    //填充header头部messageLength字段   81.    void setId(int32_t value) {   82.        return header().setRequestMsgId(value);   83.    }   84.    //填充header头部messageLength字段   85.    void setResponseToMsgId(int32_t value) {   86.        return header().setResponseToMsgId(value);   87.    }   88.    //填充header头部messageLength字段   89.    void setOperation(int value) {   90.        return header().setOpCode(value);   91.    }   92.   93.    using ConstView::data;   94.    //指向data   95.    char* data() {   96.        return storage().view(offsetof(Layout, data));   97.    }   98.private:   99.    //也就是报文起始地址   100.    DataView storage() const {   101.        return const_cast<char*>(ConstView::view2ptr());   102.    }   103.    //指向header头部   104.    MSGHEADER::View header() const {   105.        return storage().view(offsetof(Layout, header));   106.    }   107.};   108.   109.......   110.//Value为前面的Layout,减4是因为有4字节填充data,所以这个就是header长度   111.const int MsgDataHeaderSize = sizeof(Value) - 4;   112.   113.//除去头部后的数据部分长度   114.inline int ConstView::dataLen() const {    115.    return getLen() - MsgDataHeaderSize;   116.}   117.}  // namespace MsgData  

       和MSGHEADER命名空间相比,MsgData这个namespace命名空间接口实现和前面的MSGHEADER命名空间实现大同小异。MsgData不仅仅处理header头部的解析组装,还负责body部分数据头部指针指向、头部长度检查、opCode检查、数据填充等。其中,MsgData命名空间中header头部的解析构造底层依赖MSGHEADER实现。

3.3 Message/DbMessage核心代码实现

      在《transport_layer网络传输层模块源码实现二》中,从底层ASIO库接收到的mongodb报文是存放在Message结构中存储,最终存放在ServiceStateMachine._inMessage成员中。

      在前面第2章我们知道mongod和mongso实例的服务入口接口handleRequest(...)中都带有Message入参,也就是接收到的Message数据通过该接口处理。Message类主要接口实现如下:

1.//DbMessage._msg成员为该类型   2.class Message {   3.public:   4.    //message初始化   5.    explicit Message(SharedBuffer data) : _buf(std::move(data)) {}   6.    //头部header数据   7.    MsgData::View header() const {   8.        verify(!empty());   9.        return _buf.get();   10.    }   11.    //获取网络数据报文中的op字段   12.    NetworkOp operation() const {   13.        return header().getNetworkOp();   14.    }   15.    //_buf释放为空   16.    bool empty() const {   17.        return !_buf;   18.    }   19.    //获取报文总长度messageLength   20.    int size() const {   21.        if (_buf) {   22.            return MsgData::ConstView(_buf.get()).getLen();   23.        }   24.        return 0;   25.    }   26.    //body长度   27.    int dataSize() const {   28.        return size() - sizeof(MSGHEADER::Value);   29.    }   30.    //buf重置   31.    void reset() {   32.        _buf = {};   33.    }   34.    // use to set first buffer if empty   35.    //_buf直接使用buf空间   36.    void setData(SharedBuffer buf) {   37.        verify(empty());   38.        _buf = std::move(buf);   39.    }   40.     //把msgtxt拷贝到_buf中   41.    void setData(int operation, const char* msgtxt) {   42.        setData(operation, msgtxt, strlen(msgtxt) + 1);   43.    }   44.    //根据operation和msgdata构造一个完整mongodb报文   45.    void setData(int operation, const char* msgdata, size_t len) {   46.        verify(empty());   47.        size_t dataLen = len + sizeof(MsgData::Value) - 4;   48.        _buf = SharedBuffer::allocate(dataLen);   49.        MsgData::View d = _buf.get();   50.        if (len)   51.            memcpy(d.data(), msgdata, len);   52.        d.setLen(dataLen);   53.        d.setOperation(operation);   54.    }   55.    ......   56.    //获取_buf对应指针   57.    const char* buf() const {   58.        return _buf.get();   59.    }   60.   61.private:   62.    //存放接收数据的buf   63.    SharedBuffer _buf;   64.};  

       Message是操作mongodb收发报文最直接的实现类,该类主要完成一个完整mongodb报文封装。有关mongodb报文头后面的body更多的解析实现在DbMessage类中完成,DbMessage类包含Message类成员_msg。实际上,Message报文信息在handleRequest(...)实例服务入口中赋值给DbMessage._msg,报文后续的body处理继续由DbMessage类相关接口完成处理。DbMessage和Message类关系如下:

1.class DbMessage {   2.    ......   3.    //包含Message成员变量   4.    const Message& _msg;   5. //mongodb报文起始地址 6. const char* _nsStart; 7. //报文结束地址 8. const char* _theEnd; 9.}   10.   11.DbMessage::DbMessage(const Message& msg) : _msg(msg),    12.  _nsStart(NULL), _mark(NULL), _nsLen(0) {   13.    //一个mongodb报文(header+body)数据的结束地址   14.    _theEnd = _msg.singleData().data() + _msg.singleData().dataLen();   15.    //报文起始地址 [_nextjsobj, _theEnd ]之间的数据就是一个完整mongodb报文   16.    _nextjsobj = _msg.singleData().data();   17.    ......   18.} 

      DbMessage._msg成员为DbMessage 类型,DbMessage_nsStart_theEnd成员分别记录完整mongodb报文的起始地址和结束地址,通过这两个指针就可以获取一个完整mongodb报文的全部内容,包括header和body。

      注意:DbMessage是早期mongodb版本(version<3.6)中用于报文body解析封装的类,这些类针对opCode=[dbUpdate, dbDelete]这个区间的操作。在mongodb新版本(version>=3.6)中,body解析及封装由op_msg.h和op_msg.cpp代码文件中的clase OpMsgRequest{}完成处理。

3.4 OpMsg报文解析封装核心代码实现

        Mongodb从3.6版本开始默认使用OP_MSG操作作为默认opCode,是一种可扩展的消息格式,旨在包含其他操作码的功能,新版本读写请求协议都对应该操作码。OP_MSG对应mongodb报文body解析封装处理由OpMsg类相关接口完成,OpMsg::parse(Message)从Message中解析出报文body内容,其核心代码实现如下:

1.struct OpMsg {    2.      ......   3.    //msg解析赋值见OpMsg::parse      4.    //各种命令(insert update find等)都存放在该body中   5.    BSONObj body;     6.    //sequences用法暂时没看懂,感觉没什么用?先跳过   7.    std::vector<DocumentSequence> sequences; //赋值见OpMsg::parse   8.}   1.//从message中解析出OpMsg信息   2.OpMsg OpMsg::parse(const Message& message) try {   3.    //message不能为空,并且opCode必须为dbMsg   4.    invariant(!message.empty());   5.    invariant(message.operation() == dbMsg);   6.    //获取flagBits   7.    const uint32_t flags = OpMsg::flags(message);   8.    //flagBits有效性检查,bit 0-15中只能对第0和第1位操作   9.    uassert(ErrorCodes::IllegalOpMsgFlag,   10.            str::stream() << "Message contains illegal flags value: Ob"   11.                          << std::bitset<32>(flags).to_string(),   12.            !containsUnknownRequiredFlags(flags));   13.   14.    //校验码默认4字节   15.    constexpr int kCrc32Size = 4;   16.    //判断该mongo报文body内容是否启用了校验功能   17.    const bool haveChecksum = flags & kChecksumPresent;   18.    //如果有启用校验功能,则报文末尾4字节为校验码   19.    const int checksumSize = haveChecksum ? kCrc32Size : 0;   20.    //sections字段内容   21.    BufReader sectionsBuf(message.singleData().data() + sizeof(flags),   22.                          message.dataSize() - sizeof(flags) - checksumSize);   23.   24.    //默认先设置位false   25.    bool haveBody = false;   26.    OpMsg msg;   27.    //解析sections对应命令请求数据   28.    while (!sectionsBuf.atEof()) {   29.     //BufReader::read读取kind内容,一个字节   30.        const auto sectionKind = sectionsBuf.read<Section>();   31.     //kind为0对应命令请求body内容,内容通过bson报错   32.        switch (sectionKind) {   33.         //sections第一个字节是0说明是body   34.            case Section::kBody: {   35.                //默认只能有一个body   36.                uassert(40430, "Multiple body sections in message", !haveBody);   37.                haveBody = true;   38.         //命令请求的bson信息保存在这里   39.                msg.body = sectionsBuf.read<Validated<BSONObj>>();   40.                break;   41.            }   42.   43.         //DocSequence暂时没看明白,用到的地方很少,跳过,后续等   44.            //该系列文章主流功能分析完成后,从头再回首分析   45.            case Section::kDocSequence: {   46.                  ......   47.            }   48.        }   49.    }   50.    //OP_MSG必须有body内容   51.    uassert(40587, "OP_MSG messages must have a body", haveBody);   52.    //body和sequence去重判断   53.    for (const auto& docSeq : msg.sequences) {   54.        ......   55.    }   56.    return msg;   57.}  

        OpMsg类被OpMsgRequest类继承,OpMsgRequest类中核心接口就是解析出OpMsg.body中的库信息和表信息,OpMsgRequest类代码实现如下:

1.//协议解析得时候会用到,见runCommands   2.struct OpMsgRequest : public OpMsg {   3.    ......   4.    //构造初始化   5.    explicit OpMsgRequest(OpMsg&& generic) : OpMsg(std::move(generic)) {}   6.    //opMsgRequestFromAnyProtocol->OpMsgRequest::parse    7.    //从message中解析出OpMsg所需成员信息   8.    static OpMsgRequest parse(const Message& message) {   9.        //OpMsg::parse   10.        return OpMsgRequest(OpMsg::parse(message));   11.    }   12.    //根据db body extraFields填充OpMsgRequest   13.    static OpMsgRequest fromDBAndBody(... {   14.        OpMsgRequest request;   15.        request.body = ([&] {   16.            //填充request.body   17.            ......   18.        }());   19.        return request;   20.    }   21.    //从body中获取db name   22.    StringData getDatabase() const {   23.        if (auto elem = body["$db"])   24.            return elem.checkAndGetStringData();   25.        uasserted(40571, "OP_MSG requests require a $db argument");   26.    }   27.    //find  insert 等命令信息  body中的第一个elem就是command 名   28.    StringData getCommandName() const {   29.        return body.firstElementFieldName();   30.    }   31.};  

       OpMsgRequest通过OpMsg::parse(message)解析出OpMsg信息,从而获取到body内容,GetCommandName()接口和getDatabase()则分别从body中获取库DB信息、命令名信息。通过该类相关接口,命令名(find、write、update等)和DB库都获取到了。

      OpMsg模块除了OP_MSG相关报文解析外,还负责OP_MSG报文组装填充,该模块接口功能大全如下表:

4. Mongod实例服务入口核心代码实现

      Mongod实例服务入口类ServiceEntryPointMongod继承ServiceEntryPointImpl类,mongod实例的报文解析处理、命令解析、命令执行都由该类负责处理。ServiceEntryPointMongod核心接口可以细分为:opCode解析及回调处理、命令解析及查找、命令执行三个子模块。

4.1 opCode解析及回调处理

      OpCode操作码解析及其回调处理由ServiceEntryPointMongod::handleRequest(...)接口实现,核心代码实现如下:

1.//mongod服务对于客户端请求的处理     2.//通过状态机SSM模块的如下接口调用:ServiceStateMachine::_processMessage   3.DbResponse ServiceEntryPointMongod::handleRequest(OperationContext* opCtx, const Message& m) {   4.    //获取opCode,3.6版本对应客户端默认使用OP_MSG   5.    NetworkOp op = m.operation();    6.    ......   7.    //根据message构造DbMessage   8.    DbMessage dbmsg(m);   9.    //根据操作上下文获取对应的client   10.    Client& c = *opCtx->getClient();     11.    ......   12.    //获取库.表信息,注意只有dbUpdate<opCode<dbDelete的opCode请求才通过dbmsg直接获取库和表信息   13.    const char* ns = dbmsg.messageShouldHaveNs() ? dbmsg.getns() : NULL;   14.    const NamespaceString nsString = ns ? NamespaceString(ns) : NamespaceString();   15.    ....   16.    //CurOp::debug 初始化opDebug,慢日志相关记录   17.    OpDebug& debug = currentOp.debug();   18.    //慢日志阀值   19.    long long logThresholdMs = serverGlobalParams.slowMS;   20.    //时mongodb将记录这次慢操作,1为只记录慢操作,即操作时间大于了设置的配置,2表示记录所有操作     21.    bool shouldLogOpDebug = shouldLog(logger::LogSeverity::Debug(1));   22.    DbResponse dbresponse;   23.    if (op == dbMsg || op == dbCommand || (op == dbQuery && isCommand)) {   24.        //新版本op=dbMsg,因此走这里   25.        //从DB获取数据,获取到的数据通过dbresponse返回   26.        dbresponse = runCommands(opCtx, m);      27.    } else if (op == dbQuery) {   28.        ......    29.        //早期mongodb版本查询走这里   30.        dbresponse = receivedQuery(opCtx, nsString, c, m);   31.    } else if (op == dbGetMore) {     32.        //早期mongodb版本查询走这里   33.        dbresponse = receivedGetMore(opCtx, m, currentOp, &shouldLogOpDebug);   34.    } else {   35.        ......   36.        //早期版本增 删 改走这里处理   37.         if (op == dbInsert) {   38.              receivedInsert(opCtx, nsString, m); //插入操作入口   新版本CmdInsert::runImpl   39.         } else if (op == dbUpdate) {   40.              receivedUpdate(opCtx, nsString, m); //更新操作入口     41.         } else if (op == dbDelete) {   42.              receivedDelete(opCtx, nsString, m); //删除操作入口     43.         }    44.    }   45.    //获取runCommands执行时间,也就是内部处理时间   46.    debug.executionTimeMicros = durationCount<Microseconds>(currentOp.elapsedTimeExcludingPauses());   47.    ......   48.    //慢日志记录   49.    if (shouldLogOpDebug || (shouldSample && debug.executionTimeMicros > logThresholdMs * 1000LL)) {   50.        Locker::LockerInfo lockerInfo;     51.        //OperationContext::lockState  LockerImpl<>::getLockerInfo   52.        opCtx->lockState()->getLockerInfo(&lockerInfo);    53.   54.    //OpDebug::report 记录慢日志到日志文件   55.        log() << debug.report(&c, currentOp, lockerInfo.stats);    56.    }   57.    //各种统计信息   58.    recordCurOpMetrics(opCtx);   59.}  

      Mongod的handleRequest()接口主要完成以下工作:

  • 从Message中获取OpCode,早期版本每个命令又对应取值,例如增删改查早期版本分别对应:dbInsert、dbDelete、dbUpdate、dbQuery;Mongodb 3.6开始,默认请求对应OpCode都是OP_MSG,本文默认只分析OpCode=OP_MSG相关的处理。
  • 获取本操作对应的Client客户端信息。
  • 如果是早期版本,通过Message构造DbMessage,同时解析出库.表信息。
  • 根据不同OpCode执行对应回调操作,OP_MSG对应操作为runCommands(...),获取的数据通过dbresponse返回。
  • 获取到db层返回的数据后,进行慢日志判断,如果db层数据访问超过阀值,记录慢日志。
  • 设置debug的各种统计信息。

4.2 命令解析及查找

      从上面的分析可以看出,接口最后调用runCommands(...),该接口核心代码实现如下所示:

1.//message解析出对应command执行   2.DbResponse runCommands(OperationContext* opCtx, const Message& message) {   3.    //获取message对应的ReplyBuilder,3.6默认对应OpMsgReplyBuilder   4.    //应答数据通过该类构造   5.    auto replyBuilder = rpc::makeReplyBuilder(rpc::protocolForMessage(message));   6.    [&] {   7.        OpMsgRequest request;   8.        try {  // Parse.   9.            //协议解析 根据message获取对应OpMsgRequest   10.            request = rpc::opMsgRequestFromAnyProtocol(message);   11.        }    12.    }   13.    try {  // Execute.   14.        //opCtx初始化   15.        curOpCommandSetup(opCtx, request);   16.        //command初始化为Null   17.        Command* c = nullptr;   18.        //OpMsgRequest::getCommandName查找   19.        if (!(c = Command::findCommand(request.getCommandName()))) {    20.             //没有找到相应的command的后续异常处理   21.             ......   22.        }   23.        //执行command命令,获取到的数据通过replyBuilder.get()返回   24.        execCommandDatabase(opCtx, c, request, replyBuilder.get());   25.    }   26.    //OpMsgReplyBuilder::done对数据进行序列化操作   27.    auto response = replyBuilder->done();   28.    //responseLength赋值   29.    CurOp::get(opCtx)->debug().responseLength = response.header().dataLen();   30.    // 返回   31.    return DbResponse{std::move(response)};   32.}  

      RunCommands(...)接口从message中解析出OpMsg信息,然后获取该OpMsg对应的command命令信息,最后执行该命令对应的后续处理操作。主要功能说明如下:

  • 获取该OpCode对应replyBuilder,OP_MSG操作对应builder为OpMsgReplyBuilder。
  • 根据message解析出OpMsgRequest数据,OpMsgRequest来中包含了真正的命令请求bson信息。
  • opCtx初始化操作。
  • 通过request.getCommandName()返回命令信息(如“find”、“update”等字符串)。
  • 通过Command::findCommand(command name)从CommandMap这个map表中查找是否支持该command命令。如果没找到说明不支持,如果找到说明支持。
  • 调用execCommandDatabase(...)执行该命令,并获取命令的执行结果。
  • 根据command执行结果构造response并返回

4.3 命令执行

1.void execCommandDatabase(...) {   2.    ......   3.    //获取dbname   4.    const auto dbname = request.getDatabase().toString();   5.    ......   6.    //mab表存放从bson中解析出的elem信息   7.    StringMap<int> topLevelFields;   8.    //body elem解析   9.    for (auto&& element : request.body) {   10.        //获取bson中的elem信息   11.        StringData fieldName = element.fieldNameStringData();   12.        //如果elem信息重复,则异常处理   13.        ......   14.    }   15.    //如果是help命令,则给出help提示   16.    if (Command::isHelpRequest(helpField)) {   17.        //给出help提示   18.        Command::generateHelpResponse(opCtx, replyBuilder, *command);   19.        return;   20.    }   21.    //权限认证检查,检查该命令执行权限   22.    uassertStatusOK(Command::checkAuthorization(command, opCtx, request));   23.    ......   24.   25.    //该命令执行次数统计  db.serverStatus().metrics.commands可以获取统计信息   26.    command->incrementCommandsExecuted();   27.    //真正的命令执行在这里面   28.    retval = runCommandImpl(opCtx, command, request, replyBuilder, startOperationTime);   29.    //该命令执行失败次数统计   30.    if (!retval) {   31.        command->incrementCommandsFailed();   32.     }   33.     ......   34.}  

      execCommandDatabase(...)最终调用RunCommandImpl(...)进行对应命令的真正处理,该接口核心代码实现如下:

1.bool runCommandImpl(...) {   2.    //获取命令请求内容body   3.    BSONObj cmd = request.body;   4.    //获取请求中的DB库信息   5.    const std::string db = request.getDatabase().toString();   6.    //ReadConcern检查   7.    Status rcStatus = waitForReadConcern(   8.        opCtx, repl::ReadConcernArgs::get(opCtx), command->allowsAfterClusterTime(cmd));   9.    //ReadConcern检查不通过,直接异常提示处理   10.    if (!rcStatus.isOK()) {   11.         //异常处理   12.         return;   13.    }   14.    if (!command->supportsWriteConcern(cmd)) {   15.        //命令不支持WriteConcern,但是对应的请求中却带有WriteConcern配置,直接报错不支持   16.        if (commandSpecifiesWriteConcern(cmd)) {   17.            //异常处理"Command does not support writeConcern"   18.            ......   19.            return result;   20.        }   21.    //调用Command::publicRun执行不同命令操作   22.        result = command->publicRun(opCtx, request, inPlaceReplyBob);   23.    }   24.    //提取WriteConcernOptions信息   25.    auto wcResult = extractWriteConcern(opCtx, cmd, db);   26.    //提取异常,直接异常处理   27.    if (!wcResult.isOK()) {   28.        //异常处理   29.        ......   30.        return result;   31.    }   32.    ......   33.    //执行对应的命令Command::publicRun,执行不同命令操作   34.    result = command->publicRun(opCtx, request, inPlaceReplyBob);   35.    ......   36.}

      RunCommandImpl(...)接口最终调用该接口入参的command,执行 command->publicRun(...)接口,也就是命令模块的公共publicRun

4.4 总结

      Mongod服务入口首先从message中解析出opCode操作码,3.6版本对应客户端默认操作码为OP_MSQ,解析出该操作对应OpMsgRequest信息。然后从message原始数据中解析出command命令字符串后,继续通过全局Map表种查找是否支持该命令操作,如果支持则执行该命令;如果不支持,直接异常打印,同时返回。

5. Mongos实例服务入口核心代码实现

       mongos服务入口核心代码实现过程和mongod服务入口代码实现流程几乎相同,mongos实例message解析、OP_MSG操作码处理、command命令查找等流程和上一章节mongod实例处理过程类似,本章节不在详细分析。Mongos实例服务入口处理调用流程如下:

      ServiceEntryPointMongos::handleRequest(...)->Strategy::clientCommand(...)-->runCommand(...)->execCommandClient(...)

      最后的接口核心代码实现如下:

1.void runCommand(...) {   2.    ......   3.    //获取请求命令name   4.    auto const commandName = request.getCommandName();   5.    //从全局map表中查找   6.    auto const command = Command::findCommand(commandName);   7.    //没有对应的command存在,抛异常说明不支持该命令   8.    if (!command) {    9.        ......   10.        return;   11.    }    12.    ......   13.    //执行命令   14.    execCommandClient(opCtx, command, request, builder);    15.    ......   16.}   17. 18.void execCommandClient(...)   19.{    20.    ......   21.    //认证检查,是否有操作该command命令的权限,没有则异常提示   22.    Status status = Command::checkAuthorization(c, opCtx, request);     23.    if (!status.isOK()) {   24.        Command::appendCommandStatus(result, status);   25.        return;   26.    }   27.    //该命令的执行次数自增,代理上面也是要计数的   28.    c->incrementCommandsExecuted();    29.    //如果需要command统计,则加1   30.    if (c->shouldAffectCommandCounter()) {   31.        globalOpCounters.gotCommand();   32.    }   33.    ......   34.    //有部分命令不支持writeconcern配置,报错   35.    bool supportsWriteConcern = c->supportsWriteConcern(request.body);   36.    //不支持writeconcern又带有该参数的请求,直接异常处理"Command does not support writeConcern"   37.    if (!supportsWriteConcern && !wcResult.getValue().usedDefault) {   38.        ......   39.        return;   40.    }   41.    //执行本命令对应的公共publicRun接口,Command::publicRun   42.    ok = c->publicRun(opCtx, request, result);    43.    ......   44.}  
  1. Tips: mongos和mongod实例服务入口核心代码实现的一点小区别
  • Mongod实例opCode操作码解析、OpMsg解析、command查找及对应命令调用处理都由class ServiceEntryPointMongod{...}类一起完成。
  • mongos实例则把opCode操作码解析交由class ServiceEntryPointMongos{...}类实现,OpMsg解析、command查找及对应命令调用处理放到了clase Strategy{...}类来处理。

6. 总结

        Mongodb报文解析及组装流程总结

  • 一个完整mongodb报文由通用报文header头部+body部分组成。
  • Body部分内容,根据报文头部的opCode来决定不同的body内容。
  • 3.6版本对应客户端请求opCode默认为OP_MSG,该操作码对应body部分由flagBits + sections + checksum组成,其中sections 中存放的是真正的命令请求信息,已bson数据格式保存。
  • Header头部和body报文体封装及解析过程由class Message {...}类实现
  • Body中对应command命令名、库名、表名的解析在mongodb(version<3.6)低版本协议中由class DbMessage {...}类实现
  • Body中对应command命令名、库名、表名的解析在mongodb(version<3.6)低版本协议中由struct OpMsgRequest{...}结构和struct OpMsg {...}类实现

      Mongos和mongod实例的服务入口处理流程大同小异,整体处理流程如下:

  • 从message解析出opCode操作码,根据不同操作码执行对应操作码回调。
  • 根据message解析出OpMsg request信息,mongodb报文的命令信息就存储在该body中,该body已bson格式存储。
  • 从body中解析出command命令字符串信息(如“insert”、“update”等)。
  • 从全局_commands map表中查找是否支持该命令,如果支持则执行该命令处理,如果不支持则直接报错提示。
  • 最终找到对应command命令后,执行command的功能run接口。

      图形化总结如下:

      说明:第3章的协议解析及封装过程实际上应该算是网络处理模块范畴,本文为了分析command命令处理模块方便,把该部分实现归纳到了命令处理模块,这样方便理解。

    Tips: 下期继续分享不同command命令执行细节。

7. 遗留问题

    第1章节中的统计信息,将在command模块核心代码分析完毕后揭晓答案,《mongodb command命令处理模块源码实现二》中继续分析,敬请关注。

原文链接:https://my.oschina.net/u/4087916/blog/4709503
关注公众号

低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。

持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。

转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。

文章评论

共有0条评论来说两句吧...

文章二维码

扫描即可查看该文章

点击排行

推荐阅读

最新文章