您现在的位置是:首页 > 文章详情

Spring Boot 多数据源,整合 Atomikos 实现分布式事务

日期:2019-10-19点击:549

前言

由于最近的项目需要整合两个数据库,有些业务逻辑也涉及到两个数据库同时插入、更新的操作;所以就涉及到跨数据库的数据一致性问题。于是基于 Spring Boot 整合了 Atomikos 的一个项目 demo。
项目源码地址:https://github.com/WongMinHo/spring-boot-api-starter

介绍

  • 分布式事务:

分布式事务,可以理解为:由于分布式而引起的事务不一致的问题。随着项目做大,模块拆分,数据库拆分。一次包含增删改操作数据库涉及到了更新两个不同物理节点的数据库,这样的数据库事务只能保证自己处理的部分的事务,但是整个的事务就不能保证一致性。

  • JTA:

JTA(java Transaction API)是 JavaEE 13 个开发规范之一,java 事务API,允许应用程序执行分布式事务处理——在两个或多个网络计算机资源上访问并且更新数据。JDBC 驱动程序的 JTA 支持极大地增强了数据访问能力。事务就是保证数据的有效性,数据的一致性。

  • Atomikos:

Atomikos 是一个为 Java 平台提供增值服务的并且开源类事务管理器,主要用于处理跨数据库事务;在 Spring Boot 的文档也推荐更多人使用 Atomikos

实现案例

场景:两个数据库,分别是minhow_firstminhow_second;包含 mh_user 用户表、 mh_customer 客户表。

项目结构:
directory-structure

