当前位置: 代码网 > it编程>编程语言>Java > Java多线程Thread及其原理详解

Java多线程Thread及其原理详解

2025年05月06日 Java 我要评论
1. 实现多线程的方式package com.jxz.threads;import lombok.sneakythrows;import lombok.extern.slf4j.slf4j;impor

1. 实现多线程的方式

package com.jxz.threads;

import lombok.sneakythrows;
import lombok.extern.slf4j.slf4j;
import org.junit.test;

import java.util.concurrent.executorservice;
import java.util.concurrent.executors;
import java.util.concurrent.future;
import java.util.concurrent.futuretask;

/**
 * @author jiangxuzhao
 * @description
 * @date 2024/9/10
 */
@slf4j
public class threadcreatetest {
    @test
    @sneakythrows
    public void test1() {
        // 匿名类重写 thread#run
        thread thread1 = new thread() {
            @override
            public void run() {
                log.info("extend thread#run...");
            }
        };
        thread1.start();
        // 避免主线程直接结束
        thread.sleep(1000);
    }

    @test
    @sneakythrows
    public void test2() {
        // lambda 表达式定义实现 runnable target, thread#run 方法最终调用 target#run
        thread thread2 = new thread(() -> {
            log.info("implement 自定义变量 target 的 runnable#run...");
        });
        thread2.start();
        // 避免主线程直接结束
        thread.sleep(1000);
    }

    @test
    @sneakythrows
    public void test3() {
        // lambda 表达式定义实现 callable#call,可以在主线程中通过 future#get 阻塞获取结果 result
        futuretask<string> stringfuturetask = new futuretask<>(() -> {
            log.info("implement callable#call");
            return thread.currentthread().getname();
        });
        thread thread3 = new thread(stringfuturetask);
        thread3.start();
        // 阻塞获取结果,不用担心主线程直接结束
        log.info("thread3 futuretask callable output = {}", stringfuturetask.get());
    }

    @test
    @sneakythrows
    public void test4() {
        // 线程池实现异步多线程
        executorservice executorservice = executors.newfixedthreadpool(1);
        future<string> stringfuture = executorservice.submit(() -> {
            log.info("thread pool submit");
            return thread.currentthread().getname();
        });
        // 阻塞获取结果,不用担心主线程直接结束
        log.info("thead submit output = {}", stringfuture.get());
    }
}

2. thread 部分源码

2.1. native 方法注册

public
class thread implements runnable {
  	// 在 jdk 底层的 thread.c 文件中定义了各种方法
    private static native void registernatives();
  
  	// 确保 registernatives 是 <clinit> 中第一件做的事
    static {
        registernatives();
    }
}

thread#registernatives 作为本地方法,主要作用是注册一些本地方法供 thread 类使用,如 start0(), stop0() 等。

该方法被放在一个本地静态代码块中,并且该代码块被放在类中最靠前的位置,确保当 thread 类被加载到 jvm 中时,调用 第一时间就会注册所有的本地方法。

所有的本地方法都是定义在 jdk 源码的 thread.c 文件中的,它定义了各个操作系统平台都要用到的关于线程的基本操作。

可以专门去下载 openjdk 1.8 的源码一探究竟:

或者直接阅读 openjdk8 在线的源码:

https://hg.openjdk.org/jdk8/jdk8/jdk/file/687fd7c7986d/src/share/native/java/lang/thread.c

2.2. thread 中的成员变量

针对其中常见的几个变量做了中文注释

// 当前线程的名称
private volatile string name;
private int            priority;
private thread         threadq;
private long           eetop;

/* whether or not to single_step this thread. */
private boolean     single_step;

// 当前线程是否在后台运行
/* whether or not the thread is a daemon thread. */
private boolean     daemon = false;

/* jvm state */
private boolean     stillborn = false;

// init 构造方法中传入的执行任务,当其不为空时,会执行此任务
/* what will be run. */
private runnable target;

// 当前线程所在的线程组
/* the group of this thread */
private threadgroup group;

// 当前线程的类加载器
/* the context classloader for this thread */
private classloader contextclassloader;

/* the inherited accesscontrolcontext of this thread */
private accesscontrolcontext inheritedaccesscontrolcontext;

// 被用来定义 "thread-" + nextthreadnum() 的线程名,自增的序号在线程池打印日志中很常见
// 静态变量 threadinitnumber 在 static synchronized 方法中自增,这个方法被调用时在 thread.class 类上加 synchronized 锁,保证单台 jvm 虚拟机上都通过 thread.class 并发创建线程 init 时,线程自增序号的并发安全
/* for autonumbering anonymous threads. */
private static int threadinitnumber;
private static synchronized int nextthreadnum() {
    return threadinitnumber++;
}

// 每个线程都维护一个 threadlocalmap,这个在保障线程安全的 threadlocal 中经常出现
/* threadlocal values pertaining to this thread. this map is maintained
 * by the threadlocal class. */
threadlocal.threadlocalmap threadlocals = null;

/*
 * inheritablethreadlocal values pertaining to this thread. this map is
 * maintained by the inheritablethreadlocal class.
 */
threadlocal.threadlocalmap inheritablethreadlocals = null;

/*
 * the requested stack size for this thread, or 0 if the creator did
 * not specify a stack size.  it is up to the vm to do whatever it
 * likes with this number; some vms will ignore it.
 */
private long stacksize;

/*
 * jvm-private state that persists after native thread termination.
 */
private long nativeparkeventpointer;

/*
 * thread id
 */
private long tid;

/* for generating thread id */
private static long threadseqnumber;

/* java thread status for tools,
 * initialized to indicate thread 'not yet started'
 */

private volatile int threadstatus = 0;


private static synchronized long nextthreadid() {
    return ++threadseqnumber;
}

/**
 * the argument supplied to the current call to
 * java.util.concurrent.locks.locksupport.park.
 * set by (private) java.util.concurrent.locks.locksupport.setblocker
 * accessed using java.util.concurrent.locks.locksupport.getblocker
 */
volatile object parkblocker;

/* the object in which this thread is blocked in an interruptible i/o
 * operation, if any.  the blocker's interrupt method should be invoked
 * after setting this thread's interrupt status.
 */
private volatile interruptible blocker;
private final object blockerlock = new object();

/* set the blocker field; invoked via sun.misc.sharedsecrets from java.nio code
 */
void blockedon(interruptible b) {
    synchronized (blockerlock) {
        blocker = b;
    }
}

/**
 * the minimum priority that a thread can have.
 */
public final static int min_priority = 1;

/**
 * the default priority that is assigned to a thread.
 */
public final static int norm_priority = 5;

/**
 * the maximum priority that a thread can have.
 */
public final static int max_priority = 10;

2.3. thread 构造方法与初始化

构造方法:

thread 具有多个重载的构造函数,内部都是调用 thread#init() 方法初始化,我们常用的就是传入 thread(runnable target) 以及 thread(runnable target, string name)

public thread() {
    init(null, null, "thread-" + nextthreadnum(), 0);
}

public thread(runnable target) {
    init(null, target, "thread-" + nextthreadnum(), 0);
}


thread(runnable target, accesscontrolcontext acc) {
    init(null, target, "thread-" + nextthreadnum(), 0, acc, false);
}

public thread(threadgroup group, runnable target) {
    init(group, target, "thread-" + nextthreadnum(), 0);
}

public thread(string name) {
    init(null, null, name, 0);
}

public thread(threadgroup group, string name) {
    init(group, null, name, 0);
}

public thread(runnable target, string name) {
    init(null, target, name, 0);
}

public thread(threadgroup group, runnable target, string name) {
    init(group, target, name, 0);
}

