当前位置: 代码网 > it编程>数据库>MsSqlserver > 多数据源管理——@DS和@DSTranscation使用介绍和原理分析

多数据源管理——@DS和@DSTranscation使用介绍和原理分析

2024年08月03日 MsSqlserver 我要评论
@DS和@DSTranscation的使用和原理分析

前言

在日常的后端开发过程中,由于业务的变更或者系统的切换,我们经常会遇到需要配置多个数据源的情况。为了解决数据源的动态切换问题,可以引入@ds注解。@ds注解是一个用于切换数据源的注解,可以标注在方法或者类上。在使用@ds注解时,我们可以指定要使用的数据源的名称,也可以使用spel表达式来动态决定使用哪个数据源。通过使用@ds注解,我们可以在不修改代码的情况下轻松切换不同的数据源。这个注解的引入可以极大地简化了数据源切换的操作,提高了开发效率和灵活性。

@ds注解

使用介绍

1、引入jar包
<dependency>
    <groupid>com.baomidou</groupid>
    <artifactid>dynamic-datasource-spring-boot-starter</artifactid>
    <version>3.4.1</version>
</dependency>
2、配置文件配置多数据源

注意:此处采用yaml格式进行配置,读者可根据实际情况采用对应格式配置数据源信息

spring:
  datasource:
    dynamic:
      primary: master #设置默认的数据源或者数据源组,默认值即为master
      strict: false #严格匹配数据源,默认false. true未匹配到指定数据源时抛异常,false使用默认数据源
      datasource:
        master:
          url:
          username:
          password:
          driver-class-name:
        slave:
          url:
          username:
          password:
          driver-class-name:
3、mapper层或者dao层配置@ds指定数据源

注意1:如果你的dao层继承了mybatis-plus提供的serviceimpl类,即dao层可以不经过显式mapper层直接调用sql,则在dao层上加@ds,否则,建议加在mapper层
注意2: 该注解可以用在类上,也可以用在方法上

// serviceimpl是mybatis-plus框架中扩展的service接口的实现类。
// 该包中包含了mybatis-plus的扩展service接口的实现类,
// 这些实现类提供了一些常用的数据库操作方法,如新增、删除、修改、查询等。通过继承这些实现类,
// 可以快速地编写service层的代码,减少重复的工作量。
@component
@ds("slave")  // 不配置该注解,默认走master数据源
public class userdao extends serviceimpl<usermapper, user> {
	@autowired
    private usermapper usermapper;
}
@mapper
public interface usermapper extends basemapper<user>{
}

原理分析

jar包的结构

在这里插入图片描述

底层实现核心类是com.baomidou.dynamic.datasource.dynamicroutingdatasource

类属性分析

// 分组识别符
private static final string underline = "_";
// 用于存储所有的数据源,以数据源名称作为键
private final map<string, datasource> datasourcemap = new concurrenthashmap<>();

// 用于存储分组数据源,以分组名称作为键
private final map<string, groupdatasource> groupdatasources = new concurrenthashmap<>();

// 一个dynamicdatasourceprovider类型的列表,用于提供动态数据源的实现
@autowired
private list<dynamicdatasourceprovider> providers;

// 动态数据源的策略类。默认为loadbalancedynamicdatasourcestrategy
@setter
private class<? extends dynamicdatasourcestrategy> strategy = loadbalancedynamicdatasourcestrategy.class;

// 主数据源的标识。默认为"master"
@setter
private string primary = "master";

// 表示是否严格检查数据源的存在性。默认为false
@setter
private boolean strict = false;

// 表示是否启用p6spy数据源。默认为false
@setter
private boolean p6spy = false;

// 表示是否启用seata数据源代理。默认为false
@setter
private boolean seata = false;
1.项目初始化时,获取所有数据源信息——dynamicroutingdatasource.adddatasource
  1. 检查开启数据源配置是否开启——p6spy、seata
  2. 添加并分组数据源
  3. 检测默认数据源是否配置
@override
public void afterpropertiesset() throws exception {
	// 检查开启了配置但没有相关依赖
	checkenv();
	// 添加并分组数据源
	map<string, datasource> datasources = new hashmap<>();
	for (dynamicdatasourceprovider provider : providers) {
	    datasources.putall(provider.loaddatasources());
	}
	for (map.entry<string, datasource> dsitem : datasources.entryset()) {
	    adddatasource(dsitem.getkey(), dsitem.getvalue());
	}
	// 检测默认数据源是否设置
	if (groupdatasources.containskey(primary)) {
	    log.info("dynamic-datasource initial loaded [{}] datasource,primary group datasource named [{}]", datasources.size(), primary);
	} else if (datasourcemap.containskey(primary)) {
	    log.info("dynamic-datasource initial loaded [{}] datasource,primary datasource named [{}]", datasources.size(), primary);
	} else {
	    log.warn("dynamic-datasource initial loaded [{}] datasource,please add your primary datasource or check your configuration", datasources.size());
	}
}
  1. 将数据源加入到全部数据源map中
  2. 将数据源添加到分组中
  3. 关闭数据源
public synchronized void adddatasource(string ds, datasource datasource) {
    datasource olddatasource = datasourcemap.put(ds, datasource);
    // 新数据源添加到分组
    this.addgroupdatasource(ds, datasource);
    // 关闭老的数据源
    if (olddatasource != null) {
        closedatasource(ds, olddatasource);
    }
    log.info("dynamic-datasource - add a datasource named [{}] success", ds);
}
  1. 校验数据源名称是否包含分组标识符
  2. 按分组标识符进行分割,以分组标识符前的字符串作为分组名
  3. 将数据源添加到分组数据源中
private void addgroupdatasource(string ds, datasource datasource) {
    if (ds.contains(underline)) {
        string group = ds.split(underline)[0];
        groupdatasource groupdatasource = groupdatasources.get(group);
        if (groupdatasource == null) {
            try {
                groupdatasource = new groupdatasource(group, strategy.getdeclaredconstructor().newinstance());
                groupdatasources.put(group, groupdatasource);
            } catch (exception e) {
                throw new runtimeexception("dynamic-datasource - add the datasource named " + ds + " error", e);
            }
        }
        groupdatasource.adddatasource(ds, datasource);
    }
}
2.进行数据库操作,执行对应方法时,会被 dynamicdatasourceannotationinterceptor拦截器拦截
  1. 该拦截器继承了methodinterceptor,会对所有方法进行拦截
  2. 拦截后,invoke方法中执行determinedatasource方法,扫描加了@ds注解的类或者方法,
  3. 调用dynamicdatasourcecontextholder.poll方法。

在这里插入图片描述
在这里插入图片描述

3.dynamicdatasourcecontextholder.poll方法将当前线程的数据源名加到对应的threadlocal线程本地中
public final class dynamicdatasourcecontextholder {

    /**
     * 为什么要用链表存储(准确的是栈)
     * <pre>
     * 为了支持嵌套切换,如abc三个service都是不同的数据源
     * 其中a的某个业务要调b的方法,b的方法需要调用c的方法。一级一级调用切换,形成了链。
     * 传统的只设置当前线程的方式不能满足此业务需求,必须使用栈,后进先出。
     * </pre>
     */
    private static final threadlocal<deque<string>> lookup_key_holder = new namedthreadlocal<deque<string>>("dynamic-datasource") {
@override
        protected deque<string> initialvalue() {
            return new arraydeque<>();
        }
    };
/**
     * 设置当前线程数据源
     * <p>
     * 如非必要不要手动调用,调用后确保最终清除
     * </p>
     *
     * @param ds 数据源名称
     */
    public static string push(string ds) {
        string datasourcestr = stringutils.isempty(ds) ? "" : ds;
        lookup_key_holder.get().push(datasourcestr);
        return datasourcestr;
    }
}

到此线程中threadlocal就保存了需要切换的数据源名称了

4、数据操作完成后,方法返回第二步中的拦截器,执行dynamicdatasourcecontextholder.poll();清除掉此次threadlocal中的数据源,避免影响后续数据操作。

@dstranscation注解

spring自带事务@transactional的实现在一个事务里,只能有一个数据库链接,表现形式即只有第一个@ds注解会生效,之后的数据库操作都采用第一次使用到的数据库连接。
@dstranscation可以实现动态数据源切换下的事务

使用介绍

同上,一般加在业务层方法上
不能和@transcation混用

原理分析

在这里插入图片描述

1.核心方法dynamiclocaltransactionadvisor
  1. 如果有xid,直接反射调用原方法,说明会话已经创建。
  2. 如果没有xid,说明新会话,首先生成xid,绑到上下文上
  3. 执行原方法,如果有异常,修改状态为false
  4. 调用会话的notify方法,处理状态(关键)
  5. 删除会话上下文(关键)
@slf4j
public class dynamiclocaltransactionadvisor implements methodinterceptor {

    @override
    public object invoke(methodinvocation methodinvocation) throws throwable {
        if (!stringutils.isempty(transactioncontext.getxid())) {
            return methodinvocation.proceed();
        }
        boolean state = true;
        object o;
        string xid = uuid.randomuuid().tostring();
        transactioncontext.bind(xid);
        try {
            o = methodinvocation.proceed();
        } catch (exception e) {
            state = false;
            throw e;
        } finally {
            connectionfactory.notify(state);
            transactioncontext.remove();
        }
        return o;
    }
}
2.事务上下文类:用于记录当前事务的xid
public class transactioncontext {

