Kafka C++客户端库librdkafka笔记
目录 目录 1 1.前言 2 2.缩略语 2 3.配置和主题 3 3.1.配置和主题结构 3 3.1.1.Conf 3 3.1.2.ConfImpl 3 3.1.3.Topic 3 3.1.4.TopicImpl 3 4.线程 4 5.消费者 5 5.1.消费者结构 5 5.1.1.Handle 5 5.1.2.HandleImpl 5 5.1.3.ConsumeCb 6 5.1.4.EventCb 6 5.1.5.Consumer 7 5.1.6.KafkaConsumer 7 5.1.7.KafkaConsumerImpl 7 5.1.8.rd_kafka_message_t 7 5.1.9.rd_kafka_msg_s 7 5.1.10.rd_kafka_msgq_t 8 5.1.11.rd_kafka_toppar_t 8 6.生产者 10 6.1.生产者结构 10 6.1.1.DeliveryReportCb 11 6.1.2.PartitionerCb 11 6.1.3.Producer 11 6.1.4.ProduceImpl 11 6.2.生产者启动过程1 11 6.3.生产者启动过程2 12 6.4.生产者生产过程 14 7.poll过程 15 1.前言 librdkafka提供的异步的生产接口,异步的消费接口和同步的消息接口,没有同步的生产接口。 2.缩略语 缩略语 缩略语全称 示例或说明 rd RapidDevelopment rd.h rk RdKafka toppar TopicPartition structrd_kafka_toppar_t { }; rep Reply, structrd_kafka_t{ rd_kafka_q_t*rk_rep }; msgq MessageQueue structrd_kafka_msgq_t{ }; rkb RdKafkaBroker Kafka代理 rko RdKafkaOperation Kafka操作 rkm RdKafkaMessage Kafka消息 payload 存在Kafka上的消息(或叫Log) 3.配置和主题 3.1.配置和主题结构 3.1.1.Conf 配置接口,配置分两种:全局的和主题的。 3.1.2.ConfImpl 配置的实现。 3.1.3.Topic 主题接口。 3.1.4.TopicImpl 主题的实现。 4.线程 RdKafka编程涉及到三类线程: 1)应用线程,业务代码的实现 2)KafkaBroker线程rd_kafka_broker_thread_main,负责与Broker通讯,多个 3)KafkaHandler线程rd_kafka_thread_main,每创建一个consumer或producer即会创建一个Handler线程。 5.消费者 5.1.消费者结构 5.1.1.Handle 定义了poll等接口,它的实现者为HandleImpl。 5.1.2.HandleImpl 实现了消费者和生产者均使用的poll等,其中poll的作用为: 1)为生产者回调消息发送结果; 2)为生产者和消费者回调事件。 classHandle{ /** *@briefPollstheprovidedkafkahandleforevents. * *Eventswilltriggerapplicationprovidedcallbackstobecalled. * *The\ptimeout_msargumentspecifiesthemaximumamountoftime *(inmilliseconds)thatthecallwillblockwaitingforevents. *Fornon-blockingcalls,provide0as\ptimeout_ms. *Towaitindefinatelyforevents,provide-1. * *Events: *-deliveryreportcallbacks(ifanRdKafka::DeliveryCbisconfigured)[producer] *-eventcallbacks(ifanRdKafka::EventCbisconfigured)[producer&consumer] * *@remarkAnapplicationshouldmakesuretocallpoll()atregular *intervalstoserveanyqueuedcallbackswaitingtobecalled. * *@warningThismethodMUSTNOTbeusedwiththeRdKafka::KafkaConsumer, *useitsRdKafka::KafkaConsumer::consume()instead. * *@returnsthenumberofeventsserved. */ virtualintpoll(inttimeout_ms)=0; }; 5.1.3.ConsumeCb 只针对消费者的Callback。 5.1.4.RebalanceCb 只针对消费者的Callback。 5.1.5.EventCb 消费者和生产者均可设置EventCb,如:_global_conf->set("event_cb",&_event_cb,errmsg);。 /** *@briefEventcallbackclass * *Eventsareagenericinterfaceforpropagatingerrors,statistics,logs,etc *fromlibrdkafkatotheapplication. * *@saRdKafka::Event */ classRD_EXPORTEventCb{ public: /** *@briefEventcallback * *@saRdKafka::Event */ virtualvoidevent_cb(Event&event)=0; virtual~EventCb(){} }; /** *@briefEventobjectclassaspassedtotheEventCbcallback. */ classRD_EXPORTEvent{ public: /**@briefEventtype*/ enumType{ EVENT_ERROR,/**<Eventisanerrorcondition*/ EVENT_STATS,/**<EventisastatisticsJSONdocument*/ EVENT_LOG,/**<Eventisalogmessage*/ EVENT_THROTTLE/**<Eventisathrottlelevelsignalingfromthebroker*/ }; }; 5.1.6.Consumer 简单消息者,一般不使用,而是使用KafkaConsumer。 5.1.7.KafkaConsumer 消费者和生产者均采用多重继承方式,其中KafkaConsumer为消费者接口,KafkaConsumerImpl为消费者实现。 5.1.8.KafkaConsumerImpl KafkaConsumerImpl为消费者实现。 5.1.9.rd_kafka_message_t 消息结构。 5.1.10.rd_kafka_msg_s 消息结构,但消息数据实际存储在rd_kafka_message_t,结构大致如下: structrd_kafka_msg_s { rd_kafka_message_trkm_rkmessage; struct { rd_kafka_msg_s*tqe_next; rd_kafka_msg_s**tqe_prev; int64_trkm_timestamp; rd_kafka_timestamp_type_trkm_tstype; }rkm_link; }; 5.1.11.rd_kafka_msgq_t 存储消息的消息队列,生产者生产的消息并不直接socket发送到brokers,而是放入了这个队列,结构大致如下: structrd_kafka_msgq_t { struct { rd_kafka_msg_s*tqh_first;//队首 rd_kafka_msg_s*tqh_last;//队尾 }; //消息个数 rd_atomic32_trkmq_msg_cnt; //所有消息加起来的字节数 rd_atomic64_trkmq_msg_bytes; }; 5.1.12.rd_kafka_toppar_t Topic-Partition队列,很复杂的一个结构,部分内容如下: //Topic+Partitioncombination typedefstructrd_kafka_toppar_s { struct { rd_kafka_toppar_s*tqe_next; rd_kafka_toppar_s**tqe_prev; }rktp_rklink; struct { rd_kafka_toppar_s*tqe_next; rd_kafka_toppar_s**tqe_prev; }rktp_rkblink; struct { rd_kafka_toppar_s*cqe_next; rd_kafka_toppar_s*cqe_prev; }rktp_fetchlink; struct { rd_kafka_toppar_s*tqe_next; rd_kafka_toppar_s**tqe_prev; }rktp_rktlink; struct { rd_kafka_toppar_s*tqe_next; rd_kafka_toppar_s**tqe_prev; }rktp_cgrplink; rd_kafka_itopic_t*rktp_rkt; int32_trktp_partition; int32_trktp_leader_id; rd_kafka_broker_t*rktp_leader; rd_kafka_broker_t*rktp_next_leader; rd_refcnt_trktp_refcnt; rd_kafka_msgq_trktp_msgq;//application->rdkafkaqueue }rd_kafka_toppar_t; 6.生产者 6.1.生产者结构 6.1.1.DeliveryReportCb 消息已经成功递送到Broker时回调,只针对生产者有效。 6.1.2.PartitionerCb 计算分区号回调函数,只针对生产者有效。 6.1.3.Producer Producer为生产者接口,它的实现者为ProducerImpl。 6.1.4.ProduceImpl ProducerImpl为生产者的实现。 6.2.生产者启动过程1 启动时会创建两组线程:一组Broker线程(rd_kafka_broker_thread_main,多个),实为与Broker间的网络IO线程;一组Handler线程(rd_kafka_thread_main,单个),每调用一次RdKafka::Producer::create或rd_kafka_new即创建一Handler线程。 Handler线程调用栈: (gdb)t17 [Switchingtothread17(Thread0x7ff7059d3700(LWP16765))] #00x00007ff7091e6cf2inpthread_cond_timedwait@@GLIBC_2.3.2()from/lib64/libpthread.so.0 (gdb)bt #00x00007ff7091e6cf2inpthread_cond_timedwait@@GLIBC_2.3.2()from/lib64/libpthread.so.0 #10x00000000005b4d2fincnd_timedwait_ms(cnd=0x1517748,mtx=0x1517720,timeout_ms=898)attinycthread.c:501 #20x0000000000580e16inrd_kafka_q_serve(rkq=0x1517720,timeout_ms=898,max_cnt=0,cb_type=RD_KAFKA_Q_CB_CALLBACK,callback=0x0,opaque=0x0)atrdkafka_queue.c:440 #30x000000000054ee9binrd_kafka_thread_main(arg=0x1516df0)atrdkafka.c:1227 #40x00000000005b4e0fin_thrd_wrapper_function(aArg=0x15179d0)attinycthread.c:624 #50x00007ff7091e2e25instart_thread()from/lib64/libpthread.so.0 #60x00007ff7082d135dinclone()from/lib64/libc.so.6 6.3.生产者启动过程2 创建网络IO线程,消费者启动过程类似,只是一个调用rd_kafka_broker_producer_serve(rkb),另一个调用rd_kafka_broker_consumer_serve(rkb)。 IO线程负责消息的收和发,发送底层调用的是sendmsg,收调用的是recvmsg(但MSVC平台调用send和recv)。 6.4.生产者生产过程 生产者生产的消息并不直接socket发送到brokers,而是放入队列rd_kafka_msgq_t中。Broker线程(rd_kafka_broker_thread_main)消费这个队列。 Broker线程同时监控与Broker间的网络连接,又要监控队列中是否有数据,如何实现的?这个队列和管道绑定在一起的,绑定的是管道写端(rktp->rktp_msgq_wakeup_fd=rkb->rkb_toppar_wakeup_fd;rkb->rkb_toppar_wakeup_fd=rkb->rkb_wakeup_fd[1])。 这样Broker线程即可同时监听网络数据和管道数据。 //intrd_kafka_msg_partitioner(rd_kafka_itopic_t*rkt,rd_kafka_msg_t*rkm,intdo_lock) (gdb)p*rkm $7={rkm_rkmessage={err=RD_KAFKA_RESP_ERR_NO_ERROR,rkt=0x1590c10,partition=1,payload=0x7f48c4001260,len=203,key=0x7f48c400132b,key_len=14,offset=0, _private=0x0},rkm_link={tqe_next=0x5b5d47554245445b,tqe_prev=0x6361667265746e69},rkm_flags=196610,rkm_timestamp=1524829399009, rkm_tstype=RD_KAFKA_TIMESTAMP_CREATE_TIME,rkm_u={producer={ts_timeout=16074575505526,ts_enq=16074275505526}}} (gdb)prkm->rkm_rkmessage $8={err=RD_KAFKA_RESP_ERR_NO_ERROR,rkt=0x1590c10,partition=1,payload=0x7f48c4001260,len=203,key=0x7f48c400132b,key_len=14,offset=0,_private=0x0} (gdb)prkm->rkm_rkmessage->payload $9=(void*)0x7f48c4001260 (gdb)p(char*)rkm->rkm_rkmessage->payload $10=0x7f48c4001260"{\"p\":\"f\",\"o\":1,\"d\":\"m\",\"d\":\"m\",\"i\":\"f2\",\"ip\":\"127.0.0.1\",\"pt\":2018,\"sc\":0,\"fc\":1,\"tc\":0,\"acc\":395,\"mcc\":395,\"cd\":\"test\",\"cmd\":\"tester\",\"cf\":\"main\",\"cp\":\"1.49.16.9"... 7.poll过程 poll的作用是触发回调,生产者即使不调用poll,消息也会发送出去,但是如果不通过poll触发回调,则不能确定消息发送状态(成功或失败等)。 消费队列rd_kafka_t->rk_rep,rk_rep为响应队列,类型为rd_kafka_q_t或rd_kafka_q_s: