当前位置: 代码网 > it编程>游戏开发>ar > CyclicBarrier之多线程中的循环栅栏详解

CyclicBarrier之多线程中的循环栅栏详解

2025年05月16日 ar 我要评论
1. cyclicbarrier简介现实生活中我们经常会遇到这样的情景,在进行某个活动前需要等待人全部都齐了才开始。例如吃饭时要等全家人都上座了才动筷子,旅游时要等全部人都到齐了才出发,比赛时要等运动

1. cyclicbarrier简介

现实生活中我们经常会遇到这样的情景,在进行某个活动前需要等待人全部都齐了才开始。例如吃饭时要等全家人都上座了才动筷子,旅游时要等全部人都到齐了才出发,比赛时要等运动员都上场后才开始。

在juc包中为我们提供了一个同步工具类能够很好的模拟这类场景,它就是cyclicbarrier类。利用cyclicbarrier类可以实现一组线程相互等待,当所有线程都到达某个屏障点(栅栏)后再进行后续的操作。下图演示了这一过程:

cyclicbarrier可以使一定数量的线程反复地在栅栏(不同轮次或不同代)位置处汇集。

  • cyclicbarrier字面意思是“可重复使用的栅栏”,cyclicbarrier 和 countdownlatch 很像,只是 cyclicbarrier 可以有不止一个栅栏,因为它的栅栏(barrier)可以重复使用(cyclic)。
  • 当线程到达栅栏位置时将调用await()方法,这个方法将阻塞(当前线程)直到所有线程都到达栅栏位置。如果所有线程都到达栅栏位置,那么栅栏将打开,此时所有的线程都将被释放,而栅栏将被重置以便下次使用。

2.cyclicbarrier的使用

2.1 常用方法

//参数parties:表示要到达屏障 (栅栏)的线程数量
//参数runnable: 最后一个线程到达屏障之后要做的任务
​
//构造方法1
public cyclicbarrier(int parties) 
//构造方法2
public cyclicbarrier(int parties, runnable barrieraction)
​
//线程调用await()方法表示当前线程已经到达栅栏,然后会被阻塞
public int await() throws interruptedexception, brokenbarrierexception {
    try {
        return dowait(false, 0l);
    } catch (timeoutexception toe) {
        throw new error(toe); // cannot happen
    }
}
​
//带时限的阻塞等待
public int await(long timeout, timeunit unit) throws interruptedexception,brokenbarrierexception,timeoutexception {
    return dowait(true, unit.tonanos(timeout));
}

2.2 使用举例

适用场景:可用于需要多个线程均到达某一步之后才能继续往下执行的场景

//循环栅栏-可多次循环使用
   cyclicbarrier cyclicbarrier = new cyclicbarrier(5,()->{
        system.out.println(thread.currentthread().getname()+" 完成最后任务!");
   });
​
   intstream.range(1,6).foreach(i->{
        new thread(()->{
            try {
                thread.sleep(new double(math.random()*3000).longvalue());
                system.out.println(thread.currentthread().getname()+" 到达栅栏a");
                
                cyclicbarrier.await();//屏障点a,当前线程会阻塞至此,等待计数器=0
                
                system.out.println(thread.currentthread().getname()+" 冲破栅栏a");
            }catch (exception e){
                e.printstacktrace();
            }
        }).start();
    });

3.cyclicbarrier原理

cyclicbarrier是一道屏障,调用await()方法后,当前线程进入阻塞,当parties数量的线程调用await方法后,所有的await方法会返回并继续往下执行。

3.1 成员变量

 /** cyclicbarrier使用的排他锁*/
    private final reentrantlock lock = new reentrantlock();
    /** barrier被冲破前,线程等待的condition*/
    private final condition trip = lock.newcondition();
    /** barrier被冲破时,需要满足的参与线程数。*/
    private final int parties;
    /* barrier被冲破后执行的方法。*/
    private final runnable barriercommand;
    /** 当其轮次 */
    private generation generation = new generation();
​
    /**
     *目前等待剩余的参与者数量。从 parties倒数到0。每个轮次该值会被重置回parties
     */
    private int count;

(1)cyclicbarrier内部是通过条件队列trip来对线程进行阻塞的,并且其内部维护了两个int型的变量parties和count。

  • parties表示每次拦截的线程数,该值在构造时进行赋值。
  • count是内部计数器,它的初始值和parties相同,以后随着每次await方法的调用而减1,直到减为0就将所有线程唤醒。

(2)cyclicbarrier有一个静态内部类generation,该类的对象代表栅栏的当前代,就像玩游戏时代表的本局游戏,利用它可以实现循环等待

(3)barriercommand表示换代前执行的任务,当count减为0时表示本局游戏结束,需要转到下一局。在转到下一局游戏之前会将所有阻塞的线程唤醒,在唤醒所有线程之前你可以通过指定barriercommand来执行自己的任务。

3.2 构造器

