当前位置: 代码网 > it编程>编程语言>Java > Java多线程父线程向子线程传值问题及解决

Java多线程父线程向子线程传值问题及解决

2025年02月15日 Java 我要评论
1 背景在实际开发过程中我们需要父子之间传递一些数据,比如用户信息,日志异步生成数据传递等,该文章从5种解决方案解决父子之间数据传递困扰2 threadlocal+taskdecorator用户工具类

1 背景

在实际开发过程中我们需要父子之间传递一些数据,比如用户信息,日志异步生成数据传递等,该文章从5种解决方案解决父子之间数据传递困扰

2 threadlocal+taskdecorator

用户工具类 userutils

/**
 *使用threadlocal存储共享的数据变量,如登录的用户信息
 */
public class userutils {
    private static  final  threadlocal<string> userlocal=new threadlocal<>();
 
    public static  string getuserid(){
        return userlocal.get();
    }
    public static void setuserid(string userid){
        userlocal.set(userid);
    }
 
    public static void clear(){
        userlocal.remove();
    }
 
}

自定义customtaskdecorator

/**
 * 线程池修饰类
 */
public class customtaskdecorator implements taskdecorator {
    @override
    public runnable decorate(runnable runnable) {
        // 获取主线程中的请求信息(我们的用户信息也放在里面)
        string robotid = userutils.getuserid();
        system.out.println(robotid);
        return () -> {
            try {
                // 将主线程的请求信息,设置到子线程中
                userutils.setuserid(robotid);
                // 执行子线程,这一步不要忘了
                runnable.run();
            } finally {
                // 线程结束,清空这些信息,否则可能造成内存泄漏
                userutils.clear();
            }
        };
    }
}

executorconfig

在原来的基础上增加 executor.settaskdecorator(new customtaskdecorator());

@bean(name = "asyncserviceexecutor")
    public executor asyncserviceexecutor() {
        log.info("start asyncserviceexecutor----------------");
        //threadpooltaskexecutor executor = new threadpooltaskexecutor();
        //使用可视化运行状态的线程池
        threadpooltaskexecutor executor = new visiablethreadpooltaskexecutor();
        //配置核心线程数
        executor.setcorepoolsize(corepoolsize);
        //配置最大线程数
        executor.setmaxpoolsize(maxpoolsize);
        //配置队列大小
        executor.setqueuecapacity(queuecapacity);
        //配置线程池中的线程的名称前缀
        executor.setthreadnameprefix(nameprefix);
 
        // rejection-policy:当pool已经达到max size的时候,如何处理新任务
        // caller_runs:不在新线程中执行任务,而是有调用者所在的线程来执行
        executor.setrejectedexecutionhandler(new threadpoolexecutor.callerrunspolicy());
 
        //增加线程池修饰类
        executor.settaskdecorator(new customtaskdecorator());
        //增加mdc的线程池修饰类
        //executor.settaskdecorator(new mdctaskdecorator());
        //执行初始化
        executor.initialize();
        log.info("end asyncserviceexecutor------------");
        return executor;
    }

asyncserviceimpl

    /**
     * 使用threadlocal方式传递
     * 带有返回值
     * @throws interruptedexception
     */
    @async("asyncserviceexecutor")
    public completablefuture<string> executevalueasync2() throws interruptedexception {
        log.info("start executevalueasync");
        system.out.println("异步线程执行返回结果......+");
        log.info("end executevalueasync");
        return completablefuture.completedfuture(userutils.getuserid());
    }
 

test2controller

    /**
     * 使用threadlocal+taskdecorator的方式
     * @return
     * @throws interruptedexception
     * @throws executionexception
     */
    @getmapping("/test2")
    public string test2() throws interruptedexception, executionexception {
        userutils.setuserid("123456");
        completablefuture<string> completablefuture = asyncservice.executevalueasync2();
        string s = completablefuture.get();
        return s;
    }

3 requestcontextholder+taskdecorator

自定义customtaskdecorator

/**
 * 线程池修饰类
 */
public class customtaskdecorator implements taskdecorator {
    @override
    public runnable decorate(runnable runnable) {
        // 获取主线程中的请求信息(我们的用户信息也放在里面)
        requestattributes attributes = requestcontextholder.getrequestattributes();
        return () -> {
            try {
                // 将主线程的请求信息,设置到子线程中
                requestcontextholder.setrequestattributes(attributes);
                // 执行子线程,这一步不要忘了
                runnable.run();
            } finally {
                // 线程结束,清空这些信息,否则可能造成内存泄漏
                requestcontextholder.resetrequestattributes();
            }
        };
    }
}

executorconfig

在原来的基础上增加 executor.settaskdecorator(new customtaskdecorator());

@bean(name = "asyncserviceexecutor")
    public executor asyncserviceexecutor() {
        log.info("start asyncserviceexecutor----------------");
        //threadpooltaskexecutor executor = new threadpooltaskexecutor();
        //使用可视化运行状态的线程池
        threadpooltaskexecutor executor = new visiablethreadpooltaskexecutor();
        //配置核心线程数
        executor.setcorepoolsize(corepoolsize);
        //配置最大线程数
        executor.setmaxpoolsize(maxpoolsize);
        //配置队列大小
        executor.setqueuecapacity(queuecapacity);
        //配置线程池中的线程的名称前缀
        executor.setthreadnameprefix(nameprefix);
 
        // rejection-policy:当pool已经达到max size的时候,如何处理新任务
        // caller_runs:不在新线程中执行任务,而是有调用者所在的线程来执行
        executor.setrejectedexecutionhandler(new threadpoolexecutor.callerrunspolicy());
 
        //增加线程池修饰类
        executor.settaskdecorator(new customtaskdecorator());
        //增加mdc的线程池修饰类
        //executor.settaskdecorator(new mdctaskdecorator());
        //执行初始化
        executor.initialize();
        log.info("end asyncserviceexecutor------------");
        return executor;
    }

asyncserviceimpl

     /**
     * 使用requestattributes获取主线程传递的数据
     * @return
     * @throws interruptedexception
     */
    @async("asyncserviceexecutor")
    public completablefuture<string> executevalueasync3() throws interruptedexception {
        log.info("start executevalueasync");
        system.out.println("异步线程执行返回结果......+");
        requestattributes attributes = requestcontextholder.getrequestattributes();
        object userid = attributes.getattribute("userid", 0);
        log.info("end executevalueasync");
        return completablefuture.completedfuture(userid.tostring());
    }

test2controller

    /**
     * requestcontextholder+taskdecorator的方式
     * @return
     * @throws interruptedexception
     * @throws executionexception
     */
    @getmapping("/test3")
    public string test3() throws interruptedexception, executionexception {
        requestattributes attributes = requestcontextholder.getrequestattributes();
        attributes.setattribute("userid","123456",0);
        completablefuture<string> completablefuture = asyncservice.executevalueasync3();
        string s = completablefuture.get();
        return s;
    }

4 mdc+taskdecorator

自定义mdctaskdecorator

/**
 * 线程池修饰类
 */
public class mdctaskdecorator implements taskdecorator {
    @override
    public runnable decorate(runnable runnable) {
        // 获取主线程中的请求信息(我们的用户信息也放在里面)
        string userid = mdc.get("userid");
        map<string, string> copyofcontextmap = mdc.getcopyofcontextmap();
        system.out.println(copyofcontextmap);
        return () -> {
            try {
                // 将主线程的请求信息,设置到子线程中
                mdc.put("userid",userid);
                // 执行子线程,这一步不要忘了
                runnable.run();
            } finally {
                // 线程结束,清空这些信息,否则可能造成内存泄漏
                mdc.clear();
            }
        };
    }
}

executorconfig

在原来的基础上增加 executor.settaskdecorator(new mdctaskdecorator());

@bean(name = "asyncserviceexecutor")
    public executor asyncserviceexecutor() {
        log.info("start asyncserviceexecutor----------------");
        //threadpooltaskexecutor executor = new threadpooltaskexecutor();
        //使用可视化运行状态的线程池
        threadpooltaskexecutor executor = new visiablethreadpooltaskexecutor();
        //配置核心线程数
        executor.setcorepoolsize(corepoolsize);
        //配置最大线程数
        executor.setmaxpoolsize(maxpoolsize);
        //配置队列大小
        executor.setqueuecapacity(queuecapacity);
        //配置线程池中的线程的名称前缀
        executor.setthreadnameprefix(nameprefix);
 
        // rejection-policy:当pool已经达到max size的时候,如何处理新任务
        // caller_runs:不在新线程中执行任务,而是有调用者所在的线程来执行
        executor.setrejectedexecutionhandler(new threadpoolexecutor.callerrunspolicy());
 
        //增加mdc的线程池修饰类
        executor.settaskdecorator(new mdctaskdecorator());
        //执行初始化
        executor.initialize();
        log.info("end asyncserviceexecutor------------");
        return executor;
    }

asyncserviceimpl

         /**
     * 使用mdc获取主线程传递的数据
     * @return
     * @throws interruptedexception
     */
    @async("asyncserviceexecutor")
    public completablefuture<string> executevalueasync5() throws interruptedexception {
        log.info("start executevalueasync");
        system.out.println("异步线程执行返回结果......+");
        log.info("end executevalueasync");
        return completablefuture.completedfuture(mdc.get("userid"));
    }

test2controller

     /**
     * 使用mdc+taskdecorator方式
     * 本质也是threadlocal+taskdecorator方式
     * @return
     * @throws interruptedexception
     * @throws executionexception
     */
    @getmapping("/test5")
    public string test5() throws interruptedexception, executionexception {
        mdc.put("userid","123456");
        completablefuture<string> completablefuture = asyncservice.executevalueasync5();
        string s = completablefuture.get();
        return s;
    }

5 inheritablethreadlocal

测试代码

public class testthreadlocal {
	public static threadlocal<string> threadlocal = new threadlocal<>();
	public static void main(string[] args) {
		 //设置线程变量
        threadlocal.set("hello world");
        thread thread = new thread(new runnable() {
            @override
            public void run( ) {
                //子线程输出线程变量的值
                system.out.println("thread:"+threadlocal.get());
            }
        });
        thread.start();
        // 主线程输出线程变量的值
        system.out.println("main:"+threadlocal.get());
	}
}

输出结果:

main:hello world

thread:null

从上面结果可以看出:同一个threadlocal变量在父线程中被设置后,在子线程中是获取不到的;

原因在子线程thread里面调用get方法时当前线程为thread线程,而这里调用set方法设置线程变量的是main线程,两者是不同的线程,自然子线程访问时返回null

为了解决上面的问题,inheritablethreadlocal应运而生,inheritablethreadlocal继承threadlocal,其提供一个特性,就是让子线程可以访问在父线程中设置的本地变量

将上面测试代码用inheritablethreadlocal修改

public class testinheritablethreadlocal {
	
	public static inheritablethreadlocal<string> threadlocal = new inheritablethreadlocal<>();
	
	public static void main(string[] args) {
		 //设置线程变量
        threadlocal.set("hello world");
        thread thread = new thread(new runnable() {
            @override
            public void run( ) {
                //子线程输出线程变量的值
                system.out.println("thread:"+threadlocal.get());
            }
        });
        thread.start();
        // 主线程输出线程变量的值
        system.out.println("main:"+threadlocal.get());
	}
}

输出结果:

main:hello world

thread:hello world

5.1 源码分析

public class inheritablethreadlocal<t> extends threadlocal<t> {
    protected t childvalue(t parentvalue) {
        return parentvalue;
    }
    threadlocalmap getmap(thread t) {
       return t.inheritablethreadlocals;
    }
    void createmap(thread t, t firstvalue) {
        t.inheritablethreadlocals = new threadlocalmap(this, firstvalue);
    }
}

inheritablethreadlocal 重写了childvalue,getmap,createmap三个方法

在inheritablethreadlocal中,变量inheritablethreadlocals 替代了threadlocals;

那么如何让子线程可以访问父线程的本地变量。这要从创建thread的代码说起,打开thread类的默认构造方法,代码如下:

  public thread(runnable target) {
        init(null, target, "thread-" + nextthreadnum(), 0);
    }
 private void init(threadgroup g, runnable target, string name,
                      long stacksize, accesscontrolcontext acc,
                      boolean inheritthreadlocals) {
        if (name == null) {
            throw new nullpointerexception("name cannot be null");
        }
        this.name = name;
        //获取当前线程
        thread parent = currentthread();
       //如果父线程的 inheritablethreadlocals变量不为null
        if (inheritthreadlocals && parent.inheritablethreadlocals != null)
        //设置子线程inheritthreadlocals变量
            this.inheritablethreadlocals =
threadlocal.createinheritedmap(parent.inheritablethreadlocals);
        /* stash the specified stack size in case the vm cares */
        this.stacksize = stacksize;
        /* set thread id */
        tid = nextthreadid();
    }

我们看下createinheritedmap代码:

this.inheritablethreadlocals =            threadlocal.createinheritedmap(parent.inheritablethreadlocals);

在createinheritedmap内部使用父线程的inheritablethreadlocals变量作为构造方法创建了一个新的threadlocalmap变量,然后赋值给子线程的inheritablethreadlocals变量。

下面看看threadlocalmap的构造函数内部做了什么事情;

private threadlocalmap(threadlocalmap parentmap) {
            entry[] parenttable = parentmap.table;
            int len = parenttable.length;
            setthreshold(len);
            table = new entry[len];
            for (int j = 0; j < len; j++) {
                entry e = parenttable[j];
                if (e != null) {
                    @suppresswarnings("unchecked")
                    threadlocal<object> key = (threadlocal<object>) e.get();
                    if (key != null) {
                        object value = key.childvalue(e.value);
                        entry c = new entry(key, value);
                        int h = key.threadlocalhashcode & (len - 1);
                        while (table[h] != null)
                            h = nextindex(h, len);
                        table[h] = c;
                        size++;
                    }
                }
            }
        }

