当前位置: 代码网 > it编程>编程语言>Java > AQS实现的ReentrantLock全过程

AQS实现的ReentrantLock全过程

2026年03月24日 Java 我要评论
这里的源码用的java8版本lock方法#当reentrantlock类的实例对象尝试获取锁的时候,调用lock方法会进入sync的lock方法,其中sync是reentrantlock的一个内部类,

这里的源码用的java8版本

lock方法#

当reentrantlock类的实例对象尝试获取锁的时候,调用lock方法

会进入sync的lock方法,其中sync是reentrantlock的一个内部类,reentrantlock构造方法会默认使用非公平锁nonfairsync,这个类是继承于sync的

        final void lock() {
            if (!initialtrylock())
                acquire(1);
        }
// 其中sync的initialtrylock是抽象方法,需要看非公平锁实现方法

[!tip]
在这里是第一次尝试获取锁

由于reentrantlock是个可重入锁,判断里有重入的判断

final boolean initialtrylock() {
            thread current = thread.currentthread();
			// 获取当前线程的对象
            if (compareandsetstate(0, 1)) { // first attempt is unguarded
			// 用cas比较state状态是否为0(无人持有锁),如果是,就转为1(获取到锁)
                setexclusiveownerthread(current);
			// 将当前进程设置为拥有锁的线程
                return true;
            } else if (getexclusiveownerthread() == current) {
			// 当前线程为拥有锁的线程(重复获取),重入
                int c = getstate() + 1;
                if (c < 0) // overflow
			// 负数,state是个int类型数据,超出可能导致溢出变为负数
                    throw new error("maximum lock count exceeded");
                setstate(c);
			// 设置新的state
                return true;
            } else
			// 已有线程占锁,返回为false
                return false;
        }

然后开始调用acquire方法,传入1

    public final void acquire(int arg) {
        if (!tryacquire(arg) &&
            acquirequeued(addwaiter(node.exclusive), arg))
            selfinterrupt();
    }

调用tryacquire()方法,其中tryacquire()方法是一个只有抛出异常的方法,需要重写,我们看非公平锁的写法

[!tip]
这是第二次获取锁

        protected final boolean tryacquire(int acquires) {
            if (getstate() == 0 && !hasqueuedpredecessors() &&
                compareandsetstate(0, acquires)) {
                setexclusiveownerthread(thread.currentthread());
                return true;
            }
            return false;
        }

这里,如果state是0,即没有线程占用锁的情况下getstate() == 0​这个为真!hasqueuedpredecessors()执行这个方法,这个方法会检查是否已经出现了等待队列

    public final boolean hasqueuedpredecessors() {
        thread first = null; node h, s;
        if ((h = head) != null && ((s = h.next) == null ||
                                   (first = s.waiter) == null ||
                                   s.prev == null))
            first = getfirstqueuedthread(); // retry via getfirstqueuedthread
        return first != null && first != thread.currentthread();
    }

当未出现 同步队列/阻塞队列 ,或者当前线程是队列的第一个时,执行compareandsetstate(0, acquires),第二次尝试获取锁,如果成功,返回真

否则返回假,执行acquirequeued(addwaiter(node.exclusive), arg))

    private node addwaiter(node mode) {
        node node = new node(thread.currentthread(), mode);
        // try the fast path of enq; backup to full enq on failure
        node pred = tail;
        if (pred != null) {
            node.prev = pred;
            if (compareandsettail(pred, node)) {
			// 尝试加入队尾
                pred.next = node;
                return node;
            }
        }
        enq(node);
        return node;
    }

node是双向队列:阻塞队列一个节点,是为了保证原子化所以包装起来的

如果tail尾指针指向的节点不为空,则设置新生成的为尾指针指向的