//构造器1:指定本局要拦截的线程数parties 及 本局结束时要执行的任务
public cyclicbarrier(int parties, runnable barrieraction) {
    if (parties <= 0) throw new illegalargumentexception();
    this.parties = parties;
    this.count = parties;
    this.barriercommand = barrieraction;
}
​
//构造器2
public cyclicbarrier(int parties) {
    this(parties, null);
}

3.3 等待的方法

cyclicbarrier类最主要的功能就是使先到达屏障点的线程阻塞并等待后面的线程,其中它提供了两种等待的方法,分别是定时等待非定时等待。源代码中await()方法最终调用的是dowait()方法:

private int dowait(boolean timed, long nanos) throws interruptedexception, brokenbarrierexception,timeoutexception {
    // 获取独占锁
    final reentrantlock lock = this.lock;
    lock.lock();//对共享资源count,generation操作前,需先上锁保证线程安全
    try {
        // 当前代--当前轮次对象的引用
        final generation g = generation;
        // 如果这轮次broken了,抛出异常
        if (g.broken)
            throw new brokenbarrierexception();
        // 如果线程中断了,抛出异常
        if (thread.interrupted()) {
            breakbarrier();//如果被打断,通过此方法设置当前轮次为broken状态,通知当前轮次所有等待的线程
            throw new interruptedexception();//抛出异常
        }
 
        //自旋前
        
        //1、count值-1
        int index = --count;
        // 2、判断是否到0,若是,则冲破屏障点(说明最后一个线程已经到达)
        if (index == 0) {
            boolean ranaction = false;
            try {
                final runnable command = barriercommand;
                // 3、执行栅栏任务(若cyclicbarrier构造时传入了runnable,则调用)
                if (command != null)
                    command.run();
                ranaction = true;
                // 4、更新一轮次,将count重置,将generation重置,唤醒之前等待的线程
                nextgeneration();
                return 0;
            } finally {
                // 如果执行栅栏任务(command)的时候出现了异常,那么就认为本轮次破环了
                if (!ranaction)
                    breakbarrier();
            }
        }
 
        //计数器没有到0 =》开始自旋,直到屏障被冲破,或者interrupted或者超时
        for (;;) {
            try {
                 // 开始等待;如果没有时间限制,则直接等待,直到被唤醒(让其他线程进入到lock的代码块执行以上逻辑)
                if (!timed)
                    trip.await();//阻塞,此时会释放锁,以让其他线程进入await方法中,等待屏障被冲破后,向后执行
                // 如果有时间限制,则等待指定时间
                else if (nanos > 0l)
                    nanos = trip.awaitnanos(nanos);
            } catch (interruptedexception ie) {
                // 如果当前线程阻塞被interrupt了,并且本轮次还没有被break,那么修改本轮次状态为broken
                if (g == generation && ! g.broken) {
                    // 让栅栏失效
                    breakbarrier();
                    throw ie;
                } else {
                    // 上面条件不满足,说明这个线程不是这轮次的,就不会影响当前这代栅栏的执行,所以,就打个中断标记
                    thread.currentthread().interrupt();
                }
            }
 
            // 当有任何一个线程中断了,就会调用breakbarrier方法,就会唤醒其他的线程,其他线程醒来后,也要抛出异常 
            if (g.broken)
                throw new brokenbarrierexception();
 
            // g != generation表示正常换轮次了,返回当前线程所在栅栏的下标
            // 如果 g == generation,说明还没有换,那为什么会醒了?
            // 因为一个线程可以使用多个栅栏,当别的栅栏唤醒了这个线程,就会走到这里,所以需要判断是否是当前代。
            // 正是因为这个原因,才需要generation来保证正确。
            if (g != generation)
                return index;
            
            // 如果有时间限制,且时间小于等于0,销毁栅栏并抛出异常
            if (timed && nanos <= 0l) {
                breakbarrier();
                throw new timeoutexception();
            }
        }//自旋
        
    } finally {
        // 释放独占锁
        lock.unlock();
    }
}

更新本轮次的方法:nextgeneration()

private void nextgeneration() {
    trip.signalall();//唤醒本轮次等待的线程
    count = parties;//重置count值为初始值,为下一轮次(代)使用
    generation = new generation();//更新本轮次对象,这样自旋中的线程才会跳出自旋。
}
​
private static class generation {
    boolean broken = false;
}
​
private void breakbarrier() {
    generation.broken = true;//设置标识
    count = parties;//重置count值为初始值
    trip.signalall();//唤醒所有等待线程
}

总结

以上为个人经验,希望能给大家一个参考,也希望大家多多支持代码网。

(0)

相关文章:

版权声明:本文内容由互联网用户贡献,该文观点仅代表作者本人。本站仅提供信息存储服务,不拥有所有权,不承担相关法律责任。 如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 2386932994@qq.com 举报,一经查实将立刻删除。

发表评论

验证码:
Copyright © 2017-2025  代码网 保留所有权利. 粤ICP备2024248653号
站长QQ:2386932994 | 联系邮箱:2386932994@qq.com