inheritablethreadlocal 类通过重写下面代码

 threadlocalmap getmap(thread t) {
       return t.inheritablethreadlocals;
    }
    /**
     * create the map associated with a threadlocal.
     *
     * @param t the current thread
     * @param firstvalue value for the initial entry of the table.
     */
    void createmap(thread t, t firstvalue) {
        t.inheritablethreadlocals = new threadlocalmap(this, firstvalue);
    }

让本地变量保存到了具体的线程的inheritablethreadlocals变量里面,那么线程在通过inheritablethreadlocal类实例的set或者get方法设置变量时,就会创建当前线程的inheritablethreadlocals变量。

当父线程创建子线程时,构造方法会把父线程中的inheritablethreadlocals变量里面的本地变量赋值一份保存到子线程的inheritablethreadlocals变量里面

5.2 inheritablethreadlocal存在的问题

虽然inheritablethreadlocal可以解决在子线程中获取父线程的值的问题,但是在使用线程池的情况下,由于不同的任务有可能是同一个线程处理,因此这些任务取到的值有可能并不是父线程设置的值

测试目标:任务1和任务2 获取父线程值一样,为测试代码中的hello world

测试代码:

public class testinheritablethreadlocaissue {
	
	public static inheritablethreadlocal<string> threadlocal = new inheritablethreadlocal<>();
	public static executorservice executorservice = executors.newsinglethreadexecutor();
	
	public static void main(string[] args) throws exception {
		 //设置线程变量
        threadlocal.set("hello world");
        thread thread1 = new thread(new runnable() {
            @override
            public void run( ) {
                //子线程输出线程变量的值
                system.out.println("thread:"+threadlocal.get());
                threadlocal.set("hello world 2");
            }
        },"task1");
        thread thread2 = new thread(new runnable() {
            @override
            public void run( ) {
                //子线程输出线程变量的值
                system.out.println("thread:"+threadlocal.get());
                threadlocal.set("hello world 2");
            }
        },"task2");
        executorservice.submit(thread1).get();
        executorservice.submit(thread2).get();
        
        // 主线程输出线程变量的值
        system.out.println("main:"+threadlocal.get());
	}
}

输出结果:

thread:hello world

thread:hello world 2

main:hello world

结果分析:

很明显,任务2获取的不是父线程设置的hello world ,而是线程1修改后的值。如果在线程池中使用,需要注意这种情况(可以备份备份父线程的值)

6 transmittablethreadlocal

解决线程池化值传递

阿里封装了一个工具,实现了在使用线程池等会池化复用线程的组件情况下,提供threadlocal值的传递功能,解决异步执行时上下文传递的问题

jdk的inheritablethreadlocal类可以完成父线程到子线程的值传递。但对于使用线程池等会池化复用线程的执行组件的情况,线程由线程池创建好,并且线程是池化起来反复使用的;

这时父子线程关系的threadlocal值传递已经没有意义,应用需要的实际上是把 任务提交给线程池时的threadlocal值传递到 任务执行时

https://github.com/alibaba/transmittable-thread-local

引入:

<dependency>
	<groupid>com.alibaba</groupid>
	<artifactid>transmittable-thread-local</artifactid>
	<version>2.11.5</version>
</dependency>

需求场景:

  • 1.分布式跟踪系统 或 全链路压测(即链路打标)
  • 2.日志收集记录系统上下文
  • 3.session级cache
  • 4.应用容器或上层框架跨应用代码给下层sdk传递信息

测试代码:

1)父子线程信息传递

public static transmittablethreadlocal<string> threadlocal = new transmittablethreadlocal<>();
	
	public static void main(string[] args) {
		 //设置线程变量
        threadlocal.set("hello world");
        thread thread = new thread(new runnable() {
            @override
            public void run( ) {
                //子线程输出线程变量的值
                system.out.println("thread:"+threadlocal.get());
            }
        });
        thread.start();
        // 主线程输出线程变量的值
        system.out.println("main:"+threadlocal.get());
	}
}

输出结果:

main:hello world

thread:hello world

2)线程池中传递值,参考github:修饰线程池

总结

以上为个人经验,希望能给大家一个参考,也希望大家多多支持代码网。

(0)

相关文章:

版权声明:本文内容由互联网用户贡献,该文观点仅代表作者本人。本站仅提供信息存储服务,不拥有所有权,不承担相关法律责任。 如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 2386932994@qq.com 举报,一经查实将立刻删除。

发表评论

验证码:
Copyright © 2017-2025  代码网 保留所有权利. 粤ICP备2024248653号
站长QQ:2386932994 | 联系邮箱:2386932994@qq.com