pom.xml 依赖:
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.1.9.RELEASE</version> <relativePath/> <!-- lookup parent from repository --> </parent> <groupId>com.minhow</groupId> <artifactId>spring-boot-api-starter</artifactId> <version>1.0</version> <name>spring-boot-api-starter</name> <description>Spring Boot Seed Project</description> <properties> <java.version>1.8</java.version> <mybatis-plus.version>3.2.0</mybatis-plus.version> <mybatis-plus-generator.version>3.2.0</mybatis-plus-generator.version> <guava.version>27.1-jre</guava.version> <common-lang3.version>3.9</common-lang3.version> <fastjson.version>1.2.60</fastjson.version> <druid.version>1.1.20</druid.version> <jjwt.version>0.9.1</jjwt.version> <velocity-engine.version>2.0</velocity-engine.version> <mysql-connector.version>8.0.11</mysql-connector.version> <lombok.version>1.18.10</lombok.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <!-- jta-atomikos 分布式事务管理 --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-jta-atomikos</artifactId> </dependency> <!-- Lombok --> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <version>${lombok.version}</version> <optional>true</optional> </dependency> <!-- mysql connector--> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>${mysql-connector.version}</version> </dependency> <!-- mybatis-plus--> <dependency> <groupId>com.baomidou</groupId> <artifactId>mybatis-plus-boot-starter</artifactId> <version>${mybatis-plus.version}</version> </dependency> <dependency> <groupId>com.baomidou</groupId> <artifactId>mybatis-plus-generator</artifactId> <version>${mybatis-plus-generator.version}</version> </dependency> <!-- 模板引擎 --> <dependency> <groupId>org.apache.velocity</groupId> <artifactId>velocity-engine-core</artifactId> <version>${velocity-engine.version}</version> </dependency> <!-- Redis --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId> </dependency> <!-- Alibaba --> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>${fastjson.version}</version> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>druid-spring-boot-starter</artifactId> <version>${druid.version}</version> </dependency> <!-- jjwt --> <dependency> <groupId>io.jsonwebtoken</groupId> <artifactId>jjwt</artifactId> <version>${jjwt.version}</version> </dependency> <dependency> <groupId>com.google.guava</groupId> <artifactId>guava</artifactId> <version>${guava.version}</version> </dependency> <dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-lang3</artifactId> <version>${commons-lang3.version}</version> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build> </project>
application.yml 数据源配置:
# 本地环境配置文件 spring: datasource: druid: first: #数据源1 driver-class-name: com.mysql.cj.jdbc.Driver type: com.alibaba.druid.pool.xa.DruidXADataSource url: jdbc:mysql://localhost:3306/minhow_first?useUnicode=true&useSSL=false&serverTimezone=Asia/Shanghai&autoReconnect=true&characterEncoding=utf8 username: root password: root #初始化时建立物理连接的个数 initial-size: 5 #池中最大连接数 max-active: 20 #最小空闲连接 min-idle: 1 #获取连接时最大等待时间,单位毫秒 max-wait: 60000 #有两个含义: #1) Destroy线程会检测连接的间隔时间,如果连接空闲时间大于等于minEvictableIdleTimeMillis则关闭物理连接。 #2) testWhileIdle的判断依据,详细看testWhileIdle属性的说明 time-between-eviction-runs-millis: 60000 #连接保持空闲而不被驱逐的最小时间,单位是毫秒 min-evictable-idle-time-millis: 300000 #使用该SQL语句检查链接是否可用。如果validationQuery=null,testOnBorrow、testOnReturn、testWhileIdle都不会起作用。 validationQuery: SELECT 1 FROM DUAL #建议配置为true,不影响性能,并且保证安全性。申请连接的时候检测,如果空闲时间大于timeBetweenEvictionRunsMillis,执行validationQuery检测连接是否有效。 test-while-idle: true #申请连接时执行validationQuery检测连接是否有效,做了这个配置会降低性能。 test-on-borrow: false #归还连接时执行validationQuery检测连接是否有效,做了这个配置会降低性能。 test-on-return: false # 配置监控统计拦截的filters,去掉后监控界面sql无法统计,'wall'用于防火墙 filters: stat,wall,slf4j # 通过connectProperties属性来打开mergeSql功能;慢SQL记录 connect-properties: druid.stat.mergeSql=true;druid.stat.slowSqlMillis=5000 second: #数据源2 driver-class-name: com.mysql.cj.jdbc.Driver type: com.alibaba.druid.pool.xa.DruidXADataSource url: jdbc:mysql://localhost:3306/minhow_second?useUnicode=true&useSSL=false&serverTimezone=Asia/Shanghai&autoReconnect=true&characterEncoding=utf8 username: root password: root #初始化时建立物理连接的个数 initial-size: 5 #池中最大连接数 max-active: 20 #最小空闲连接 min-idle: 1 #获取连接时最大等待时间,单位毫秒 max-wait: 60000 #有两个含义: #1) Destroy线程会检测连接的间隔时间,如果连接空闲时间大于等于minEvictableIdleTimeMillis则关闭物理连接。 #2) testWhileIdle的判断依据,详细看testWhileIdle属性的说明 time-between-eviction-runs-millis: 60000 #连接保持空闲而不被驱逐的最小时间,单位是毫秒 min-evictable-idle-time-millis: 300000 #使用该SQL语句检查链接是否可用。如果validationQuery=null,testOnBorrow、testOnReturn、testWhileIdle都不会起作用。 validationQuery: SELECT 1 FROM DUAL #建议配置为true,不影响性能,并且保证安全性。申请连接的时候检测,如果空闲时间大于timeBetweenEvictionRunsMillis,执行validationQuery检测连接是否有效。 test-while-idle: true #申请连接时执行validationQuery检测连接是否有效,做了这个配置会降低性能。 test-on-borrow: false #归还连接时执行validationQuery检测连接是否有效,做了这个配置会降低性能。 test-on-return: false # 配置监控统计拦截的filters,去掉后监控界面sql无法统计,'wall'用于防火墙 filters: stat,wall,slf4j # 通过connectProperties属性来打开mergeSql功能;慢SQL记录 connect-properties: druid.stat.mergeSql=true;druid.stat.slowSqlMillis=5000
创建两个数据库和数据表sql:
#创建第一个数据库和数据表 CREATE DATABASE minhow_first; -- ---------------------------- -- Table structure for mh_user -- ---------------------------- USE minhow_first; DROP TABLE IF EXISTS `mh_user`; CREATE TABLE `mh_user` ( `id` int(11) NOT NULL AUTO_INCREMENT, `name` varchar(191) COLLATE utf8_unicode_ci DEFAULT NULL COMMENT '姓名', `password` varchar(191) COLLATE utf8_unicode_ci DEFAULT NULL COMMENT '密码', `customer_num` int(11) DEFAULT '0' COMMENT '客户数', PRIMARY KEY (`id`) ) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8 COLLATE=utf8_unicode_ci; -- ---------------------------- -- Records of mh_user -- ---------------------------- INSERT INTO `mh_user` VALUES (1, 'minhow', NULL, 0); #创建第二个数据库和数据表 CREATE DATABASE minhow_second; -- ---------------------------- -- Table structure for mh_customer -- ---------------------------- USE minhow_second; DROP TABLE IF EXISTS `mh_customer`; CREATE TABLE `mh_customer` ( `id` int(11) NOT NULL AUTO_INCREMENT, `user_id` int(11) DEFAULT NULL COMMENT '用户id', `name` varchar(191) COLLATE utf8_unicode_ci DEFAULT NULL COMMENT '姓名', `phone` varchar(11) COLLATE utf8_unicode_ci DEFAULT NULL COMMENT '手机号', PRIMARY KEY (`id`) ) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8 COLLATE=utf8_unicode_ci;
第一个数据源FirstDataSourceProperties配置:
package com.minhow.springbootapistarter.config.datasource; import lombok.Data; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.stereotype.Component; /** * @author MinHow * @date 2018/3/4 7:13 下午 */ @Data @Component @ConfigurationProperties(prefix = "spring.datasource.druid.first") public class FirstDataSourceProperties { private String url; private String username; private String password; private String driverClassName; private String type; private Integer initialSize; private Integer minIdle; private Integer maxActive; private Integer maxWait; private Integer timeBetweenEvictionRunsMillis; private Integer minEvictableIdleTimeMillis; private String validationQuery; private Boolean testWhileIdle; private String testOnBorrow; private String testOnReturn; private String poolPreparedStatements; private String filters; private String connectionProperties; private String initConnectionSqls; }
第二个数据源SecondDataSourceProperties配置:
package com.minhow.springbootapistarter.config.datasource; import lombok.Data; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.stereotype.Component; /** * @author MinHow * @date 2018/3/4 7:13 下午 */ @Data @Component @ConfigurationProperties(prefix = "spring.datasource.druid.second") public class SecondDataSourceProperties { private String url; private String username; private String password; private String driverClassName; private String type; private Integer initialSize; private Integer minIdle; private Integer maxActive; private Integer maxWait; private Integer timeBetweenEvictionRunsMillis; private Integer minEvictableIdleTimeMillis; private String validationQuery; private Boolean testWhileIdle; private String testOnBorrow; private String testOnReturn; private String poolPreparedStatements; private String filters; private String connectionProperties; private String initConnectionSqls; }
第一个数据源FirstDataSourceConfiguration配置:

