【本人秃顶程序员】使用Spring Cloud Stream和RabbitMQ实现事件驱动的微服务
←←←←←←←←←←←← 快!点关注
让我们展示如何使用Spring Cloud Stream来设计事件驱动的微服务。首先,Spring Cloud Stream首先有什么好处?因为Spring AMPQ提供了访问AMPQ工件所需的一切。如果您不熟悉Spring AMPQ,请查看此repo,其中包含许多有用的示例。那么为什么要使用Spring Cloud Stream ......?
Spring Cloud Stream概念
- Spring Cloud Stream通过Binder概念将使用过的消息代理与Spring Integration消息通道绑定在一起。支持RabbitMQ和Kafka。
- Spring Cloud Stream将基础架构配置从代码中分离为属性文件。这意味着即使您更改了底层代理,您的Spring Integration代码也将是相同的!
示例中的Spring Cloud Stream概念(RabbitMQ)
让我们有一个名为streamInput的交换,它有两个队列streamInput.cities和streamInput.persons。现在让我们将这两个队列插入两个消息通道citiesChannel和personsChannel来消费来自它的传入消息。使用Spring AMPQ,您需要创建SimpleMessageListenerContainer并在代码中连接基础结构。但这有很多样板代码。使用Spring Cloud Stream,您可以将AMPQ配置分离到属性文件:
spring.cloud.stream.bindings.citiesChannel.destination=streamInput
spring.cloud.stream.bindings.citiesChannel.group=cities
spring.cloud.stream.rabbit.bindings.citiesChannel.consumer.durableSubscription=true
spring.cloud.stream.rabbit.bindings.citiesChannel.consumer.bindingRoutingKey=cities
spring.cloud.stream.bindings.personsChannel.destination=streamInput
spring.cloud.stream.bindings.personsChannel.group=persons
spring.cloud.stream.rabbit.bindings.personsChannel.consumer.durableSubscription=true
spring.cloud.stream.rabbit.bindings.personsChannel.consumer.bindingRoutingKey=persons
配置详细信息
在类路径上使用RabbitMQ Binder,每个目标都映射到TopicExchange。在示例中,我创建了名为streamInput的TopicExchange, 并将其附加到两个消息通道citiesChannel和personsChannel。
spring.cloud.stream.bindings.citiesChannel.destination = streamInput
spring.cloud.stream.bindings.personsChannel.destination = streamInput
现在您需要了解RabbitMQ绑定器的灵感来自Kafka,队列的消费者被分组到消费者组中,其中只有一个消费者将获得消息。这是有道理的,因为您可以轻松扩展消费者。
因此,让我们创建两个队列streamInput.persons和streamInput.cities并将它们附加到streamInput TopicExchange和提到的消息通道
# This will create queue "streamInput.cities" connected to message channel citiesChannel where input messages will land.
spring.cloud.stream.bindings.citiesChannel.group=cities
# Durable subscription, of course.
spring.cloud.stream.rabbit.bindings.citiesChannel.consumer.durableSubscription=true
# AMPQ binding to exchange (previous spring.cloud.stream.bindings.<channel name>.destination settings).
# Only messages with routingKey = 'cities' will land here.
spring.cloud.stream.rabbit.bindings.citiesChannel.consumer.bindingRoutingKey=cities
spring.cloud.stream.bindings.personsChannel.group=persons
spring.cloud.stream.rabbit.bindings.personsChannel.consumer.durableSubscription=true
spring.cloud.stream.rabbit.bindings.personsChannel.consumer.bindingRoutingKey=persons
连接属性到Spring Integration
好的,到目前为止我创建了两个队列。StreamInput.cities绑定到citiesChannel。StreamInput.persons绑定到peopleChannel。
<destination>.<group>是Spring Cloud Stream约定的队列命名,现在让我们将它连接到Spring Integration:
package com.example.spring.cloud.configuration;
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.messaging.SubscribableChannel;
/**
\* Created by tomask79 on 30.03.17.
*/
public interface SinkRabbitAPI {
String INPUT_CITIES = "citiesChannel";
String INPUT_PERSONS = "personsChannel";
@Input(SinkRabbitAPI.INPUT_CITIES)
SubscribableChannel citiesChannel();
@Input(SinkRabbitAPI.INPUT_PERSONS)
SubscribableChannel personsChannel();
}
Spring Boot启动时加载这个属性
package com.example.spring.cloud;
import com.example.spring.cloud.configuration.SinkRabbitAPI;
import com.example.spring.cloud.configuration.SourceRabbitAPI;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.scheduling.annotation.EnableScheduling;
@SpringBootApplication
@EnableBinding({SinkRabbitAPI.class})
public class StreamingApplication {
public static void main(String\[\] args) {
SpringApplication.run(StreamingApplication.class, args);
}
}
在此之后,我们可以创建消费者从绑定的消息通道中的队列接收消息:
import com.example.spring.cloud.configuration.SinkRabbitAPI;
import com.example.spring.cloud.configuration.SourceRabbitAPI;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.handler.annotation.SendTo;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
/**
\* Created by tomask79 on 30.03.17.
*/
@Service
public class ProcessingAMPQEndpoint {
@StreamListener(SinkRabbitAPI.INPUT_CITIES)
public void processCity(final String city) {
System.out.println("Trying to process input city: "+city);
}
@StreamListener(SinkRabbitAPI.INPUT_PERSONS)
public void processPersons(final String person) {
System.out.println("Trying to process input person: "+person);
}
}
RabbitMQ绑定器和代理配置
Spring Cloud Stream如何知道在哪里寻找消息中间件?如果在类路径中找到RabbitMQ绑定器,则使用默认RabbitMQ主机(localhost)和端口(5672)连接到RabbitMQ服务器。如果您的消息中间件配置在不同端口,则需要配置属性:
spring:
cloud:
stream:
bindings:
...
binders:
rabbitbinder:
type: rabbit
environment:
spring:
rabbitmq:
host: rabbitmq
port: 5672
username: XXX
password: XXX
测试消息消费
- 安装并运行RabbitMQ代理
- rabbitmq.git
- mvn clean install
- java -jar target / streaming-0.0.1-SNAPSHOT.jar
- 现在使用路由键'cities'或'persons'在streamInput Exchange上发布消息...输出应该是:
Started StreamingApplication in 6.513 seconds (JVM running for 6.92)
Trying to process input city: sdjfjljksdflkjsdflkjsdfsfd
Trying to process input person: sdjfjljksdflkjsdflkjsdfsfd
使用Spring Cloud Stream重新传递消息
您通常希望在进入DLX交换之前再次尝试接收消息。首先,让我们配置Spring Cloud Stream尝试重新发送失败消息的次数:
spring.cloud.stream.bindings.personsChannel.consumer.maxAttempts = 6
这意味着如果从streamInput.persons队列接收的消息出错,那么Spring Cloud Stream将尝试重新发送六次。让我们试试,首先让我们修改接收端点以模拟接收崩溃:
@StreamListener(SinkRabbitAPI.INPUT_PERSONS)
public void processPersons(final String person) {
System.out.println("Trying to process input person: "+person);
throw new RuntimeException();
}
如果我现在尝试使用人员路由键将某些内容发布到streamInput交换中,那么这应该是输出:
Trying to process input person: sfsdfsdfsd
Trying to process input person: sfsdfsdfsd
Trying to process input person: sfsdfsdfsd
Trying to process input person: sfsdfsdfsd
Trying to process input person: sfsdfsdfsd
Trying to process input person: sfsdfsdfsd
Retry Policy Exhausted
at org.springframework.amqp.rabbit.retry.RejectAndDontRequeueRecoverer.recover
(RejectAndDontRequeueRecoverer.java:45) ~\[spring-rabbit-1.7.0.RELEASE.jar! /:na\]
at org.springframework.amqp.rabbit.config.StatelessRetryOperationsInterc
建议将Spring Cloud Stream 用于事件驱动的MicroServices,因为它可以节省时间,而且您不需要为Java中的AMPQ基础架构编写样板代码。
写在最后:
秃顶程序员的不易,看到这里,点了关注吧!
点关注,不迷路,持续更新!!!

