一、簡(jiǎn)介
CyclicBarrier 字面意思回環(huán)柵欄(循環(huán)屏障),它可以實(shí)現(xiàn)讓一組線程等待至某個(gè)狀態(tài)(屏障點(diǎn))之后再全部同時(shí)執(zhí)行。叫做回環(huán)是因?yàn)楫?dāng)所有等待線程都被釋放以后,CyclicBarrier可以被重用。
CyclicBarrier 作用是讓一組線程相互等待,當(dāng)達(dá)到一個(gè)共同點(diǎn)時(shí),所有之前等待的線程再繼續(xù)執(zhí)行,且 CyclicBarrier 功能可重復(fù)使用。
二、CyclicBarrier的使用
構(gòu)造方法:
?// parties表示屏障攔截的線程數(shù)量,每個(gè)線程調(diào)用 await 方法告訴 CyclicBarrier 我已經(jīng)到達(dá)了屏障,然后當(dāng)前線程被阻塞。
public CyclicBarrier(int parties)
// 用于在線程到達(dá)屏障時(shí),優(yōu)先執(zhí)行 barrierAction,方便處理更復(fù)雜的業(yè)務(wù)場(chǎng)景(該線程的執(zhí)行時(shí)機(jī)是在到達(dá)屏障之后再執(zhí)行)
重要方法:
//屏障 指定數(shù)量的線程全部調(diào)用await()方法時(shí),這些線程不再阻塞
// BrokenBarrierException 表示柵欄已經(jīng)被破壞,破壞的原因可能是其中一個(gè)線程 await() 時(shí)被中斷或者超時(shí)
public int await() throws InterruptedException, BrokenBarrierException
public int await(long timeout, TimeUnit unit) throws InterruptedException, BrokenBarrierException, TimeoutException
//循環(huán) 通過reset()方法可以進(jìn)行重置
CyclicBarrier 應(yīng)用場(chǎng)景
- 利用 CyclicBarrier 可以用于多線程計(jì)算數(shù)據(jù),最后合并計(jì)算結(jié)果的場(chǎng)景。
- 利用 CyclicBarrier的計(jì)數(shù)器能夠重置,屏障可以重復(fù)使用的特性,可以支持類似“人滿發(fā)車”的場(chǎng)景
模擬合并計(jì)算場(chǎng)景
利用 CyclicBarrier 可以用于多線程計(jì)算數(shù)據(jù),最后合并計(jì)算結(jié)果的場(chǎng)景。
public class CyclicBarrierTest2 {
//保存每個(gè)學(xué)生的平均成績(jī)
private Conc urrentHashMap<String, Integer> map=new ConcurrentHashMap<String,Integer>();
private ExecutorService threadPool= Executors.newFixedThreadPool(3);
private CyclicBarrier cb=new CyclicBarrier(3,()->{
int result=0;
Set<String> set = map.keySet();
for(String s:set){
result+=map.get(s);
}
System.out.println("三人平均成績(jī)?yōu)?"+(result/3)+"分");
});
public void count(){
for(int i=0;i<3;i++){
threadPool.execute(new Runnable(){
@Override
public void run() {
//獲取學(xué)生平均成績(jī)
int score=(int)(Math.random()*40+60);
map.put(Thread.currentThread().getName(), score);
System.out.println(Thread.currentThread().getName()
+"同學(xué)的平均成績(jī)?yōu)椋?+score);
try {
//執(zhí)行完運(yùn)行await(),等待所有學(xué)生平均成績(jī)都計(jì)算完畢
cb.await();
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
}
});
}
}
public static void main(String[] args) {
CyclicBarrierTest2 cb=new CyclicBarrierTest2();
cb.count();
}
}
模擬“人滿發(fā)車”的場(chǎng)景
利用CyclicBarrier的計(jì)數(shù)器能夠重置,屏障可以重復(fù)使用的特性,可以支持類似“人滿發(fā)車”的場(chǎng)景
public class CyclicBarrierTest3 {
public static void main(String[] args) {
AtomicInteger counter = new AtomicInteger();
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
5, 5, 1000, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(100),
(r) -> new Thread(r, counter.addAndGet(1) + " 號(hào) "),
new ThreadPoolExecutor.AbortPolicy());
CyclicBarrier cyclicBarrier = new CyclicBarrier(5,
() -> System.out.println("裁判:比賽開始~~"));
for (int i = 0; i < 10; i++) {
threadPoolExecutor.submit(new Runner(cyclicBarrier));
}
}
static class Runner extends Thread{
private CyclicBarrier cyclicBarrier;
public Runner (CyclicBarrier cyclicBarrier) {
this.cyclicBarrier = cyclicBarrier;
}
@Override
public void run() {
try {
int sleepMills = ThreadLocalRandom.current().nextInt(1000);
Thread.sleep(sleepMills);
System.out.println(Thread.currentThread().getName() + " 選手已就位, 準(zhǔn)備共用時(shí): " + sleepMills + "ms" + cyclicBarrier.getNumberWaiting());
cyclicBarrier.await();
} catch (InterruptedException e) {
e.printStackTrace();
}catch(BrokenBarrierException e){
e.printStackTrace();
}
}
}
}
輸出結(jié)果:
3 號(hào) 選手已就位, 準(zhǔn)備共用時(shí): 78ms0
1 號(hào) 選手已就位, 準(zhǔn)備共用時(shí): 395ms1
5 號(hào) 選手已就位, 準(zhǔn)備共用時(shí): 733ms2
2 號(hào) 選手已就位, 準(zhǔn)備共用時(shí): 776ms3
4 號(hào) 選手已就位, 準(zhǔn)備共用時(shí): 807ms4
裁判:比賽開始~~
4 號(hào) 選手已就位, 準(zhǔn)備共用時(shí): 131ms0
3 號(hào) 選手已就位, 準(zhǔn)備共用時(shí): 256ms1
2 號(hào) 選手已就位, 準(zhǔn)備共用時(shí): 291ms2
1 號(hào) 選手已就位, 準(zhǔn)備共用時(shí): 588ms3
5 號(hào) 選手已就位, 準(zhǔn)備共用時(shí): 763ms4
裁判:比賽開始~~
三、CyclicBarrier 源碼分析
CyclicBarrier 流程
主要是的流程:
- 獲取鎖 如果 count != 0 就進(jìn)入阻塞;
- 進(jìn)入阻塞之前,首先需要進(jìn)入條件隊(duì)列,然后釋放鎖,最后阻塞;
- 如果 count != 0 會(huì)進(jìn)行一個(gè)喚醒,將所有的條件隊(duì)列中的節(jié)點(diǎn)轉(zhuǎn)換為阻塞隊(duì)列;
- 被喚醒過后會(huì)進(jìn)行鎖的獲取,如果鎖獲取失敗,會(huì)進(jìn)入 lock 的阻塞隊(duì)列;
- 如果鎖獲取成功,進(jìn)行鎖的釋放,以及喚醒,同步隊(duì)列中的線程。
下面是一個(gè)簡(jiǎn)單的流程圖:
下面是具體的一些代碼調(diào)用的流程:
幾個(gè)常見的問題?
- 1.一組線程在觸發(fā)屏障之前互相等待,最后一個(gè)線程到達(dá)屏障后喚醒邏輯是如何實(shí)現(xiàn)的. 喚醒的過程是通過調(diào)用
java.util.concurrent.locks.Condition#signalAll
喚醒條件隊(duì)列上的所有節(jié)點(diǎn)。 - 2.刪欄循環(huán)使用是如何實(shí)現(xiàn)的? 實(shí)際上一個(gè)互斥鎖 ReentrantLock 的條件隊(duì)列和阻塞隊(duì)列的轉(zhuǎn)換。
- 3.條件隊(duì)列到同步隊(duì)列的轉(zhuǎn)換實(shí)現(xiàn)邏輯 ? 轉(zhuǎn)換過程中,首先會(huì)先將條件隊(duì)列中所有的阻塞線程喚醒,然后會(huì)去獲取 lock 如果獲取失敗,就進(jìn)入同步隊(duì)列。
CyclicBarrier 與 CountDownLatch的區(qū)別
- CountDownLatch的計(jì)數(shù)器只能使用一次,而CyclicBarrier的計(jì)數(shù)器可以使用reset() 方法重置。所以CyclicBarrier能處理更為復(fù)雜的業(yè)務(wù)場(chǎng)景,比如如果計(jì)算發(fā)生錯(cuò)誤,可以重置計(jì)數(shù)器,并讓線程們重新執(zhí)行一次
- CyclicBarrier還提供getNumberWaiting(可以獲得CyclicBarrier阻塞的線程數(shù)量)、isBroken(用來(lái)知道阻塞的線程是否被中斷)等方法。
- CountDownLatch會(huì)阻塞主線程,CyclicBarrier不會(huì)阻塞主線程,只會(huì)阻塞子線程。
- CountDownLatch和CyclicBarrier都能夠?qū)崿F(xiàn)線程之間的等待,只不過它們側(cè)重點(diǎn)不同。CountDownLatch一般用于一個(gè)或多個(gè)線程,等待其他線程執(zhí)行完任務(wù)后,再執(zhí)行。CyclicBarrier一般用于一組線程互相等待至某個(gè)狀態(tài),然后這一組線程再同時(shí)執(zhí)行。
- CyclicBarrier 還可以提供一個(gè) barrierAction,合并多線程計(jì)算結(jié)果。
- CyclicBarrier是通過ReentrantLock的"獨(dú)占鎖"和Conditon來(lái)實(shí)現(xiàn)一組線程的阻塞喚醒的,而CountDownLatch則是通過AQS的“共享鎖”實(shí)現(xiàn)
到此這篇關(guān)于Java中CyclicBarrier 循環(huán)屏障的文章就介紹到這了,更多相關(guān)Java CyclicBarrier 內(nèi)容請(qǐng)搜索html5模板網(wǎng)以前的文章希望大家以后多多支持html5模板網(wǎng)!