    private static final threadlocal<string> context_holder = new threadlocal<>();

    /**
     * gets xid.
     *
     * @return the xid
     */
    public static string getxid() {
        string xid = context_holder.get();
        if (!stringutils.isempty(xid)) {
            return xid;
        }
        return null;
    }

    /**
     * unbind string.
     *
     * @return the string
     */
    public static string unbind(string xid) {
        context_holder.remove();
        return xid;
    }

    /**
     * bind string.
     *
     * @return the string
     */
    public static string bind(string xid) {
        context_holder.set(xid);
        return xid;
    }

    /**
     * remove
     */
    public static void remove() {
        context_holder.remove();
    }

}

3. 会话工厂:用于管理用到的数据源链接
  1. 关闭使用到的数据源的自动提交能力【putconnection】
  2. 将使用到的数据源放到当前线程的threadlocal中【putconnection】
  3. 循环调用了所有数据库连接的notify方法。有一个false就都rollback了【notify】
public class connectionfactory {

    private static final threadlocal<map<string, connectionproxy>> connection_holder =
            new threadlocal<map<string, connectionproxy>>() {
                @override
                protected map<string, connectionproxy> initialvalue() {
                    return new concurrenthashmap<>();
                }
            };

    public static void putconnection(string ds, connectionproxy connection) {
        map<string, connectionproxy> concurrenthashmap = connection_holder.get();
        if (!concurrenthashmap.containskey(ds)) {
            try {
                connection.setautocommit(false);
            } catch (sqlexception e) {
                e.printstacktrace();
            }
            concurrenthashmap.put(ds, connection);
        }
    }

    public static connectionproxy getconnection(string ds) {
        return connection_holder.get().get(ds);
    }

    public static void notify(boolean state) {
        try {
            map<string, connectionproxy> concurrenthashmap = connection_holder.get();
            for (connectionproxy connectionproxy : concurrenthashmap.values()) {
                connectionproxy.notify(state);
            }
        } finally {
            connection_holder.remove();
        }
    }

}

4.抽象动态数据源的事务处理
  1. 如果获取不到xid,则非ds事务,直接返回数据库连接
  2. 如果获取到xid,则为ds事务
  3. 基于2,如果connectionfactory获取不到对应数据库连接,则调用getconnectionproxy方法
  4. 基于3,如果能获取到对应数据库连接,直接返回该连接
  5. 基于2,将新的数据库连接放到connectionproxy中
public abstract class abstractroutingdatasource extends abstractdatasource {

    protected abstract datasource determinedatasource();

    @override
    public connection getconnection() throws sqlexception {
        string xid = transactioncontext.getxid();
        if (stringutils.isempty(xid)) {
            return determinedatasource().getconnection();
        } else {
            string ds = dynamicdatasourcecontextholder.peek();
            ds = stringutils.isempty(ds) ? "default" : ds;
            connectionproxy connection = connectionfactory.getconnection(ds);
            return connection == null ? getconnectionproxy(ds, determinedatasource().getconnection()) : connection;
        }
    }

    @override
    public connection getconnection(string username, string password) throws sqlexception {
        string xid = transactioncontext.getxid();
        if (stringutils.isempty(xid)) {
            return determinedatasource().getconnection(username, password);
        } else {
            string ds = dynamicdatasourcecontextholder.peek();
            ds = stringutils.isempty(ds) ? "default" : ds;
            connectionproxy connection = connectionfactory.getconnection(ds);
            return connection == null ? getconnectionproxy(ds, determinedatasource().getconnection(username, password))
                    : connection;
        }
    }

    private connection getconnectionproxy(string ds, connection connection) {
        connectionproxy connectionproxy = new connectionproxy(connection, ds);
        connectionfactory.putconnection(ds, connectionproxy);
        return connectionproxy;
    }
   //略
}

参考文档:
@dstransactional注解原理
多数据源管理:掌握@ds注解的威力

(0)

相关文章:

版权声明:本文内容由互联网用户贡献,该文观点仅代表作者本人。本站仅提供信息存储服务,不拥有所有权,不承担相关法律责任。 如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 2386932994@qq.com 举报,一经查实将立刻删除。

发表评论

验证码:
Copyright © 2017-2025  代码网 保留所有权利. 粤ICP备2024248653号
站长QQ:2386932994 | 联系邮箱:2386932994@qq.com