public thread(threadgroup group, runnable target, string name,
              long stacksize) {
    init(group, target, name, stacksize);
}

init 初始化方法:

主要完成成员变量赋值的操作,包括 runnable target 变量的赋值。后面可以看到,如果在构造器中就传入这个 runnable,thread#run 就会执行这个 runnable.

private void init(threadgroup g, runnable target, string name,
                  long stacksize, accesscontrolcontext acc,
                  boolean inheritthreadlocals) {
    if (name == null) {
        throw new nullpointerexception("name cannot be null");
    }

    this.name = name;

    thread parent = currentthread();
    securitymanager security = system.getsecuritymanager();
    if (g == null) {
        /* determine if it's an applet or not */

        /* if there is a security manager, ask the security manager
           what to do. */
        if (security != null) {
            g = security.getthreadgroup();
        }

        /* if the security doesn't have a strong opinion of the matter
           use the parent thread group. */
        if (g == null) {
            g = parent.getthreadgroup();
        }
    }

    /* checkaccess regardless of whether or not threadgroup is
       explicitly passed in. */
    g.checkaccess();

    /*
     * do we have the required permissions?
     */
    if (security != null) {
        if (isccloverridden(getclass())) {
            security.checkpermission(subclass_implementation_permission);
        }
    }

    g.addunstarted();

    this.group = g;
    this.daemon = parent.isdaemon();
    this.priority = parent.getpriority();
    if (security == null || isccloverridden(parent.getclass()))
        this.contextclassloader = parent.getcontextclassloader();
    else
        this.contextclassloader = parent.contextclassloader;
    this.inheritedaccesscontrolcontext =
            acc != null ? acc : accesscontroller.getcontext();
  
  	// 就是上面成员变量中的 target,在这里赋值
    this.target = target;
    setpriority(priority);
    if (inheritthreadlocals && parent.inheritablethreadlocals != null)
        this.inheritablethreadlocals =
            threadlocal.createinheritedmap(parent.inheritablethreadlocals);
    /* stash the specified stack size in case the vm cares */
    this.stacksize = stacksize;

    /* set thread id */
    tid = nextthreadid();
}

2.4. thread 线程状态与操作系统状态

public enum state {
    /**
     * thread state for a thread which has not yet started.
     */
  	// 初始化状态
    new,

    /**
     * thread state for a runnable thread.  a thread in the runnable
     * state is executing in the java virtual machine but it may
     * be waiting for other resources from the operating system
     * such as processor.
     */
  	// 可运行状态,可运行状态可以包括:运行中状态和就绪状态。
    runnable,

    /**
     * thread state for a thread blocked waiting for a monitor lock.
     * a thread in the blocked state is waiting for a monitor lock
     * to enter a synchronized block/method or
     * reenter a synchronized block/method after calling
     * {@link object#wait() object.wait}.
     */
  	// 线程阻塞状态
    blocked,

    /**
     * thread state for a waiting thread.
     * a thread is in the waiting state due to calling one of the
     * following methods:
     * <ul>
     *   <li>{@link object#wait() object.wait} with no timeout</li>
     *   <li>{@link #join() thread.join} with no timeout</li>
     *   <li>{@link locksupport#park() locksupport.park}</li>
     * </ul>
     *
     * <p>a thread in the waiting state is waiting for another thread to
     * perform a particular action.
     *
     * for example, a thread that has called <tt>object.wait()</tt>
     * on an object is waiting for another thread to call
     * <tt>object.notify()</tt> or <tt>object.notifyall()</tt> on
     * that object. a thread that has called <tt>thread.join()</tt>
     * is waiting for a specified thread to terminate.
     */
  	// 等待状态
    waiting,

