前言
在日常的后端开发过程中,由于业务的变更或者系统的切换,我们经常会遇到需要配置多个数据源的情况。为了解决数据源的动态切换问题,可以引入@ds注解。@ds注解是一个用于切换数据源的注解,可以标注在方法或者类上。在使用@ds注解时,我们可以指定要使用的数据源的名称,也可以使用spel表达式来动态决定使用哪个数据源。通过使用@ds注解,我们可以在不修改代码的情况下轻松切换不同的数据源。这个注解的引入可以极大地简化了数据源切换的操作,提高了开发效率和灵活性。
@ds注解
使用介绍
1、引入jar包
<dependency>
<groupid>com.baomidou</groupid>
<artifactid>dynamic-datasource-spring-boot-starter</artifactid>
<version>3.4.1</version>
</dependency>
2、配置文件配置多数据源
注意:此处采用yaml格式进行配置,读者可根据实际情况采用对应格式配置数据源信息
spring:
datasource:
dynamic:
primary: master #设置默认的数据源或者数据源组,默认值即为master
strict: false #严格匹配数据源,默认false. true未匹配到指定数据源时抛异常,false使用默认数据源
datasource:
master:
url:
username:
password:
driver-class-name:
slave:
url:
username:
password:
driver-class-name:
3、mapper层或者dao层配置@ds指定数据源
注意1:如果你的dao层继承了mybatis-plus提供的serviceimpl类,即dao层可以不经过显式mapper层直接调用sql,则在dao层上加@ds,否则,建议加在mapper层
注意2: 该注解可以用在类上,也可以用在方法上
// serviceimpl是mybatis-plus框架中扩展的service接口的实现类。
// 该包中包含了mybatis-plus的扩展service接口的实现类,
// 这些实现类提供了一些常用的数据库操作方法,如新增、删除、修改、查询等。通过继承这些实现类,
// 可以快速地编写service层的代码,减少重复的工作量。
@component
@ds("slave") // 不配置该注解,默认走master数据源
public class userdao extends serviceimpl<usermapper, user> {
@autowired
private usermapper usermapper;
}
@mapper
public interface usermapper extends basemapper<user>{
}
原理分析
jar包的结构
底层实现核心类是com.baomidou.dynamic.datasource.dynamicroutingdatasource
类属性分析
// 分组识别符
private static final string underline = "_";
// 用于存储所有的数据源,以数据源名称作为键
private final map<string, datasource> datasourcemap = new concurrenthashmap<>();
// 用于存储分组数据源,以分组名称作为键
private final map<string, groupdatasource> groupdatasources = new concurrenthashmap<>();
// 一个dynamicdatasourceprovider类型的列表,用于提供动态数据源的实现
@autowired
private list<dynamicdatasourceprovider> providers;
// 动态数据源的策略类。默认为loadbalancedynamicdatasourcestrategy
@setter
private class<? extends dynamicdatasourcestrategy> strategy = loadbalancedynamicdatasourcestrategy.class;
// 主数据源的标识。默认为"master"
@setter
private string primary = "master";
// 表示是否严格检查数据源的存在性。默认为false
@setter
private boolean strict = false;
// 表示是否启用p6spy数据源。默认为false
@setter
private boolean p6spy = false;
// 表示是否启用seata数据源代理。默认为false
@setter
private boolean seata = false;
1.项目初始化时,获取所有数据源信息——dynamicroutingdatasource.adddatasource
- 检查开启数据源配置是否开启——p6spy、seata
- 添加并分组数据源
- 检测默认数据源是否配置
@override
public void afterpropertiesset() throws exception {
// 检查开启了配置但没有相关依赖
checkenv();
// 添加并分组数据源
map<string, datasource> datasources = new hashmap<>();
for (dynamicdatasourceprovider provider : providers) {
datasources.putall(provider.loaddatasources());
}
for (map.entry<string, datasource> dsitem : datasources.entryset()) {
adddatasource(dsitem.getkey(), dsitem.getvalue());
}
// 检测默认数据源是否设置
if (groupdatasources.containskey(primary)) {
log.info("dynamic-datasource initial loaded [{}] datasource,primary group datasource named [{}]", datasources.size(), primary);
} else if (datasourcemap.containskey(primary)) {
log.info("dynamic-datasource initial loaded [{}] datasource,primary datasource named [{}]", datasources.size(), primary);
} else {
log.warn("dynamic-datasource initial loaded [{}] datasource,please add your primary datasource or check your configuration", datasources.size());
}
}
- 将数据源加入到全部数据源map中
- 将数据源添加到分组中
- 关闭数据源
public synchronized void adddatasource(string ds, datasource datasource) {
datasource olddatasource = datasourcemap.put(ds, datasource);
// 新数据源添加到分组
this.addgroupdatasource(ds, datasource);
// 关闭老的数据源
if (olddatasource != null) {
closedatasource(ds, olddatasource);
}
log.info("dynamic-datasource - add a datasource named [{}] success", ds);
}
- 校验数据源名称是否包含分组标识符
- 按分组标识符进行分割,以分组标识符前的字符串作为分组名
- 将数据源添加到分组数据源中
private void addgroupdatasource(string ds, datasource datasource) {
if (ds.contains(underline)) {
string group = ds.split(underline)[0];
groupdatasource groupdatasource = groupdatasources.get(group);
if (groupdatasource == null) {
try {
groupdatasource = new groupdatasource(group, strategy.getdeclaredconstructor().newinstance());
groupdatasources.put(group, groupdatasource);
} catch (exception e) {
throw new runtimeexception("dynamic-datasource - add the datasource named " + ds + " error", e);
}
}
groupdatasource.adddatasource(ds, datasource);
}
}
2.进行数据库操作,执行对应方法时,会被 dynamicdatasourceannotationinterceptor拦截器拦截
- 该拦截器继承了methodinterceptor,会对所有方法进行拦截
- 拦截后,invoke方法中执行determinedatasource方法,扫描加了@ds注解的类或者方法,
- 调用dynamicdatasourcecontextholder.poll方法。
3.dynamicdatasourcecontextholder.poll方法将当前线程的数据源名加到对应的threadlocal线程本地中
public final class dynamicdatasourcecontextholder {
/**
* 为什么要用链表存储(准确的是栈)
* <pre>
* 为了支持嵌套切换,如abc三个service都是不同的数据源
* 其中a的某个业务要调b的方法,b的方法需要调用c的方法。一级一级调用切换,形成了链。
* 传统的只设置当前线程的方式不能满足此业务需求,必须使用栈,后进先出。
* </pre>
*/
private static final threadlocal<deque<string>> lookup_key_holder = new namedthreadlocal<deque<string>>("dynamic-datasource") {
@override
protected deque<string> initialvalue() {
return new arraydeque<>();
}
};
/**
* 设置当前线程数据源
* <p>
* 如非必要不要手动调用,调用后确保最终清除
* </p>
*
* @param ds 数据源名称
*/
public static string push(string ds) {
string datasourcestr = stringutils.isempty(ds) ? "" : ds;
lookup_key_holder.get().push(datasourcestr);
return datasourcestr;
}
}
到此线程中threadlocal就保存了需要切换的数据源名称了
4、数据操作完成后,方法返回第二步中的拦截器,执行dynamicdatasourcecontextholder.poll();清除掉此次threadlocal中的数据源,避免影响后续数据操作。
@dstranscation注解
spring自带事务@transactional的实现在一个事务里,只能有一个数据库链接,表现形式即只有第一个@ds注解会生效,之后的数据库操作都采用第一次使用到的数据库连接。
@dstranscation可以实现动态数据源切换下的事务
使用介绍
同上,一般加在业务层方法上
不能和@transcation混用
原理分析
1.核心方法dynamiclocaltransactionadvisor
- 如果有xid,直接反射调用原方法,说明会话已经创建。
- 如果没有xid,说明新会话,首先生成xid,绑到上下文上
- 执行原方法,如果有异常,修改状态为false
- 调用会话的notify方法,处理状态(关键)
- 删除会话上下文(关键)
@slf4j
public class dynamiclocaltransactionadvisor implements methodinterceptor {
@override
public object invoke(methodinvocation methodinvocation) throws throwable {
if (!stringutils.isempty(transactioncontext.getxid())) {
return methodinvocation.proceed();
}
boolean state = true;
object o;
string xid = uuid.randomuuid().tostring();
transactioncontext.bind(xid);
try {
o = methodinvocation.proceed();
} catch (exception e) {
state = false;
throw e;
} finally {
connectionfactory.notify(state);
transactioncontext.remove();
}
return o;
}
}
2.事务上下文类:用于记录当前事务的xid
public class transactioncontext {
private static final threadlocal<string> context_holder = new threadlocal<>();
/**
* gets xid.
*
* @return the xid
*/
public static string getxid() {
string xid = context_holder.get();
if (!stringutils.isempty(xid)) {
return xid;
}
return null;
}
/**
* unbind string.
*
* @return the string
*/
public static string unbind(string xid) {
context_holder.remove();
return xid;
}
/**
* bind string.
*
* @return the string
*/
public static string bind(string xid) {
context_holder.set(xid);
return xid;
}
/**
* remove
*/
public static void remove() {
context_holder.remove();
}
}
3. 会话工厂:用于管理用到的数据源链接
- 关闭使用到的数据源的自动提交能力【putconnection】
- 将使用到的数据源放到当前线程的threadlocal中【putconnection】
- 循环调用了所有数据库连接的notify方法。有一个false就都rollback了【notify】
public class connectionfactory {
private static final threadlocal<map<string, connectionproxy>> connection_holder =
new threadlocal<map<string, connectionproxy>>() {
@override
protected map<string, connectionproxy> initialvalue() {
return new concurrenthashmap<>();
}
};
public static void putconnection(string ds, connectionproxy connection) {
map<string, connectionproxy> concurrenthashmap = connection_holder.get();
if (!concurrenthashmap.containskey(ds)) {
try {
connection.setautocommit(false);
} catch (sqlexception e) {
e.printstacktrace();
}
concurrenthashmap.put(ds, connection);
}
}
public static connectionproxy getconnection(string ds) {
return connection_holder.get().get(ds);
}
public static void notify(boolean state) {
try {
map<string, connectionproxy> concurrenthashmap = connection_holder.get();
for (connectionproxy connectionproxy : concurrenthashmap.values()) {
connectionproxy.notify(state);
}
} finally {
connection_holder.remove();
}
}
}
4.抽象动态数据源的事务处理
- 如果获取不到xid,则非ds事务,直接返回数据库连接
- 如果获取到xid,则为ds事务
- 基于2,如果connectionfactory获取不到对应数据库连接,则调用getconnectionproxy方法
- 基于3,如果能获取到对应数据库连接,直接返回该连接
- 基于2,将新的数据库连接放到connectionproxy中
public abstract class abstractroutingdatasource extends abstractdatasource {
protected abstract datasource determinedatasource();
@override
public connection getconnection() throws sqlexception {
string xid = transactioncontext.getxid();
if (stringutils.isempty(xid)) {
return determinedatasource().getconnection();
} else {
string ds = dynamicdatasourcecontextholder.peek();
ds = stringutils.isempty(ds) ? "default" : ds;
connectionproxy connection = connectionfactory.getconnection(ds);
return connection == null ? getconnectionproxy(ds, determinedatasource().getconnection()) : connection;
}
}
@override
public connection getconnection(string username, string password) throws sqlexception {
string xid = transactioncontext.getxid();
if (stringutils.isempty(xid)) {
return determinedatasource().getconnection(username, password);
} else {
string ds = dynamicdatasourcecontextholder.peek();
ds = stringutils.isempty(ds) ? "default" : ds;
connectionproxy connection = connectionfactory.getconnection(ds);
return connection == null ? getconnectionproxy(ds, determinedatasource().getconnection(username, password))
: connection;
}
}
private connection getconnectionproxy(string ds, connection connection) {
connectionproxy connectionproxy = new connectionproxy(connection, ds);
connectionfactory.putconnection(ds, connectionproxy);
return connectionproxy;
}
//略
}
发表评论