spring 注解试事物源码解析
spring 注解试事物源码解析
- 基于xml注解式事务入口
public class TxNamespaceHandler extends NamespaceHandlerSupport { static final String TRANSACTION_MANAGER_ATTRIBUTE = "transaction-manager"; static final String DEFAULT_TRANSACTION_MANAGER_BEAN_NAME = "transactionManager"; static String getTransactionManagerName(Element element) { return (element.hasAttribute(TRANSACTION_MANAGER_ATTRIBUTE) ? element.getAttribute(TRANSACTION_MANAGER_ATTRIBUTE) : DEFAULT_TRANSACTION_MANAGER_BEAN_NAME); } @Override public void init() { registerBeanDefinitionParser("advice", new TxAdviceBeanDefinitionParser()); registerBeanDefinitionParser("annotation-driven", new AnnotationDrivenBeanDefinitionParser()); registerBeanDefinitionParser("jta-transaction-manager", new JtaTransactionManagerBeanDefinitionParser()); } }
基于注解的事务启动入口
@Configuration public class ProxyTransactionManagementConfiguration extends AbstractTransactionManagementConfiguration { // 事务增强器 @Bean(name = TransactionManagementConfigUtils.TRANSACTION_ADVISOR_BEAN_NAME) @Role(BeanDefinition.ROLE_INFRASTRUCTURE) public BeanFactoryTransactionAttributeSourceAdvisor transactionAdvisor() { BeanFactoryTransactionAttributeSourceAdvisor advisor = new BeanFactoryTransactionAttributeSourceAdvisor(); advisor.setTransactionAttributeSource(transactionAttributeSource()); advisor.setAdvice(transactionInterceptor()); advisor.setOrder(this.enableTx.<Integer>getNumber("order")); return advisor; } // 事务的属性资源 @Bean @Role(BeanDefinition.ROLE_INFRASTRUCTURE) public TransactionAttributeSource transactionAttributeSource() { return new AnnotationTransactionAttributeSource(); } // 事务的拦截器 @Bean @Role(BeanDefinition.ROLE_INFRASTRUCTURE) public TransactionInterceptor transactionInterceptor() { TransactionInterceptor interceptor = new TransactionInterceptor(); interceptor.setTransactionAttributeSource(transactionAttributeSource()); if (this.txManager != null) { interceptor.setTransactionManager(this.txManager); } return interceptor; } }
事务bean的增强器
public class BeanFactoryTransactionAttributeSourceAdvisor extends AbstractBeanFactoryPointcutAdvisor { private TransactionAttributeSource transactionAttributeSource; private final TransactionAttributeSourcePointcut pointcut = new TransactionAttributeSourcePointcut() { @Override protected TransactionAttributeSource getTransactionAttributeSource() { return transactionAttributeSource; } }; // 设置事务的属性资源 public void setTransactionAttributeSource(TransactionAttributeSource transactionAttributeSource) { this.transactionAttributeSource = transactionAttributeSource; } /** * Set the {@link ClassFilter} to use for this pointcut. * Default is {@link ClassFilter#TRUE}. */ public void setClassFilter(ClassFilter classFilter) { this.pointcut.setClassFilter(classFilter); } @Override public Pointcut getPointcut() { return this.pointcut; } }
事务拦截器
TransactionInterceptor.java
@Override public Object invoke(final MethodInvocation invocation) throws Throwable { // Work out the target class: may be {@code null}. // The TransactionAttributeSource should be passed the target class // as well as the method, which may be from an interface. Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null); // Adapt to TransactionAspectSupport's invokeWithinTransaction... return invokeWithinTransaction(invocation.getMethod(), targetClass, new InvocationCallback() { @Override public Object proceedWithInvocation() throws Throwable { return invocation.proceed(); } }); }
TransactionAspectSupport.java
protected Object invokeWithinTransaction(Method method, Class<?> targetClass, final InvocationCallback invocation) throws Throwable { // 获取事务对应的属性 final TransactionAttribute txAttr = getTransactionAttributeSource().getTransactionAttribute(method, targetClass); // 获取事务管理器 final PlatformTransactionManager tm = determineTransactionManager(txAttr); final String joinpointIdentification = methodIdentification(method, targetClass); if (txAttr == null || !(tm instanceof CallbackPreferringPlatformTransactionManager)) { // 创建TransactionInfo TransactionInfo txInfo = createTransactionIfNecessary(tm, txAttr, joinpointIdentification); Object retVal = null; try { // 执行被增强的方法 retVal = invocation.proceedWithInvocation(); } catch (Throwable ex) { // 异常回滚 completeTransactionAfterThrowing(txInfo, ex); throw ex; } finally { // 清除事务信息 cleanupTransactionInfo(txInfo); } // 提交事务 commitTransactionAfterReturning(txInfo); return retVal; } else { // 编程式事务处理 try { Object result = ((CallbackPreferringPlatformTransactionManager) tm).execute(txAttr, new TransactionCallback<Object>() { @Override public Object doInTransaction(TransactionStatus status) { TransactionInfo txInfo = prepareTransactionInfo(tm, txAttr, joinpointIdentification, status); try { return invocation.proceedWithInvocation(); } catch (Throwable ex) { if (txAttr.rollbackOn(ex)) { // A RuntimeException: will lead to a rollback. if (ex instanceof RuntimeException) { throw (RuntimeException) ex; } else { throw new ThrowableHolderException(ex); } } else { // A normal return value: will lead to a commit. return new ThrowableHolder(ex); } } finally { cleanupTransactionInfo(txInfo); } } }); // Check result: It might indicate a Throwable to rethrow. if (result instanceof ThrowableHolder) { throw ((ThrowableHolder) result).getThrowable(); } else { return result; } } catch (ThrowableHolderException ex) { throw ex.getCause(); } } }
获取对应的事物属性
protected TransactionInfo createTransactionIfNecessary( PlatformTransactionManager tm, TransactionAttribute txAttr, final String joinpointIdentification) { // 没有名称 就用 joinpointIdentification 当名称 if (txAttr != null && txAttr.getName() == null) { txAttr = new DelegatingTransactionAttribute(txAttr) { @Override public String getName() { return joinpointIdentification; } }; } TransactionStatus status = null; if (txAttr != null) { if (tm != null) { // 获取一个事务状态 status = tm.getTransaction(txAttr); } else { if (logger.isDebugEnabled()) { logger.debug("Skipping transactional joinpoint [" + joinpointIdentification + "] because no transaction manager has been configured"); } } } return prepareTransactionInfo(tm, txAttr, joinpointIdentification, status); }
获取一个事务状态
AbstractPlatformTransactionManager.java
@Override public final TransactionStatus getTransaction(TransactionDefinition definition) throws TransactionException { Object transaction = doGetTransaction(); // Cache debug flag to avoid repeated checks. boolean debugEnabled = logger.isDebugEnabled(); if (definition == null) { // Use defaults if no transaction definition given. definition = new DefaultTransactionDefinition(); } // 判断当前的线程是否存在事务,判断依据是当前线程记录的连接不为空且连接中(connectionHolder) // 中的transcationActive属性不为空 if (isExistingTransaction(transaction)) { // 当前线程已经存在事务 return handleExistingTransaction(definition, transaction, debugEnabled); } // 事务超时设置验证 if (definition.getTimeout() < TransactionDefinition.TIMEOUT_DEFAULT) { throw new InvalidTimeoutException("Invalid transaction timeout", definition.getTimeout()); } // 如果当前线程不存在事务,但是PropagationBehavior==TransactionDefinition.PROPAGATION_MANDATORY 则抛出异常 if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_MANDATORY) { throw new IllegalTransactionStateException( "No existing transaction found for transaction marked with propagation 'mandatory'"); } else if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED || definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW || definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) { // 其他三个情况则新建事务 // 先挂起一个空事务 SuspendedResourcesHolder suspendedResources = suspend(null); if (debugEnabled) { logger.debug("Creating new transaction with name [" + definition.getName() + "]: " + definition); } try { boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER); DefaultTransactionStatus status = newTransactionStatus( definition, transaction, true, newSynchronization, debugEnabled, suspendedResources); // 构造transaction,包括设置connectionHolder、隔离级别、timeout // 如果是新连接,绑定到当前线程 doBegin(transaction, definition); // 新同步事务设置,针对当前线程设置 prepareSynchronization(status, definition); return status; } catch (RuntimeException ex) { resume(null, suspendedResources); throw ex; } catch (Error err) { resume(null, suspendedResources); throw err; } } else { // Create "empty" transaction: no actual transaction, but potentially synchronization. if (definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT && logger.isWarnEnabled()) { logger.warn("Custom isolation level specified but no actual transaction initiated; " + "isolation level will effectively be ignored: " + definition); } boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS); return prepareTransactionStatus(definition, null, true, newSynchronization, debugEnabled, null); } }
获取一个事务
DataSourceTransactionManager.java
@Override protected Object doGetTransaction() { DataSourceTransactionObject txObject = new DataSourceTransactionObject(); // 设置是否允许保存点 txObject.setSavepointAllowed(isNestedTransactionAllowed()); // 如果当前线程已经记录数据库连接则使用已有的连接 ConnectionHolder conHolder = (ConnectionHolder) TransactionSynchronizationManager.getResource(this.dataSource); txObject.setConnectionHolder(conHolder, false); return txObject; }
处理已存在的事务
/** * Create a TransactionStatus for an existing transaction. */ private TransactionStatus handleExistingTransaction( TransactionDefinition definition, Object transaction, boolean debugEnabled) throws TransactionException { if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NEVER) { throw new IllegalTransactionStateException( "Existing transaction found for transaction marked with propagation 'never'"); } if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NOT_SUPPORTED) { if (debugEnabled) { logger.debug("Suspending current transaction"); } Object suspendedResources = suspend(transaction); boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS); return prepareTransactionStatus( definition, null, false, newSynchronization, debugEnabled, suspendedResources); } if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW) { if (debugEnabled) { logger.debug("Suspending current transaction, creating new transaction with name [" + definition.getName() + "]"); } // 新事务连接 SuspendedResourcesHolder suspendedResources = suspend(transaction); try { boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER); DefaultTransactionStatus status = newTransactionStatus( definition, transaction, true, newSynchronization, debugEnabled, suspendedResources); doBegin(transaction, definition); prepareSynchronization(status, definition); return status; } catch (RuntimeException beginEx) { resumeAfterBeginException(transaction, suspendedResources, beginEx); throw beginEx; } catch (Error beginErr) { resumeAfterBeginException(transaction, suspendedResources, beginErr); throw beginErr; } } // 嵌入式事务的处理 if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) { if (!isNestedTransactionAllowed()) { throw new NestedTransactionNotSupportedException( "Transaction manager does not allow nested transactions by default - " + "specify 'nestedTransactionAllowed' property with value 'true'"); } if (debugEnabled) { logger.debug("Creating nested transaction with name [" + definition.getName() + "]"); } if (useSavepointForNestedTransaction()) { // 如果没有可以使用的保存点的方式控制事务回滚,那么嵌入式事务的建立初始建立保存点 DefaultTransactionStatus status = prepareTransactionStatus(definition, transaction, false, false, debugEnabled, null); status.createAndHoldSavepoint(); return status; } else { // 有些情况不能使用保存点,如果jat,那么建立新事务 boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER); DefaultTransactionStatus status = newTransactionStatus( definition, transaction, true, newSynchronization, debugEnabled, null); doBegin(transaction, definition); prepareSynchronization(status, definition); return status; } } // Assumably PROPAGATION_SUPPORTS or PROPAGATION_REQUIRED. if (debugEnabled) { logger.debug("Participating in existing transaction"); } if (isValidateExistingTransaction()) { if (definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT) { Integer currentIsolationLevel = TransactionSynchronizationManager.getCurrentTransactionIsolationLevel(); if (currentIsolationLevel == null || currentIsolationLevel != definition.getIsolationLevel()) { Constants isoConstants = DefaultTransactionDefinition.constants; throw new IllegalTransactionStateException("Participating transaction with definition [" + definition + "] specifies isolation level which is incompatible with existing transaction: " + (currentIsolationLevel != null ? isoConstants.toCode(currentIsolationLevel, DefaultTransactionDefinition.PREFIX_ISOLATION) : "(unknown)")); } } if (!definition.isReadOnly()) { if (TransactionSynchronizationManager.isCurrentTransactionReadOnly()) { throw new IllegalTransactionStateException("Participating transaction with definition [" + definition + "] is not marked as read-only but existing transaction is"); } } } boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER); return prepareTransactionStatus(definition, transaction, false, newSynchronization, debugEnabled, null); }
构造事务,并且配置隔离级别和timeout
@Override protected void doBegin(Object transaction, TransactionDefinition definition) { DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction; Connection con = null; try { if (txObject.getConnectionHolder() == null || txObject.getConnectionHolder().isSynchronizedWithTransaction()) { Connection newCon = this.dataSource.getConnection(); if (logger.isDebugEnabled()) { logger.debug("Acquired Connection [" + newCon + "] for JDBC transaction"); } txObject.setConnectionHolder(new ConnectionHolder(newCon), true); } txObject.getConnectionHolder().setSynchronizedWithTransaction(true); con = txObject.getConnectionHolder().getConnection(); // 设置隔离级别 Integer previousIsolationLevel = DataSourceUtils.prepareConnectionForTransaction(con, definition); txObject.setPreviousIsolationLevel(previousIsolationLevel); // 更改自动提交配置,有spring控制提交 if (con.getAutoCommit()) { txObject.setMustRestoreAutoCommit(true); if (logger.isDebugEnabled()) { logger.debug("Switching JDBC Connection [" + con + "] to manual commit"); } con.setAutoCommit(false); } // 设置当前线程是否存在事务的依据 txObject.getConnectionHolder().setTransactionActive(true); //设置timeout int timeout = determineTimeout(definition); if (timeout != TransactionDefinition.TIMEOUT_DEFAULT) { txObject.getConnectionHolder().setTimeoutInSeconds(timeout); } // 将当前获取到的连接绑定到当前线程 if (txObject.isNewConnectionHolder()) { TransactionSynchronizationManager.bindResource(getDataSource(), txObject.getConnectionHolder()); } } catch (Throwable ex) { if (txObject.isNewConnectionHolder()) { DataSourceUtils.releaseConnection(con, this.dataSource); txObject.setConnectionHolder(null, false); } throw new CannotCreateTransactionException("Could not open JDBC Connection for transaction", ex); } }
新同步事务的设置
protected void prepareSynchronization(DefaultTransactionStatus status, TransactionDefinition definition) { if (status.isNewSynchronization()) { TransactionSynchronizationManager.setActualTransactionActive(status.hasTransaction()); TransactionSynchronizationManager.setCurrentTransactionIsolationLevel( definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT ? definition.getIsolationLevel() : null); TransactionSynchronizationManager.setCurrentTransactionReadOnly(definition.isReadOnly()); TransactionSynchronizationManager.setCurrentTransactionName(definition.getName()); TransactionSynchronizationManager.initSynchronization(); } }
准备事务信息
protected TransactionInfo prepareTransactionInfo(PlatformTransactionManager tm, TransactionAttribute txAttr, String joinpointIdentification, TransactionStatus status) { TransactionInfo txInfo = new TransactionInfo(tm, txAttr, joinpointIdentification); if (txAttr != null) { // We need a transaction for this method if (logger.isTraceEnabled()) { logger.trace("Getting transaction for [" + txInfo.getJoinpointIdentification() + "]"); } // 记录事务状态 txInfo.newTransactionStatus(status); } else { // The TransactionInfo.hasTransaction() method will return // false. We created it only to preserve the integrity of // the ThreadLocal stack maintained in this class. if (logger.isTraceEnabled()) logger.trace("Don't need to create transaction for [" + joinpointIdentification + "]: This method isn't transactional."); } // We always bind the TransactionInfo to the thread, even if we didn't create // a new transaction here. This guarantees that the TransactionInfo stack // will be managed correctly even if no transaction was created by this aspect. txInfo.bindToThread(); return txInfo; }
回滚处理
protected void completeTransactionAfterThrowing(TransactionInfo txInfo, Throwable ex) { // 当抛出异常时首先判断当前是否存在事务, if (txInfo != null && txInfo.hasTransaction()) { if (logger.isTraceEnabled()) { logger.trace("Completing transaction for [" + txInfo.getJoinpointIdentification() + "] after exception: " + ex); } // 这里判断是否回滚的依据是抛出的异常是否runtimeException活着error, 默认实现 if (txInfo.transactionAttribute.rollbackOn(ex)) { try { //处理回滚信息 txInfo.getTransactionManager().rollback(txInfo.getTransactionStatus()); } catch (TransactionSystemException ex2) { logger.error("Application exception overridden by rollback exception", ex); ex2.initApplicationException(ex); throw ex2; } catch (RuntimeException ex2) { logger.error("Application exception overridden by rollback exception", ex); throw ex2; } catch (Error err) { logger.error("Application exception overridden by rollback error", ex); throw err; } } else { // 如果不满足条件则抛出异常也会提交 try { txInfo.getTransactionManager().commit(txInfo.getTransactionStatus()); } catch (TransactionSystemException ex2) { logger.error("Application exception overridden by commit exception", ex); ex2.initApplicationException(ex); throw ex2; } catch (RuntimeException ex2) { logger.error("Application exception overridden by commit exception", ex); throw ex2; } catch (Error err) { logger.error("Application exception overridden by commit error", ex); throw err; } } } }
DefaultTransactionAttribute.java
@Override public boolean rollbackOn(Throwable ex) { return (ex instanceof RuntimeException || ex instanceof Error); }
AbstractPlatformTransactionManager.java
@Override public final void rollback(TransactionStatus status) throws TransactionException { // 如果事务完成再次回滚会抛出异常 if (status.isCompleted()) { throw new IllegalTransactionStateException( "Transaction is already completed - do not call commit or rollback more than once per transaction"); } DefaultTransactionStatus defStatus = (DefaultTransactionStatus) status; processRollback(defStatus); }
如果有保存点则回到保存点
private void processRollback(DefaultTransactionStatus status) { try { try { triggerBeforeCompletion(status); if (status.hasSavepoint()) { if (status.isDebug()) { logger.debug("Rolling back transaction to savepoint"); } // 如果有保存点则回到保存点 status.rollbackToHeldSavepoint(); } else if (status.isNewTransaction()) { if (status.isDebug()) { logger.debug("Initiating transaction rollback"); } // 如果是新事务则回滚 doRollback(status); } else if (status.hasTransaction()) { // 有事务标识又不属于上面两种情况的,多属于jta,只能做标记,在提交的时候回退 if (status.isLocalRollbackOnly() || isGlobalRollbackOnParticipationFailure()) { if (status.isDebug()) { logger.debug("Participating transaction failed - marking existing transaction as rollback-only"); } doSetRollbackOnly(status); } else { if (status.isDebug()) { logger.debug("Participating transaction failed - letting transaction originator decide on rollback"); } } } else { logger.debug("Should roll back transaction but cannot - no transaction available"); } } catch (RuntimeException ex) { triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN); throw ex; } catch (Error err) { triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN); throw err; } triggerAfterCompletion(status, TransactionSynchronization.STATUS_ROLLED_BACK); } finally { cleanupAfterCompletion(status); } }
AbstractTransactionStatus.java
public void rollbackToHeldSavepoint() throws TransactionException { if (!hasSavepoint()) { throw new TransactionUsageException( "Cannot roll back to savepoint - no savepoint associated with current transaction"); } // 回滚到保存点 getSavepointManager().rollbackToSavepoint(getSavepoint()); // 释放掉这个保存点 getSavepointManager().releaseSavepoint(getSavepoint()); setSavepoint(null); }
JdbcTransactionObjectSupport.java
@Override public void rollbackToSavepoint(Object savepoint) throws TransactionException { ConnectionHolder conHolder = getConnectionHolderForSavepoint(); try { conHolder.getConnection().rollback((Savepoint) savepoint); } catch (Throwable ex) { throw new TransactionSystemException("Could not roll back to JDBC savepoint", ex); } } /** * This implementation releases the given JDBC 3.0 Savepoint. * @see java.sql.Connection#releaseSavepoint */ @Override public void releaseSavepoint(Object savepoint) throws TransactionException { ConnectionHolder conHolder = getConnectionHolderForSavepoint(); try { conHolder.getConnection().releaseSavepoint((Savepoint) savepoint); } catch (Throwable ex) { logger.debug("Could not explicitly release JDBC savepoint", ex); } }
新事务则直接回退
@Override protected void doRollback(DefaultTransactionStatus status) { DataSourceTransactionObject txObject = (DataSourceTransactionObject) status.getTransaction(); Connection con = txObject.getConnectionHolder().getConnection(); if (status.isDebug()) { logger.debug("Rolling back JDBC transaction on Connection [" + con + "]"); } try { con.rollback(); } catch (SQLException ex) { throw new TransactionSystemException("Could not roll back JDBC transaction", ex); } }
回退后信息清理
private void cleanupAfterCompletion(DefaultTransactionStatus status) { // 设置事务完成状态 status.setCompleted(); if (status.isNewSynchronization()) { TransactionSynchronizationManager.clear(); } if (status.isNewTransaction()) { doCleanupAfterCompletion(status.getTransaction()); } if (status.getSuspendedResources() != null) { if (status.isDebug()) { logger.debug("Resuming suspended transaction after completion of inner transaction"); } // 结束之前挂起的事务 resume(status.getTransaction(), (SuspendedResourcesHolder) status.getSuspendedResources()); } }
TransactionSynchronizationManager.java
public static void clear() { clearSynchronization(); setCurrentTransactionName(null); setCurrentTransactionReadOnly(false); setCurrentTransactionIsolationLevel(null); setActualTransactionActive(false); }
@Override protected void doCleanupAfterCompletion(Object transaction) { DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction; // 解绑数据源 if (txObject.isNewConnectionHolder()) { TransactionSynchronizationManager.unbindResource(this.dataSource); } // 释放数据库连接 Connection con = txObject.getConnectionHolder().getConnection(); try { // 恢复数据库的自动提交属性 if (txObject.isMustRestoreAutoCommit()) { con.setAutoCommit(true); } // 重制数据库连接 DataSourceUtils.resetConnectionAfterTransaction(con, txObject.getPreviousIsolationLevel()); } catch (Throwable ex) { logger.debug("Could not reset JDBC Connection after transaction", ex); } if (txObject.isNewConnectionHolder()) { if (logger.isDebugEnabled()) { logger.debug("Releasing JDBC Connection [" + con + "] after transaction"); } //如果当前事务时独立的新创建的事务则在完成事务时释放数据库连接 DataSourceUtils.releaseConnection(con, this.dataSource); } txObject.getConnectionHolder().clear(); }
protected final void resume(Object transaction, SuspendedResourcesHolder resourcesHolder) throws TransactionException { // 如果在事务执行前有挂起,那么在这里恢复这个事务 if (resourcesHolder != null) { Object suspendedResources = resourcesHolder.suspendedResources; if (suspendedResources != null) { doResume(transaction, suspendedResources); } List<TransactionSynchronization> suspendedSynchronizations = resourcesHolder.suspendedSynchronizations; if (suspendedSynchronizations != null) { TransactionSynchronizationManager.setActualTransactionActive(resourcesHolder.wasActive); TransactionSynchronizationManager.setCurrentTransactionIsolationLevel(resourcesHolder.isolationLevel); TransactionSynchronizationManager.setCurrentTransactionReadOnly(resourcesHolder.readOnly); TransactionSynchronizationManager.setCurrentTransactionName(resourcesHolder.name); doResumeSynchronization(suspendedSynchronizations); } } } @Override protected void doResume(Object transaction, Object suspendedResources) { ConnectionHolder conHolder = (ConnectionHolder) suspendedResources; TransactionSynchronizationManager.bindResource(this.dataSource, conHolder); }
事务的提交
protected void commitTransactionAfterReturning(TransactionInfo txInfo) { if (txInfo != null && txInfo.hasTransaction()) { if (logger.isTraceEnabled()) { logger.trace("Completing transaction for [" + txInfo.getJoinpointIdentification() + "]"); } txInfo.getTransactionManager().commit(txInfo.getTransactionStatus()); } }
@Override public final void commit(TransactionStatus status) throws TransactionException { if (status.isCompleted()) { throw new IllegalTransactionStateException( "Transaction is already completed - do not call commit or rollback more than once per transaction"); } DefaultTransactionStatus defStatus = (DefaultTransactionStatus) status; // 如果有标记为回滚的状态则回滚 if (defStatus.isLocalRollbackOnly()) { if (defStatus.isDebug()) { logger.debug("Transactional code has requested rollback"); } processRollback(defStatus); return; } if (!shouldCommitOnGlobalRollbackOnly() && defStatus.isGlobalRollbackOnly()) { if (defStatus.isDebug()) { logger.debug("Global transaction is marked as rollback-only but transactional code requested commit"); } processRollback(defStatus); // Throw UnexpectedRollbackException only at outermost transaction boundary // or if explicitly asked to. if (status.isNewTransaction() || isFailEarlyOnGlobalRollbackOnly()) { throw new UnexpectedRollbackException( "Transaction rolled back because it has been marked as rollback-only"); } return; } processCommit(defStatus); }
private void processCommit(DefaultTransactionStatus status) throws TransactionException { try { boolean beforeCompletionInvoked = false; try { prepareForCommit(status); triggerBeforeCommit(status); triggerBeforeCompletion(status); beforeCompletionInvoked = true; boolean globalRollbackOnly = false; if (status.isNewTransaction() || isFailEarlyOnGlobalRollbackOnly()) { globalRollbackOnly = status.isGlobalRollbackOnly(); } if (status.hasSavepoint()) { if (status.isDebug()) { logger.debug("Releasing transaction savepoint"); } // 如果存在保存点则清除保存点 status.releaseHeldSavepoint(); } else if (status.isNewTransaction()) { if (status.isDebug()) { logger.debug("Initiating transaction commit"); } // 如果是新事务则提交 doCommit(status); } // Throw UnexpectedRollbackException if we have a global rollback-only // marker but still didn't get a corresponding exception from commit. if (globalRollbackOnly) { throw new UnexpectedRollbackException( "Transaction silently rolled back because it has been marked as rollback-only"); } } catch (UnexpectedRollbackException ex) { // can only be caused by doCommit triggerAfterCompletion(status, TransactionSynchronization.STATUS_ROLLED_BACK); throw ex; } catch (TransactionException ex) { // can only be caused by doCommit if (isRollbackOnCommitFailure()) { doRollbackOnCommitException(status, ex); } else { triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN); } throw ex; } catch (RuntimeException ex) { if (!beforeCompletionInvoked) { triggerBeforeCompletion(status); } doRollbackOnCommitException(status, ex); throw ex; } catch (Error err) { if (!beforeCompletionInvoked) { triggerBeforeCompletion(status); } doRollbackOnCommitException(status, err); throw err; } // Trigger afterCommit callbacks, with an exception thrown there // propagated to callers but the transaction still considered as committed. try { triggerAfterCommit(status); } finally { triggerAfterCompletion(status, TransactionSynchronization.STATUS_COMMITTED); } } finally { cleanupAfterCompletion(status); } }
protected void doCommit(DefaultTransactionStatus status) { DataSourceTransactionObject txObject = (DataSourceTransactionObject) status.getTransaction(); Connection con = txObject.getConnectionHolder().getConnection(); if (status.isDebug()) { logger.debug("Committing JDBC transaction on Connection [" + con + "]"); } try { // 终于可以愉快的提交这个事务了 con.commit(); } catch (SQLException ex) { throw new TransactionSystemException("Could not commit JDBC transaction", ex); } }

低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
- 上一篇
双亲委派模型
类加载流程 类加载流程,先加载Bootstrap ClassLoader 启动类加载即最顶层的加载类。这部分由C++ 编写。继续再次加载Extention ClassLoader 扩展类加载器由Bootstrap ClassLoader加载,加载进入内存。继续再次加载Application ClassLoader 应用类加载器,即系统即在此,加载当前应用下的类最后再次加载ClassLoader类 双亲委派模型 该模型是先检查指定名称的类是否已经加载过,如果加载进入内存,不加载直接返回,如果没有加载过,判断是否有父类加载器,如果拥有父类加载器,那么将会直接将权利移交给父类,由父类代理当前类进行加载该类。或者是调用C++的bootstrap类加载器来加载该类最后如果三者都没有找到类,那么直接调用当前类加载器的findClass方法来完成类加载。即,如果需要需要加载自定义的类,那么就需要重写findClass方法。 即,如果需要将当前类加载进入,那么就需要重写findClass方法,若未找到这几种类,则会自动调用findClass方法。 调用过程 先加载父类,若父类未加载,继续调用父类,直到...
- 下一篇
高并发下log4j的性能瓶颈
开篇 近期由于业务需要进行业务迁移,期间因为误设置log4j的日志级别,导致系统性能整体下降,具体表现在QPS下降明显,系统RT上升。迁移期间由于各类系统环境较原来有较大差别,因为在排查过程中也走了一些弯路,总结起来避免他人再次踩坑。 问题背景 问题的背景其实很简单,线上系统的log4j的日志级别由warn调整为info,导致大量的调用log4j的日志接口,导致系统的rt上升伴随着qps下降,具体的影响效果可以看下图。 需要额外解释的就是log4j的这个问题需要在qps较大的情况下才会复现,按照我遇到的情况基本上需要在2k/s以上会比较明显(数据不一定非常准确),只是为了说明需要在大量请求的情况才会触发synchronized的问题。 当然问题的本质在于大量调用log4j的日志接口,导致竞争synchronized影响性能,影响的时间应该也是在ms级别,这其实在另外一个方面说明了只求非常极致追求性能问题才会细致的去分析这些问题。 优化效果 优化后qps的提升效果图,这边是以分钟进行统计的。 优化后系统的响应时间效果图。 现象分析 整个分析过程其实并不是那么具有条理性,只是现在...
相关文章
文章评论
共有0条评论来说两句吧...
文章二维码
点击排行
推荐阅读
最新文章
- Docker使用Oracle官方镜像安装(12C,18C,19C)
- CentOS7编译安装Cmake3.16.3,解决mysql等软件编译问题
- CentOS6,7,8上安装Nginx,支持https2.0的开启
- CentOS8,CentOS7,CentOS6编译安装Redis5.0.7
- CentOS7设置SWAP分区,小内存服务器的救世主
- CentOS8安装MyCat,轻松搞定数据库的读写分离、垂直分库、水平分库
- Linux系统CentOS6、CentOS7手动修改IP地址
- Docker安装Oracle12C,快速搭建Oracle学习环境
- SpringBoot2全家桶,快速入门学习开发网站教程
- Jdk安装(Linux,MacOS,Windows),包含三大操作系统的最全安装