    /**
     * thread state for a waiting thread with a specified waiting time.
     * a thread is in the timed waiting state due to calling one of
     * the following methods with a specified positive waiting time:
     * <ul>
     *   <li>{@link #sleep thread.sleep}</li>
     *   <li>{@link object#wait(long) object.wait} with timeout</li>
     *   <li>{@link #join(long) thread.join} with timeout</li>
     *   <li>{@link locksupport#parknanos locksupport.parknanos}</li>
     *   <li>{@link locksupport#parkuntil locksupport.parkuntil}</li>
     * </ul>
     */
  	// 超时等待状态
    timed_waiting,

    /**
     * thread state for a terminated thread.
     * the thread has completed execution.
     */
  	// 线程终止状态
    terminated;
}
  • new: 初始状态,线程被构建,但是还没有调用 thread#start() 方法
  • runnable: 可运行状态,包括运行中和就绪状态。从源码的注释中可以看出来,就绪状态就是线程在 jvm 中有资格运行,但是由于操作系统调度的原因尚未执行,可能线程在等待操作系统释放资源,比方说处理器资源。
  • blocked: 阻塞状态,处于这个状态的线程等待别的线程释放 monitor 锁以进入 synchronized 块;或者调用 object#wait() 方法释放锁进入等待队列后(此时是 waiting 状态),被其他线程 notify() 唤醒时不能立刻从上次 wait 的地方恢复执行,再次进入 synchronized 块还需要和别的线程竞争锁。
  • 总结来说,线程因为获取不到锁而无法进入同步代码块时,处于 blocked 阻塞状态。
  • waiting: 等待状态,处于该状态的线程需要其他线程对其进行通知或者中断等操作,从而进入下一个状态。
  • timed_waiting: 超时等待状态,相比于 waiting 状态持续等待,该状态可以在一定时间后自行返回
  • terminated: 终止状态,当前线程执行完毕

下面就用一张图表示了java线程各种状态的流转,其中夹杂着操作系统线程的状态定义,其中标红的部分表示 java 状态

对比操作系统线程状态,包括 new、terminated、ready、running、waiting,除去初始化 new 和 terminated 终止状态,一个线程运行中的状态只有:

  • ready: 线程已创建,等待系统调度分配 cpu 资源
  • running: 线程获得了 cpu 使用权,正在运算
  • waiting: 线程等待(或者说挂起),让出 cpu 资源给其他线程使用

其对应关系我理解如下:

其中 java 线程状态 runnable 包括操作系统状态的运行 running 和就绪 ready,操作系统的 waiting 包含了 blocked 阻塞挂起状态。

2.4. start() 与 run() 方法

新线程构造之后,只有调用 start() 才能让 jvm 创建线程并进入运行状态,thread#start() 源码如下,主要包含几大步骤:

  • 判断线程状态是否为 new 初始化
  • 加入线程组
  • 调用 native 方法 start0() 通知底层 jvm 启动一个线程,start0() 就是前面 registernatives() 本地方法注册的一个启动方法
  • 如果启动失败,把线程从线程组中删除
public synchronized void start() {
    /**
     * this method is not invoked for the main method thread or "system"
     * group threads created/set up by the vm. any new functionality added
     * to this method in the future may have to also be added to the vm.
     *
     * a zero status value corresponds to state "new".
     */
  	// 1. 判断线程状态是否为 new 初始化,否则直接抛出异常
    if (threadstatus != 0)
        throw new illegalthreadstateexception();

    /* notify the group that this thread is about to be started
     * so that it can be added to the group's list of threads
     * and the group's unstarted count can be decremented. */
  	// 2. 加入线程组
    group.add(this);
	
  	// 线程是否已经启动标志位,启动后设置为 true
    boolean started = false;
    try {
      	// 3. 调用本地方法启动线程
        start0();
      	// 启动后设置标志位为 true
        started = true;
    } finally {
        try {
          	// 4. 如果启动失败,把线程从线程组中移除
            if (!started) {
                group.threadstartfailed(this);
            }
        } catch (throwable ignore) {
            /* do nothing. if start0 threw a throwable then
              it will be passed up the call stack */
        }
    }
}

