当前位置: 代码网 > it编程>编程语言>Java > JAVA多线程之JDK中的各种锁详解(看这一篇就够了)

JAVA多线程之JDK中的各种锁详解(看这一篇就够了)

2024年07月18日 Java 我要评论
1.概论1.1.实现锁的要素java中的锁都是可重入的锁,因为不可重入的试用的时候很容易造成死锁。这个道理很好想明白:当一个线程已经持有一个锁,并在持有该锁的过程中再次尝试获取同一把锁时,如果没有重入

1.概论

1.1.实现锁的要素

java中的锁都是可重入的锁,因为不可重入的试用的时候很容易造成死锁。这个道理很好想明白:

当一个线程已经持有一个锁,并在持有该锁的过程中再次尝试获取同一把锁时,如果没有重入机制,第二次请求会被阻塞,因为锁已经被自己持有。这会导致线程自我死锁,因为它在等待自己释放的锁。

可重入是指获取锁的线程可以继续重复的获得此锁。其实我们想都能想到要实现一把锁需要些什么,首先肯定是:

  • 标志位,也叫信号量,标记锁的状态和重入次数,这样才能完成持有锁和释放锁。

接下来要考虑的是拒接策略,当前锁被持有期间,后续的请求线程该怎么处理,当然可以直接拒绝,java的选择委婉点,选择了允许这些线程躺在锁上阻塞等待锁被释放。要实现让线程躺在锁上等待,我们想想无非要:

  • 需要支持对一个线程的阻塞、唤醒

  • 需要记录当前哪个线程持有锁

  • 需要一个队列维护所有阻塞在当前锁上的线程

ok,以上四点就是java锁的核心,总结起来就是信号量+队列,分别用来记录持有者和等待者。

1.2.阻塞、唤醒操作

首先我们来看看阻塞和唤醒的操作,在jdk中提供了一个unsafe类,该类中提供了阻塞或唤醒线程的一对操作 原语——park/unpark:

public native void unpark(object var1);
public native void park(boolean var1, long var2);

这对原语最终会调用操作系统的程序接口执行线程操作。

1.2.阻塞队列

拿来维护所有阻塞在当前锁上的线程的队列能是个普通队列吗?很显然不是,它的操作必须是线程安全的是吧,所以这个队列用阻塞队列实现才合适。什么是阻塞队列:

阻塞队列提供了线程安全的元素插入和移除操作,并且在特定条件下会阻塞线程,直到满足操作条件。

说到jdk中的阻塞队列,其核心就是abstractqueuedsynchronizer,简称aqs,由双向链表实现的一个元素操作绝对安全的队列,用来在锁的实现中维护阻塞在锁上的线程上的队列的这个角色。

来看看aqs的源码:

它有指向前后节点的指针、有一个标志位state、还有一个提供线程操作原原语(阻塞、唤醒)的unsafe类。

所以其实aqs就长这样:

点进源码可以看到其随便一个方法都是线程安全的:

由于本文不是专门聊aqs这里就不扩展了,反正知道aqs是一个线程安全的阻塞队列就对了。

1.3.lock接口和sync类

java中所有锁的顶级父接口,用来规范定义一把锁应该有那些行为职责:

public interface lock {
    void lock();
    void lockinterruptibly() throws interruptedexception;
    boolean trylock(long time, timeunit unit) throws interruptedexception;
    void unlock();
    condition newcondition();
}

java中所有锁的实现都是依托aqs去作为阻塞队列,每个锁内部都会实现一个sync内部类,在自身sync内部以不同的策略去操作aqs实现不同种类的锁。

abstract static class sync extends abstractqueuedsynchronizer {......}

2.各种锁

2.1.互斥锁

2.1.1.概论

reentrantlock,互斥锁,reentrantlock本身没有任何代码逻辑,依靠内部类sync干活儿:

public class reentrantlock implements lock, serializable {
    private final reentrantlock.sync sync;
    public void lock() {
        this.sync.lock();
    }
    public void unlock() {
        this.sync.release(1);
    }
    ......
}

reentrantlock的sync继承了aqs

abstract static class sync extends abstractqueuedsynchronizer {......}

sync是抽象类,有两个实现:

  • nonfairsync,公平锁

  • fairsync,非公平锁

实例化reentrantlock的实例时,根据传入的标志位可以创建公平和公平的实现

public class reentrantlock implements lock, java.io.serializable{
public reentrantlock() {
        sync = new nonfairsync();
    }
​
    public reentrantlock(boolean fair) {
        sync = fair ? new fairsync() : new nonfairsync();
    }
    ......
}
}

