「七剑下天山」
一:@EnableTransactionManagement
三:@TransactionEventListener
六:TransactionSynchronizationManager
七:TransactionAwareDataSourceProxy
@EnableTransactionManagement
1.1 作用
此注解是 Spring 支持注解事务配置的标志。表明 Spring 开启注解事务配置的支持。是注解驱动开发事务配置的必备注解。
1.2 源码
@Target(ElementType.TYPE)@Retention(RetentionPolicy.RUNTIME)@Documented@Import(TransactionManagementConfigurationSelector.class)public @interface EnableTransactionManagement { /** * 指定基于目标类代理还是基于接口代理。 * 默认采用JDK官方的基于接口代理。 * @return */ boolean proxyTargetClass() default false;
/** * 指定事务通知是如何执行的。默认是通过代理方式执行的。 * 如果是同一个类中调用的话,请采用AdviceMode.ASPECTJ * @return */ AdviceMode mode() default AdviceMode.PROXY;
/** * 指示在特定连接点应用多个通知时事务处理的执行顺序。 * 默认值是:最低优先级(Integer.MAX_VALUE) * @return */ int order() default Ordered.LOWEST_PRECEDENCE;}
1.3 源码分析
2.1 作用
此注解是 Spring 注解配置事务的核心注解,无论是注解驱动开发还是注解和 XML 混合开发,只有涉及配置事务采用注解的方式,都需要使用此注解。
通过源码我们看到,该注解可以出现在接口上,类上和方法上。分别表明:
优先级:方法上 > 类上 > 接口上。
2.2 源码
@Target({ElementType.METHOD, ElementType.TYPE})@Retention(RetentionPolicy.RUNTIME)@Inherited@Documentedpublic @interface Transactional { /** * 指定事务管理器的唯一标识 */ @AliasFor("transactionManager") String value() default "";
/** * 指定事务管理器的唯一标识 */ @AliasFor("value") String transactionManager() default "";
/** * 指定事务的传播行为 */ Propagation propagation() default Propagation.REQUIRED;
/** * 指定事务的隔离级别 */ Isolation isolation() default Isolation.DEFAULT;
/** * 指定事务的超时时间 */ int timeout() default TransactionDefinition.TIMEOUT_DEFAULT;
/** * 指定事务是否只读 */ boolean readOnly() default false;
/** * 通过指定异常类的字节码,限定事务在特定情况下回滚 */ Class<? extends Throwable>[] rollbackFor() default {};
/** * 通过指定异常类的全限定类名,限定事务在特定情况下回滚 */ String[] rollbackForClassName() default {};
/** * 通过指定异常类的字节码,限定事务在特定情况下不回滚 */ Class<? extends Throwable>[] noRollbackFor() default {};
/** * 通过指定异常类的全限定类名,限定事务在特定情况下不回滚 */ String[] noRollbackForClassName() default {};}
2.3 源码分析
2.3.1 在 @EnableTransactionManagement 注解中,有一个导入器:
TransactionManagementConfigurationSelector,导入器中在 AdiviceMode 为默认值 PROXY 时,往
容器中注入了两个bean对象,AutoProxyRegistrar 和 ProxyTransactionManagementConfiguration。
// 通过 @Import注解,Spring 定义了一个事务管理配置类的导入器。@Import(TransactionManagementConfigurationSelector.class) public @interface EnableTransactionManagement { //其余代码略 }
2.3.2 ProxyTransactionManagementConfiguration 类中的一个方法:
@Bean @Role(BeanDefinition.ROLE_INFRASTRUCTURE) public TransactionAttributeSource transactionAttributeSource() { //创建了一个注解事务的属性解析对象 return new AnnotationTransactionAttributeSource(); }
2.3.3 AnnotationTransactionAttributeSource 类的实例化
public class AnnotationTransactionAttributeSource extends AbstractFallbackTransactionAttributeSource implements Serializable { /** * 默认构造函数 */ public AnnotationTransactionAttributeSource() { this(true); }
/** * 带参构造(表明是否为public方法) */ public AnnotationTransactionAttributeSource(boolean publicMethodsOnly) { this.publicMethodsOnly = publicMethodsOnly; //判断是不是jta或者是ejp if (jta12Present || ejb3Present) { this.annotationParsers = new LinkedHashSet<>(4); this.annotationParsers.add(new SpringTransactionAnnotationParser()); //jta if (jta12Present) { this.annotationParsers.add(new JtaTransactionAnnotationParser()); } //ejp if (ejb3Present) { this.annotationParsers.add(new Ejb3TransactionAnnotationParser()); } } else { //当都不是的时候,构建一个SpringTransactionAnnotationParser(事务注解解析器) this.annotationParsers = Collections.singleton(new SpringTransactionAnnotationParser()); } }}
2.3.4 SpringTransactionAnnotationParser 的注解解析:
public class SpringTransactionAnnotationParser implements TransactionAnnotationParser, Serializable { @Override @Nullable public TransactionAttribute parseTransactionAnnotation(AnnotatedElement element) { AnnotationAttributes attributes = AnnotatedElementUtils.findMergedAnnotationAttributes(element, Transactional.class, false, false); if (attributes != null) { return parseTransactionAnnotation(attributes); } else { return null; } }
public TransactionAttribute parseTransactionAnnotation(Transactional ann) { return parseTransactionAnnotation(AnnotationUtils.getAnnotationAttributes(ann, false, false)); }
protected TransactionAttribute parseTransactionAnnotation(AnnotationAttributes attributes) { RuleBasedTransactionAttribute rbta = new RuleBasedTransactionAttribute(); Propagation propagation = attributes.getEnum("propagation"); rbta.setPropagationBehavior(propagation.value()); Isolation isolation = attributes.getEnum("isolation"); rbta.setIsolationLevel(isolation.value()); rbta.setTimeout(attributes.getNumber("timeout").intValue()); rbta.setReadOnly(attributes.getBoolean("readOnly")); rbta.setQualifier(attributes.getString("value")); List<RollbackRuleAttribute> rollbackRules = new ArrayList<>(); for (Class<?> rbRule : attributes.getClassArray("rollbackFor")) { rollbackRules.add(new RollbackRuleAttribute(rbRule)); } for (String rbRule : attributes.getStringArray("rollbackForClassName")) { rollbackRules.add(new RollbackRuleAttribute(rbRule)); } for (Class<?> rbRule : attributes.getClassArray("noRollbackFor")) { rollbackRules.add(new NoRollbackRuleAttribute(rbRule)); } for (String rbRule : attributes.getStringArray("noRollbackForClassName")) { rollbackRules.add(new NoRollbackRuleAttribute(rbRule)); } rbta.setRollbackRules(rollbackRules); return rbta; } }
@TransactionEventListener
3.1 作用
它是 spring 在 4.2 版本之后加入的注解。用于配置一个事务的事件监听器。使我们在事务提交和回滚前后可以做一些额外的功能。
例如:对事务执行监控,执行中同步做一些操作等等。
3.2 使用示例
自定义事件类:
public class MyApplicationEvent extends ApplicationEvent { public MyApplicationEvent(Object source) { super(source); }}
自定义监听类:
@Component@Slf4jpublic class MyTransactionEventListener { @TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT) public void doSomething(MyApplicationEvent event) { Map map = (Map) event.getSource(); log.info("事务提交后执行===>转出账户:" + map.get("sourceName") + ",转入账户:" + map.get("targetName") + ",转账金额:" + map.get("money")); }
@TransactionalEventListener(phase = TransactionPhase.AFTER_ROLLBACK) public void otherSomething(MyApplicationEvent event) { Map map = (Map) event.getSource(); log.info("事务回滚后执行===>转出账户:" + map.get("sourceName") + ",转入账户:" + map.get("targetName") + ",转账金额:" + map.get("money")); }}
业务层:
@Servicepublic class AccountServiceImpl implements AccountService { @Autowired private AccountDao accountDao; @Autowired private ApplicationEventPublisher applicationEventPublisher;
@Override @Transactional(rollbackFor = Exception.class) public void transfer(String sourceName, String targetName, Double money) { try { //1.根据名称查询转出账户 Account source = accountDao.findByName(sourceName); //2.根据名称查询转入账户 Account target = accountDao.findByName(targetName); //3.转出账户减钱 source.setMoney(source.getMoney() - money); //4.转入账户加钱 target.setMoney(target.getMoney() + money); accountDao.update(source); // 模拟转账异常 int i = 1 / 0; //6.更新转入账户 accountDao.update(target); } finally { Map<String, Object> map = new HashMap<>(); map.put("sourceName", sourceName); map.put("targetName", targetName); map.put("money", money); // 发布自定义事件(此处可以发布多个事件,如邮件通知、短信通知等,利用自定义监听器实现代码解耦) applicationEventPublisher.publishEvent(new MyApplicationEvent(map)); } }}
3.3 源码
@Target({ElementType.METHOD, ElementType.ANNOTATION_TYPE})@Retention(RetentionPolicy.RUNTIME)@Documented@EventListenerpublic @interface TransactionalEventListener { /** * 指定事务监听器的执行是在何时。 * 取值有: * 事务提交之前 * 事务提交之后(默认值) * 事务回滚之后 * 事务执行完成之后 */ TransactionPhase phase() default TransactionPhase.AFTER_COMMIT;
/** * 若没有事务的时候,对应的event是否已经执行 * 默认值为false表示没事务就不执行了 */ boolean fallbackExecution() default false;
/** * 指定事件类的字节码 */ @AliasFor(annotation = EventListener.class, attribute = "classes") Class<?>[] value() default {};
/** * 它和value属性的作用是一样的 */ @AliasFor(annotation = EventListener.class, attribute = "classes") Class<?>[] classes() default {};
/** * 用于指定执行事件处理器的条件。取值是基于Spring的el表达式编写的。 */ String condition() default "";}
3.4 源码分析
3.4.1 在 IoC 容器加载时,执行 AbstractApplicationContext 的 refresh() 方法,一共十二个步骤,在执行到
// Instantiate all remaining (non-lazy-init) singletons. finishBeanFactoryInitialization(beanFactory);//触发其他单例bean的加载
3.4.2 AbstractApplicationContext 的 finishBeanFactoryInitialization 方法执行时,初始化剩余的单例 bean 对象。
/** * 初始化剩余单例bean对象的方法 */protected void finishBeanFactoryInitialization(ConfigurableListableBeanFactory beanFactory) { // 方法中的其他代码略 // 初始化剩余单例bean对象.调用的是DefaultListableBeanFactory类中的preInstantiateSingletons方法。 beanFactory.preInstantiateSingletons();}
3.4.3 DefaultListableBeanFactory 类中的 preInstantiateSingletons 方法中执行了 afterSingletonsInstantiated() 方法。此方法是SmartInitializingSingleton 接口中声明的。具体实现类包含:EventListenerMethodProcessor 事件监听器方法处理器。
3.4.4 EventListenerMethodProcessor 中的 afterSingletonsInstantiated 会被执行,该方法中包含处理 bean 的方法:processBean。
3.4.5 在 processBean 方法中调用了创建事件监听器的方法 createApplicationListener。该方法是 EventListenerFactory 接口中声明的方法。
3.4.6 TransactionalEventListenerFactory 类实现了 EventListenerFactory 接口,并重写了 createApplicationListener 方法。
/** * 重写接口中的创建监听器方法 */@Overridepublic ApplicationListener<?> createApplicationListener(String beanName, Class<?> type, Method method) { return new ApplicationListenerMethodTransactionalAdapter(beanName, type, method);}
3.4.7 ApplicationListenerMethodTransactionalAdapter 的实例化。至此,解析 TransactionalEventListener 注解的过程完成。
/** * 构造函数 */public ApplicationListenerMethodTransactionalAdapter(String beanName, Class<?> targetClass, Method method) { super(beanName, targetClass, method); TransactionalEventListener ann = AnnotatedElementUtils.findMergedAnnotation(method, TransactionalEventListener.class); if (ann == null) { throw new IllegalStateException("No TransactionalEventListener annotation found on method: "+method); } this.annotation = ann;}
此类是用于编程式事务的模板对象。源码分析如下:
public class TransactionTemplate extends DefaultTransactionDefinition implements TransactionOperations, InitializingBean { /** * 定义日志组件. */ protected final Log logger = LogFactory.getLog(getClass()); /** * 定义事务管理器对象 */ @Nullable private PlatformTransactionManager transactionManager;
/*** 默认构造函数 */ public TransactionTemplate() { }
/*** 通过事务管理器构建事务模板对象 */ public TransactionTemplate(PlatformTransactionManager transactionManager) { this.transactionManager = transactionManager; }
/*** 通过事务管理器和事务定义信息构建模板对象 */ public TransactionTemplate(PlatformTransactionManager transactionManager, TransactionDefinition transactionDefinition) { super(transactionDefinition); this.transactionManager = transactionManager; }
/*** 当使用默认构造函数构建事务模板对象时,可以通过此方法注入事务管理器 */ public void setTransactionManager(@Nullable PlatformTransactionManager transactionManager) { this.transactionManager = transactionManager; }
/*** 获取使用的事务管理器对象 */ @Nullable public PlatformTransactionManager getTransactionManager() { return this.transactionManager; }
/*** 判断事务管理器是否为空 */ @Override public void afterPropertiesSet() { if (this.transactionManager == null) { throw new IllegalArgumentException("Property 'transactionManager' is required"); } }
/*** 编程事务控制的核心方法,重写的是TransactionOperations接口中的方法 */ @Override @Nullable public <T> T execute(TransactionCallback<T> action) throws TransactionException { Assert.state(this.transactionManager != null, "No PlatformTransactionManager set"); // 判断当前事务管理器是否为CallbackPreferringPlatformTransactionManager类型, 如果是的话,直接使用该接口实现类中的execute方法执行。而无需继续让 PlatformTransactionManager的实现类控制事务,当前坐标环境下它只有一个实现类:WebSphereUowTransactionManager。 if (this.transactionManager instanceof CallbackPreferringPlatformTransactionManager) { return ((CallbackPreferringPlatformTransactionManager) this.transactionManager).execute(this, action); } else { // 需要借助PlatformTransactionManager的实现类控制事务。 TransactionStatus status = this.transactionManager.getTransaction(this); T result; try { // 执行TransactionCallback中的doInTransaction方法,此处又是策略模式。 // spring只提供了一个接口(还有一个抽象实现类),而具体需要事务支持的业务代 码由使用者提供。 result = action.doInTransaction(status); } catch (RuntimeException | Error ex) { // 当doInTransaction执行有异常时事务回滚 rollbackOnException(status, ex); throw ex; } catch (Throwable ex) { // 当doInTransaction执行有无法预知的异常时,事务回滚。 rollbackOnException(status, ex); throw new UndeclaredThrowableException(ex, "TransactionCallback threw undeclared checked exception"); } // 没有异常的话,事务提交 this.transactionManager.commit(status); // 返回执行结果(有可能是结果集,也有可能是影响数据库记录的行数) return result; } }
/** 出现异常事务回滚 */ private void rollbackOnException(TransactionStatus status, Throwable ex) throws TransactionException { Assert.state(this.transactionManager != null, "No PlatformTransactionManager set"); logger.debug("Initiating transaction rollback on application exception", ex); try { // 执行事务管理器中提供的回滚方法。 this.transactionManager.rollback(status); } catch (TransactionSystemException ex2) { logger.error("Application exception overridden by rollback exception", ex); ex2.initApplicationException(ex); throw ex2; } catch (RuntimeException | Error ex2) { logger.error("Application exception overridden by rollback exception", ex); throw ex2; } }
/*** 重写equals方法 */ @Override public boolean equals(Object other) { return (this == other || (super.equals(other) && (!(other instanceof TransactionTemplate) || getTransactionManager() == ((TransactionTemplate) other).getTransactionManager()))); }}
Spring 中数据源的工具类。里面定义着获取连接的方法。
public abstract class DataSourceUtils { public static Connection getConnection(DataSource dataSource) throws CannotGetJdbcConnectionException { try { return doGetConnection(dataSource); } catch (SQLException ex) { throw new CannotGetJdbcConnectionException("Failed to obtain JDBC Connection: ", ex); } catch (IllegalStateException ex) { throw new CannotGetJdbcConnectionException("Failed to obtain JDBC Connection: " + ex.getMessage()); } }
public static Connection doGetConnection(DataSource dataSource) throws SQLException { Assert.notNull(dataSource, "No DataSource specified"); ConnectionHolder conHolder = (ConnectionHolder) TransactionSynchronizationManager.getResource(dataSource); if (conHolder != null && (conHolder.hasConnection() || conHolder.isSynchronizedWithTransaction())) { conHolder.requested(); if (!conHolder.hasConnection()) { logger.debug("Fetching resumed JDBC Connection from DataSource"); conHolder.setConnection(fetchConnection(dataSource)); } return conHolder.getConnection(); } logger.debug("Fetching JDBC Connection from DataSource"); Connection con = fetchConnection(dataSource); if (TransactionSynchronizationManager.isSynchronizationActive()) { try { ConnectionHolder holderToUse = conHolder; if (holderToUse == null) { holderToUse = new ConnectionHolder(con); } else { holderToUse.setConnection(con); } holderToUse.requested(); TransactionSynchronizationManager.registerSynchronization(new ConnectionSynchronization(holderToUse, dataSource)); holderToUse.setSynchronizedWithTransaction(true);
if (holderToUse != conHolder) { TransactionSynchronizationManager.bindResource(dataSource, holderToUse); } } } catch(RuntimeException ex) { releaseConnection(con, dataSource); throw ex; } return con; }
private static Connection fetchConnection(DataSource dataSource) throws SQLException { Connection con = dataSource.getConnection(); if (con == null) { throw new IllegalStateException("DataSource returned null from getConnection(): " + dataSource); } return con; } }
TransactionSynchronizationManager
事务的同步管理器类,实现连接和线程绑定从而控制事务的核心类。
它是个抽象类,但是没有任何子类 因为它所有的方法都是静态的。
public abstract class TransactionSynchronizationManager { private static final ThreadLocal<Set<TransactionSynchronization>> synchronizations = new NamedThreadLocal<>("Transaction synchronizations"); private static final ThreadLocal<String> currentTransactionName = new NamedThreadLocal<>("Current transaction name"); private static final ThreadLocal<Boolean> currentTransactionReadOnly = new NamedThreadLocal<>("Current transaction read-only status"); private static final ThreadLocal<Integer> currentTransactionIsolationLevel = new NamedThreadLocal<>("Current transaction isolation level"); private static final ThreadLocal<Boolean> actualTransactionActive = new NamedThreadLocal<>("Actual transaction active");
public static boolean isSynchronizationActive() { return (synchronizations.get() != null); }
public static void initSynchronization() throws IllegalStateException { if (isSynchronizationActive()) { throw new IllegalStateException("Cannot activate transaction synchronization - already active"); } logger.trace("Initializing transaction synchronization"); synchronizations.set(new LinkedHashSet<>()); }
public static void clear() { synchronizations.remove(); currentTransactionName.remove(); currentTransactionReadOnly.remove(); currentTransactionIsolationLevel.remove(); actualTransactionActive.remove(); } }
TransactionAwareDataSourceProxy
这是 Spring 提供的一个数据源代理类,它继承了 DelegatingDataSource 类。
因为数据连接泄露是个很头疼的问题,Spring 框架也提供了很多种办法来避免这个问题。
比如使用 XXXTemplate,当然其背后是 DataSourceUtils。
同时还有另外一种办法,使用 TransactionAwareDataSourceProxy。
通过 TransactionAwareDataSourceProxy 对数据源代理后,数据源对象就有了事务上下文感知的能力了。
当然看源码会发现,其实它还是使用的 DataSourceUtils。
public class TransactionAwareDataSourceProxy extends DelegatingDataSource { /*** 暴露出来的获取连接的方法 */ @Override public Connection getConnection() throws SQLException { return getTransactionAwareConnectionProxy(obtainTargetDataSource()); }
/*** 使用JDK的动态代理创建连接的代理对象 */ protected Connection getTransactionAwareConnectionProxy(DataSource targetDataSource) { return (Connection) Proxy.newProxyInstance(ConnectionProxy.class.getClassLoader(), new Class<?>[]{ConnectionProxy.class}, new TransactionAwareInvocationHandler(targetDataSource)); }
/*** InvocationHandler的具体实现(增强的部分) */ private class TransactionAwareInvocationHandler implements InvocationHandler { private final DataSource targetDataSource; @Nullable private Connection target; private boolean closed = false;
public TransactionAwareInvocationHandler(DataSource targetDataSource) { this.targetDataSource = targetDataSource; }
@Override @Nullable public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { // Invocation on ConnectionProxy interface coming in... if (method.getName().equals("equals")) { // Only considered as equal when proxies are identical. return proxy == args[0]; } else if (method.getName().equals("hashCode")) { // Use hashCode of Connection proxy. return System.identityHashCode(proxy); } else if (method.getName().equals("toString")) { // Allow for differentiating between the proxy and the raw Connection. StringBuilder sb = new StringBuilder("Transaction-aware proxy for target Connection "); if (this.target != null) { sb.append("[").append(this.target.toString()).append("]"); } else { sb.append(" from DataSource [").append(this.targetDataSource).append("]"); } return sb.toString(); } else if (method.getName().equals("unwrap")) { if (((Class<?>) args[0]).isInstance(proxy)) { return proxy; } } else if (method.getName().equals("isWrapperFor")) { if (((Class<?>) args[0]).isInstance(proxy)) { return true; } } else if (method.getName().equals("close")) { // 释放资源仍然使用的是DataSourceUtils中的方法 DataSourceUtils.doReleaseConnection(this.target, this.targetDataSource); this.closed = true; return null; } else if (method.getName().equals("isClosed")) { return this.closed; } if (this.target == null) { if (this.closed) { throw new SQLException("Connection handle already closed"); } if (shouldObtainFixedConnection(this.targetDataSource)) { this.target = DataSourceUtils.doGetConnection(this.targetDataSource); } } Connection actualTarget = this.target; if (actualTarget == null) { //获取连接使用的也是DataSourceUtils中的方法 actualTarget = DataSourceUtils.doGetConnection(this.targetDataSource); } if (method.getName().equals("getTargetConnection")) { return actualTarget; }// Invoke method on target Connection. try { Object retVal = method.invoke(actualTarget, args); // If return value is a Statement, apply transaction timeout. // Applies to createStatement, prepareStatement, prepareCall. if (retVal instanceof Statement) { DataSourceUtils.applyTransactionTimeout((Statement) retVal, this.targetDataSource); } return retVal; } catch (InvocationTargetException ex) { throw ex.getTargetException(); } finally { if (actualTarget != this.target) { DataSourceUtils.doReleaseConnection(actualTarget, this.targetDataSource); } } } } //其余代码略 }
# 精彩推荐 #
分布式系统「全链路日志追踪」实战之 RestTemplate & Feign
分布式任务调度框架 Elastic-Job 之动态任务发布实现详解
小白都能看得懂的服务调用链路追踪设计与实现
事务看完这篇你只能算入门
[三步法] 可视化分析定位线上 JVM 问题
从 Java 代码如何运行聊到 JVM 和对象的创建-分配-定位-布局-垃圾回收
记一次生产频繁出现 Full GC 的 GC日志图文详解
从源码到实战之 Spring 中的 JdbcTemplate 及策略模式自定义 JdbcTemplate 实现
![]()
"在看"吗,赶快分享和收藏吧
![]()