1. cyclicbarrier简介
现实生活中我们经常会遇到这样的情景,在进行某个活动前需要等待人全部都齐了才开始。例如吃饭时要等全家人都上座了才动筷子,旅游时要等全部人都到齐了才出发,比赛时要等运动员都上场后才开始。
在juc包中为我们提供了一个同步工具类能够很好的模拟这类场景,它就是cyclicbarrier类。利用cyclicbarrier类可以实现一组线程相互等待,当所有线程都到达某个屏障点(栅栏)后再进行后续的操作。下图演示了这一过程:
cyclicbarrier可以使一定数量的线程反复地在栅栏(不同轮次或不同代)位置处汇集。
- cyclicbarrier字面意思是“可重复使用的栅栏”,cyclicbarrier 和 countdownlatch 很像,只是 cyclicbarrier 可以有不止一个栅栏,因为它的栅栏(barrier)可以重复使用(cyclic)。
- 当线程到达栅栏位置时将调用await()方法,这个方法将阻塞(当前线程)直到所有线程都到达栅栏位置。如果所有线程都到达栅栏位置,那么栅栏将打开,此时所有的线程都将被释放,而栅栏将被重置以便下次使用。
2.cyclicbarrier的使用
2.1 常用方法
//参数parties:表示要到达屏障 (栅栏)的线程数量 //参数runnable: 最后一个线程到达屏障之后要做的任务 //构造方法1 public cyclicbarrier(int parties) //构造方法2 public cyclicbarrier(int parties, runnable barrieraction) //线程调用await()方法表示当前线程已经到达栅栏,然后会被阻塞 public int await() throws interruptedexception, brokenbarrierexception { try { return dowait(false, 0l); } catch (timeoutexception toe) { throw new error(toe); // cannot happen } } //带时限的阻塞等待 public int await(long timeout, timeunit unit) throws interruptedexception,brokenbarrierexception,timeoutexception { return dowait(true, unit.tonanos(timeout)); }
2.2 使用举例
适用场景:可用于需要多个线程均到达某一步之后才能继续往下执行的场景
//循环栅栏-可多次循环使用 cyclicbarrier cyclicbarrier = new cyclicbarrier(5,()->{ system.out.println(thread.currentthread().getname()+" 完成最后任务!"); }); intstream.range(1,6).foreach(i->{ new thread(()->{ try { thread.sleep(new double(math.random()*3000).longvalue()); system.out.println(thread.currentthread().getname()+" 到达栅栏a"); cyclicbarrier.await();//屏障点a,当前线程会阻塞至此,等待计数器=0 system.out.println(thread.currentthread().getname()+" 冲破栅栏a"); }catch (exception e){ e.printstacktrace(); } }).start(); });
3.cyclicbarrier原理
cyclicbarrier是一道屏障,调用await()方法后,当前线程进入阻塞,当parties数量的线程调用await方法后,所有的await方法会返回并继续往下执行。
3.1 成员变量
/** cyclicbarrier使用的排他锁*/ private final reentrantlock lock = new reentrantlock(); /** barrier被冲破前,线程等待的condition*/ private final condition trip = lock.newcondition(); /** barrier被冲破时,需要满足的参与线程数。*/ private final int parties; /* barrier被冲破后执行的方法。*/ private final runnable barriercommand; /** 当其轮次 */ private generation generation = new generation(); /** *目前等待剩余的参与者数量。从 parties倒数到0。每个轮次该值会被重置回parties */ private int count;
(1)cyclicbarrier内部是通过条件队列trip来对线程进行阻塞的,并且其内部维护了两个int型的变量parties和count。
- parties表示每次拦截的线程数,该值在构造时进行赋值。
- count是内部计数器,它的初始值和parties相同,以后随着每次await方法的调用而减1,直到减为0就将所有线程唤醒。
(2)cyclicbarrier有一个静态内部类generation,该类的对象代表栅栏的当前代,就像玩游戏时代表的本局游戏,利用它可以实现循环等待
(3)barriercommand表示换代前执行的任务,当count减为0时表示本局游戏结束,需要转到下一局。在转到下一局游戏之前会将所有阻塞的线程唤醒,在唤醒所有线程之前你可以通过指定barriercommand来执行自己的任务。
3.2 构造器
//构造器1:指定本局要拦截的线程数parties 及 本局结束时要执行的任务 public cyclicbarrier(int parties, runnable barrieraction) { if (parties <= 0) throw new illegalargumentexception(); this.parties = parties; this.count = parties; this.barriercommand = barrieraction; } //构造器2 public cyclicbarrier(int parties) { this(parties, null); }
3.3 等待的方法
cyclicbarrier类最主要的功能就是使先到达屏障点的线程阻塞并等待后面的线程,其中它提供了两种等待的方法,分别是定时等待和非定时等待。源代码中await()方法最终调用的是dowait()方法:
private int dowait(boolean timed, long nanos) throws interruptedexception, brokenbarrierexception,timeoutexception { // 获取独占锁 final reentrantlock lock = this.lock; lock.lock();//对共享资源count,generation操作前,需先上锁保证线程安全 try { // 当前代--当前轮次对象的引用 final generation g = generation; // 如果这轮次broken了,抛出异常 if (g.broken) throw new brokenbarrierexception(); // 如果线程中断了,抛出异常 if (thread.interrupted()) { breakbarrier();//如果被打断,通过此方法设置当前轮次为broken状态,通知当前轮次所有等待的线程 throw new interruptedexception();//抛出异常 } //自旋前 //1、count值-1 int index = --count; // 2、判断是否到0,若是,则冲破屏障点(说明最后一个线程已经到达) if (index == 0) { boolean ranaction = false; try { final runnable command = barriercommand; // 3、执行栅栏任务(若cyclicbarrier构造时传入了runnable,则调用) if (command != null) command.run(); ranaction = true; // 4、更新一轮次,将count重置,将generation重置,唤醒之前等待的线程 nextgeneration(); return 0; } finally { // 如果执行栅栏任务(command)的时候出现了异常,那么就认为本轮次破环了 if (!ranaction) breakbarrier(); } } //计数器没有到0 =》开始自旋,直到屏障被冲破,或者interrupted或者超时 for (;;) { try { // 开始等待;如果没有时间限制,则直接等待,直到被唤醒(让其他线程进入到lock的代码块执行以上逻辑) if (!timed) trip.await();//阻塞,此时会释放锁,以让其他线程进入await方法中,等待屏障被冲破后,向后执行 // 如果有时间限制,则等待指定时间 else if (nanos > 0l) nanos = trip.awaitnanos(nanos); } catch (interruptedexception ie) { // 如果当前线程阻塞被interrupt了,并且本轮次还没有被break,那么修改本轮次状态为broken if (g == generation && ! g.broken) { // 让栅栏失效 breakbarrier(); throw ie; } else { // 上面条件不满足,说明这个线程不是这轮次的,就不会影响当前这代栅栏的执行,所以,就打个中断标记 thread.currentthread().interrupt(); } } // 当有任何一个线程中断了,就会调用breakbarrier方法,就会唤醒其他的线程,其他线程醒来后,也要抛出异常 if (g.broken) throw new brokenbarrierexception(); // g != generation表示正常换轮次了,返回当前线程所在栅栏的下标 // 如果 g == generation,说明还没有换,那为什么会醒了? // 因为一个线程可以使用多个栅栏,当别的栅栏唤醒了这个线程,就会走到这里,所以需要判断是否是当前代。 // 正是因为这个原因,才需要generation来保证正确。 if (g != generation) return index; // 如果有时间限制,且时间小于等于0,销毁栅栏并抛出异常 if (timed && nanos <= 0l) { breakbarrier(); throw new timeoutexception(); } }//自旋 } finally { // 释放独占锁 lock.unlock(); } }
更新本轮次的方法:nextgeneration()
private void nextgeneration() { trip.signalall();//唤醒本轮次等待的线程 count = parties;//重置count值为初始值,为下一轮次(代)使用 generation = new generation();//更新本轮次对象,这样自旋中的线程才会跳出自旋。 } private static class generation { boolean broken = false; } private void breakbarrier() { generation.broken = true;//设置标识 count = parties;//重置count值为初始值 trip.signalall();//唤醒所有等待线程 }
总结
以上为个人经验,希望能给大家一个参考,也希望大家多多支持代码网。
发表评论