- 转载自芋道源码
文章目录
一、AOP实现步骤
一句话:使用自定义注解(切点)+interceptor(增强Advice)构成织入。
1.定义注解 DSTransactional
代码如下(示例):
ipackage com.baomidou.dynamic.datasource.annotation;import java.lang.annotation.*;@Target({ElementType.TYPE, ElementType.METHOD})@Retention(RetentionPolicy.RUNTIME)@Documentedpublic @interface DSTransactional {}
2.定义拦截器(增强)DynamicLocalTransactionAdvisor
代码如下(示例):
package com.baomidou.dynamic.datasource.aop;import com.baomidou.dynamic.datasource.tx.ConnectionFactory;import com.baomidou.dynamic.datasource.tx.TransactionContext;import lombok.extern.slf4j.Slf4j;import org.aopalliance.intercept.MethodInterceptor;import org.aopalliance.intercept.MethodInvocation;import org.springframework.util.StringUtils;import java.util.UUID;@Slf4jpublic class DynamicLocalTransactionAdvisor implements MethodInterceptor { @Override public Object invoke(MethodInvocation methodInvocation) throws Throwable { if (!StringUtils.isEmpty(TransactionContext.getXID())) { return methodInvocation.proceed(); } boolean state = true; Object o; String xid = UUID.randomUUID().toString(); TransactionContext.bind(xid); try { o = methodInvocation.proceed(); } catch (Exception e) { state = false; throw e; } finally { ConnectionFactory.notify(state); TransactionContext.remove(); } return o; }}
3.AspectJ定义注解为切点,并配置织入(关键代码)
public class DynamicDataSourceAutoConfiguration implements InitializingBean { @Role(value = BeanDefinition.ROLE_INFRASTRUCTURE) @ConditionalOnProperty(prefix = DynamicDataSourceProperties.PREFIX, name = "seata", havingValue = "false", matchIfMissing = true) @Bean public Advisor dynamicTransactionAdvisor() { AspectJExpressionPointcut pointcut = new AspectJExpressionPointcut(); pointcut.setExpression("@annotation(com.baomidou.dynamic.datasource.annotation.DSTransactional)"); return new DefaultPointcutAdvisor(pointcut, new DynamicLocalTransactionAdvisor()); }}
二.connection代理实现
Spring自带事务@Transactional的实现在一个事务里,只能有一个数据库connection,在动态多数据源里的现象就是只有第一个数据源,后面切的都失效了。
所以要想实现动态多数据源下的统一提交和回滚,就不能用Spring自带的。
PS:在很多数据库相关项目里,connection这个词是有歧义的,可能有的含义包括:事务、会话、数据库连接。
spring自带事务明显默认了一个事务会话就是一个数据库链接这种老思想。
1.从增强实现开始
从DynamicLocalTransactionAdvisor增强的invoke方法来看具体逻辑:
首先用到了TransactionContext,一个基于ThreadLocal的账本,记录了当前事务的xid。看下面代码注释。
public class DynamicLocalTransactionAdvisor implements MethodInterceptor { @Override public Object invoke(MethodInvocation methodInvocation) throws Throwable { // 1-1. 如果有xid,直接反射调用原方法,说明会话已经创建。 if (!StringUtils.isEmpty(TransactionContext.getXID())) { return methodInvocation.proceed(); } // 1-2. 如果没有xid,说明新会话,首先生成xid,绑到上下文上。 boolean state = true; Object o; String xid = UUID.randomUUID().toString(); TransactionContext.bind(xid); try { o = methodInvocation.proceed(); } catch (Exception e) { // 1-3. 执行原方法,如果有异常,修改状态为false state = false; throw e; } finally { // 1-4. 调用会话的notify方法,处理状态 ConnectionFactory.notify(state); // 1-5. 删除会话上下文 TransactionContext.remove(); } return o; }}
2. TransactionContext 事务上下文
public class TransactionContext { //记录了当前事务的xid private static final ThreadLocal CONTEXT_HOLDER = new ThreadLocal<>(); public static String getXID() { String xid = CONTEXT_HOLDER.get(); if (!StringUtils.isEmpty(xid)) { return xid; } return null; } public static String unbind(String xid) { CONTEXT_HOLDER.remove(); return xid; } public static String bind(String xid) { CONTEXT_HOLDER.set(xid); return xid; } public static void remove() { CONTEXT_HOLDER.remove(); }}
3. ConnectionFactory 会话工厂(代表会话)
再来看1-4用到的ConnectionFactory,也是一个基于ThreadLocal的账本,记录了该【会话】中用到的所有【连接】。
public class ConnectionFactory { private static final ThreadLocal
4. ConnectionProxy 数据库连接代理
数据库连接
public class ConnectionProxy implements Connection { private Connection connection; private String ds; public ConnectionProxy(Connection connection, String ds) { this.connection = connection; this.ds = ds; } // 4-1:前面303调用的方法 public void notify(Boolean commit) { try { if (commit) { connection.commit(); // 状态为true,则提交 } else { connection.rollback(); // 状态为false,则提交 } connection.close(); } catch (Exception e) { log.error(e.getLocalizedMessage(), e); } } @Override public void commit() throws SQLException { // connection.commit(); } // ....略}
5. AbstractRoutingDataSource 的改造:让动态数据库连接获取connection时登记到会话账本
public abstract class AbstractRoutingDataSource extends AbstractDataSource { protected abstract DataSource determineDataSource(); @Override public Connection getConnection() throws SQLException { String xid = TransactionContext.getXID(); if (StringUtils.isEmpty(xid)) { // 非DSTransaction return determineDataSource().getConnection(); } else { // DSTransaction String ds = DynamicDataSourceContextHolder.peek(); ConnectionProxy connection = ConnectionFactory.getConnection(ds); return connection == null ? getConnectionProxy(ds, determineDataSource().getConnection()) : connection; } } @Override public Connection getConnection(String username, String password) throws SQLException { String xid = TransactionContext.getXID(); if (StringUtils.isEmpty(xid)) { // 非DSTransaction return determineDataSource().getConnection(username, password); } else { // DSTransaction String ds = DynamicDataSourceContextHolder.peek(); ConnectionProxy connection = ConnectionFactory.getConnection(ds); return connection == null ? getConnectionProxy(ds, determineDataSource().getConnection(username, password)) : connection; } } private Connection getConnectionProxy(String ds, Connection connection) { ConnectionProxy connectionProxy = new ConnectionProxy(connection, ds); // 调用了3-1 ConnectionFactory.putConnection(ds, connectionProxy); return connectionProxy; } // ... 略}
使用注意
不可与Transactional混用
2.目前只支持统一提交和回滚,更复杂的请用seata
3.3.4版本之前不加@DS注解会报错
来源地址:https://blog.csdn.net/wangchengqi1997/article/details/127821521