否则(阻塞队列为空),调用enq函数

    private node enq(final node node) {
        for (;;) {
            node t = tail;
            if (t == null) { // must initialize
                if (compareandsethead(new node()))
			// 使用cas,防止多线程同时创建头节点,所以本质上还是需要抢入队顺序
                    tail = head;
			// 初始化头节点,并将尾指针指向头节点
            } else {
                node.prev = t;
                if (compareandsettail(t, node)) {
			// 判断t是否为尾节点,如果有线程更快的改掉尾节点,那么修改失败,
			// 重新进入for循环
                    t.next = node;
                    return t;
			// 修改成功
                }
            }
        }
    }

[!tip]
这是第三次尝试获取锁

    final boolean acquirequeued(final node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final node p = node.predecessor();
			// 获取node的前一个节点,如果前一个节点是头节点(当前节点是第一个)
			// 执行tryacquire(arg),执行第三次尝试获取锁
                if (p == head && tryacquire(arg)) {
			// 获取锁成功,出队
                    sethead(node);// 将node设为头节点
                    p.next = null; // help gc
                    failed = false;
                    return interrupted;
                }
                if (shouldparkafterfailedacquire(p, node) &&
                    parkandcheckinterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelacquire(node);
        }
    }

如果第三次尝试获取锁失败了,会调用shouldparkafterfailedacquire()方法,将node的前一个节点传入(node一直都是加入的节点)

    private static boolean shouldparkafterfailedacquire(node pred, node node) {
        int ws = pred.waitstatus;
        if (ws == node.signal)
		// 确认前面的节点处于signal状态,即确认前面的节点会叫醒自己
            /*
             * this node has already set status asking a release
             * to signal it, so it can safely park.
             */
            return true;
        if (ws > 0) {
            /*
             * predecessor was cancelled. skip over predecessors and
             * indicate retry.
             */
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitstatus > 0);
			// node里面仅有一个大于零的状态,即1取消状态,也就是说当前任务被取消了
			// 持续循环值找到不再取消的节点
            pred.next = node;
        } else {
		// 将前一个节点用cas转为node.signal状态-1,返回为false
            /*
             * waitstatus must be 0 or propagate.  indicate that we
             * need a signal, but don't park yet.  caller will need to
             * retry to make sure it cannot acquire before parking.
             */
            compareandsetwaitstatus(pred, ws, node.signal);
        }
        return false;
    }