注意:如果使用Druid的分布式驱动,暂不支持MySql8.0+

package com.minhow.springbootapistarter.config.datasource; import com.alibaba.druid.pool.xa.DruidXADataSource; import com.baomidou.mybatisplus.extension.spring.MybatisSqlSessionFactoryBean; import com.minhow.springbootapistarter.common.constant.DBConstants; import com.mysql.cj.jdbc.MysqlXADataSource; import lombok.extern.slf4j.Slf4j; import org.apache.ibatis.session.SqlSessionFactory; import org.mybatis.spring.annotation.MapperScan; import org.springframework.beans.BeanUtils; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.boot.jta.atomikos.AtomikosDataSourceBean; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Primary; import org.springframework.core.io.support.PathMatchingResourcePatternResolver; import javax.sql.DataSource; /** * @author MinHow * @date 2018/3/4 7:20 下午 */ @Configuration @MapperScan(basePackages = DBConstants.FIRST_MAPPER, sqlSessionFactoryRef = DBConstants.FIRST_SQL_SESSION_FACTORY) @Slf4j public class FirstDataSourceConfiguration { @Autowired private FirstDataSourceProperties firstDataSourceProperties; /** * 配置第一个数据源 * @return */ @Primary @Bean(DBConstants.FIRST_DATA_SOURCE) public DataSource firstDataSource() { // 使用Druid的分布式驱动,暂时发现不支持MySql8以上的版本 // DruidXADataSource druidXADataSource = new DruidXADataSource(); // BeanUtils.copyProperties(firstDataSourceProperties, druidXADataSource); //使用mysql的分布式驱动,支持MySql5.*、MySql8.* 以上版本 MysqlXADataSource mysqlXaDataSource = new MysqlXADataSource(); mysqlXaDataSource.setUrl(firstDataSourceProperties.getUrl()); mysqlXaDataSource.setPassword(firstDataSourceProperties.getPassword()); mysqlXaDataSource.setUser(firstDataSourceProperties.getUsername()); AtomikosDataSourceBean xaDataSource = new AtomikosDataSourceBean(); xaDataSource.setXaDataSource(mysqlXaDataSource); xaDataSource.setUniqueResourceName(DBConstants.FIRST_DATA_SOURCE); xaDataSource.setPoolSize(firstDataSourceProperties.getInitialSize()); xaDataSource.setMinPoolSize(firstDataSourceProperties.getMinIdle()); xaDataSource.setMaxPoolSize(firstDataSourceProperties.getMaxActive()); xaDataSource.setMaxIdleTime(firstDataSourceProperties.getMinIdle()); xaDataSource.setMaxLifetime(firstDataSourceProperties.getMinEvictableIdleTimeMillis()); xaDataSource.setConcurrentConnectionValidation(firstDataSourceProperties.getTestWhileIdle()); xaDataSource.setTestQuery(firstDataSourceProperties.getValidationQuery()); return xaDataSource; } /** * 创建第一个SqlSessionFactory * @param firstDataSource * @return * @throws Exception */ @Primary @Bean(DBConstants.FIRST_SQL_SESSION_FACTORY) public SqlSessionFactory firstSqlSessionFactory(@Qualifier(DBConstants.FIRST_DATA_SOURCE) DataSource firstDataSource) throws Exception { MybatisSqlSessionFactoryBean bean = new MybatisSqlSessionFactoryBean(); bean.setDataSource(firstDataSource); //设置mapper位置 bean.setTypeAliasesPackage(DBConstants.FIRST_MAPPER); //设置mapper.xml文件的路径 bean.setMapperLocations( new PathMatchingResourcePatternResolver().getResources(DBConstants.FIRST_MAPPER_XML)); return bean.getObject(); } }
第二个数据源SecondDataSourceConfiguration配置:

注意:如果使用Druid的分布式驱动,暂不支持MySql8.0+

package com.minhow.springbootapistarter.config.datasource; import com.alibaba.druid.pool.xa.DruidXADataSource; import com.baomidou.mybatisplus.extension.spring.MybatisSqlSessionFactoryBean; import com.minhow.springbootapistarter.common.constant.DBConstants; import com.mysql.cj.jdbc.MysqlXADataSource; import org.apache.ibatis.session.SqlSessionFactory; import org.mybatis.spring.annotation.MapperScan; import org.springframework.beans.BeanUtils; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.boot.jta.atomikos.AtomikosDataSourceBean; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.core.io.support.PathMatchingResourcePatternResolver; import javax.sql.DataSource; /** * @author MinHow * @date 2018/3/4 7:20 下午 */ @Configuration @MapperScan(basePackages = DBConstants.SECOND_MAPPER, sqlSessionFactoryRef = DBConstants.SECOND_SQL_SESSION_FACTORY) public class SecondDataSourceConfiguration { @Autowired private SecondDataSourceProperties secondDataSourceProperties; /** * 配置第二个数据源 * @return */ @Bean(DBConstants.SECOND_DATA_SOURCE) public DataSource secondDataSource() { // 使用Druid的分布式驱动,暂时发现不支持mysql8以上的版本 // DruidXADataSource druidXADataSource = new DruidXADataSource(); // BeanUtils.copyProperties(secondDataSourceProperties, druidXADataSource); //使用mysql的分布式驱动,支持mysql5.*、mysql8.* 以上版本 MysqlXADataSource mysqlXaDataSource = new MysqlXADataSource(); mysqlXaDataSource.setUrl(secondDataSourceProperties.getUrl()); mysqlXaDataSource.setPassword(secondDataSourceProperties.getPassword()); mysqlXaDataSource.setUser(secondDataSourceProperties.getUsername()); AtomikosDataSourceBean xaDataSource = new AtomikosDataSourceBean(); xaDataSource.setXaDataSource(mysqlXaDataSource); xaDataSource.setUniqueResourceName(DBConstants.SECOND_DATA_SOURCE); xaDataSource.setPoolSize(secondDataSourceProperties.getInitialSize()); xaDataSource.setMinPoolSize(secondDataSourceProperties.getMinIdle()); xaDataSource.setMaxPoolSize(secondDataSourceProperties.getMaxActive()); xaDataSource.setMaxIdleTime(secondDataSourceProperties.getMinIdle()); xaDataSource.setMaxLifetime(secondDataSourceProperties.getMinEvictableIdleTimeMillis()); xaDataSource.setConcurrentConnectionValidation(secondDataSourceProperties.getTestWhileIdle()); xaDataSource.setTestQuery(secondDataSourceProperties.getValidationQuery()); return xaDataSource; } /** * 创建第二个SqlSessionFactory * @param secondDataSource * @return * @throws Exception */ @Bean(DBConstants.SECOND_SQL_SESSION_FACTORY) public SqlSessionFactory secondSqlSessionFactory(@Qualifier(DBConstants.SECOND_DATA_SOURCE) DataSource secondDataSource) throws Exception { MybatisSqlSessionFactoryBean bean = new MybatisSqlSessionFactoryBean(); bean.setDataSource(secondDataSource); //设置mapper位置 bean.setTypeAliasesPackage(DBConstants.SECOND_MAPPER); //设置mapper.xml文件的路径 bean.setMapperLocations( new PathMatchingResourcePatternResolver().getResources(DBConstants.SECOND_MAPPER_XML)); return bean.getObject(); } }
Atomikos配置:
package com.minhow.springbootapistarter.config.datasource; import com.atomikos.icatch.jta.UserTransactionImp; import com.atomikos.icatch.jta.UserTransactionManager; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.DependsOn; import org.springframework.transaction.PlatformTransactionManager; import org.springframework.transaction.annotation.EnableTransactionManagement; import org.springframework.transaction.jta.JtaTransactionManager; import javax.transaction.TransactionManager; import javax.transaction.UserTransaction; /** * 事务管理 * @author jacker * @date 2019/8/13 3:41 PM */ @Configuration @EnableTransactionManagement public class TransactionManagerConfig { @Bean(name = "userTransaction") public UserTransaction userTransaction() throws Throwable { UserTransactionImp userTransactionImp = new UserTransactionImp(); userTransactionImp.setTransactionTimeout(10000); return userTransactionImp; } @Bean(name = "atomikosTransactionManager") public TransactionManager atomikosTransactionManager() throws Throwable { UserTransactionManager userTransactionManager = new UserTransactionManager(); userTransactionManager.setForceShutdown(false); return userTransactionManager; } @Bean(name = "transactionManager") @DependsOn({"userTransaction", "atomikosTransactionManager"}) public PlatformTransactionManager transactionManager() throws Throwable { return new JtaTransactionManager(userTransaction(), atomikosTransactionManager()); } }

通过 @EnableTransactionManagement 来启用事务管理,该注解会自动查找满足条件的PlatformTransactionManager;更详细的配置方法可以参见 Atomikos Spring Integration
还有 Dao 和 Mapper 的代码就不贴了,详情请看项目源码。
至此为止,配置就完成了,之后只需要在事务控制的地方加上 @Transactional 注解即可。

案例:

业务流程:在 mh_customer 客户表新增记录,mh_user 用户表客户数增加1,代码如下:

package com.minhow.springbootapistarter.service.second.impl; import com.minhow.springbootapistarter.common.enums.ResultEnum; import com.minhow.springbootapistarter.common.exception.BusinessException; import com.minhow.springbootapistarter.common.response.Result; import com.minhow.springbootapistarter.pojo.dto.StoreCustomerDTO; import com.minhow.springbootapistarter.pojo.entity.first.User; import com.minhow.springbootapistarter.pojo.entity.second.Customer; import com.minhow.springbootapistarter.dao.second.mapper.CustomerMapper; import com.minhow.springbootapistarter.service.first.UserService; import com.minhow.springbootapistarter.service.second.CustomerService; import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; /** * <p> * 服务实现类 * </p> * * @author MinHow * @since 2019-10-05 */ @Service public class CustomerServiceImpl extends ServiceImpl<CustomerMapper, Customer> implements CustomerService { @Autowired private UserService userService; /** * 新增客户 - 演示多数据源分布式事务 * @param storeCustomerDTO * @return */ @Override @Transactional(rollbackFor = BusinessException.class) public Result store(StoreCustomerDTO storeCustomerDTO) { User user = userService.lambdaQuery() .select(User::getId, User::getCustomerNum) .eq(User::getId, storeCustomerDTO.getUserId()) .one(); if (user == null) { return Result.fail(4001, "用户不存在"); } Customer customer = new Customer(); customer.setName(storeCustomerDTO.getCustomerName()) .setPhone(storeCustomerDTO.getCustomerPhone()) .setUserId(storeCustomerDTO.getUserId()); //添加客户 boolean customerStatus = this.save(customer); //更新用户客户数 boolean userStatus = userService.lambdaUpdate() .set(User::getCustomerNum, user.getCustomerNum() + 1) .eq(User::getId, storeCustomerDTO.getUserId()) .update(); //不符合条件,两个数据库表数据回滚 if (! customerStatus || ! userStatus) { throw new BusinessException(ResultEnum.BUSINESS_ERROR); } return Result.ok(); } }

通过修改不同条件,测试事务回滚和不回滚的结果,就能测试分布式事务是否得到支持。

原文链接:https://yq.aliyun.com/articles/721427
关注公众号

低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。

持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。

转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。

文章评论

共有0条评论来说两句吧...

文章二维码

扫描即可查看该文章

点击排行

推荐阅读

最新文章