2.1.2.源码

1.lock()

公平锁的lock():

static final class fairsync extends sync {
        final void lock() {
            acquire(1);//进来直接排队
        }

非公平锁的lock():

static final class nonfairsync extends sync {
        final void lock() {
            if (compareandsetstate(0, 1))//进来直接抢锁
                setexclusiveownerthread(thread.currentthread());//将锁的持有者设置为当前线程
            else
                acquire(1);//没抢过再去排队
        }
    }

acquire()是aqs的模板方法:

tryacquire,尝试再去获取一次锁,公平锁依然是排队抢,去看看阻塞队列是否为空;非公平锁依然是直接抢。

acquirequeued,将线程放入阻塞队列。

public final void acquire(int arg) {
        if (!tryacquire(arg) &&
            acquirequeued(addwaiter(node.exclusive), arg))
            selfinterrupt();
    }

acquirequeued(..)是lock()最关键的一部分,addwaiter(..)把thread对象加入阻塞队列,acquirequeued(..)完成对线程的阻塞。

final boolean acquirequeued(final node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final node p = node.predecessor();
                if (p == head && tryacquire(arg)) {//如果发现自己在队头就去拿锁
                    sethead(node);
                    p.next = null; // help gc
                    failed = false;
                    return interrupted;
                }
                if (shouldparkafterfailedacquire(p, node) &&
                    parkandcheckinterrupt())//调用原语,阻塞自己
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelacquire(node);
        }
    }

acquirequeued(..)函数有一个返回值,表示什么意思 呢?虽然该函数不会中断响应,但它会记录被阻塞期间有没有其他线 程向它发送过中断信号。如果有,则该函数会返回true;否则,返回false。所以才有了以下逻辑:

public final void acquire(int arg) {
        if (!tryacquire(arg) &&
            acquirequeued(addwaiter(node.exclusive), arg))
            selfinterrupt();
    }public final void acquire(int arg) {
        if (!tryacquire(arg) &&
            acquirequeued(addwaiter(node.exclusive), arg))
            selfinterrupt();
    }

当 acquirequeued(..) 返回 true 时,会调用 selfinterrupt (),自己给自己发送中断信号,也就是自己把自己的中断标志位设 为true。之所以要这么做,是因为自己在阻塞期间,收到其他线程中 断信号没有及时响应,现在要进行补偿。这样一来,如果该线程在loc k代码块内部有调用sleep()之类的阻塞方法,就可以抛出异常,响 应该中断信号。

2.unlock()

unlock的逻辑很简单,每次unlock,state-1,直到state=0时,将锁的拥有者置null,释放锁。由于只有锁的持有线程才能操作lock,所以unlock()不需要用cas,操作时直接判断一下是不是锁的持有线程在操作即可。

public void unlock() {
        sync.release(1);
    }
public final boolean release(int arg) {
        if (tryrelease(arg)) {//释放锁
            node h = head;
            if (h != null && h.waitstatus != 0)
                unparksuccessor(h);//唤醒阻塞队列中的后继者
            return true;
        }
        return false;
    }

释放锁:

protected final boolean tryrelease(int releases) {
            int c = getstate() - releases;//每次unlock,state减1
            if (thread.currentthread() != getexclusiveownerthread())//判断是不是锁的持有线程
                throw new illegalmonitorstateexception();
            boolean free = false;
            if (c == 0) {//state为0表示该锁没有被持有
                free = true;
                setexclusiveownerthread(null);//将锁的持有者置null
            }
            setstate(c);
            return free;
        }

唤醒后继者:

private void unparksuccessor(node node) {
        int ws = node.waitstatus;
        if (ws < 0)
            compareandsetwaitstatus(node, ws, 0);
        node s = node.next;
        if (s == null || s.waitstatus > 0) {
            s = null;
            for (node t = tail; t != null && t != node; t = t.prev)
                if (t.waitstatus <= 0)
                    s = t;
        }
        if (s != null)
            locksupport.unpark(s.thread);
    }

2.2.读写锁

读写锁是一个实现读写互斥的锁,读写锁包含一个读锁、一个写锁:

public interface readwritelock{
    lock readlock();
    lock writelock();
}

读写锁的使用就是直接调用对应锁进行锁定和解锁:

readwritelock rwlock=new reetrantreadwritelock();
lock rlock=rwlock.readlock();
rlock.lock();
rlock.unlock();
lock wlock=rwlock.writelock();
wlock.lock();
wlock.unlock();