这里插一嘴,node节点有一些状态,来体现其的任务状态,如前面传入的就是独占队列,addwaiter(node.exclusive)

    static final class node {
        /** marker to indicate a node is waiting in shared mode */
        static final node shared = new node();
		// 共享队列
        /** marker to indicate a node is waiting in exclusive mode */
        static final node exclusive = null;
		// 独占队列
        /** waitstatus value to indicate thread has cancelled */// 取消
        static final int cancelled =  1;
		// 已被取消
        /** waitstatus value to indicate successor's thread needs unparking */
        static final int signal    = -1;
		// 表示next节点已经park,需要被唤醒
        /** waitstatus value to indicate thread is waiting on condition */
        static final int condition = -2;
        /**
         * waitstatus value to indicate the next acquireshared should
         * unconditionally propagate
         */
		// 共享状态
        static final int propagate = -3;
if (shouldparkafterfailedacquire(p, node) &&
                    parkandcheckinterrupt())
                    interrupted = true;

如果前一个节点的waitstate是0,会被cas转为-1,然后返回false,进而不会执行parkandcheckinterrupt(),继续for的无限循环,这里有可能出现第四次尝试

如果前一个节点的waitstate是-1,该函数返回一个true,也就可以继续执行parkandcheckinterrupt()

    private final boolean parkandcheckinterrupt() {
        locksupport.park(this);
        return thread.interrupted();
    }

当前线程进入park状态

至此我们完成了这个的lock过程

unlock方法#

unlock()也是公平锁以及非公平锁都有的方法,同样继承了sync

    public void unlock() {
        sync.release(1);
    }

sync的release方法

    public final boolean release(int arg) {
        if (tryrelease(arg)) {
            node h = head;
            if (h != null && h.waitstatus != 0)
                unparksuccessor(h);
            return true;
        }
        return false;
    }

首先尝试tryrelease方法

        protected final boolean tryrelease(int releases) {
            int c = getstate() - releases;
            if (thread.currentthread() != getexclusiveownerthread())
                throw new illegalmonitorstateexception();
            boolean free = false;
            if (c == 0) {
                free = true;
                setexclusiveownerthread(null);
            }
            setstate(c);
            return free;
        }

如果成功醒过来,该线程依然处于一种park的位置上,即parkandcheckinterrupt这个方法上,这个方法返回是否被中断reentrantlock这个锁仅获取中断信息,而不会做出任何操作

final boolean acquirequeued(final node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final node p = node.predecessor();
                if (p == head && tryacquire(arg)) {
                    sethead(node);
                    p.next = null; // help gc
                    failed = false;
                    return interrupted;
                }
                if (shouldparkafterfailedacquire(p, node) &&
                    parkandcheckinterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelacquire(node);
        }
    }

苏醒过来之后,继续for循环,尝试获取锁,失败之后会接着park,成功就会获取锁,并返回中断状态,在acquire中决定自我中断

        final boolean nonfairtryacquire(int acquires) {
            final thread current = thread.currentthread();
            int c = getstate();
            if (c == 0) {
                if (compareandsetstate(0, acquires)) {
                    setexclusiveownerthread(current);
                    return true;
                }
            }
            else if (current == getexclusiveownerthread()) {
                int nextc = c + acquires;
                if (nextc < 0) // overflow
                    throw new error("maximum lock count exceeded");
                setstate(nextc);
                return true;
            }
            return false;
        }

并将setexclusiveownerthread传入当前线程,返回为真,因此在tryrelease方法里的thread.currentthread() != getexclusiveownerthread()一定为假,不会抛出异常,并设置free为false,当c也就是资源的state如果是0

			if (c == 0) {
                free = true;
                setexclusiveownerthread(null);
            }
            setstate(c);
            return free;

c如果是0,即没有线程占用资源,setexclusiveownerthread将锁的线程设置为空,如果不为0,也就是重入锁仅仅解锁一次,c依然存在多个,设置c为新的state值,然会free值(资源锁的使用情况)

    public final boolean release(int arg) {
        if (tryrelease(arg)) {
            node h = head;
            if (h != null && h.waitstatus != 0)
                unparksuccessor(h);
            return true;
        }
        return false;
    }
    private void unparksuccessor(node node) {
        /*
         * if status is negative (i.e., possibly needing signal) try
         * to clear in anticipation of signalling.  it is ok if this
         * fails or if status is changed by waiting thread.
         */
        int ws = node.waitstatus;
        if (ws < 0)
            compareandsetwaitstatus(node, ws, 0);

        /*
         * thread to unpark is held in successor, which is normally
         * just the next node.  but if cancelled or apparently null,
         * traverse backwards from tail to find the actual
         * non-cancelled successor.
         */
        node s = node.next;、
		// 如果下一个节点的状态为取消或者为空,从后向前找最后一个满足条件的,赋值为s
        if (s == null || s.waitstatus > 0) {
            s = null;
            for (node t = tail; t != null && t != node; t = t.prev)
                if (t.waitstatus <= 0)
                    s = t;
        }
		// s不为空的话作为下一个被唤醒的节点,尝试唤醒
        if (s != null)
            locksupport.unpark(s.thread);
    }

此时,当前节点为头节点,调用unparksuccessor()方法,获取头节点的下一个节点

总结

以上为个人经验,希望能给大家一个参考,也希望大家多多支持代码网。

(0)

相关文章:

版权声明:本文内容由互联网用户贡献,该文观点仅代表作者本人。本站仅提供信息存储服务,不拥有所有权,不承担相关法律责任。 如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 2386932994@qq.com 举报,一经查实将立刻删除。

发表评论

验证码:
Copyright © 2017-2026  代码网 保留所有权利. 粤ICP备2024248653号
站长QQ:2386932994 | 联系邮箱:2386932994@qq.com