// jvm 真正启动线程的本地方法
private native void start0();

从 start() 源码中可以看出来以下几点:

  1. start() 加上 synchronized 关键词,单个 thread 实例在这个 jvm 进程中运行是同步的,因此不会出现并发问题。同步检查该线程的状态,如果不是初始化状态则抛出异常。
  2. start() 方法并没有直接调用我们定义的 run() 方法,是因为 thread#start() 底层调用 thread#start0(),start0() 的本地方法逻辑中会调用 run() 方法
  3. 直接调用 thread#run() 方法或者 runnable#run() 方法不会创建新线程执行任务,而是在主线程直接串行执行,如果要创建新线程执行任务,需要调用 thread#start() 方法

调用逻辑图如下:

thread#run() 源码如下:

// 自定义重写 thread#run() 或者传入 runnable,最终都会调用该线程的 run() 方法逻辑
// 如果传入了 runnable 就会走进这个方法运行 target.run(),有点装饰器模式的感觉
@override
public void run() {
    if (target != null) {
        target.run();
    }
}

至于为何最终 start0() 还是调用了 thread#run(),这就需要去看 jdk 源码了,我刚好也硬着头皮去挖了下:

首先看到 thread.c 文件中 registernatives 里面注册的这些本地方法,start0() 会去调用 jvm_startthread

在 jvm.cpp 文件中找出 jvm_startthread 方法,其底层调用 new javathread()方法

最终该方法真的会去 thread.cpp 里调用创建操作系统线程的方法 os::create_thread

new javathread() 方法里面会引用 jvm.cpp 文件中的 thread_entry 方法,这个方法最终就会调用 vmsymbols::run_method_name(),看起来是个虚拟机内注册的方法

全局检索一下,其实就是在 vmsymbols.hpp 头文件中定义的许多通用方法和变量,run 方法刚好是其中定义的一个,也就是 thread#run()。

还可以看到许多其他常见的方法,比方说类的初始化方法,是 jvm 第一次加载 class 文件时调用,包括静态变量初始化语句和静态块执行。参考init和clinit何时调用

2.5. sleep() 方法

thread#sleep() 方法会让当前线程休眠一段时间,单位为毫秒,由于是 static 方法,所以是让直接调用 thread.sleep() 的休眠,这里需要注意的是:

调用 sleep() 方法使线程休眠以后,不会释放自己占有的锁。

// 本地方法,真正让线程休眠的方法
public static native void sleep(long millis) throws interruptedexception;

/**
 * causes the currently executing thread to sleep (temporarily cease
 * execution) for the specified number of milliseconds plus the specified
 * number of nanoseconds, subject to the precision and accuracy of system
 * timers and schedulers. the thread does not lose ownership of any
 * monitors.
 *
 * @param  millis
 *         the length of time to sleep in milliseconds
 *
 * @param  nanos
 *         {@code 0-999999} additional nanoseconds to sleep
 *
 * @throws  illegalargumentexception
 *          if the value of {@code millis} is negative, or the value of
 *          {@code nanos} is not in the range {@code 0-999999}
 *
 * @throws  interruptedexception
 *          if any thread has interrupted the current thread. the
 *          <i>interrupted status</i> of the current thread is
 *          cleared when this exception is thrown.
 */
public static void sleep(long millis, int nanos)
throws interruptedexception {
    if (millis < 0) {
        throw new illegalargumentexception("timeout value is negative");
    }

    if (nanos < 0 || nanos > 999999) {
        throw new illegalargumentexception(
                            "nanosecond timeout value out of range");
    }

    if (nanos >= 500000 || (nanos != 0 && millis == 0)) {
        millis++;
    }
		
  	// 调用本地方法
    sleep(millis);
}

2.6. join() 方法

