RocketMQ 源码解析 —— 调试环境搭建
摘要: 原创出处 http://www.iocoder.cn/RocketMQ/build-debugging-environment/ 「芋道源码」欢迎转载,保留摘要,谢谢!
- 0. 友情提示
- 1. 依赖工具
- 2. 源码拉取
- 3. 启动 RocketMQ Namesrv
- 4. 启动 RocketMQ Broker
- 5. 启动 RocketMQ Producer
- 6. 启动 RocketMQ Consumer
- 666. 彩蛋
- 《Dubbo 实现原理与源码解析 —— 精品合集》
- 《Netty 实现原理与源码解析 —— 精品合集》
- 《Spring 实现原理与源码解析 —— 精品合集》
- 《MyBatis 实现原理与源码解析 —— 精品合集》
- 《Spring MVC 实现原理与源码解析 —— 精品合集》
- 《数据库实体设计合集》
- 《Java 面试题 —— 精品合集》
- 《Java 学习指南 —— 精品合集》
0. 友情提示
阅读源码之前,建议胖友对 RocketMQ 的文档已经熟读。目前 RocketMQ 4 的中文文档很少,所以英文不太好的胖友,推荐看看如下资料:
- 《RocketMQ 用户指南》 基于 RocketMQ 3 的版本。
- 《RocketMQ 原理简介》 基于 RocketMQ 3 的版本。
- 《RocketMQ 最佳实践》 基于 RocketMQ 3 的版本。
- 《阿里云 —— 消息队列 MQ》 阿里云的消息队列,就是 RocketMQ 的云服务。
1. 依赖工具
- JDK :1.8+
- Maven
- IntelliJ IDEA
2. 源码拉取
从官方仓库 [https://github.com/apache/rocketmq) Fork
出属于自己的仓库。为什么要 Fork
?既然开始阅读、调试源码,我们可能会写一些注释,有了自己的仓库,可以进行自由的提交。😈
使用 IntelliJ IDEA
从 Fork
出来的仓库拉取代码。拉取完成后,Maven
会下载依赖包,可能会花费一些时间,耐心等待下。
在等待的过程中,我来简单说下,搭建调试环境的过程:
- 启动 RocketMQ Namesrv
- 启动 RocketMQ Broker
- 启动 RocketMQ Producer
- 启动 RocketMQ Consumer
最小化的 RocketMQ 的环境,暂时不考虑 Namesrv 集群、Broker 集群、Consumer 集群。
😈 另外,本文使用的 RocketMQ 版本是 4.4.0-SNAPSHOT
。
3. 启动 RocketMQ Namesrv
打开 org.apache.rocketmq.namesrv.NameServerInstanceTest
单元测试类,参考 #startup()
方法,我们编写 #main(String[] args)
静态方法,代码如下:
// NameServerInstanceTest.java public static void main(String[] args) throws Exception { // NamesrvConfig 配置 final NamesrvConfig namesrvConfig = new NamesrvConfig(); // NettyServerConfig 配置 final NettyServerConfig nettyServerConfig = new NettyServerConfig(); nettyServerConfig.setListenPort(9876); // 设置端口 // 创建 NamesrvController 对象,并启动 NamesrvController namesrvController = new NamesrvController(namesrvConfig, nettyServerConfig); namesrvController.initialize(); namesrvController.start(); // 睡觉,就不起来 Thread.sleep(DateUtils.MILLIS_PER_DAY); }
然后,右键运行,RocketMQ Namesrv 就启动完成。输出日志如下:
17:54:03.354 [NettyEventExecutor] INFO RocketmqRemoting - NettyEventExecutor service started 17:54:03.355 [FileWatchService] INFO RocketmqCommon - FileWatchService service started
最后,这是一个可选的步骤,命令行中输入 telnet 127.0.0.1 9876
,看看是否能连接上 RocketMQ Namesrv 。
4. 启动 RocketMQ Broker
打开 org.apache.rocketmq.broker.BrokerControllerTest
单元测试类,参考 #testBrokerRestart()
方法,我们编写 #main(String[] args)
方法,代码如下:
// BrokerControllerTest.java public static void main(String[] args) throws Exception { // 设置版本号 System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION)); // NettyServerConfig 配置 final NettyServerConfig nettyServerConfig = new NettyServerConfig(); nettyServerConfig.setListenPort(10911); // BrokerConfig 配置 final BrokerConfig brokerConfig = new BrokerConfig(); brokerConfig.setBrokerName("broker-a"); brokerConfig.setNamesrvAddr("127.0.0.1:9876"); // MessageStoreConfig 配置 final MessageStoreConfig messageStoreConfig = new MessageStoreConfig(); messageStoreConfig.setDeleteWhen("04"); messageStoreConfig.setFileReservedTime(48); messageStoreConfig.setFlushDiskType(FlushDiskType.ASYNC_FLUSH); messageStoreConfig.setDuplicationEnable(false); // BrokerPathConfigHelper.setBrokerConfigPath("/Users/yunai/百度云同步盘/开发/Javascript/Story/incubator-rocketmq/conf/broker.conf"); // 创建 BrokerController 对象,并启动 BrokerController brokerController = new BrokerController(// brokerConfig, // nettyServerConfig, // new NettyClientConfig(), // messageStoreConfig); brokerController.initialize(); brokerController.start(); // 睡觉,就不起来 System.out.println("你猜"); Thread.sleep(DateUtils.MILLIS_PER_DAY); }
然后,右键运行,RocketMQ Broker 就启动完成了。输出日志如下:
你猜
-
不要懵逼,我们打开下 RocketMQ Namesrv 那,已经输出日志如下:
18:17:30.443 [NettyServerCodecThread_5] INFO RocketmqRemoting - NETTY SERVER PIPELINE: channelRegistered 127.0.0.1:63847 18:17:30.443 [NettyServerCodecThread_5] INFO RocketmqRemoting - NETTY SERVER PIPELINE: channelActive, the channel[127.0.0.1:63847] 18:17:30.457 [RemotingExecutorThread_4] DEBUG RocketmqNamesrv - receive request, 103 127.0.0.1:63847 RemotingCommand [code=103, language=JAVA, version=275, opaque=0, flag(B)=0, remark=null, extFields={brokerId=0, bodyCrc32=1880081823, clusterName=DefaultCluster, brokerAddr=192.168.3.26:10911, haServerAddr=192.168.3.26:10912, compressed=false, brokerName=broker-a}, serializeTypeCurrentRPC=JSON] 18:17:30.458 [RemotingExecutorThread_4] INFO RocketmqNamesrv - new topic registered, BenchmarkTest QueueData [brokerName=broker-a, readQueueNums=1024, writeQueueNums=1024, perm=6, topicSynFlag=0] 18:17:30.458 [RemotingExecutorThread_4] INFO RocketmqNamesrv - new topic registered, OFFSET_MOVED_EVENT QueueData [brokerName=broker-a, readQueueNums=1, writeQueueNums=1, perm=6, topicSynFlag=0] 18:17:30.458 [RemotingExecutorThread_4] INFO RocketmqNamesrv - new topic registered, broker-a QueueData [brokerName=broker-a, readQueueNums=1, writeQueueNums=1, perm=7, topicSynFlag=0] 18:17:30.458 [RemotingExecutorThread_4] INFO RocketmqNamesrv - new topic registered, TBW102 QueueData [brokerName=broker-a, readQueueNums=8, writeQueueNums=8, perm=7, topicSynFlag=0] 18:17:30.458 [RemotingExecutorThread_4] INFO RocketmqNamesrv - new topic registered, SELF_TEST_TOPIC QueueData [brokerName=broker-a, readQueueNums=1, writeQueueNums=1, perm=6, topicSynFlag=0] 18:17:30.458 [RemotingExecutorThread_4] INFO RocketmqNamesrv - new topic registered, DefaultCluster QueueData [brokerName=broker-a, readQueueNums=16, writeQueueNums=16, perm=7, topicSynFlag=0] 18:17:30.458 [RemotingExecutorThread_4] INFO RocketmqNamesrv - new broker registered, 192.168.3.26:10911 HAServer: 192.168.3.26:10912
- 妥妥的,原来 RocketMQ Broker 已经启动完成,并且注册到 RocketMQ Namesrv 上。
最后,这是一个可选的步骤,命令行中输入 telnet 127.0.0.1 10911
,看看是否能连接上 RocketMQ Broker 。
5. 启动 RocketMQ Producer
打开 org.apache.rocketmq.example.quickstart.Producer
示例类,代码如下:
// Producer.java /** * This class demonstrates how to send messages to brokers using provided {@link DefaultMQProducer}. */ public class Producer { public static void main(String[] args) throws MQClientException, InterruptedException { /* * Instantiate with a producer group name. */ DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name"); /* * Specify name server addresses. * <p/> * * Alternatively, you may specify name server addresses via exporting environmental variable: NAMESRV_ADDR * <pre> * {@code * producer.setNamesrvAddr("name-server1-ip:9876;name-server2-ip:9876"); * } * </pre> */ /* * Launch the instance. */ producer.setNamesrvAddr("127.0.0.1:9876"); // <x> 哈哈哈哈 producer.start(); for (int i = 0; i < 1000; i++) { try { /* * Create a message instance, specifying topic, tag and message body. */ Message msg = new Message("TopicTest" /* Topic */, "TagA" /* Tag */, ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */ ); /* * Call send message to deliver message to one of brokers. */ SendResult sendResult = producer.send(msg); System.out.printf("%s%n", sendResult); } catch (Exception e) { e.printStackTrace(); Thread.sleep(1000); } } /* * Shut down once the producer instance is not longer in use. */ producer.shutdown(); } }
- 注意,在
<x>
哈哈哈哈处,我们增加了producer.setNamesrvAddr("127.0.0.1:9876")
代码块,指明 Producer 使用的 RocketMQ Namesrv 。 - 😈 可能会有胖友会问,为什么指定的不是 RocketMQ Broker 呢?请退回到 「0. 友情提示」 。
然后,右键运行,RocketMQ Producer 就启动完成。输出日志如下:
18:22:13.507 [main] DEBUG i.n.u.i.l.InternalLoggerFactory - Using SLF4J as the default logging framework SendResult [sendStatus=SEND_OK, msgId=C0A8031AE91718B4AAC27A6364050000, offsetMsgId=C0A8031A00002A9F0000000000000000, messageQueue=MessageQueue [topic=TopicTest, brokerName=broker-a, queueId=1], queueOffset=0] // ... 中间省略 N 条 ... SendResult [sendStatus=SEND_OK, msgId=C0A8031AE91718B4AAC27A6369F603E6, offsetMsgId=C0A8031A00002A9F000000000002BD4A, messageQueue=MessageQueue [topic=TopicTest, brokerName=broker-a, queueId=3], queueOffset=249] SendResult [sendStatus=SEND_OK, msgId=C0A8031AE91718B4AAC27A6369F703E7, offsetMsgId=C0A8031A00002A9F000000000002BDFE, messageQueue=MessageQueue [topic=TopicTest, brokerName=broker-a, queueId=0], queueOffset=249] 18:22:15.558 [NettyClientSelector_1] INFO RocketmqRemoting - closeChannel: close the connection to remote address[192.168.3.26:10911] result: true 18:22:15.559 [NettyClientSelector_1] INFO RocketmqRemoting - closeChannel: close the connection to remote address[127.0.0.1:9876] result: true 18:22:15.560 [NettyClientSelector_1] INFO RocketmqRemoting - closeChannel: close the connection to remote address[192.168.3.26:10909] result: true
没有最后。
6. 启动 RocketMQ Consumer
打开 org.apache.rocketmq.example.quickstart.Consumer
示例类,代码如下:
// Consumer.java public class Consumer { public static void main(String[] args) throws InterruptedException, MQClientException { /* * Instantiate with specified consumer group name. */ DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4"); /* * Specify name server addresses. * <p/> * * Alternatively, you may specify name server addresses via exporting environmental variable: NAMESRV_ADDR * <pre> * {@code * consumer.setNamesrvAddr("name-server1-ip:9876;name-server2-ip:9876"); * } * </pre> */ /* * Specify where to start in case the specified consumer group is a brand new one. */ consumer.setNamesrvAddr("127.0.0.1:9876"); // <x> 哈哈哈哈 consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); /* * Subscribe one more more topics to consume. */ consumer.subscribe("TopicTest", "*"); /* * Register callback to execute on arrival of messages fetched from brokers. */ consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); /* * Launch the consumer instance. */ consumer.start(); System.out.printf("Consumer Started.%n"); } }
- 注意,在
<x>
哈哈哈哈处,我们还增加了consumer.setNamesrvAddr("127.0.0.1:9876")
代码块,指明 Consumer 使用的 RocketMQ Namesrv 。 - 😈 再来一道送命题,为什么指定的不是 RocketMQ Broker 呢?
然后,右键运行,RocketMQ Consumer 就启动完成。输入日志如下:
18:37:12.196 [main] DEBUG i.n.u.i.l.InternalLoggerFactory - Using SLF4J as the default logging framework Consumer Started. ConsumeMessageThread_2 Receive New Messages: [MessageExt [queueId=3, storeSize=178, queueOffset=0, sysFlag=0, bornTimestamp=1543054934061, bornHost=/192.168.3.26:64103, storeTimestamp=1543054934065, storeHost=/192.168.3.26:10911, msgId=C0A8031A00002A9F0000000000000164, commitLogOffset=356, bodyCRC=1250039395, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=250, CONSUME_START_TIME=1543055832771, UNIQ_KEY=C0A8031AE91718B4AAC27A63642D0002, WAIT=true, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 50], transactionId='null'}]] ConsumeMessageThread_16 Receive New Messages: [MessageExt [queueId=2, storeSize=179, queueOffset=4, sysFlag=0, bornTimestamp=1543054934102, bornHost=/192.168.3.26:64103, storeTimestamp=1543054934103, storeHost=/192.168.3.26:10911, msgId=C0A8031A00002A9F0000000000000BD9, commitLogOffset=3033, bodyCRC=367242165, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=250, CONSUME_START_TIME=1543055832779, UNIQ_KEY=C0A8031AE91718B4AAC27A6364560011, WAIT=true, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 49, 55], transactionId='null'}]] // ... 中间省略 N 条 ... CONSUME_START_TIME=1543055832779, UNIQ_KEY=C0A8031AE91718B4AAC27A636450000F, WAIT=true, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 49, 53], transactionId='null'}]]
没有最后。
666. 彩蛋
😈 一直想写这篇,一直忘记掉。
妥妥的,徐妈是最胖的。
还是那句话,一定一定一定要看 「0. 友情提示」 提供的文档。先懂原理,才能更好的读懂源码。
源码,是原理的具象化
原理,是代码的抽象化
低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
- 上一篇
MySQL插入性能优化
MySQL插入性能优化 标签: 博客 [TOC] 可以从如下几个方面优化MySQL的插入性能。 代码优化 values 多个 即拼接成一个insert values sql, 例如 INSERT INTO MyTable ( Column1, Column2, Column3 ) VALUES ('John', 123, 'Lloyds Office'), ('Jane', 124, 'Lloyds Office'), ('Billy', 125, 'London Office'), ('Miranda', 126, 'Bristol Office'); 一个事务 开启一个事务,批量操作完了才提交事务,而不是,操作一次就提交一次,这样io太高,插入太慢。 插入字段尽量少,尽量用默认值 注意事项: max_allowed_packet 默认是1M,如何insert values sql 太大需要上调这个值 关闭 unique_checks 优化效果不是很明显,下面截图 选自 《MySQL 数据库开发、优化与管理维护 第2版》书籍 bulk_insert_buffer_size 这个参数只...
- 下一篇
Linux(CentOS7)系统中部署Django web框架
1. 概述 github项目地址:https://github.com/superwujc 尊重原创,欢迎转载,注明出处:https://my.oschina.net/superwjc/blog/3003027 Django服务框架在逻辑上可以分为web层与数据库层:web前端通过实现了WSGI协议的模块对python代码进行解析,而python代码中则通过特定于数据库的操作接口对数据库进行读取与写入。 Django自身内置了轻量级的web服务器与sqlite数据库,可以用于简单的代码测试,并支持Apache httpd与Nginx作为web前端,以及PostgreSQL/MySQL/Oracle等数据库作为后端存储,用于实际的生产环境。 本文分别以MySQL + Apache httpd + mod_wsgi与MySQL + Nginx + uwsgi为例,通过源码安装的方式,简单描述Django服务框架在Linux系统生产环境下的部署过程。 2. 说明 1.示例中包含两台服务器,操作系统版本均为CentOS 7.6.1810,最小化全新安装,无其他项目运行。 django-web(...
相关文章
文章评论
共有0条评论来说两句吧...
文章二维码
点击排行
推荐阅读
最新文章
- SpringBoot2初体验,简单认识spring boot2并且搭建基础工程
- CentOS7,8上快速安装Gitea,搭建Git服务器
- Linux系统CentOS6、CentOS7手动修改IP地址
- SpringBoot2配置默认Tomcat设置,开启更多高级功能
- MySQL8.0.19开启GTID主从同步CentOS8
- Red5直播服务器,属于Java语言的直播服务器
- SpringBoot2更换Tomcat为Jetty,小型站点的福音
- SpringBoot2整合Redis,开启缓存,提高访问速度
- SpringBoot2整合Thymeleaf,官方推荐html解决方案
- CentOS7,CentOS8安装Elasticsearch6.8.6