1 背景
在实际开发过程中我们需要父子之间传递一些数据,比如用户信息,日志异步生成数据传递等,该文章从5种解决方案解决父子之间数据传递困扰
2 threadlocal+taskdecorator
用户工具类 userutils
/** *使用threadlocal存储共享的数据变量,如登录的用户信息 */ public class userutils { private static final threadlocal<string> userlocal=new threadlocal<>(); public static string getuserid(){ return userlocal.get(); } public static void setuserid(string userid){ userlocal.set(userid); } public static void clear(){ userlocal.remove(); } }
自定义customtaskdecorator
/** * 线程池修饰类 */ public class customtaskdecorator implements taskdecorator { @override public runnable decorate(runnable runnable) { // 获取主线程中的请求信息(我们的用户信息也放在里面) string robotid = userutils.getuserid(); system.out.println(robotid); return () -> { try { // 将主线程的请求信息,设置到子线程中 userutils.setuserid(robotid); // 执行子线程,这一步不要忘了 runnable.run(); } finally { // 线程结束,清空这些信息,否则可能造成内存泄漏 userutils.clear(); } }; } }
executorconfig
在原来的基础上增加 executor.settaskdecorator(new customtaskdecorator());
@bean(name = "asyncserviceexecutor") public executor asyncserviceexecutor() { log.info("start asyncserviceexecutor----------------"); //threadpooltaskexecutor executor = new threadpooltaskexecutor(); //使用可视化运行状态的线程池 threadpooltaskexecutor executor = new visiablethreadpooltaskexecutor(); //配置核心线程数 executor.setcorepoolsize(corepoolsize); //配置最大线程数 executor.setmaxpoolsize(maxpoolsize); //配置队列大小 executor.setqueuecapacity(queuecapacity); //配置线程池中的线程的名称前缀 executor.setthreadnameprefix(nameprefix); // rejection-policy:当pool已经达到max size的时候,如何处理新任务 // caller_runs:不在新线程中执行任务,而是有调用者所在的线程来执行 executor.setrejectedexecutionhandler(new threadpoolexecutor.callerrunspolicy()); //增加线程池修饰类 executor.settaskdecorator(new customtaskdecorator()); //增加mdc的线程池修饰类 //executor.settaskdecorator(new mdctaskdecorator()); //执行初始化 executor.initialize(); log.info("end asyncserviceexecutor------------"); return executor; }
asyncserviceimpl
/** * 使用threadlocal方式传递 * 带有返回值 * @throws interruptedexception */ @async("asyncserviceexecutor") public completablefuture<string> executevalueasync2() throws interruptedexception { log.info("start executevalueasync"); system.out.println("异步线程执行返回结果......+"); log.info("end executevalueasync"); return completablefuture.completedfuture(userutils.getuserid()); }
test2controller
/** * 使用threadlocal+taskdecorator的方式 * @return * @throws interruptedexception * @throws executionexception */ @getmapping("/test2") public string test2() throws interruptedexception, executionexception { userutils.setuserid("123456"); completablefuture<string> completablefuture = asyncservice.executevalueasync2(); string s = completablefuture.get(); return s; }
3 requestcontextholder+taskdecorator
自定义customtaskdecorator
/** * 线程池修饰类 */ public class customtaskdecorator implements taskdecorator { @override public runnable decorate(runnable runnable) { // 获取主线程中的请求信息(我们的用户信息也放在里面) requestattributes attributes = requestcontextholder.getrequestattributes(); return () -> { try { // 将主线程的请求信息,设置到子线程中 requestcontextholder.setrequestattributes(attributes); // 执行子线程,这一步不要忘了 runnable.run(); } finally { // 线程结束,清空这些信息,否则可能造成内存泄漏 requestcontextholder.resetrequestattributes(); } }; } }
executorconfig
在原来的基础上增加 executor.settaskdecorator(new customtaskdecorator());
@bean(name = "asyncserviceexecutor") public executor asyncserviceexecutor() { log.info("start asyncserviceexecutor----------------"); //threadpooltaskexecutor executor = new threadpooltaskexecutor(); //使用可视化运行状态的线程池 threadpooltaskexecutor executor = new visiablethreadpooltaskexecutor(); //配置核心线程数 executor.setcorepoolsize(corepoolsize); //配置最大线程数 executor.setmaxpoolsize(maxpoolsize); //配置队列大小 executor.setqueuecapacity(queuecapacity); //配置线程池中的线程的名称前缀 executor.setthreadnameprefix(nameprefix); // rejection-policy:当pool已经达到max size的时候,如何处理新任务 // caller_runs:不在新线程中执行任务,而是有调用者所在的线程来执行 executor.setrejectedexecutionhandler(new threadpoolexecutor.callerrunspolicy()); //增加线程池修饰类 executor.settaskdecorator(new customtaskdecorator()); //增加mdc的线程池修饰类 //executor.settaskdecorator(new mdctaskdecorator()); //执行初始化 executor.initialize(); log.info("end asyncserviceexecutor------------"); return executor; }
asyncserviceimpl
/** * 使用requestattributes获取主线程传递的数据 * @return * @throws interruptedexception */ @async("asyncserviceexecutor") public completablefuture<string> executevalueasync3() throws interruptedexception { log.info("start executevalueasync"); system.out.println("异步线程执行返回结果......+"); requestattributes attributes = requestcontextholder.getrequestattributes(); object userid = attributes.getattribute("userid", 0); log.info("end executevalueasync"); return completablefuture.completedfuture(userid.tostring()); }
test2controller
/** * requestcontextholder+taskdecorator的方式 * @return * @throws interruptedexception * @throws executionexception */ @getmapping("/test3") public string test3() throws interruptedexception, executionexception { requestattributes attributes = requestcontextholder.getrequestattributes(); attributes.setattribute("userid","123456",0); completablefuture<string> completablefuture = asyncservice.executevalueasync3(); string s = completablefuture.get(); return s; }
4 mdc+taskdecorator
自定义mdctaskdecorator
/** * 线程池修饰类 */ public class mdctaskdecorator implements taskdecorator { @override public runnable decorate(runnable runnable) { // 获取主线程中的请求信息(我们的用户信息也放在里面) string userid = mdc.get("userid"); map<string, string> copyofcontextmap = mdc.getcopyofcontextmap(); system.out.println(copyofcontextmap); return () -> { try { // 将主线程的请求信息,设置到子线程中 mdc.put("userid",userid); // 执行子线程,这一步不要忘了 runnable.run(); } finally { // 线程结束,清空这些信息,否则可能造成内存泄漏 mdc.clear(); } }; } }
executorconfig
在原来的基础上增加 executor.settaskdecorator(new mdctaskdecorator());
@bean(name = "asyncserviceexecutor") public executor asyncserviceexecutor() { log.info("start asyncserviceexecutor----------------"); //threadpooltaskexecutor executor = new threadpooltaskexecutor(); //使用可视化运行状态的线程池 threadpooltaskexecutor executor = new visiablethreadpooltaskexecutor(); //配置核心线程数 executor.setcorepoolsize(corepoolsize); //配置最大线程数 executor.setmaxpoolsize(maxpoolsize); //配置队列大小 executor.setqueuecapacity(queuecapacity); //配置线程池中的线程的名称前缀 executor.setthreadnameprefix(nameprefix); // rejection-policy:当pool已经达到max size的时候,如何处理新任务 // caller_runs:不在新线程中执行任务,而是有调用者所在的线程来执行 executor.setrejectedexecutionhandler(new threadpoolexecutor.callerrunspolicy()); //增加mdc的线程池修饰类 executor.settaskdecorator(new mdctaskdecorator()); //执行初始化 executor.initialize(); log.info("end asyncserviceexecutor------------"); return executor; }
asyncserviceimpl
/** * 使用mdc获取主线程传递的数据 * @return * @throws interruptedexception */ @async("asyncserviceexecutor") public completablefuture<string> executevalueasync5() throws interruptedexception { log.info("start executevalueasync"); system.out.println("异步线程执行返回结果......+"); log.info("end executevalueasync"); return completablefuture.completedfuture(mdc.get("userid")); }
test2controller
/** * 使用mdc+taskdecorator方式 * 本质也是threadlocal+taskdecorator方式 * @return * @throws interruptedexception * @throws executionexception */ @getmapping("/test5") public string test5() throws interruptedexception, executionexception { mdc.put("userid","123456"); completablefuture<string> completablefuture = asyncservice.executevalueasync5(); string s = completablefuture.get(); return s; }
5 inheritablethreadlocal
测试代码
public class testthreadlocal { public static threadlocal<string> threadlocal = new threadlocal<>(); public static void main(string[] args) { //设置线程变量 threadlocal.set("hello world"); thread thread = new thread(new runnable() { @override public void run( ) { //子线程输出线程变量的值 system.out.println("thread:"+threadlocal.get()); } }); thread.start(); // 主线程输出线程变量的值 system.out.println("main:"+threadlocal.get()); } }
输出结果:
main:hello world
thread:null
从上面结果可以看出:同一个threadlocal变量在父线程中被设置后,在子线程中是获取不到的;
原因在子线程thread里面调用get方法时当前线程为thread线程,而这里调用set方法设置线程变量的是main线程,两者是不同的线程,自然子线程访问时返回null
为了解决上面的问题,inheritablethreadlocal应运而生,inheritablethreadlocal继承threadlocal,其提供一个特性,就是让子线程可以访问在父线程中设置的本地变量
将上面测试代码用inheritablethreadlocal修改
public class testinheritablethreadlocal { public static inheritablethreadlocal<string> threadlocal = new inheritablethreadlocal<>(); public static void main(string[] args) { //设置线程变量 threadlocal.set("hello world"); thread thread = new thread(new runnable() { @override public void run( ) { //子线程输出线程变量的值 system.out.println("thread:"+threadlocal.get()); } }); thread.start(); // 主线程输出线程变量的值 system.out.println("main:"+threadlocal.get()); } }
输出结果:
main:hello world
thread:hello world
5.1 源码分析
public class inheritablethreadlocal<t> extends threadlocal<t> { protected t childvalue(t parentvalue) { return parentvalue; } threadlocalmap getmap(thread t) { return t.inheritablethreadlocals; } void createmap(thread t, t firstvalue) { t.inheritablethreadlocals = new threadlocalmap(this, firstvalue); } }
inheritablethreadlocal 重写了childvalue,getmap,createmap三个方法
在inheritablethreadlocal中,变量inheritablethreadlocals 替代了threadlocals;
那么如何让子线程可以访问父线程的本地变量。这要从创建thread的代码说起,打开thread类的默认构造方法,代码如下:
public thread(runnable target) { init(null, target, "thread-" + nextthreadnum(), 0); } private void init(threadgroup g, runnable target, string name, long stacksize, accesscontrolcontext acc, boolean inheritthreadlocals) { if (name == null) { throw new nullpointerexception("name cannot be null"); } this.name = name; //获取当前线程 thread parent = currentthread(); //如果父线程的 inheritablethreadlocals变量不为null if (inheritthreadlocals && parent.inheritablethreadlocals != null) //设置子线程inheritthreadlocals变量 this.inheritablethreadlocals = threadlocal.createinheritedmap(parent.inheritablethreadlocals); /* stash the specified stack size in case the vm cares */ this.stacksize = stacksize; /* set thread id */ tid = nextthreadid(); }
我们看下createinheritedmap代码:
this.inheritablethreadlocals = threadlocal.createinheritedmap(parent.inheritablethreadlocals);
在createinheritedmap内部使用父线程的inheritablethreadlocals变量作为构造方法创建了一个新的threadlocalmap变量,然后赋值给子线程的inheritablethreadlocals变量。
下面看看threadlocalmap的构造函数内部做了什么事情;
private threadlocalmap(threadlocalmap parentmap) { entry[] parenttable = parentmap.table; int len = parenttable.length; setthreshold(len); table = new entry[len]; for (int j = 0; j < len; j++) { entry e = parenttable[j]; if (e != null) { @suppresswarnings("unchecked") threadlocal<object> key = (threadlocal<object>) e.get(); if (key != null) { object value = key.childvalue(e.value); entry c = new entry(key, value); int h = key.threadlocalhashcode & (len - 1); while (table[h] != null) h = nextindex(h, len); table[h] = c; size++; } } } }
inheritablethreadlocal 类通过重写下面代码
threadlocalmap getmap(thread t) { return t.inheritablethreadlocals; } /** * create the map associated with a threadlocal. * * @param t the current thread * @param firstvalue value for the initial entry of the table. */ void createmap(thread t, t firstvalue) { t.inheritablethreadlocals = new threadlocalmap(this, firstvalue); }
让本地变量保存到了具体的线程的inheritablethreadlocals变量里面,那么线程在通过inheritablethreadlocal类实例的set或者get方法设置变量时,就会创建当前线程的inheritablethreadlocals变量。
当父线程创建子线程时,构造方法会把父线程中的inheritablethreadlocals变量里面的本地变量赋值一份保存到子线程的inheritablethreadlocals变量里面
5.2 inheritablethreadlocal存在的问题
虽然inheritablethreadlocal可以解决在子线程中获取父线程的值的问题,但是在使用线程池的情况下,由于不同的任务有可能是同一个线程处理,因此这些任务取到的值有可能并不是父线程设置的值
测试目标:任务1和任务2 获取父线程值一样,为测试代码中的hello world
测试代码:
public class testinheritablethreadlocaissue { public static inheritablethreadlocal<string> threadlocal = new inheritablethreadlocal<>(); public static executorservice executorservice = executors.newsinglethreadexecutor(); public static void main(string[] args) throws exception { //设置线程变量 threadlocal.set("hello world"); thread thread1 = new thread(new runnable() { @override public void run( ) { //子线程输出线程变量的值 system.out.println("thread:"+threadlocal.get()); threadlocal.set("hello world 2"); } },"task1"); thread thread2 = new thread(new runnable() { @override public void run( ) { //子线程输出线程变量的值 system.out.println("thread:"+threadlocal.get()); threadlocal.set("hello world 2"); } },"task2"); executorservice.submit(thread1).get(); executorservice.submit(thread2).get(); // 主线程输出线程变量的值 system.out.println("main:"+threadlocal.get()); } }
输出结果:
thread:hello world
thread:hello world 2
main:hello world
结果分析:
很明显,任务2获取的不是父线程设置的hello world ,而是线程1修改后的值。如果在线程池中使用,需要注意这种情况(可以备份备份父线程的值)
6 transmittablethreadlocal
解决线程池化值传递
阿里封装了一个工具,实现了在使用线程池等会池化复用线程的组件情况下,提供threadlocal值的传递功能,解决异步执行时上下文传递的问题
jdk的inheritablethreadlocal类可以完成父线程到子线程的值传递。但对于使用线程池等会池化复用线程的执行组件的情况,线程由线程池创建好,并且线程是池化起来反复使用的;
这时父子线程关系的threadlocal值传递已经没有意义,应用需要的实际上是把 任务提交给线程池时的threadlocal值传递到 任务执行时
https://github.com/alibaba/transmittable-thread-local
引入:
<dependency> <groupid>com.alibaba</groupid> <artifactid>transmittable-thread-local</artifactid> <version>2.11.5</version> </dependency>
需求场景:
- 1.分布式跟踪系统 或 全链路压测(即链路打标)
- 2.日志收集记录系统上下文
- 3.session级cache
- 4.应用容器或上层框架跨应用代码给下层sdk传递信息
测试代码:
1)父子线程信息传递
public static transmittablethreadlocal<string> threadlocal = new transmittablethreadlocal<>(); public static void main(string[] args) { //设置线程变量 threadlocal.set("hello world"); thread thread = new thread(new runnable() { @override public void run( ) { //子线程输出线程变量的值 system.out.println("thread:"+threadlocal.get()); } }); thread.start(); // 主线程输出线程变量的值 system.out.println("main:"+threadlocal.get()); } }
输出结果:
main:hello world
thread:hello world
2)线程池中传递值,参考github:修饰线程池
总结
以上为个人经验,希望能给大家一个参考,也希望大家多多支持代码网。
发表评论