您现在的位置是:首页 > 文章详情

RocketMQ概念详细之Consumer

日期:2018-11-04点击:342

消费者组和订阅

首先你应该关心的是不同的消费者群组可以独立的消费相同的主题,并且每个组都拥有自己的消费偏移量。
请确保相同消费者内的每个消费者订阅一样的主题。

消息监听器

串行
消费者将锁定每个消息队列以确保它被顺序消费。这会引起性能丢失,但当你关心消息顺序的实时这是很有用的。不推荐抛异常,你可以用返回ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT 替代。

并行
正如名字所说,消息者将会并行消费消息。为了好的性能推荐使用。不推荐抛异常,你可以使用返回ConsumeConcurrentlyStatus.RECONSUME_LATER 代替。

消费状态

对于MessageListenerConcurrently,你可以返回 RECONSUME_LATER 来告诉消费者,你现在不能马上消费并且想在稍后重新消费。然后你可以继续消费其他消息。
对于MessageListenerOrderly,因为你关心顺序,你不能跳过消息,但你可以返回SUSPEND_CURRENT_QUEUE_A_MOMENT 来告诉消费者等待一会。

阻塞

不建议阻塞监听器,因为它会阻塞线程池,最后使消费进程停止。

线程数量

消费者内部使用ThreadPoolExecutor来处理消费,因此你通过设置setConsumeThreadMin 或setConsumeThreadMax 可以改变它。

从哪开始消费

当创建一个新的消费者组,需要决定它是否需要消费在broker中已经存在的历史消息。
CONSUME_FROM_LAST_OFFSET 将会忽略历史消息,消费所有的之后产生的消息。
CONSUME_FROM_FIRST_OFFSET 将会消费Broker中存在的每一个消息。
CONSUME_FROM_TIMESTAMP ,消费指定时间戳之后产生的消息。

重复

很多情况下都会引起重复,例如:

  • Producer重发消息(FLUSH_SLAVE_TIMEOUT情况)
  • Consumer 关闭,一些偏移量未及时更新到Broker

如果你的应用不能容忍重复的话,你可能需要做一些额外的工作来处理。例如,你可以检查DB的唯一主键。

原文链接:https://yq.aliyun.com/articles/664618
关注公众号

低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。

持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。

转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。

文章评论

共有0条评论来说两句吧...

文章二维码

扫描即可查看该文章

点击排行

推荐阅读

最新文章