低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
-
上一篇
SpringBoot入坑指南之二:配置篇
开篇语 很多人都说,Spring Boot最大的作用就是简化配置,摆脱原来Spring的配置地狱。确实,相比Spring原来的配置,Spring Boot简直就是天堂,所以说Spring Boot就是一个又大又深的坑,跳进去了就再也出不来(你也不愿意出来),这也是这个系列文章为什么叫入坑指南的原因。 不过,任何应用都不可能摆脱配置,像数据库相关配置、业务自定义配置等就肯定需要进行配置的。所以,在本文主要讲一下Spring Boot的一些配置方式,主要参考官方文档和自己项目中的一些使用方式。 Spring Boot的配置文件 默认配置文件 Spring Boot默认的配置文件时application.yml(或application.properties),相对于properties文件,yaml格式层级结构清晰易于阅读和管理,故推荐使用yaml格式进行配置。需注意的是,当相同目录下同时存在application.properties和application.yml文件时,会优先读取properties文件中的配置。 application.properties/application....
-
下一篇
java异常处理01
java异常处理高级技巧 什么是Java异常? 当Java程序的正常行为被意外行为中断时,会发生故障。这种故障被称为异常。例如,程序尝试打开文件以读取其内容,但该文件不存在将产生异常。Java将异常分为几种类型,所以让我们考虑每一种类型。 检查异常 Java将(例如FileNotFoundException, IOException)引起的异常分类为已检查的异常。Java编译器会检查这些异常,并且在异常发生的位置要求进行捕获处理或者向上抛出(throws)。需要注意的是检查异常属于编译器的行为,要求你必须在代码中捕获或向上抛出(throws)。 运行时(非检查)异常 例如程序进行强制转换(cast),这种可能存在转换失败的异常就是另一种异常。即运行时异常(RuntimeException)。和检查不同,编译器不会检查你在代码中是否进行处理或抛出。运行时异常通常来自编写的不良代码,因此应由程序员修复。 错误(Error) 指一些非常严重,通常无法进行修正必须要重启程序的异常。例如, 尝试从JVM分配内存,但没有足够的可用内存来满足请求(OutOfMemoryError)。运行时尝试调用加...
相关文章
文章评论
共有0条评论来说两句吧...
文章二维码
点击排行
推荐阅读
最新文章
- SpringBoot2全家桶,快速入门学习开发网站教程
- CentOS8编译安装MySQL8.0.19
- Docker使用Oracle官方镜像安装(12C,18C,19C)
- MySQL8.0.19开启GTID主从同步CentOS8
- Springboot2将连接池hikari替换为druid,体验最强大的数据库连接池
- SpringBoot2初体验,简单认识spring boot2并且搭建基础工程
- Dcoker安装(在线仓库),最新的服务器搭配容器使用
- Docker快速安装Oracle11G,搭建oracle11g学习环境
- MySQL数据库在高并发下的优化方案
- Docker容器配置,解决镜像无法拉取问题