您现在的位置是:首页 > 文章详情

spring-cloud-stream整合kafka

日期:2019-06-17点击:581

1.在项目的pom中引入

<dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-stream-binder-kafka</artifactId> </dependency>

2.配置消息通道

public interface Demo { /** * 发消息的通道名称 */ String DEMO_OUTPUT = "demo_output"; /** * 消息的订阅通道名称 */ String DEMO_INPUT = "demo_input"; /** * 发消息的通道 * * @return */ @Output(DEMO_OUTPUT) MessageChannel sendDemoMessage(); /** * 收消息的通道 * * @return */ @Input(DEMO_INPUT) SubscribableChannel recieveDemoMessage(); } 
  1. 使带注释组件的结合Input和Output根据作为值给注释传递接口的列表到代理
@EnableBinding(value = {Demo.class})

4.链接kafka配置

spring.cloud.stream.bindings.demo_input.destination=demo spring.cloud.stream.bindings.demo_input.group=demo spring.cloud.stream.bindings.demo_output.destination=demo spring.cloud.stream.bindings.demo_output.group=demo spring.cloud.stream.default-binder=kafka spring.kafka.bootstrap-servers=127.0.0.1:9092 spring.kafka.consumer.enable-auto-commit=true spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.ByteArraySerializer spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.ByteArraySerializer

5.发送消息

@Resource(name = Demo.DEMO_OUTPUT) private MessageChannel sendDemoMessageChannel; @Test public void Demo() { boolean isSendSuccess = sendDemoMessageChannel. send(MessageBuilder.withPayload("OK").build()); System.out.println(isSendSuccess); }

6.接收消息

 @StreamListener(Demo. DEMO_INPUT) public void insertQuotationK(Message<String> message) { if (StringUtils.isEmpty(message.getPayload())) { System.out.println("receiver data is empty !"); System.out.println(400 + "failed"); } System.out.println("kafka收到"+message.getPayload()); }

7.结束咯,如果出现异常,请留言。

原文链接:https://yq.aliyun.com/articles/705793
关注公众号

低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。

持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。

转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。

文章评论

共有0条评论来说两句吧...

文章二维码

扫描即可查看该文章

点击排行

推荐阅读

最新文章