首页 文章 精选 留言 我的

精选列表

搜索[SpringBoot],共4254篇文章
优秀的个人博客,低调大师

springboot + aop的最佳实践,再也不怕双击666

唠唠叨叨 复工不到一个月疯狂的加班中,最近没什么时间写博客更文有点慢,好在做的项目中不全是增删改的业务逻辑,还是有一些比较有意思实用的技术点,所以整理出来分享给大家。和那些搞高端技术的大佬比不了,咱就是个还在一线搬砖的码农。 有没有遇到过这种情况:由于网速等原因,网页响应很慢,提交一次表单后发现服务久久没响应,然后你就疯狂点击提交按钮(12306就经常被这样怒怼),如果做过防重复提交还好,否则那是什么级别的灾难就不好说了。。。 今天主要是用 自定义注解、 AOP、· Guava 包中Cache来生成一种本地锁,来达到的防重复提交效果,整体的实现比较简单,没有什么太大的难度,代码也是比较少,,由于是基于内存的缓存,因此这种实现方式并不适用于分布式服务。旨在给大家介绍一种实现防重复提交的方案,要是有什么说的不对的地方大家温柔一点撕,毕竟人家还是个20出头30郎当岁的孩子。 Guava是什么? guava包是个啥?做过Java的小伙伴应该多少都有所了解,它是google “嫌弃” JAVA自带的类库不好用,自行研发的一套工具包,对JDK工具做了很好的拓展。例如:并发[Concurrency]、缓存[Caches]、 函数式风格[Functional idioms]、 字符串处理[Strings]等。 总之一句话guava包很好用多去了解下,能不造的轮子咱们就尽量不去造,因为咱们自己造的轮子可能不太圆。不多说废话了,来看看具体的实现。 代码撸起来 1、引入Guava包依赖 第一步引入Guava的依赖包没什么好说的 <dependency> <groupId>com.google.guava</groupId> <artifactId>guava</artifactId> <version>21.0</version> </dependency> 2、自定义注解 服务端实现防止重复提交,一般都是利用AOP自定义注解的的方式,作用于controller的入口方法。自定义一个LocalLock注解用于需要防止重复提交的方法上。 /** * 锁的注解 * */ @Target(ElementType.METHOD) @Retention(RetentionPolicy.RUNTIME) @Documented @Inherited public @interface LocalLock { /** * @author fly */ String key() default ""; } 注解定义好以后就需要做AOP拦截器切面的具体实现,在 interceptor() 方法上采用的是 Around(环绕增强) ,因此所有带 LocalLock 注解的都将被切面处理; @Around("execution(public * *(..)) && @annotation(com.chengxy.annotation.LocalLock)") 既然是缓存,那紧跟的属性一定要有过期时间,通过expireAfterWrite 设置缓存的过期时间,maximumSize设置缓存的个数。 通过在内存中查询key是否存在来判断是否让再次提交,和Redis的setNX方法是一个比较像。 这里我们设置同一个方法,5秒钟内相同参数的请求只允许执行一次。 那么这个注解该怎么用呢? @Aspect @Configuration public class LockMethodInterceptor { private static final Cache<String, Object> CACHES = CacheBuilder.newBuilder() // 最大缓存 100 个 .maximumSize(1000) // 设置写缓存后 5 秒钟过期 .expireAfterWrite(5, TimeUnit.SECONDS) .build(); @Around("execution(public * *(..)) && @annotation(com.chengxy.annotation.LocalLock)") public Object interceptor(ProceedingJoinPoint pjp) { MethodSignature signature = (MethodSignature) pjp.getSignature(); Method method = signature.getMethod(); LocalLock localLock = method.getAnnotation(LocalLock.class); String key = getKey(localLock.key(), pjp.getArgs()); if (!StringUtils.isEmpty(key)) { if (CACHES.getIfPresent(key) != null) { throw new RuntimeException("请勿重复请求"); } // 如果是第一次请求,就将 key 当前对象压入缓存中 CACHES.put(key, key); } try { return pjp.proceed(); } catch (Throwable throwable) { throw new RuntimeException("服务器异常"); } finally { // TODO 为了演示效果,这里就不调用 CACHES.invalidate(key); 代码了 } } /** * key 的生成策略,如果想灵活可以写成接口与实现类的方式(TODO 后续讲解) * * @param keyExpress 表达式 * @param args 参数 * @return 生成的key */ private String getKey(String keyExpress, Object[] args) { for (int i = 0; i < args.length; i++) { keyExpress = keyExpress.replace("arg[" + i + "]", args[i].toString()); } return keyExpress; } } 3、控制层的实现 我们将注解加在控制层方法上,key = "city:arg[0] key自己定义,arg[0]这个匹配规则表示替换成第一个参数。那么就实现city:token在一定时间内不可以重复提交了 @RestController @RequestMapping("/city") public class BookController { @LocalLock(key = "city:arg[0]") @GetMapping public String query(@RequestParam String token) { return "ok- " + token; } } 4、测试 接下来我们就测试一下,预期结果:5秒内只有第一次的提交会正常返回,其余的显示“请勿重复提交”,看看执行结果是不是我们预期的那样,这里用postman测试。 第一次请求正常响应: 紧接着请求第二次,返回结果“重复提交”,显然我们实现成功了 小福利: 获取到一些极客课程 ,嘘~,免费 送给小伙伴们。公号【程序员内点事】回复【极客】自行领取

优秀的个人博客,低调大师

SpringBoot2 整合ElasticJob框架,定制化管理流程

一、ElasticJob简介 1、定时任务 在前面的文章中,说过QuartJob这个定时任务,被广泛应用的定时任务标准。但Quartz核心点在于执行定时任务并不是在于关注的业务模式和场景,缺少高度自定义的功能。Quartz能够基于数据库实现任务的高可用,但是不具备分布式并行调度的功能。 2、ElasticJob说明 基础简介 Elastic-Job 是一个开源的分布式调度中间件,由两个相互独立的子项目 Elastic-Job-Lite 和 Elastic-Job-Cloud 组成。Elastic-Job-Lite 为轻量级无中心化解决方案,使用 jar 包提供分布式任务的调度和治理。 Elastic-Job-Cloud 是一个 Mesos Framework,依托于Mesos额外提供资源治理、应用分发以及进程隔离等服务。 功能特点 分布式调度协调 弹性扩容缩容 失效转移 错过执行作业重触发 作业分片一致性,保证同一分片在分布式环境中仅一个执行实例 补刀:人家官网这样描述的,这里赘述一下,充实一下文章。 基础框架结构 该图片来自ElasticJob官网。 由图可知如下内容: 需要Zookeeper组件支持,作为分布式的调度任务,有良好的监听机制,和控制台,下面的案例也就冲这个图解来。 3、分片管理 这个概念在ElasticJob中是最具有特点的,实用性极好。 分片概念 任务的分布式执行,需要将一个任务拆分为多个独立的任务项,然后由分布式的服务器分别执行某一个或几个分片项。 场景描述:假设有服务3台,分3片管理,要处理数据表100条,那就可以100%3,按照余数0,1,2分散到三台服务上执行,看到这里分库分表的基本逻辑涌上心头,这就是为何很多大牛讲说,编程思维很重要。 个性化参数 个性化参数即shardingItemParameter,可以和分片项匹配对应关系,用于将分片项的数字转换为更加可读的业务代码。 场景描述:这里猛一读好像很飘逸,其实就是这个意思,如果分3片,取名[0,1,2]不好看,或者不好标识,可以分别给个别名标识一下,[0=A,1=B,2=C]。 二、定时任务加载 1、核心依赖包 这里使用2.0+的版本。 <dependency> <groupId>com.dangdang</groupId> <artifactId>elastic-job-lite-core</artifactId> <version>2.1.5</version> </dependency> <dependency> <groupId>com.dangdang</groupId> <artifactId>elastic-job-lite-spring</artifactId> <version>2.1.5</version> </dependency> 2、核心配置文件 这里主要配置一下Zookeeper中间件,分片和分片参数。 zookeeper: server: 127.0.0.1:2181 namespace: es-job job-config: cron: 0/10 * * * * ? shardCount: 1 shardItem: 0=A,1=B,2=C,3=D 3、自定义注解 看了官方的案例,没看到好用的注解,这里只能自己编写一个,基于案例的加载过程和核心API作为参考。 核心配置类: com.dangdang.ddframe.job.lite.config.LiteJobConfiguration 根据自己想如何使用注解的思路,比如我只想注解定时任务名称和Cron表达式这两个功能,其他参数直接统一配置(这里可能是受QuartJob影响太深,可能根本就是想省事...) @Inherited @Target({ElementType.TYPE}) @Retention(RetentionPolicy.RUNTIME) public @interface TaskJobSign { @AliasFor("cron") String value() default ""; @AliasFor("value") String cron() default ""; String jobName() default ""; } 4、作业案例 这里打印一些基本参数,对照配置和注解,一目了然。 @Component @TaskJobSign(cron = "0/5 * * * * ?",jobName = "Hello-Job") public class HelloJob implements SimpleJob { private static final Logger LOG = LoggerFactory.getLogger(HelloJob.class.getName()) ; @Override public void execute(ShardingContext shardingContext) { LOG.info("当前线程: "+Thread.currentThread().getId()); LOG.info("任务分片:"+shardingContext.getShardingTotalCount()); LOG.info("当前分片:"+shardingContext.getShardingItem()); LOG.info("分片参数:"+shardingContext.getShardingParameter()); LOG.info("任务参数:"+shardingContext.getJobParameter()); } } 5、加载定时任务 既然自定义注解,那加载过程自然也要自定义一下,读取自定义的注解,配置化,加入容器,然后初始化,等着任务执行就好。 @Configuration public class ElasticJobConfig { @Resource private ApplicationContext applicationContext ; @Resource private ZookeeperRegistryCenter zookeeperRegistryCenter; @Value("${job-config.cron}") private String cron ; @Value("${job-config.shardCount}") private int shardCount ; @Value("${job-config.shardItem}") private String shardItem ; /** * 配置任务监听器 */ @Bean public ElasticJobListener elasticJobListener() { return new TaskJobListener(); } /** * 初始化配置任务 */ @PostConstruct public void initTaskJob() { Map<String, SimpleJob> jobMap = this.applicationContext.getBeansOfType(SimpleJob.class); Iterator iterator = jobMap.entrySet().iterator(); while (iterator.hasNext()) { // 自定义注解管理 Map.Entry<String, SimpleJob> entry = (Map.Entry)iterator.next(); SimpleJob simpleJob = entry.getValue(); TaskJobSign taskJobSign = simpleJob.getClass().getAnnotation(TaskJobSign.class); if (taskJobSign != null){ String cron = taskJobSign.cron() ; String jobName = taskJobSign.jobName() ; // 生成配置 SimpleJobConfiguration simpleJobConfiguration = new SimpleJobConfiguration( JobCoreConfiguration.newBuilder(jobName, cron, shardCount) .shardingItemParameters(shardItem).jobParameter(jobName).build(), simpleJob.getClass().getCanonicalName()); LiteJobConfiguration liteJobConfiguration = LiteJobConfiguration.newBuilder( simpleJobConfiguration).overwrite(true).build(); TaskJobListener taskJobListener = new TaskJobListener(); // 初始化任务 SpringJobScheduler jobScheduler = new SpringJobScheduler( simpleJob, zookeeperRegistryCenter, liteJobConfiguration, taskJobListener); jobScheduler.init(); } } } } 絮叨一句:不要疑问这些API是怎么知道,看下官方文档的案例,他们怎么使用这些核心API,这里就是照着写过来,就是多一步自定义注解类的加载过程。当然官方文档大致读一遍还是很有必要的。 补刀一句:如何快速学习一些组件的用法,首先找到官方文档,或者开源库Wiki,再不济ReadMe文档(如果都没有,酌情放弃,另寻其他),熟悉基本功能是否符合自己的需求,如果符合,就看下基本用法案例,熟悉API,最后就是研究自己需要的功能模块,个人经验来看,该过程是弯路最少,坑最少的。 6、任务监听 用法非常简单,实现ElasticJobListener接口。 @Component public class TaskJobListener implements ElasticJobListener { private static final Logger LOG = LoggerFactory.getLogger(TaskJobListener.class); private long beginTime = 0; @Override public void beforeJobExecuted(ShardingContexts shardingContexts) { beginTime = System.currentTimeMillis(); LOG.info(shardingContexts.getJobName()+"===>开始..."); } @Override public void afterJobExecuted(ShardingContexts shardingContexts) { long endTime = System.currentTimeMillis(); LOG.info(shardingContexts.getJobName()+ "===>结束...[耗时:"+(endTime - beginTime)+"]"); } } 絮叨一句:before和after执行前后,中间执行目标方法,标准的AOP切面思想,所以底层水平决定了对上层框架的理解速度,那本《Java编程思想》上的灰尘是不是该擦擦? 三、动态添加 1、作业任务 有部分场景需要动态添加和管理定时任务,基于上面的加载流程,在自定义一些步骤就可以。 @Component public class GetTimeJob implements SimpleJob { private static final Logger LOG = LoggerFactory.getLogger(GetTimeJob.class.getName()) ; private static final SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss") ; @Override public void execute(ShardingContext shardingContext) { LOG.info("Job Name:"+shardingContext.getJobName()); LOG.info("Local Time:"+format.format(new Date())); } } 2、添加任务服务 这里就动态添加上面的任务。 @Service public class TaskJobService { @Resource private ZookeeperRegistryCenter zookeeperRegistryCenter; public void addTaskJob(final String jobName,final SimpleJob simpleJob, final String cron,final int shardCount,final String shardItem) { // 配置过程 JobCoreConfiguration jobCoreConfiguration = JobCoreConfiguration.newBuilder( jobName, cron, shardCount) .shardingItemParameters(shardItem).build(); JobTypeConfiguration jobTypeConfiguration = new SimpleJobConfiguration(jobCoreConfiguration, simpleJob.getClass().getCanonicalName()); LiteJobConfiguration liteJobConfiguration = LiteJobConfiguration.newBuilder( jobTypeConfiguration).overwrite(true).build(); TaskJobListener taskJobListener = new TaskJobListener(); // 加载执行 SpringJobScheduler jobScheduler = new SpringJobScheduler( simpleJob, zookeeperRegistryCenter, liteJobConfiguration, taskJobListener); jobScheduler.init(); } } 补刀一句:这里添加之后,任务就会定时执行,如何停止任务又是一个问题,可以在任务名上做一些配置,比如在数据库生成一条记录[1,job1,state],如果调度到state为停止状态的任务,直接截胡即可。 3、测试接口 @RestController public class TaskJobController { @Resource private TaskJobService taskJobService ; @RequestMapping("/addJob") public String addJob(@RequestParam("cron") String cron,@RequestParam("jobName") String jobName, @RequestParam("shardCount") Integer shardCount, @RequestParam("shardItem") String shardItem) { taskJobService.addTaskJob(jobName, new GetTimeJob(), cron, shardCount, shardItem); return "success"; } } 四、源代码地址 GitHub·地址 https://github.com/cicadasmile/middle-ware-parent GitEE·地址 https://gitee.com/cicadasmile/middle-ware-parent

优秀的个人博客,低调大师

SpringBoot MQ 系列】RabbitMq 核心知识点小结

【MQ 系列】RabbitMq 核心知识点小结 以下内容,部分取材于官方教程,部分来源网络博主的分享,如有兴趣了解更多详细的知识点,可以在本文最后的文章列表中获取原地址 RabbitMQ 是一个基于 AMQP 协议实现的企业级消息系统,想要顺畅的玩耍的前提是得先了解它,本文将主要介绍 rabbitmq 的一些基本知识点 特点 基本概念 消息投递消费的几种姿势 事务 集群 <!-- more --> I. 基本知识点 它是采用 Erlang 语言实现的 AMQP(Advanced Message Queued Protocol)的消息中间件,最初起源于金融系统,用在分布式系统存储转发消息,目前广泛应用于各类系统用于解耦、削峰 1.特点 首先得了解一下 rabbitmq 的特点,看看是否满足我们的系统需求(毕竟学习一个框架也是要不少时间的) 以下内容来自: MQ 和 RabbitMQ 作用特点 主要特点,大致可以归纳为以下几个 可靠性:通过支持消息持久化,支持事务,支持消费和传输的 ack 等来确保可靠性 路由机制:支持主流的订阅消费模式,如广播,订阅,headers 匹配等 扩展性:多个 RabbitMQ 节点可以组成一个集群,也可以根据实际业务情况动态地扩展集群中节点。 高可用性:队列可以在集群中的机器上设置镜像,使得在部分节点出现问题的情况下队仍然可用。 多种协议:RabbitMQ 除了原生支持 AMQP 协议,还支持 STOMP,MQTT 等多种消息中间件协议。 多语言客户端:RabbitMQ 几乎支持所有常用语言,比如 Jav a、Python、Ruby、PHP、C#、JavaScript 等。 管理界面:RabbitMQ 提供了一个易用的用户界面,使得用户可以监控和管理消息、集群中的节点等。 插件机制:RabbitMQ 提供了许多插件,以实现从多方面进行扩展,当然也可以编写自己的插件。 2. 基本概念 下图为 rabbitmq 的内部结构图 从上图也可以发现几个基本概念(Message, Publisher, Exchange, Binding, Queue, Channel, Consuer, Virtual host) 下面逐一进行说明 a. Message 具体的消息,包含消息头(即附属的配置信息)和消息体(即消息的实体内容) 由发布者,将消息推送到 Exchange,由消费者从 Queue 中获取 b. Publisher 消息生产者,负责将消息发布到交换器(Exchange) c. Exchange 交换器,用来接收生产者发送的消息并将这些消息路由给服务器中的队列 d. Binding 绑定,用于给 Exchange 和 Queue 建立关系,从而决定将这个交换器中的哪些消息,发送到对应的 Queue e. Queue 消息队列,用来保存消息直到发送给消费者 它是消息的容器,也是消息的终点 一个消息可投入一个或多个队列 消息一直在队列里面,等待消费者连接到这个队列将其取走 f. Connection 连接,内部持有一些 channel,用于和 queue 打交道 g. Channel 信道(通道),MQ 与外部打交道都是通过 Channel 来的,发布消息、订阅队列还是接收消息,这些动作都是通过 Channel 完成; 简单来说就是消息通过 Channel 塞进队列或者流出队列 h. Consumer 消费者,从消息队列中获取消息的主体 i. Virtual Host 虚拟主机,表示一批交换器、消息队列和相关对象。 虚拟主机是共享相同的身份认证和加密环境的独立服务器域。 每个 vhost 本质上就是一个 mini 版的 RabbitMQ 服务器,拥有自己的队列、交换器、绑定和权限机制。 vhost 是 AMQP 概念的基础,必须在连接时指定,RabbitMQ 默认的 vhost 是 / 可以理解为 db 中的数据库的概念,用于逻辑拆分 j. Broker 消息队列服务器实体 3. 消息投递消费 从前面的内部结构图可以知晓,消息由生产者发布到 Exchange,然后通过路由规则,分发到绑定 queue 上,供消费者获取消息 接下来我们看一下 Exchange 支持的四种策略 a. Direct 策略 消息中的路由键(routing key)如果和 Binding 中的 binding key 一致, 交换器就将消息发到对应的队列中 简单来讲,就是rounting key与binding key完全匹配 如果一个队列绑定到交换机要求路由键为dog 只转发routing key 标记为dog的消息, 不会转发dog.puppy,也不会转发“dog.guard”等等 它是完全匹配、单播的模式 举例说明 Exchange 和两个队列绑定在一起: Q1 的 bindingkey 是 orange Q2 的 binding key 是 black 和 green. 当 Producer 发布一个消息,其routing key是orange时, exchange 会把它放到 Q1 上, 如果是black或green就会到 Q2 上, 其余的 Message 被丢弃 注意 当有多个队列绑定到同一个 Exchange,且 binding key 相同时,这时消息会分发给所有满足条件的队列 b. Topic 策略 这个策略可以看成是 Direct 策略的升级版,通过routing key与 bingding key的模式匹配方式来分发消息 简单来讲,直接策略是完全精确匹配,而 topic 则支持正则匹配,满足某类指定规则的(如以 xxx 开头的路由键),可以将消息分发过去 # 匹配 0 个或多个单词 * 匹配不多不少一个单词 一个更直观的实例如下 Producer 发送消息时需要设置 routing_key, Q1 的 binding key 是*.orange.* Q2 是 *.*.rabbit 和 lazy.#: 发布一个routing key为test.orange.mm 消息,则会路由到 Q1; 注意: 如果是routng key是 test.orange则无法路由到 Q1, 因为 Q1 的规则是三个单词,中间一个为 orange,不满足这个规则的都无效 发布一个routing key为test.qq.rabbit或者lazy.qq的消息 都可以分发到 Q2;即路由 key 为三个单词,最后一个为 rabbit 或者不限制单词个数,主要第一个是 lazy 的消息,都可以分发过来 如果发布的是一个test.orange.rabbit消息,则 Q1 和 Q2 都可以满足 注意: 这时两个队列都会接受到这个消息 c. Fanout 策略 广播策略,忽略routing key 和 binding key,将消息分发给所有绑定在这个 exchange 上的 queue d. Headers 策略 这个实际上用得不多,它是根据 Message 的一些头部信息来分发过滤 Message,忽略 routing key 的属性,如果 Header 信息和 message 消息的头信息相匹配 II. 消息一致性问题 在进入 rabbitmq 如何保证一致性之前,我们先得理解,什么是消息一致性? 1. 一致性问题 数据的一致性是什么 按照我个人的粗浅理解,我认为的消息一致性,应该包含下面几个 生产者,确保消息发布成功 消息不会丢 顺序不会乱 消息不会重复(如重传,导致发布一次,却出现多个消息) 消费者,确保消息消费成功 有序消费 不重复消费 发送端 为了确保发布者推送的消息不会丢失,我们需要消息持久化 broker 持久化消息 为了确定消息正确接收 publisher 需要知道消息投递并成功持久化 2. 持久化 这里的持久化,主要是指将内存中的消息保存到磁盘,避免 mq 宕机导致的内存中消息丢失;然而单纯的持久化,只是保证一致性的其中一个要素,比如 publisher 将消息发送到 exchange,在 broker 持久化的工程中,宕机了导致持久化失败,而 publisher 并不知道持久化失败,这个时候就会出现数据丢失,为了解决这个问题,rabbitmq 提供了事务机制 3. 事务机制 事务机制能够解决生产者与 broker 之间消息确认的问题,只有消息成功被 broker 接受,事务才能提交成功,否则就进行事务回滚操作并进行消息重发。但是使用事务机制会降低 RabbitMQ 的消息吞吐量,不适用于需要发布大量消息的业务场景。 注意,事务是同步的 4. 消息确认机制 RabbitMQ 学习(六)——消息确认机制(Confirm 模式) 消息确认机制,可以区分为生产端和消费端 生产端 生产者将信道设置成 Confirm 模式,一旦信道进入 Confirm 模式,所有在该信道上面发布的消息都会被指派一个唯一的 ID(以 confirm.select 为基础从 1 开始计数), 一旦消息被投递到所有匹配的队列之后,Broker 就会发送一个确认给生产者(包含消息的唯一 ID),这就使得生产者知道消息已经正确到达目的队列了, 如果消息和队列是可持久化的,那么确认消息会将消息写入磁盘之后发出, Broker 回传给生产者的确认消息中 deliver-tag 域包含了确认消息的序列号(此外 Broker 也可以设置 basic.ack 的 multiple 域,表示到这个序列号之前的所有消息都已经得到了处理) Confirm 模式属性异步,publisher 发布一条消息之后,在等信道返回确认的同时,依然可以继续发送下一条消息,所以小概率会出现投递的消息顺序和 broker 中持久化消息顺序不一致的问题 一般从编程角度出发,Confirm 模式有三种姿势 普通 Confirm 模式:发送一条消息之后,等到服务器 confirm,然后再发布下一条消息(串行发布) 批量 Confirm 模式:发送一批消息之后,等到服务器 confirm,然后再发布下一批消息(如果失败,这一批消息全部重复,所以会有重复问题) 异步 Confirm 模式:提供一个回调方法,服务器 confirm 之后,触发回调方法,因此不会阻塞下一条消息的发送 消费端 ACK 机制是消费者从 RabbitMQ 收到消息并处理完成后,反馈给 RabbitMQ,RabbitMQ 收到反馈后才将此消息从队列中删除。 如果一个消费者在处理消息出现了网络不稳定、服务器异常等现象,那么就不会有 ACK 反馈,RabbitMQ 会认为这个消息没有正常消费,会将消息重新放入队列中 如果在集群的情况下,RabbitMQ 会立即将这个消息推送给这个在线的其他消费者。这种机制保证了在消费者服务端故障的时候,不丢失任何消息和任务 消息永远不会从 RabbitMQ 中删除,只有当消费者正确发送 ACK 反馈,RabbitMQ 确认收到后,消息才会从 RabbitMQ 服务器的数据中删除 III. 集群 按照目前的发展趋势,一个不支持集群的中间件基本上是不会有市场的;rabbitmq 也是支持集群的,下面简单的介绍一下常见的 4 种集群架构模式 以下内容来自网上博文,详情请点击右边:RabbitMQ 的 4 种集群架构 1. 主备模式 这个属于常见的集群模式了,但又不太一样 主节点提供读写,备用节点不提供读写。如果主节点挂了,就切换到备用节点,原来的备用节点升级为主节点提供读写服务,当原来的主节点恢复运行后,原来的主节点就变成备用节点 2. 远程模式 远程模式可以实现双活的一种模式,简称 shovel 模式,所谓的 shovel 就是把消息进行不同数据中心的复制工作,可以跨地域的让两个 MQ 集群互联,远距离通信和复制。 Shovel 就是我们可以把消息进行数据中心的复制工作,我们可以跨地域的让两个 MQ 集群互联。 如上图,有两个异地的 MQ 集群(可以是更多的集群),当用户在地区 1 这里下单了,系统发消息到 1 区的 MQ 服务器,发现 MQ 服务已超过设定的阈值,负载过高,这条消息就会被转到 地区 2 的 MQ 服务器上,由 2 区的去执行后面的业务逻辑,相当于分摊我们的服务压力。 3. 镜像模式 非常经典的 mirror 镜像模式,保证 100% 数据不丢失。在实际工作中也是用得最多的,并且实现非常的简单,一般互联网大厂都会构建这种镜像集群模式。 如上图,用 KeepAlived 做了 HA-Proxy 的高可用,然后有 3 个节点的 MQ 服务,消息发送到主节点上,主节点通过 mirror 队列把数据同步到其他的 MQ 节点,这样来实现其高可靠 4. 多活模式 也是实现异地数据复制的主流模式,因为 shovel 模式配置比较复杂,所以一般来说,实现异地集群的都是采用这种双活 或者 多活模型来实现的。这种模式需要依赖 rabbitMQ 的 federation 插件,可以实现持续的,可靠的 AMQP 数据通信,多活模式在实际配置与应用非常的简单 rabbitMQ 部署架构采用双中心模式(多中心),那么在两套(或多套)数据中心各部署一套 rabbitMQ 集群,各中心的 rabbitMQ 服务除了需要为业务提供正常的消息服务外,中心之间还需要实现部分队列消息共享。 federation 插件是一个不需要构建 cluster ,而在 brokers 之间传输消息的高性能插件,federation 插件可以在 brokers 或者 cluster 之间传输消息,连接的双方可以使用不同的 users 和 virtual hosts,双方也可以使用不同版本的 rabbitMQ 和 erlang。federation 插件使用 AMQP 协议通信,可以接受不连续的传输。federation 不是建立在集群上的,而是建立在单个节点上的,如图上黄色的 rabbit node 3 可以与绿色的 node1、node2、node3 中的任意一个利用 federation 插件进行数据同步。 IV. 其他 0. 项目 工程:https://github.com/liuyueyi/spring-boot-demo 1. 相关博文 RabbitMQ Tutorials MQ 和 RabbitMQ 作用特点 RabbitMq 基础教程之基本概念 RabbitMQ 学习(六)——消息确认机制(Confirm 模式) RabbitMQ 的 4 种集群架构 Rabbitmq 是如何来保证事务的 rabbitmq 消息一致性问题 2. 一灰灰 Blog 尽信书则不如,以上内容,纯属一家之言,因个人能力有限,难免有疏漏和错误之处,如发现 bug 或者有更好的建议,欢迎批评指正,不吝感激 下面一灰灰的个人博客,记录所有学习和工作中的博文,欢迎大家前去逛逛 一灰灰 Blog 个人博客 https://blog.hhui.top 一灰灰 Blog-Spring 专题博客 http://spring.hhui.top

优秀的个人博客,低调大师

SpringBoot2.0 整合 RocketMQ ,实现请求异步处理

本文源码:GitHub·点这里 || GitEE·点这里 一、RocketMQ简介 1、架构图片 2、角色分类 (1)、Broker RocketMQ 的核心,接收 Producer 发过来的消息、处理 Consumer 的消费消息请求、消息的持 久化存储、服务端过滤功能等 。 (2)、NameServer 消息队列中的状态服务器,集群的各个组件通过它来了解全局的信息 。类似微服务中注册中心的服务注册,发现,下线,上线的概念。 热备份:NamServer可以部署多个,相互之间独立,其他角色同时向多个NameServer 机器上报状态信息。 心跳机制:NameServer 中的 Broker、 Topic等状态信息不会持久存储,都是由各个角色定时上报并存储到内存中,超时不上报的话, NameServer会认为某个机器出故障不可用。 (3)、Producer 消息的生成者,最常用的producer类就是DefaultMQProducer。 (4)、Consumer 消息的消费者,常用Consumer类DefaultMQPushConsumer收到消息后自动调用传入的处理方法来处理,实时性高DefaultMQPullConsumer用户自主控制 ,灵活性更高。 3、通信机制 (1)、Broker启动后需要完成一次将自己注册至NameServer的操作;随后每隔30s时间定时向NameServer更新Topic路由信息。 (2)、Producer发送消息时候,需要根据消息的Topic从本地缓存的获取路由信息。如果没有则更新路由信息会从NameServer重新拉取,同时Producer会默认每隔30s向NameServer拉取一次路由信息。 (3)、Consumer消费消息时候,从NameServer获取的路由信息,并再完成客户端的负载均衡后,监听指定消息队列获取消息并进行消费。 二、代码实现案例 1、项目结构图 版本描述 <spring-boot.version>2.1.3.RELEASE</spring-boot.version> <rocketmq.version>4.3.0</rocketmq.version> 2、配置文件 rocketmq: # 生产者配置 producer: isOnOff: on # 发送同一类消息的设置为同一个group,保证唯一 groupName: CicadaGroup # 服务地址 namesrvAddr: 127.0.0.1:9876 # 消息最大长度 默认1024*4(4M) maxMessageSize: 4096 # 发送消息超时时间,默认3000 sendMsgTimeout: 3000 # 发送消息失败重试次数,默认2 retryTimesWhenSendFailed: 2 # 消费者配置 consumer: isOnOff: on # 官方建议:确保同一组中的每个消费者订阅相同的主题。 groupName: CicadaGroup # 服务地址 namesrvAddr: 127.0.0.1:9876 # 接收该 Topic 下所有 Tag topics: CicadaTopic~*; consumeThreadMin: 20 consumeThreadMax: 64 # 设置一次消费消息的条数,默认为1条 consumeMessageBatchMaxSize: 1 # 配置 Group Topic Tag rocket: group: rocketGroup topic: rocketTopic tag: rocketTag 3、生产者配置 /** * RocketMQ 生产者配置 */ @Configuration public class ProducerConfig { private static final Logger LOG = LoggerFactory.getLogger(ProducerConfig.class) ; @Value("${rocketmq.producer.groupName}") private String groupName; @Value("${rocketmq.producer.namesrvAddr}") private String namesrvAddr; @Value("${rocketmq.producer.maxMessageSize}") private Integer maxMessageSize ; @Value("${rocketmq.producer.sendMsgTimeout}") private Integer sendMsgTimeout; @Value("${rocketmq.producer.retryTimesWhenSendFailed}") private Integer retryTimesWhenSendFailed; @Bean public DefaultMQProducer getRocketMQProducer() { DefaultMQProducer producer; producer = new DefaultMQProducer(this.groupName); producer.setNamesrvAddr(this.namesrvAddr); //如果需要同一个jvm中不同的producer往不同的mq集群发送消息,需要设置不同的instanceName if(this.maxMessageSize!=null){ producer.setMaxMessageSize(this.maxMessageSize); } if(this.sendMsgTimeout!=null){ producer.setSendMsgTimeout(this.sendMsgTimeout); } //如果发送消息失败,设置重试次数,默认为2次 if(this.retryTimesWhenSendFailed!=null){ producer.setRetryTimesWhenSendFailed(this.retryTimesWhenSendFailed); } try { producer.start(); } catch (MQClientException e) { e.printStackTrace(); } return producer; } } 4、消费者配置 /** * RocketMQ 消费者配置 */ @Configuration public class ConsumerConfig { private static final Logger LOG = LoggerFactory.getLogger(ConsumerConfig.class) ; @Value("${rocketmq.consumer.namesrvAddr}") private String namesrvAddr; @Value("${rocketmq.consumer.groupName}") private String groupName; @Value("${rocketmq.consumer.consumeThreadMin}") private int consumeThreadMin; @Value("${rocketmq.consumer.consumeThreadMax}") private int consumeThreadMax; @Value("${rocketmq.consumer.topics}") private String topics; @Value("${rocketmq.consumer.consumeMessageBatchMaxSize}") private int consumeMessageBatchMaxSize; @Resource private RocketMsgListener msgListener; @Bean public DefaultMQPushConsumer getRocketMQConsumer(){ DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(groupName); consumer.setNamesrvAddr(namesrvAddr); consumer.setConsumeThreadMin(consumeThreadMin); consumer.setConsumeThreadMax(consumeThreadMax); consumer.registerMessageListener(msgListener); consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET); consumer.setConsumeMessageBatchMaxSize(consumeMessageBatchMaxSize); try { String[] topicTagsArr = topics.split(";"); for (String topicTags : topicTagsArr) { String[] topicTag = topicTags.split("~"); consumer.subscribe(topicTag[0],topicTag[1]); } consumer.start(); }catch (MQClientException e){ e.printStackTrace(); } return consumer; } } 5、消息监听配置 /** * 消息消费监听 */ @Component public class RocketMsgListener implements MessageListenerConcurrently { private static final Logger LOG = LoggerFactory.getLogger(RocketMsgListener.class) ; @Resource private ParamConfigService paramConfigService ; @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext context) { if (CollectionUtils.isEmpty(list)){ return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } MessageExt messageExt = list.get(0); LOG.info("接受到的消息为:"+new String(messageExt.getBody())); int reConsume = messageExt.getReconsumeTimes(); // 消息已经重试了3次,如果不需要再次消费,则返回成功 if(reConsume ==3){ return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } if(messageExt.getTopic().equals(paramConfigService.rocketTopic)){ String tags = messageExt.getTags() ; switch (tags){ case "rocketTag": LOG.info("开户 tag == >>"+tags); break ; default: LOG.info("未匹配到Tag == >>"+tags); break; } } // 消息消费成功 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } } 6、配置参数绑定 @Service public class ParamConfigService { @Value("${rocket.group}") public String rocketGroup ; @Value("${rocket.topic}") public String rocketTopic ; @Value("${rocket.tag}") public String rocketTag ; } 7、消息发送测试 @Service public class RocketMqServiceImpl implements RocketMqService { @Resource private DefaultMQProducer defaultMQProducer; @Resource private ParamConfigService paramConfigService ; @Override public SendResult openAccountMsg(String msgInfo) { // 可以不使用Config中的Group defaultMQProducer.setProducerGroup(paramConfigService.rocketGroup); SendResult sendResult = null; try { Message sendMsg = new Message(paramConfigService.rocketTopic, paramConfigService.rocketTag, "open_account_key", msgInfo.getBytes()); sendResult = defaultMQProducer.send(sendMsg); } catch (Exception e) { e.printStackTrace(); } return sendResult ; } } 三、项目源码 GitHub·地址 https://github.com/cicadasmile/middle-ware-parent GitEE·地址 https://gitee.com/cicadasmile/middle-ware-parent

资源下载

更多资源
Mario

Mario

马里奥是站在游戏界顶峰的超人气多面角色。马里奥靠吃蘑菇成长,特征是大鼻子、头戴帽子、身穿背带裤,还留着胡子。与他的双胞胎兄弟路易基一起,长年担任任天堂的招牌角色。

Nacos

Nacos

Nacos /nɑ:kəʊs/ 是 Dynamic Naming and Configuration Service 的首字母简称,一个易于构建 AI Agent 应用的动态服务发现、配置管理和AI智能体管理平台。Nacos 致力于帮助您发现、配置和管理微服务及AI智能体应用。Nacos 提供了一组简单易用的特性集,帮助您快速实现动态服务发现、服务配置、服务元数据、流量管理。Nacos 帮助您更敏捷和容易地构建、交付和管理微服务平台。

Sublime Text

Sublime Text

Sublime Text具有漂亮的用户界面和强大的功能,例如代码缩略图,Python的插件,代码段等。还可自定义键绑定,菜单和工具栏。Sublime Text 的主要功能包括:拼写检查,书签,完整的 Python API , Goto 功能,即时项目切换,多选择,多窗口等等。Sublime Text 是一个跨平台的编辑器,同时支持Windows、Linux、Mac OS X等操作系统。

WebStorm

WebStorm

WebStorm 是jetbrains公司旗下一款JavaScript 开发工具。目前已经被广大中国JS开发者誉为“Web前端开发神器”、“最强大的HTML5编辑器”、“最智能的JavaScript IDE”等。与IntelliJ IDEA同源,继承了IntelliJ IDEA强大的JS部分的功能。

用户登录
用户注册