这个方法是目前我觉得 thread 里面最难理解的方法了,涉及到 synchronized 锁、wait、notify 原理,以及线程调用主体之间的辨析,参考 【java】thread类中的join()方法原理,我的理解如下:

首先看下 thread#join() 方法的源码:

非静态方法,是类中的普通方法,比方说 main 线程调用 threada.join(),就是 main 线程会等待 threada 执行完成

// 调用方法,比方说 main 线程调用 threada.join(),就是 main 线程会等待 threada 执行完成
public final void join() throws interruptedexception {
    join(0);
}

public final synchronized void join(long millis)
throws interruptedexception {
    long base = system.currenttimemillis();
    long now = 0;

    if (millis < 0) {
        throw new illegalargumentexception("timeout value is negative");
    }

    if (millis == 0) {
      	// 该分支是无限期等待 threada 结束,其实内部最后是在 threada 结束时被 notify 
        while (isalive()) {
            wait(0);
        }
    } else {
      	// 该分支时等待有限的时间,如果 threada 在 delay 时间以后还未结束,等待线程也返回了
        while (isalive()) {
            long delay = millis - now;
            if (delay <= 0) {
                break;
            }
            wait(delay);
            now = system.currenttimemillis() - base;
        }
    }
}

重点关注下其中会出现的两个 wait():

首先我们知道,object#wait() 需要放在 synchronized 代码块中执行,即获取到锁以后再释放掉锁。

这个 synchronized 锁就是在 thread#join() 方法上,成员方法上加了 synchronized 说明就是 synchronized(this), 假设 main 线程调用 threada.join(),那么这个 this 就是指调用 threada.join() 的 threada 对象本身,最终效果就是,调用方 main 线程持有了 threada 对象的 monitor 锁,被记录在 threada 对象头上。

有了 object#wait() 就需要有对应的 object#notify() 将其唤醒,这又得看到 jvm 源码里面去了

在 openjdk/hotspot/src/share/vm/runtime/thread.cpp 的 javathread::exit 方法中,这其实是线程退出时会执行的方法,有个 ensure_join() 方法

ensure_join() 方法的源码如下:

上面的 this 就是指 threada,就是下面方法入参中的 thread。可以看出来,当线程 threada 执行完成准备退出时,jvm 会自动唤醒等待在 threada 对象上的线程,在我们的例子中就是主线程。

总结如下:

thread.join() 方法底层原理是 synchronized 方法 + wait/notify。主线程调用 threada.join() 方法,通过 synchronized 关键字获取到 threada 的对象锁,内部再通过 object#wait() 方法等待,这里的执行方和调用方都是主线程,最终当 threada 线程退出的时候,jvm 会自动 notify 唤醒等待在 threada 上的线程,也就是主线程。

2.7. interrupt() 方法

thread#interrupt 是中断被调用线程的方法,它通过设置线程的中断标志位来中断被调用线程,通常调用会抛出 java.lang.interruptedexception 异常。

这种中断线程的方法比较安全,能够使正在执行的任务继续能够执行完,而不像 stop() 方法那样强制关闭。

public void interrupt() {
    if (this != thread.currentthread())
        checkaccess();

    synchronized (blockerlock) {
        interruptible b = blocker;
        if (b != null) {
            interrupt0();           // just to set the interrupt flag
            b.interrupt(this);
            return;
        }
    }
  	// 调用本地方法中断线程
    interrupt0();
}

总结

以上为个人经验,希望能给大家一个参考,也希望大家多多支持代码网。

(0)

相关文章:

版权声明:本文内容由互联网用户贡献,该文观点仅代表作者本人。本站仅提供信息存储服务,不拥有所有权,不承担相关法律责任。 如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 2386932994@qq.com 举报,一经查实将立刻删除。

发表评论

验证码:
Copyright © 2017-2025  代码网 保留所有权利. 粤ICP备2024248653号
站长QQ:2386932994 | 联系邮箱:2386932994@qq.com