读写锁的sync内部类对读锁和写锁采用同一个int型的信号量的高16位和低16位分别表示读写锁的状态和重入次数,这样一次cas就能统一处理进行读写互斥操作:

abstract static class sync extends abstractqueuedsynchronizer {
        static final int shared_shift   = 16;
        static final int shared_unit    = (1 << shared_shift);
        static final int max_count      = (1 << shared_shift) - 1;
        static final int exclusive_mask = (1 << shared_shift) - 1;
        static int sharedcount(int c)    { return c >>> shared_shift; }
        static int exclusivecount(int c) { return c & exclusive_mask; }
}

2.3.condition

2.3.1.概论

condition用于更加细粒度的控制锁上面的线程阻塞、唤醒。

以下以一个经典的生产、消费者问题为例:

队列空的时候进来的消费者线程阻塞,有数据放进来后唤醒阻塞的消费者线程。

队列满的时候进来的生产者线程阻塞,有空位后唤醒阻塞的生产者线程。

锁粒度的实现:

public void enqueue(){
    synchronized(queue){
        while(queue.full()){
            queue.wait();
        }
        //入队列
        ......
        //通知消费者,队列中有数据了
        queue.notify();
    }
}
​
public void dequeue(){
    synchronized(queue){
        while(queue.empty()){
            queue.wait();
        }
        //出队列
        ......
        //通知生产者,队列中有空位了,可以继续放数据
        queue.notify();
    }
}

可以发现,唤醒的时候把阻塞的生产消费线程一起唤醒了。

条件粒度的实现:

private final lock lock = new reentrantlock();
private final condition notfull  = lock.newcondition(); // 用于等待队列不满
private final condition notempty = lock.newcondition(); // 用于等待队列非空

public void enqueue(object item) {
    try {
        while (queue.isfull()) {
            notfull.await(); // 等待队列不满
        }
        // 入队列操作
        // ...
        
        // 入队后,通知等待的消费者
        notempty.signal();
    } catch (interruptedexception e) {
        thread.currentthread().interrupt(); // 保持中断状态
        // 处理中断逻辑
    } finally {
        queue.unlock();
    }
}

public void dequeue() {
    try {
        while (queue.isempty()) {
            notempty.await(); // 等待队列非空
        }
        // 出队列操作
        // ...
        
        // 出队后,通知等待的生产者
        notfull.signal();
    } catch (interruptedexception e) {
        thread.currentthread().interrupt(); // 保持中断状态
        // 处理中断逻辑
    } finally {
        queue.unlock();
    }
}

2.3.2.底层实现

condition由lock产生,因此lock中持有condition:

public interface lock {
    ......
    condition newcondition();
}

承担功能的其实就是syn中的conditionobject,也就是aqs中的conditionobject:

final conditionobject newcondition() {
            return new conditionobject(this);
        }

一个condition上面阻塞着多个线程,所以每个condition内部都有一个队列,用来记录阻塞在这个condition上面的线程,这个队列其实也是aqs实现的,aqs中除了实现一个以node为节点的队列,还实现了一个以conditionobject为节点的队列:

public abstract class abstractqueuedsynchronizer
    extends abstractownablesynchronizer
    implements java.io.serializable {
        public class conditionobject implements condition, java.io.serializable {
        private static final long serialversionuid = 1173984872572414699l;
        private transient node firstwaiter;
        private transient node lastwaiter;
        ......
        }
    }

condition是个接口,定义了一系列条件操作:

public interface condition {
    void await() throws interruptedexception;
    void awaituninterruptibly();
    long awaitnanos(long var1) throws interruptedexception;
    boolean await(long var1, timeunit var3) throws interruptedexception;
    boolean awaituntil(date var1) throws interruptedexception;
    void signal();
    void signalall();
}

总结 

到此这篇关于java多线程之jdk中各种锁的文章就介绍到这了,更多相关java多线程jdk各种锁内容请搜索3w代码以前的文章或继续浏览下面的相关文章希望大家以后多多支持3w代码!

(0)

相关文章:

版权声明:本文内容由互联网用户贡献,该文观点仅代表作者本人。本站仅提供信息存储服务,不拥有所有权,不承担相关法律责任。 如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 2386932994@qq.com 举报,一经查实将立刻删除。

发表评论

验证码:
Copyright © 2017-2025  代码网 保留所有权利. 粤ICP备2024248653号
站长QQ:2386932994 | 联系邮箱:2386932994@qq.com