1 背景
在实际开发过程中我们需要父子之间传递一些数据,比如用户信息,日志异步生成数据传递等,该文章从5种解决方案解决父子之间数据传递困扰
2 threadlocal+taskdecorator
用户工具类 userutils
/**
*使用threadlocal存储共享的数据变量,如登录的用户信息
*/
public class userutils {
private static final threadlocal<string> userlocal=new threadlocal<>();
public static string getuserid(){
return userlocal.get();
}
public static void setuserid(string userid){
userlocal.set(userid);
}
public static void clear(){
userlocal.remove();
}
}自定义customtaskdecorator
/**
* 线程池修饰类
*/
public class customtaskdecorator implements taskdecorator {
@override
public runnable decorate(runnable runnable) {
// 获取主线程中的请求信息(我们的用户信息也放在里面)
string robotid = userutils.getuserid();
system.out.println(robotid);
return () -> {
try {
// 将主线程的请求信息,设置到子线程中
userutils.setuserid(robotid);
// 执行子线程,这一步不要忘了
runnable.run();
} finally {
// 线程结束,清空这些信息,否则可能造成内存泄漏
userutils.clear();
}
};
}
}executorconfig
在原来的基础上增加 executor.settaskdecorator(new customtaskdecorator());
@bean(name = "asyncserviceexecutor")
public executor asyncserviceexecutor() {
log.info("start asyncserviceexecutor----------------");
//threadpooltaskexecutor executor = new threadpooltaskexecutor();
//使用可视化运行状态的线程池
threadpooltaskexecutor executor = new visiablethreadpooltaskexecutor();
//配置核心线程数
executor.setcorepoolsize(corepoolsize);
//配置最大线程数
executor.setmaxpoolsize(maxpoolsize);
//配置队列大小
executor.setqueuecapacity(queuecapacity);
//配置线程池中的线程的名称前缀
executor.setthreadnameprefix(nameprefix);
// rejection-policy:当pool已经达到max size的时候,如何处理新任务
// caller_runs:不在新线程中执行任务,而是有调用者所在的线程来执行
executor.setrejectedexecutionhandler(new threadpoolexecutor.callerrunspolicy());
//增加线程池修饰类
executor.settaskdecorator(new customtaskdecorator());
//增加mdc的线程池修饰类
//executor.settaskdecorator(new mdctaskdecorator());
//执行初始化
executor.initialize();
log.info("end asyncserviceexecutor------------");
return executor;
}asyncserviceimpl
/**
* 使用threadlocal方式传递
* 带有返回值
* @throws interruptedexception
*/
@async("asyncserviceexecutor")
public completablefuture<string> executevalueasync2() throws interruptedexception {
log.info("start executevalueasync");
system.out.println("异步线程执行返回结果......+");
log.info("end executevalueasync");
return completablefuture.completedfuture(userutils.getuserid());
}
test2controller
/**
* 使用threadlocal+taskdecorator的方式
* @return
* @throws interruptedexception
* @throws executionexception
*/
@getmapping("/test2")
public string test2() throws interruptedexception, executionexception {
userutils.setuserid("123456");
completablefuture<string> completablefuture = asyncservice.executevalueasync2();
string s = completablefuture.get();
return s;
}3 requestcontextholder+taskdecorator
自定义customtaskdecorator
/**
* 线程池修饰类
*/
public class customtaskdecorator implements taskdecorator {
@override
public runnable decorate(runnable runnable) {
// 获取主线程中的请求信息(我们的用户信息也放在里面)
requestattributes attributes = requestcontextholder.getrequestattributes();
return () -> {
try {
// 将主线程的请求信息,设置到子线程中
requestcontextholder.setrequestattributes(attributes);
// 执行子线程,这一步不要忘了
runnable.run();
} finally {
// 线程结束,清空这些信息,否则可能造成内存泄漏
requestcontextholder.resetrequestattributes();
}
};
}
}executorconfig
在原来的基础上增加 executor.settaskdecorator(new customtaskdecorator());
@bean(name = "asyncserviceexecutor")
public executor asyncserviceexecutor() {
log.info("start asyncserviceexecutor----------------");
//threadpooltaskexecutor executor = new threadpooltaskexecutor();
//使用可视化运行状态的线程池
threadpooltaskexecutor executor = new visiablethreadpooltaskexecutor();
//配置核心线程数
executor.setcorepoolsize(corepoolsize);
//配置最大线程数
executor.setmaxpoolsize(maxpoolsize);
//配置队列大小
executor.setqueuecapacity(queuecapacity);
//配置线程池中的线程的名称前缀
executor.setthreadnameprefix(nameprefix);
// rejection-policy:当pool已经达到max size的时候,如何处理新任务
// caller_runs:不在新线程中执行任务,而是有调用者所在的线程来执行
executor.setrejectedexecutionhandler(new threadpoolexecutor.callerrunspolicy());
//增加线程池修饰类
executor.settaskdecorator(new customtaskdecorator());
//增加mdc的线程池修饰类
//executor.settaskdecorator(new mdctaskdecorator());
//执行初始化
executor.initialize();
log.info("end asyncserviceexecutor------------");
return executor;
}asyncserviceimpl
/**
* 使用requestattributes获取主线程传递的数据
* @return
* @throws interruptedexception
*/
@async("asyncserviceexecutor")
public completablefuture<string> executevalueasync3() throws interruptedexception {
log.info("start executevalueasync");
system.out.println("异步线程执行返回结果......+");
requestattributes attributes = requestcontextholder.getrequestattributes();
object userid = attributes.getattribute("userid", 0);
log.info("end executevalueasync");
return completablefuture.completedfuture(userid.tostring());
}test2controller
/**
* requestcontextholder+taskdecorator的方式
* @return
* @throws interruptedexception
* @throws executionexception
*/
@getmapping("/test3")
public string test3() throws interruptedexception, executionexception {
requestattributes attributes = requestcontextholder.getrequestattributes();
attributes.setattribute("userid","123456",0);
completablefuture<string> completablefuture = asyncservice.executevalueasync3();
string s = completablefuture.get();
return s;
}4 mdc+taskdecorator
自定义mdctaskdecorator
/**
* 线程池修饰类
*/
public class mdctaskdecorator implements taskdecorator {
@override
public runnable decorate(runnable runnable) {
// 获取主线程中的请求信息(我们的用户信息也放在里面)
string userid = mdc.get("userid");
map<string, string> copyofcontextmap = mdc.getcopyofcontextmap();
system.out.println(copyofcontextmap);
return () -> {
try {
// 将主线程的请求信息,设置到子线程中
mdc.put("userid",userid);
// 执行子线程,这一步不要忘了
runnable.run();
} finally {
// 线程结束,清空这些信息,否则可能造成内存泄漏
mdc.clear();
}
};
}
}executorconfig
在原来的基础上增加 executor.settaskdecorator(new mdctaskdecorator());
@bean(name = "asyncserviceexecutor")
public executor asyncserviceexecutor() {
log.info("start asyncserviceexecutor----------------");
//threadpooltaskexecutor executor = new threadpooltaskexecutor();
//使用可视化运行状态的线程池
threadpooltaskexecutor executor = new visiablethreadpooltaskexecutor();
//配置核心线程数
executor.setcorepoolsize(corepoolsize);
//配置最大线程数
executor.setmaxpoolsize(maxpoolsize);
//配置队列大小
executor.setqueuecapacity(queuecapacity);
//配置线程池中的线程的名称前缀
executor.setthreadnameprefix(nameprefix);
// rejection-policy:当pool已经达到max size的时候,如何处理新任务
// caller_runs:不在新线程中执行任务,而是有调用者所在的线程来执行
executor.setrejectedexecutionhandler(new threadpoolexecutor.callerrunspolicy());
//增加mdc的线程池修饰类
executor.settaskdecorator(new mdctaskdecorator());
//执行初始化
executor.initialize();
log.info("end asyncserviceexecutor------------");
return executor;
}asyncserviceimpl
/**
* 使用mdc获取主线程传递的数据
* @return
* @throws interruptedexception
*/
@async("asyncserviceexecutor")
public completablefuture<string> executevalueasync5() throws interruptedexception {
log.info("start executevalueasync");
system.out.println("异步线程执行返回结果......+");
log.info("end executevalueasync");
return completablefuture.completedfuture(mdc.get("userid"));
}test2controller
/**
* 使用mdc+taskdecorator方式
* 本质也是threadlocal+taskdecorator方式
* @return
* @throws interruptedexception
* @throws executionexception
*/
@getmapping("/test5")
public string test5() throws interruptedexception, executionexception {
mdc.put("userid","123456");
completablefuture<string> completablefuture = asyncservice.executevalueasync5();
string s = completablefuture.get();
return s;
}5 inheritablethreadlocal
测试代码
public class testthreadlocal {
public static threadlocal<string> threadlocal = new threadlocal<>();
public static void main(string[] args) {
//设置线程变量
threadlocal.set("hello world");
thread thread = new thread(new runnable() {
@override
public void run( ) {
//子线程输出线程变量的值
system.out.println("thread:"+threadlocal.get());
}
});
thread.start();
// 主线程输出线程变量的值
system.out.println("main:"+threadlocal.get());
}
}输出结果:
main:hello world
thread:null
从上面结果可以看出:同一个threadlocal变量在父线程中被设置后,在子线程中是获取不到的;
原因在子线程thread里面调用get方法时当前线程为thread线程,而这里调用set方法设置线程变量的是main线程,两者是不同的线程,自然子线程访问时返回null
为了解决上面的问题,inheritablethreadlocal应运而生,inheritablethreadlocal继承threadlocal,其提供一个特性,就是让子线程可以访问在父线程中设置的本地变量
将上面测试代码用inheritablethreadlocal修改
public class testinheritablethreadlocal {
public static inheritablethreadlocal<string> threadlocal = new inheritablethreadlocal<>();
public static void main(string[] args) {
//设置线程变量
threadlocal.set("hello world");
thread thread = new thread(new runnable() {
@override
public void run( ) {
//子线程输出线程变量的值
system.out.println("thread:"+threadlocal.get());
}
});
thread.start();
// 主线程输出线程变量的值
system.out.println("main:"+threadlocal.get());
}
}输出结果:
main:hello world
thread:hello world
5.1 源码分析
public class inheritablethreadlocal<t> extends threadlocal<t> {
protected t childvalue(t parentvalue) {
return parentvalue;
}
threadlocalmap getmap(thread t) {
return t.inheritablethreadlocals;
}
void createmap(thread t, t firstvalue) {
t.inheritablethreadlocals = new threadlocalmap(this, firstvalue);
}
}inheritablethreadlocal 重写了childvalue,getmap,createmap三个方法
在inheritablethreadlocal中,变量inheritablethreadlocals 替代了threadlocals;
那么如何让子线程可以访问父线程的本地变量。这要从创建thread的代码说起,打开thread类的默认构造方法,代码如下:
public thread(runnable target) {
init(null, target, "thread-" + nextthreadnum(), 0);
}
private void init(threadgroup g, runnable target, string name,
long stacksize, accesscontrolcontext acc,
boolean inheritthreadlocals) {
if (name == null) {
throw new nullpointerexception("name cannot be null");
}
this.name = name;
//获取当前线程
thread parent = currentthread();
//如果父线程的 inheritablethreadlocals变量不为null
if (inheritthreadlocals && parent.inheritablethreadlocals != null)
//设置子线程inheritthreadlocals变量
this.inheritablethreadlocals =
threadlocal.createinheritedmap(parent.inheritablethreadlocals);
/* stash the specified stack size in case the vm cares */
this.stacksize = stacksize;
/* set thread id */
tid = nextthreadid();
}我们看下createinheritedmap代码:
this.inheritablethreadlocals = threadlocal.createinheritedmap(parent.inheritablethreadlocals);
在createinheritedmap内部使用父线程的inheritablethreadlocals变量作为构造方法创建了一个新的threadlocalmap变量,然后赋值给子线程的inheritablethreadlocals变量。
下面看看threadlocalmap的构造函数内部做了什么事情;
private threadlocalmap(threadlocalmap parentmap) {
entry[] parenttable = parentmap.table;
int len = parenttable.length;
setthreshold(len);
table = new entry[len];
for (int j = 0; j < len; j++) {
entry e = parenttable[j];
if (e != null) {
@suppresswarnings("unchecked")
threadlocal<object> key = (threadlocal<object>) e.get();
if (key != null) {
object value = key.childvalue(e.value);
entry c = new entry(key, value);
int h = key.threadlocalhashcode & (len - 1);
while (table[h] != null)
h = nextindex(h, len);
table[h] = c;
size++;
}
}
}
}inheritablethreadlocal 类通过重写下面代码
threadlocalmap getmap(thread t) {
return t.inheritablethreadlocals;
}
/**
* create the map associated with a threadlocal.
*
* @param t the current thread
* @param firstvalue value for the initial entry of the table.
*/
void createmap(thread t, t firstvalue) {
t.inheritablethreadlocals = new threadlocalmap(this, firstvalue);
}让本地变量保存到了具体的线程的inheritablethreadlocals变量里面,那么线程在通过inheritablethreadlocal类实例的set或者get方法设置变量时,就会创建当前线程的inheritablethreadlocals变量。
当父线程创建子线程时,构造方法会把父线程中的inheritablethreadlocals变量里面的本地变量赋值一份保存到子线程的inheritablethreadlocals变量里面
5.2 inheritablethreadlocal存在的问题
虽然inheritablethreadlocal可以解决在子线程中获取父线程的值的问题,但是在使用线程池的情况下,由于不同的任务有可能是同一个线程处理,因此这些任务取到的值有可能并不是父线程设置的值
测试目标:任务1和任务2 获取父线程值一样,为测试代码中的hello world
测试代码:
public class testinheritablethreadlocaissue {
public static inheritablethreadlocal<string> threadlocal = new inheritablethreadlocal<>();
public static executorservice executorservice = executors.newsinglethreadexecutor();
public static void main(string[] args) throws exception {
//设置线程变量
threadlocal.set("hello world");
thread thread1 = new thread(new runnable() {
@override
public void run( ) {
//子线程输出线程变量的值
system.out.println("thread:"+threadlocal.get());
threadlocal.set("hello world 2");
}
},"task1");
thread thread2 = new thread(new runnable() {
@override
public void run( ) {
//子线程输出线程变量的值
system.out.println("thread:"+threadlocal.get());
threadlocal.set("hello world 2");
}
},"task2");
executorservice.submit(thread1).get();
executorservice.submit(thread2).get();
// 主线程输出线程变量的值
system.out.println("main:"+threadlocal.get());
}
}输出结果:
thread:hello world
thread:hello world 2
main:hello world
结果分析:
很明显,任务2获取的不是父线程设置的hello world ,而是线程1修改后的值。如果在线程池中使用,需要注意这种情况(可以备份备份父线程的值)
6 transmittablethreadlocal
解决线程池化值传递
阿里封装了一个工具,实现了在使用线程池等会池化复用线程的组件情况下,提供threadlocal值的传递功能,解决异步执行时上下文传递的问题
jdk的inheritablethreadlocal类可以完成父线程到子线程的值传递。但对于使用线程池等会池化复用线程的执行组件的情况,线程由线程池创建好,并且线程是池化起来反复使用的;
这时父子线程关系的threadlocal值传递已经没有意义,应用需要的实际上是把 任务提交给线程池时的threadlocal值传递到 任务执行时
https://github.com/alibaba/transmittable-thread-local
引入:
<dependency> <groupid>com.alibaba</groupid> <artifactid>transmittable-thread-local</artifactid> <version>2.11.5</version> </dependency>
需求场景:
- 1.分布式跟踪系统 或 全链路压测(即链路打标)
- 2.日志收集记录系统上下文
- 3.session级cache
- 4.应用容器或上层框架跨应用代码给下层sdk传递信息
测试代码:
1)父子线程信息传递
public static transmittablethreadlocal<string> threadlocal = new transmittablethreadlocal<>();
public static void main(string[] args) {
//设置线程变量
threadlocal.set("hello world");
thread thread = new thread(new runnable() {
@override
public void run( ) {
//子线程输出线程变量的值
system.out.println("thread:"+threadlocal.get());
}
});
thread.start();
// 主线程输出线程变量的值
system.out.println("main:"+threadlocal.get());
}
}输出结果:
main:hello world
thread:hello world
2)线程池中传递值,参考github:修饰线程池
总结
以上为个人经验,希望能给大家一个参考,也希望大家多多支持代码网。
发表评论