基于Spring事务的可靠异步调用实践 | 京东物流技术团队
SpringTxAsync组件是仓储平台组(WMS6)自主研发的一个专门用于解决可靠异步调用问题的组件。
通过使用SpringTxAsync组件,我们成功地解决了在仓储平台(WMS6)中的异步调用需求。经过近二年多的实践并经历了两次618活动以及两次双11活动,该组件已经在我们的所有应用中稳定运行并成功应用于各种业务场景。
该组件的主要功能是实现可靠的异步调用。在异步任务的执行过程中,我们能够确保任务的可靠性,即使在出现异常情况或重要机器重启等不确定因素时,仍能保证任务的正常执行,并且能够满足我们的业务需求。
SpringTxAsync组件的成功应用为我们的仓储平台(WMS6)提供了稳定可靠的异步调用支持。在接下来的内容中,我们将详细介绍该组件的设计原理和技术特点,以帮助读者更好地理解和应用该组件。
异步调用的场景
异步的本质就是一种Fire-And-Forget模式,它在编程中常用于两种场景的应用:
- 提升请求的响应速度,减少不必要的同步等待时间。通过将某些操作以异步方式执行,可以避免在请求处理过程中发生长时间的阻塞等待,从而提高系统的吞吐量和响应性能。
- 实现逻辑的解耦,将纯实时逻辑与非线性非实时逻辑进行解耦。在事件驱动的场景中,异步机制常被广泛应用。通过异步方式处理事件,可以将实时逻辑与非实时逻辑分离,提高系统的可维护性和扩展性。
异步编程模式的使用可以带来多种好处,例如提升系统性能、改善用户体验、增强系统的稳定性和可扩展性等。
简单举例说明一下:
用户注册完成后,需要发送注册成功的邮件通知。
// transaction begin User user = UserService.create(UserCreateRequest reqeust)//本地事务 NotificationService.sendRegisterEmail(user)// 本地调用 // do other business operations //transaction commit //NotificationService.sendRegisterEmail(user) 实现 @TransactionAsync public void sendRegisterEmail(User user){ EmailService.send(...);//远程调用 }
可选技术方案
- 消息中间件: 通过向消息中间件发送消息,消费者监听并消费消息来执行异步任务。然而,由于消息中间件的一致性保证较为困难(许多消息中间件没有可靠的发送端,有些中间件虽然有,但使用较为复杂)。
- 多线程: 可以通过Fork线程或直接使用Spring的@Async注解来实现异步。但需要注意,在使用Spring的JDBC事务时,一旦fork出新线程,就会脱离原事务范畴,丧失JDBC事务的可靠性和一致性。
- 进程内事件通知: Guava Event和Spring Event本质上都是实现了发布-订阅模式,可以用于实现异步调用。Guava Event提供了同步和异步的事件发布,其实现是基于队列和线程池。Spring Event配置了Spring Async以支持异步操作,同时还提供了@TransactionEventListener,在当前事务的不同阶段发布事件。然而,底层本质上仍然是fork了线程,因此会失去事务特性。
- Outbox Pattern: Outbox Pattern是一种常见的微服务场景下基于Guaranteed Delivery模式的技术范式,旨在实现可靠的消息传递。主要用于解决应用与外部应用之间交互的可靠性和一致性问题,可看作是分布式事务的一种简单场景。
类似如下场景:
- 用户下单成功后给用户发一个email
- 用户注册成功后,发布一个事件(Message)到消息中件间
- 在DDD开发模式下中处理聚合根之间的领域事件通知
整体示意图如下:
我们的核心需求
异步任务的可靠性,与本地事务的一致性。
可靠性
可靠性是指在创建完成异步任务并成功提交本地事务后,确保异步任务一定会被执行,无论是否出现异常情况或机器重启等意外因素,始终能够保证达到At-Least-Once语义。
一致性
一致性指的是异步方法内部业务逻辑与调用异步方法时的事务上下文保持一致。当本地事务提交时,异步任务执行;而当本地事务回滚时,异步任务则不会执行。
基于以上两点需求,我们做了能满足上述场景的OutboxPattern实现。
SpringTxAsync方案
核心逻辑
为了对使用@TransactionAsync标注为异步的方法进行拦截,我们可以采用AOP方式。
在拦截过程中,将Invocation封装成一个异步任务,并将其持久化到数据库的异步任务表中,该操作需要在当前事务域内完成。
对于已经保存在本地表中的异步任务的处理,我们可以考虑两种实现方案。
方案1
首先,异步任务表被视为只读表,只会插入任务记录,不会进行修改操作。对于任务的处理,我们可以通过监听表的binlog来实现。当收到insert事件时,将其转换成消息队列(mq)消息,在应用中有一个监听器(listener)负责消费这些消息。
方案2
为了有效追踪异步任务的生命周期,我们对异步任务进行状态化管理。
当有新任务生成时,我们将其插入到异步任务表中。在当前事务提交后,任务会立即提交到线程池中执行,而不是从数据库中获取任务。只有发生异常的异步任务,才会定时从数据库中获取任务来处理。 这种方案可以确保异步任务能够及时地被提交和执行,同时在异常情况下也能够保证任务的处理。
两种方案的取舍:我们选择第二种方案
- 减少依赖的中间件,降低组件的整体复杂度以及团队接入组件后运维成本
- 降低任务数据的调用链长度,从binlog到mq这中间增加了很多不确定性
异步任务状态机
异步任务的生命周期共有五个状态:READY(就绪)、RUNNING(运行中)、EXCEPTION(异常)、SUCCESS(成功)、FAIL(失败)
- 当新的异步任务生成时,我们将其插入到异步任务表中,并将状态设置为READY(0)。
- 在当前事务提交后,任务将被立即提交到线程池中进行执行,而不是从数据库中获取任务。这种方式确保了任务能够及时执行,并与事务保持一致性。
- 在任务的执行过程中,如果发生异常情况,我们可以将任务的状态修改为EXCEPTION(2),并单独处理这些异常任务。这样,我们可以定时从数据库中获取这些异常任务,并进行重新处理,以确保任务能够顺利完成。
- 同时,我们还需要确定任务的成功与失败状态。当任务执行成功时,我们将其状态设置为SUCCESS(4),表示任务已成功完成。
- 而当任务多次执行异常时,达到某个预设的阈值,我们将其状态设置为FAIL(3),以标记任务的失败状态,失败的任务不再重试。
- 为了处理可能的机器重启、掉电等场景,我们引入了额外的定时器处理对READY和RUNNING状态下的超时情况进行监控,如果在这两个状态发生超时,那么直接转入EXCEPTION或FAIL状态。
核心代码
异步方法注解,用来声明异步任务
/** * 标记在Spring 容器管理的bean的public方法上,以实现本地事务级别的可靠异步调用。 * * <p>对于加注了该注解的方法: * <p>1. 如果当前处理事务中,则对异步方法的调用是在事务提交之后进行的,这样是为了保证异步调用与本地事务的一致性, * 假如事务回滚,则异步调用不会执行。 * <p>2. 如果当前调用不在事务中,则可靠性退化为与{@link org.springframework.scheduling.annotation.Async}一致。 * * <p>对于异步逻辑的调用,能保证At Least Once的语义。 * <p>在极端场景下,比如down机,或线程池被打爆的情况下,是通过重度来保证调用的可靠性的,所以有可能对异步逻辑的执行大于一次,所以要求异步逻辑本身是幂等的。 * * <p>注意:在同一个bean内部调用时不起作用,这是因为Spring AOP的Proxy代理机制导致。 * */ @Target({ElementType.METHOD}) @Retention(RetentionPolicy.RUNTIME) public @interface TransactionAsync { public static int UN_DEFINED = -1; /*** * -1 as not set. default 5 min * @return timeout value, in seconds * */ long timeout() default UN_DEFINED; /** * 用来指定当前异步任务执行的线程池,这里指定线程池的Bean名称,运行时会从Spring容器中根据名称查询。 * 如果未指定,则使用一个默认线程池,全局共享一个。 * 不同的异步任务优先级高低不同,如果想要隔离,可以给不同任务指定不同的线程池。 * 注意:如果要自定义线程池,要使用Spring的ThreadPoolTaskExecutor,这个线程池的实现,支持运行时调整线程数。 * @return executor bean name */ String executor() default ""; /** * -1 as not set. default 3 times * * @return max retry attempt */ int maxAttempt() default UN_DEFINED; }
拦截@TransactionAsync,在AOP中做切面逻辑。伪代码如下:
TransactionAsync.AOP.invoke(MethodInvocation) { AsyncInvocation asyncInvocation = 根据当前方法以及注解里的属性(支持SpringEL)确定落库的各个字段值 wms_async_task.insert(asyncInvocation, status=READY) // 当前事务里塞入一个insert,如果无事务auto_commit ThreadPoolTaskExecutor executor = determineAsyncExecutor(method) // 根据方法获取异步执行的线程池 if isInTransaction() { // 注册事务提交后的hook TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronization() { void afterCommit() { // 仍然在当前业务线程里,但事务外 executor.submit(asyncInvocation) // 转为异步线程,原事务数据变化可见 } }) } else { executor.submit(asyncInvocation) // redo log writen, invoke task best effort } }
方案完备性
错误重试
为了处理异步任务发生异常的情况,我们引入了重试机制。默认情况下,异步任务将进行3次重试,并且可以在注解中自定义重试次数。重试的间隔采用指数级递增的方式,直到达到最大重试间隔300秒后,将以固定间隔进行重试。当任务达到最大重试次数后,任务将被标记为DeadTask,同时触发报警并需要人工介入处理。
重试机制的实现依赖于Spring框架的本地调度器@Schedule。我们通过@Schedule注解定义一个定时任务,该任务会从异步任务表中根据任务的状态获取任务,并调用相应的处理方法。需要注意的是,使用乐观锁来管理任务的状态,以避免多个本地Spring Schedule任务执行相同的失败任务。
通过以上的重试机制,我们能够及时处理异步任务发生异常的情况,并尽可能地进行重试,以提高任务的成功率。在达到最大重试次数后,及时报警并进行人工介入,以确保任务的及时处理和系统的稳定运行。
超时保护
有三种类型的超时:等待超时、执行超时和失败超时。
- 等待超时:当任务进入就绪状态(Ready)后,会等待一段时间(waitTimeout),等待被线程池调度执行。如果超过等待超时阈值,则任务被标记为失败任务,并进入重试队列。另外,如果任务提交后,发生了机器重启,任务将一直保持在就绪状态,直到等待超时。
- 执行超时:当任务处于运行中状态(Running),如果执行时间超过限定的超时时间(executeTimeout),任务将被标记为失败任务,并进入重试队列。同样,如果由于机器重启等原因导致任务一直处于运行状态,任务将一直卡在运行状态,直到执行超时。
- 失败超时:为了处理已经标记为失败的任务,我们还引入了失败超时机制。一旦任务被标记为失败,会设定一个失败超时时间(根据失败次数进行计算,采用指数递增策略)。在这个失败超时时间到期后,任务将被定时任务捞取出来,进行重试操作。 通过对等待超时、执行超时和失败超时进行明确的描述,我们能够更清晰地理解不同超时情况下的任务处理逻辑,并有效地进行超时相关的任务重试和管理。
队列管理
一旦异步任务被创建并持久化到数据库,当创建异步任务所在的事务提交后,异步任务会立即提交给线程池进行处理。由于仓储平台的大部分应用都是长时间运行的任务,所以我们默认配置了较大的执行线程池,并使用阻塞队列作为缓冲。 线程池默认采用ThreadPoolTaskExecutor来执行任务,队列使用LinkedBlockingQueue来实现有界队列。当任务提交到队列无法进入时,采用丢弃策略(DiscardPolicy)并触发报警。需要注意的是,被丢弃的任务并不会真正丢失,而是会被当做等待超时的处理方式。
隔离性
通过使用线程池来实现任务的隔离是一种有效的方法。在声明异步任务时,可以在注解中明确指定所使用的线程池,以实现任务的隔离。举例来说,对于不同的优先级任务,可以使用不同的线程池来处理;而对于不同特征的任务,则可以配置不同的线程池;另外,还可以将短时任务与长时任务分别隔离开,分别放入不同的线程池中,以保证任务能够得到有效地处理。
伸缩性
在某些情况下,一个节点可能会产生大量的异步任务,而这些任务只能在该节点上执行,无法分配给其他节点执行。这会导致该节点负载过重,而其他节点却处于闲置状态。
为了解决这个问题,我们需要合理设计异步任务的粒度。如果一个节点产生的异步任务过于庞大,可以考虑将其拆分为更小的任务,并分配给其他节点执行。这样做的好处是,能够有效地平衡各个节点的负载,提高系统的性能和可伸缩性。
此外,还需要考虑创建异步任务本身的请求是否能够进行负载均衡。如果一个节点的异步任务请求过于频繁,可以通过一些负载均衡的策略来分担请求压力,例如使用负载均衡器或者分布式任务调度器来进行任务的分发和调度。
因此,在设计异步任务时,需要综合考虑任务粒度和请求负载均衡两个方面,以实现合理的任务分配和节点负载均衡,从而保证整个系统的稳定性和可伸缩性。
可配置项
提供了大量的可选配置项以满足不同的业务场景
核心配置选项如下:
- 默认最大重试次数
- 默认的失败重试间隔
- 默认执行超时时间
- 默认异步任务线程池相关核心参数
扩展点
通过采用SPI(Service Provider Interface)方式,系统提供了一些可扩展的接入点,以增强异步任务执行过程中的灵活性。这些接入点可以根据具体业务场景,自定义增加相应的逻辑。 以下是几个重要的扩展点和其作用:
- 重试异常回调(RetryExceptionCallback): 当异步任务执行过程中遇到需要重试的异常时,可以通过该回调接口进行相应的处理逻辑。
- 异步任务调用回调(InvocationCallback): 在异步任务完成后,可以通过调用该回调接口来处理任务执行成功或异常的情况。
- 特殊数据源提供者(DataSourceWrapper): 用于处理分库数据源,通过该接口可以自定义管理特殊的数据源。
- 异步任务主键生成器策略提供者(SnowflakeWorkerIdStrategyProvider): 用于自定义异步任务主键生成策略,以满足具体业务需求。
- 线程池自定义拒绝策略提供者(RejectedExecutionHandlerProvider): 用于定义线程池的拒绝策略,当任务无法被接受时,可以根据具体需求进行自定义处理。
- 定时调度异常处理器提供者(ScheduleErrorHandlerProvider): 用于处理定时调度任务执行过程中的异常情况,可以根据具体需要进行相应的处理策略。
通过以上提供的扩展点,系统提供了灵活性和扩展性,便于根据业务特定需求进行定制化开发。使用者可以根据自身业务场景和需求,选择相应的扩展点,并编写逻辑来满足具体的业务要求。这种SPI扩展机制使得系统更具可定制性和代码的可维护性。
管理API
- 任务的查询
- 任务的重置
监控点
通过UMP(京东自研监控平台) SDK上报到UMP平台已经内置的监控如下所示,还可以通过预留的SPI增加自定义监控。
- 任务执行异常监控
- 线程池拒绝线程监控
- 任务积压监控
- 组件内部异常监控
参考资料
OutboxPattern http://www.kamilgrzybek.com/design/the-outbox-pattern/
作者:京东物流 张涛
来源:京东云开发者社区 自猿其说Tech 转载请注明来源

低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
- 上一篇
从好玩到好用:程序员用AI提效的那些事儿 | 京东云技术团队
本片内容是【AI思维空间】ChatGPT纵横编程世界,点亮智慧火花的续作,主要记录组内开发小伙伴儿们在开发过程中的实际应用案例,记录典型案例,尽量不要和其他人重复,以解决开发过程中的实际问题为主,设计、方案、编码、测试、集成、部署等等; 目的:贡献最佳实践,分享心得,共同成长! Prompt提问万能公式 案例1 基于ChatGPT进行资源排期 1、首先进行拆分,分为周一到周五和周六周日 2、引导chatGPT进行排班 案例2 让chatGpt帮忙看下正则表达式的含义 在看代码的过程中,发现有个地方使用了正则表达式进行规则匹配,但是没有注释标明规则含义,所以使用chatGpt帮忙看下规则含义。 chatGpt不仅返回的了匹配的规则含义,还详细介绍了每个字符具体含义,very nice。 案例3 基于chatGpt写分段写入csv文件的程序 给chatgpt提要求,要求10000行数据放在一个csv文件中,并将文件名称进行编号处理。 变更条件,事先不知道总行数有多少条,让chatGpt重新写一段程序实现。 案例4:基于ChatGPT辅助开发 告诉ChatGPT,我想开发一个Ja...
- 下一篇
慢SQL的致胜法宝 | 京东物流技术团队
大促备战,最大的隐患项之一就是慢SQL,对于服务平稳运行带来的破坏性最大,也是日常工作中经常带来整个应用抖动的最大隐患,在日常开发中如何避免出现慢SQL,出现了慢SQL应该按照什么思路去解决是我们必须要知道的。本文主要介绍对于慢SQL的排查、解决思路,通过一个个实际的例子深入分析总结,以便更快更准确的定位并解决问题。 解决步骤 step1、观察SQL 出于一些历史原因有的SQL查询可能非常复杂,需要同时关联非常多的表,使用一些复杂的函数、子查询,这样的SQL在项目初期由于数据量比较少,不会对数据库造成较大的压力,但是随着时间的积累以及业务的发展,这些SQL慢慢就会转变为慢SQL,对数据库的性能产生一定的影响。 对于这样的SQL,建议先了解业务场景,梳理关联关系,尝试将SQL拆解为几个简单的小SQL,在内存中关联组合。 step2、分析问题 大家在分析慢SQL时最常用的工具肯定是explain语句,如下是explain语句的执行输出。 一般情况下我们最需要关注的指标有type、possible_keys、key、rows、extra几项。 type为连接类型,有如下几种取值,性能从好到坏...
相关文章
文章评论
共有0条评论来说两句吧...
文章二维码
点击排行
推荐阅读
最新文章
- Hadoop3单机部署,实现最简伪集群
- SpringBoot2编写第一个Controller,响应你的http请求并返回结果
- SpringBoot2初体验,简单认识spring boot2并且搭建基础工程
- Windows10,CentOS7,CentOS8安装Nodejs环境
- 设置Eclipse缩进为4个空格,增强代码规范
- CentOS8编译安装MySQL8.0.19
- CentOS关闭SELinux安全模块
- Windows10,CentOS7,CentOS8安装MongoDB4.0.16
- Linux系统CentOS6、CentOS7手动修改IP地址
- CentOS8,CentOS7,CentOS6编译安装Redis5.0.7