Kafka 集群同步工具 MirrorMarker 的应用
前言
kafka 集群消息同步是一个常见的需求,MirrorMarker 是 kafka 官方仓库提供的用于 kafka 各集群间 topic 消息同步的工具,本文旨在通过测验 MirrorMarker 的使用,搞清楚 MirrorMarker 的实现原理,以及通过各种使用场景模拟,验证 MirrorMarker 的可用性。
相关链接
- kafka :https://github.com/apache/kafka
- cmak(kafka-manager) :https://github.com/yahoo/CMAK
- 本文 kafka-mock-server 源码 : https://github.com/klboke/kafka-mock-server
本文软件版本
- spring-boot:2.4.5
- spring-kafka\spring-kafka-test:2.7.2
- kafka(MirrorMarker):2.5.1
创建 kafka-mock-server 项目
用于搭建测试 kafka 集群,测试 kafka 消息发送、消费,kafka 消息集群同步的项目。有如下模块。
mock-server
mock-server 用来创建 kafka 集群,进入项目的测试模块下,有如下创建 kafka 服务的启动器:
- KafkaMockServer9097:创建一个端口号为 9097 的单 broker 的 kafka 服务,zookeeper 端口号随机
- KafkaMockServer9098:创建一个端口号为 9098 的单 broker 的 kafka 服务,zookeeper 端口号随机
- MultipleBroker001:创建端口号为 9090、9091、9092 的多 broker 的 kafka 服务,zookeeper 端口号 59551
- MultipleBroker002:创建端口号为 9095、9096、9097 的多 broker 的 kafka 服务,zookeeper 端口号 59552
producer
producer 用来发送 kafka 消息,并监听自己发送的消息,消息发送做了计数统计,用于和同步 kafka 集群接收的的消息数据做对比
@SpringBootApplication @RestController public class ProducerMain { private static final Logger logger = LoggerFactory.getLogger(ProducerMain.class); private final KafkaTemplate<Object, Object> template; private static final AtomicInteger integer = new AtomicInteger(); public static void main(String[] args) { SpringApplication.run(ProducerMain.class, args); } public ProducerMain(KafkaTemplate<Object, Object> template) { this.template = template; } @GetMapping("/send") public String send() { String message = "message:" + integer.incrementAndGet(); this.template.send("topic001", message); return message; } @GetMapping("/send-message-by-time") public String sendMessageByTime() throws ExecutionException, InterruptedException { AtomicInteger counter = new AtomicInteger(); LocalDateTime stopTime = LocalDateTime.now().plusMinutes(3); while (LocalDateTime.now().isBefore(stopTime)) { String message = "message:" + counter.incrementAndGet(); //.get() 用于将异步发送变同步,异步发送本机受不了 template.send("topic001", message).get(); } String result = "3 分钟总共发送了 " + counter.get() + " 条消息"; logger.info(result); return result; } @KafkaListener(id = "webGroup", topics = "topic001") public void listen(String input) { logger.info("input value: {}", input); } }
producer 是一个 spring-boot 的 web 应用,默认启动 8089 端口,项目启动成功后,访问
- http://127.0.0.1:8089/send : send() 方法发送单条消息,用于观察验证 topic 是否正常发送消费。
- http://127.0.0.1:8089/send-message-by-time : sendMessageByTime() 方法持续同步发送消息三分钟, 用于验证 mirror-maker 服务从(MirrorMarkerMain001单一消费线程服务) 切换到 MirrorMarkerMain002(多线程消费)的消息准确性,准确性包括 消息是否丢失、消息是否重复
consumer
consumer 用来消费 MirrorMaker 同步到 kafka 集群的消息,消息到的消息也做了计数,用于和发出的消息数据做对比
@SpringBootApplication @RestController public class ConsumerMain { private static final Logger logger = LoggerFactory.getLogger(ConsumerMain.class); private static final AtomicInteger counter = new AtomicInteger(); public static void main(String[] args) { SpringApplication.run(ConsumerMain.class, args); } @KafkaListener(id = "ConsumerMain", topics = "topic001") public void listen(String input) { logger.info("总计:{},{}", counter.incrementAndGet(), input); } }
每做一次消息同步测试,都需要重启 ConsumerMain 服务,用于重置消息计数器,只有当计数器相等时,才认为消息同步成功无丢失无冗余
mirror-maker
mirror-maker 模块用于 kafka 消息集群同步,分别创建单线程的消费实例和多线程消费实例,观察消费者线程重新平衡的过程。
public class MirrorMarkerMain001 { /** * 开启 1 个消费线程的 kafka 集群同步服务 * @param args 命令入参,可用来覆盖默认默认配置 */ public static void main(String[] args) { if (args.length == 0) { args = ConfigProvider.getOneThreadConfig(); } MirrorMaker.main(args); } }
kafka-manager
kafka-manager 是雅虎开源的管理 kafka 集群的项目,kafka-manager 本身也是集群架构,需要依赖 zookeeper。项目拉下来后,先修改 conf/application.conf 中的 zookeeper 配置,然后根目录下执行 ./sbt start
启动项目。默认 web 端口是 9000,启动成功后,访问 http://127.0.0.1:9000
在 web 页面上添加需要管理的 kafka 集群时,只需要配置 kafka 集群的 zookeeper 地址即可。
测试 MirrorMarker 启停,组件工作是否正常
操作步骤如下,使用 kafka-manager 观察:
- 1、分别启动 MultipleBroker001、MultipleBroker002、MirrorMarkerMain001、producer、consumer、kafka-manager 等服务
- 2、触发 producer 的 /send 接口,检查服务是否正常工作
- 3、触发 producer 的 /send-message-by-time 接口,观察消息发送、消费情况,MirrorMarker 同步情况(消息总共会同步发送三分钟,期间完成如下操作)
- 4、启动 MirrorMarkerMain002 服务,观察同步消费组的 Consumer Instance Owner 变化
- 5、停止 MirrorMarkerMain001 服务,观察同步消费组的 Consumer Offset 状态
- 6、测试结束,对比发送的消息总数和同步集群消费到的总数,观察是否 LogSize 和 Consumer Offset 是否一致
步骤三 producer 发送记录图
步骤三 Consumer Instance Owner 情况
步骤四、五、六 Consumer Instance Owner 、Consumer Offset的消息状态
步骤六 Consumer 的消息情况
结果分析:
从测试结果来看,可以得出如下几个结论
1、从单消费线程的 MirrorMarkerMain001 切换到 MirrorMarkerMain001、MirrorMarkerMain002 同时工作,会触发 Consumer group 的消费重平衡,这个从 topic001 的 partition 的 Consumer Instance Owner 就能看出来
2、MirrorMarker 的启停过程,不影响集群同步的结果,因为 producer 和 consumer 的计数器是相等的
3、开启 MirrorMarker 多消费线程,消息的积压明显好于单线程消费
MirrorMarker 实现原理分析
MirrorMarker 的工作原理就是创建一个Consumer group,然后定制 Consumer 、Producer 参数,用来保证消息的准确性(不丢消息)。下面结合上面的测试场景,带着一些边界场景的问题,深入到源码(MirrorMarker 在 kafka 的 tools 包下)窥探下 MirrorMarker 是怎么处理的。
问题一、offset 是如何管理的?是自动提交吗?
def createConsumers(numStreams: Int, consumerConfigProps: Properties, customRebalanceListener: Option[ConsumerRebalanceListener], whitelist: Option[String]): Seq[ConsumerWrapper] = { // Disable consumer auto offsets commit to prevent data loss. maybeSetDefaultProperty(consumerConfigProps, "enable.auto.commit", "false") // Hardcode the deserializer to ByteArrayDeserializer consumerConfigProps.setProperty("key.deserializer", classOf[ByteArrayDeserializer].getName) consumerConfigProps.setProperty("value.deserializer", classOf[ByteArrayDeserializer].getName) // The default client id is group id, we manually set client id to groupId-index to avoid metric collision val groupIdString = consumerConfigProps.getProperty("group.id") val consumers = (0 until numStreams) map { i => consumerConfigProps.setProperty("client.id", groupIdString + "-" + i.toString) new KafkaConsumer[Array[Byte], Array[Byte]](consumerConfigProps) } whitelist.getOrElse(throw new IllegalArgumentException("White list cannot be empty")) consumers.map(consumer => new ConsumerWrapper(consumer, customRebalanceListener, whitelist)) }
MirrorMarker 里是手动管理 offset 的,周知,kafka 客户端的 Consumer 消费记录,即 offset 默认是自动提交的,如上代码,在 MirrorMarker 里禁用了自动提交。因为这个配置来自 consumer.properties 配置文件,也是可以修改的,当手动修改成自动提交时,MirrorMarker 会提示
data loss or message reordering is possible(可能产生消息丢失和重排序)
通过 offset.commit.interval.ms 来控制 offset 提交的频次,这个参数是针对 MirrorMarker 而言的,而不是 consumer.properties ,所以需要使用命令行参数设置 --offset.commit.interval.ms 来使用,单位是毫秒,默认是 60000 。每次 Consumer poll 消息后都会触发这个函数,Consumer poll 的阻塞时间默认是 1 秒,这个参数没法修改,看到这里你可能有疑问,60 s提交一次 offset ,要是提交前服务关了怎么办?这个在问题二、三都有答案。不过,在实际使用时,还是建议调整这个参数,这样看起来,消费的时延会比较准确。
def maybeFlushAndCommitOffsets(): Unit = { if (System.currentTimeMillis() - lastOffsetCommitMs > offsetCommitIntervalMs) { debug("Committing MirrorMaker state.") producer.flush() commitOffsets(consumerWrapper) lastOffsetCommitMs = System.currentTimeMillis() } }
问题二、有新的 Consumer group 实例加入,是如何处理的?
在 kafka 实例中,当有新的 Consumer group 加入时,中央协调器会重新触发 partition 的重新分配,在消费者端,有一个专门的监听器处理这个重平衡事件,MirrorMarker 中,也实现了自己的监听器:
private class InternalRebalanceListener(consumerWrapper: ConsumerWrapper, customRebalanceListener: Option[ConsumerRebalanceListener]) extends ConsumerRebalanceListener { override def onPartitionsLost(partitions: util.Collection[TopicPartition]): Unit = {} override def onPartitionsRevoked(partitions: util.Collection[TopicPartition]): Unit = { producer.flush() commitOffsets(consumerWrapper) customRebalanceListener.foreach(_.onPartitionsRevoked(partitions)) } override def onPartitionsAssigned(partitions: util.Collection[TopicPartition]): Unit = { customRebalanceListener.foreach(_.onPartitionsAssigned(partitions)) } }
当消费线程经过重新平衡后,失去了原先分配的 partition,则会立马提交自己的 offsets,不用等待 offset.commit.interval.ms
问题三、停止 MirrorMarker 的时候做了什么?
finally { CoreUtils.swallow ({ info("Flushing producer.") producer.flush() // note that this commit is skipped if flush() fails which ensures that we don't lose messages info("Committing consumer offsets.") commitOffsets(consumerWrapper) }, this) info("Shutting down consumer connectors.") CoreUtils.swallow(consumerWrapper.wakeup(), this) CoreUtils.swallow(consumerWrapper.close(), this) shutdownLatch.countDown() info("Mirror maker thread stopped") // if it exits accidentally, stop the entire mirror maker if (!isShuttingDown.get()) { fatal("Mirror maker thread exited abnormally, stopping the whole mirror maker.") sys.exit(-1) } }
这段代码是 MirrorMarker 接收到关闭指令时运行的,程序关闭之前也提交了 offset,程序关闭事件是通过
Exit.addShutdownHook("MirrorMakerShutdownHook", cleanShutdown())
注册的,所以,千万别 kill -15 pid 暴力关闭
ps:以上涉及到的用户保证同步服务准确性定制的 Consumer 、Producer 参数,在 MirrorMarker 的类注释上也有直接说明,凡是如下注释列出的参数不要轻易修改,如:
/** * The mirror maker has the following architecture: * - There are N mirror maker thread, each of which is equipped with a separate KafkaConsumer instance. * - All the mirror maker threads share one producer. * - Each mirror maker thread periodically flushes the producer and then commits all offsets. * * @note For mirror maker, the following settings are set by default to make sure there is no data loss: * 1. use producer with following settings * acks=all * delivery.timeout.ms=max integer * max.block.ms=max long * max.in.flight.requests.per.connection=1 * 2. Consumer Settings * enable.auto.commit=false * 3. Mirror Maker Setting: * abort.on.send.failure=true */
结语
经过反复测试,验证了 MirrorMarker 服务还是比较稳定可靠的,多消费实例启停,触发消费重平衡等场景都能很好的处理。有两个参数建议调整下,num.streams ,这个参数控制了消费线程,建议和 partition 的数量保持一致即可,多了会有线程空跑,少了的话,同步的性能会下降,同步时延会增加。offset.commit.interval.ms 调整到 500 ,消费记录会趋向真实情况。另外,用于确保数据不丢失的默认参数,如 enable.auto.commit 、acks 等参数不要修改

低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
- 上一篇
IntelliJ IDEA 到期了?教你如何免费获取正版官方License
前言 要说现在最好用的 IDE 工具,我觉得应该是Jetbrains 系列了,几乎支持所有主流的编程语言,可以说是目前最好用的 IDE 工具,比如 IntelliJ IDEA、PyCharm、WebStorm,这些 IDE 工具其实平时我都有使用,但是我们也知道这些工具都属于商业产品,价格是非常昂贵的,虽然说这些工具给我们带来的便利远远不是金钱能来衡量的,但是毕竟对于我们大部分个人用户来说还是一笔不小的开支。请看价格-> 也许你会选择使用社区版,但是毕竟社区版功能没有专业版多、强。也许你会选择破解或者淘宝,但是我告诉你,这些都是不稳定的,也许哪一天升级就又被打回原形,注意淘宝上买的不大靠谱,个人亲身经历! 那么本文教你如何获取正版jetbrains系列所有产品的license!你也不再需要再花一笔钱去购买!而且升级无忧 申请 要申请获取免费的正版 License,只有一个前提条件,那就是你需要有自己的开源项目且活跃时间超过3个月、3个月、3个月,没有3个月的申请会被打回。对于项目的内容、star 之类的目前并没有什么硬性要求 给项目添加 License 首先在申请之前,我们需要给...
- 下一篇
打通“任督二脉”:Android 应用安装优化实战
疑问: (1)了解APK安装流程有什么好处 (2)了解APK安装流程可以解决什么问题 一、可以在安装流程里做什么 安装就分为下面三个阶段,每个阶段可以做些什么工作,可以帮助我们优化安装流程,解决安装后的一些问题呢? (1)安装前、安装中:这两个阶段,第三方应用做不了什么,一般是应用分发APP应用商店、游戏中心、浏览器、应用宝这些应用会关注这两个状态。 (2)安装后:这个阶段,无论是内置应用还是第三方应用,或多或少的会遇到一些问题,如so文件找不到,图片存储、缓存数据等出现异常等... 二、安装前 安装前无非是根据自己的应用情况,选择一种可以使用的安装方法。 2.1 pm命令安装方法 对于具有系统签名的厂商应用,具备静默安装能力,使用pm命令即可实现。 String cmd = "pm install -r -d /data/data/android.apk" Runtime run = Runtime.getRuntime(); Process process = run.exec(cmd); 2.2 包安装管理器安装 非系统签名的应用宝这种应用,只能使用包安装管理器进行安装。 ...
相关文章
文章评论
共有0条评论来说两句吧...
文章二维码
点击排行
推荐阅读
最新文章
- Springboot2将连接池hikari替换为druid,体验最强大的数据库连接池
- CentOS8安装Docker,最新的服务器搭配容器使用
- MySQL8.0.19开启GTID主从同步CentOS8
- CentOS7安装Docker,走上虚拟化容器引擎之路
- SpringBoot2全家桶,快速入门学习开发网站教程
- CentOS8编译安装MySQL8.0.19
- CentOS7,CentOS8安装Elasticsearch6.8.6
- SpringBoot2配置默认Tomcat设置,开启更多高级功能
- SpringBoot2整合Thymeleaf,官方推荐html解决方案
- Windows10,CentOS7,CentOS8安装Nodejs环境