1. Semaphore
Semaphore(信號量)是用來控制同時訪問特定資源的線程數量,通過協調各個線程,保證合理的使用公共資源。 Semaphore維護了一個許可集,其實就是一定數量的“許可證”。當有線程想要訪問共享資源時,需要先獲取(acquire)的許可;如果許可不夠了,線程需要一直等待,直到許可可用。當線程使用完共享資源后,可以歸還(release)許可,以供其它需要的線程使用。
和ReentrantLock類似,Semaphore支持公平/非公平策略。
Semaphore的主要方法摘要: void acquire():從此信號量獲取一個許可,在提供一個許可前一直將線程阻塞,否則線程被中斷。 void release():釋放一個許可,將其返回給信號量。 int availablePermits():返回此信號量中當前可用的許可數。 boolean hasQueuedThreads():查詢是否有線程正在等待獲取。
使用
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
//創建一個會實現print queue的類名為 PrintQueue。
class PrintQueue {
// 聲明一個對象為Semaphore,稱它為semaphore。
private final Semaphore semaphore;
// 實現類的構造函數并初始能保護print quere的訪問的semaphore對象的值。
public PrintQueue() {
semaphore = new Semaphore(1);
}
//實現Implement the printJob()方法,此方法可以模擬打印文檔,并接收document對象作為參數。
public void printJob(Object document) {
//在這方法內,首先,你必須調用acquire()方法獲得demaphore。這個方法會拋出 InterruptedException異常,使用必須包含處理這個異常的代碼。
try {
semaphore.acquire();
//然后,實現能隨機等待一段時間的模擬打印文檔的行。
long duration = (long) (Math.random() * 10);
System.out.printf("%s: PrintQueue: Printing a Job during %d seconds\n", Thread.currentThread().getName(), duration);
Thread.sleep(duration);
//最后,釋放semaphore通過調用semaphore的relaser()方法。
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
semaphore.release();
}
}
}
//創建一個名為Job的類并一定實現Runnable 接口。這個類實現把文檔傳送到打印機的任務。
class Job implements Runnable {
//聲明一個對象為PrintQueue,名為printQueue。
private PrintQueue printQueue;
//實現類的構造函數,初始化這個類里的PrintQueue對象。
public Job(PrintQueue printQueue) {
this.printQueue = printQueue;
}
//實現方法run()。
@Override
public void run() {
//首先, 此方法寫信息到操控臺表明任務已經開始執行了。
System.out.printf("%s: Going to print a job\n", Thread.currentThread().getName());
// 然后,調用PrintQueue 對象的printJob()方法。
printQueue.printJob(new Object());
//最后, 此方法寫信息到操控臺表明它已經結束運行了。
System.out.printf("%s: The document has been printed\n", Thread.currentThread().getName());
}
}
public class SemaphoreTest {
public static void main(String args[]) {
// 創建PrintQueue對象名為printQueue。
PrintQueue printQueue = new PrintQueue();
//創建10個threads。每個線程會執行一個發送文檔到print queue的Job對象。
Thread thread[] = new Thread[10];
for (int i = 0; i < 10; i++) {
thread[i] = new Thread(new Job(printQueue), "Thread" + i);
}
for (int i = 0; i < 10; i++) {
thread[i].start();
}
}
}
2. CountDownLatch
在多線程協作完成業務功能時,有時候需要等待其他多個線程完成任務之后,主線程才能繼續往下執行業務功能,在這種的業務場景下,通??梢允褂肨hread類的join方法,讓主線程等待被join的線程執行完之后,主線程才能繼續往下執行。當然,使用線程間消息通信機制也可以完成。其實,java并發工具類中為我們提供了類似“倒計時”這樣的工具類,可以十分方便的完成所說的這種業務場景。
CountDownLatch允許一個或多個線程等待其他線程完成工作。
CountDownLatch相關方法:
- public CountDownLatch(int count) 構造方法會傳入一個整型數N,之后調用CountDownLatch的countDown方法會對N減一,知道N減到0的時候,當前調用await方法的線程繼續執行。
- await() throws InterruptedException:調用該方法的線程等到構造方法傳入的N減到0的時候,才能繼續往下執行;
- await(long timeout, TimeUnit unit):與上面的await方法功能一致,只不過這里有了時間限制,調用該方法的線程等到指定的timeout時間后,不管N是否減至為0,都會繼續往下執行;
- countDown():使CountDownLatch初始值N減1;
- long getCount():獲取當前CountDownLatch維護的值
CountDownLatch的用法 CountDownLatch典型用法:1、某一線程在開始運行前等待n個線程執行完畢。將CountDownLatch的計數器初始化為new CountDownLatch(n),每當一個任務線程執行完畢,就將計數器減1 countdownLatch.countDown(),當計數器的值變為0時,在CountDownLatch上await()的線程就會被喚醒。一個典型應用場景就是啟動一個服務時,主線程需要等待多個組件加載完畢,之后再繼續執行。 CountDownLatch典型用法:2、實現多個線程開始執行任務的最大并行性。注意是并行性,不是并發,強調的是多個線程在某一時刻同時開始執行。類似于賽跑,將多個線程放到起點,等待發令槍響,然后同時開跑。做法是初始化一個共享的CountDownLatch(1),將其計算器初始化為1,多個線程在開始執行任務前首先countdownlatch.await(),當主線程調用countDown()時,計數器變為0,多個線程同時被喚醒。
CountDownLatch的不足 CountDownLatch是一次性的,計算器的值只能在構造方法中初始化一次,之后沒有任何機制再次對其設置值,當CountDownLatch使用完畢后,它不能再次被使用。
栗子:運動員進行跑步比賽時,假設有6個運動員參與比賽,裁判員在終點會為這6個運動員分別計時,可以想象沒當一個運動員到達終點的時候,對于裁判員來說就少了一個計時任務。直到所有運動員都到達終點了,裁判員的任務也才完成。這6個運動員可以類比成6個線程,當線程調用CountDownLatch.countDown方法時就會對計數器的值減一,直到計數器的值為0的時候,裁判員(調用await方法的線程)才能繼續往下執行。
public class CountDownLatchTest {
private static CountDownLatch startSignal = new CountDownLatch(1);
//用來表示裁判員需要維護的是6個運動員
private static CountDownLatch endSignal = new CountDownLatch(6);
public static void main(String[] args) throws InterruptedException {
ExecutorService executorService = Executors.newFixedThreadPool(6);
for (int i = 0; i < 6; i++) {
executorService.execute(() -> {
try {
System.out.println(Thread.currentThread().getName() + " 運動員等待裁判員響哨?。?!");
startSignal.await();
System.out.println(Thread.currentThread().getName() + "正在全力沖刺");
endSignal.countDown();
System.out.println(Thread.currentThread().getName() + " 到達終點");
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
System.out.println("裁判員響哨開始啦!!!");
startSignal.countDown();
endSignal.await();
System.out.println("所有運動員到達終點,比賽結束!");
executorService.shutdown();
}
}
該示例代碼中設置了兩個CountDownLatch,第一個endSignal用于控制讓main線程(裁判員)必須等到其他線程(運動員)讓CountDownLatch維護的數值N減到0為止,相當于一個完成信號;另一個startSignal用于讓main線程對其他線程進行“發號施令”,相當于一個入口或者開關。
startSignal引用的CountDownLatch初始值為1,而其他線程執行的run方法中都會先通過 startSignal.await()讓這些線程都被阻塞,直到main線程通過調用startSignal.countDown();,將值N減1,CountDownLatch維護的數值N為0后,其他線程才能往下執行,并且,每個線程執行的run方法中都會通過endSignal.countDown();對endSignal維護的數值進行減一,由于往線程池提交了6個任務,會被減6次,所以endSignal維護的值最終會變為0,因此main線程在latch.await();阻塞結束,才能繼續往下執行。
注意:當調用CountDownLatch的countDown方法時,當前線程是不會被阻塞,會繼續往下執行。
3. CyclicBarrier
CountDownLatch是一個倒數計數器,在計數器不為0時,所有調用await的線程都會等待,當計數器降為0,線程才會繼續執行,且計數器一旦變為0,就不能再重置了。
CyclicBarrier可以認為是一個柵欄,柵欄的作用是什么?就是阻擋前行。
CyclicBarrier是一個可以循環使用的柵欄,它做的事情就是:讓線程到達柵欄時被阻塞(調用await方法),直到到達柵欄的線程數滿足指定數量要求時,柵欄才會打開放行,被柵欄攔截的線程才可以執行。
當多個線程都達到了指定點后,才能繼續往下繼續執行。這就有點像報數的感覺,假設6個線程就相當于6個運動員,到賽道起點時會報數進行統計,如果剛好是6的話,這一波就湊齊了,才能往下執行。這里的6個線程,也就是計數器的初始值6,是通過CyclicBarrier的構造方法傳入的。
CyclicBarrier的主要方法:
- await() throws InterruptedException, BrokenBarrierException 等到所有的線程都到達指定的臨界點;
- await(long timeout, TimeUnit unit) throws InterruptedException, BrokenBarrierException, TimeoutException 與上面的await方法功能基本一致,只不過這里有超時限制,阻塞等待直至到達超時時間為止;
- int getNumberWaiting()獲取當前有多少個線程阻塞等待在臨界點上;
- boolean isBroken()用于查詢阻塞等待的線程是否被中斷
- void reset()將屏障重置為初始狀態。如果當前有線程正在臨界點等待的話,將拋出BrokenBarrierException。
另外需要注意的是,CyclicBarrier提供了這樣的構造方法:
public CyclicBarrier(int parties, Runnable barrierAction)
可以用來,當指定的線程都到達了指定的臨界點的時,接下來執行的操作可以由barrierAction傳入即可。
栗子:6個運動員準備跑步比賽,運動員在賽跑需要在起點做好準備,當裁判發現所有運動員準備完畢后,就舉起發令槍,比賽開始。這里的起跑線就是屏障,是臨界點,而這6個運動員就類比成線程的話,就是這6個線程都必須到達指定點了,意味著湊齊了一波,然后才能繼續執行,否則每個線程都得阻塞等待,直至湊齊一波即可。
public class CyclicBarrierTest {
public static void main(String[] args) {
int N = 6; // 運動員數
CyclicBarrier cb = new CyclicBarrier(N, new Runnable() {
@Override
public void run() {
System.out.println("所有運動員已準備完畢,發令槍:跑!");
}
});
for (int i = 0; i < N; i++) {
Thread t = new Thread(new PrepareWork(cb), "運動員[" + i + "]");
t.start();
}
}
private static class PrepareWork implements Runnable {
private CyclicBarrier cb;
PrepareWork(CyclicBarrier cb) {
this.cb = cb;
}
@Override
public void run() {
try {
Thread.sleep(500);
System.out.println(Thread.currentThread().getName() + ": 準備完成");
cb.await(); // 在柵欄等待
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
}
}
}
從輸出結果可以看出,當6個運動員(線程)都到達了指定的臨界點(barrier)時候,才能繼續往下執行,否則,則會阻塞等待在調用await()處。 (在CyclicBarrier構造函數中傳入N參數,代表當由N個線程調用cb.await(); 方法,CyclicBarrier才會往下執行barrierAction)
CyclicBarrier對異常的處理 線程在阻塞過程中,可能被中斷,那么既然CyclicBarrier放行的條件是等待的線程數達到指定數目,萬一線程被中斷導致最終的等待線程數達不到柵欄的要求怎么辦?
public int await() throws InterruptedException, BrokenBarrierException {
//...
}
可以看到,這個方法除了拋出InterruptedException異常外,還會拋出BrokenBarrierException。 BrokenBarrierException表示當前的CyclicBarrier已經損壞了,等不到所有線程都到達柵欄了,所以已經在等待的線程也沒必要再等了,可以散伙了。
出現以下幾種情況之一時,當前等待線程會拋出BrokenBarrierException異常:
- 其它某個正在await等待的線程被中斷了;
- 其它某個正在await等待的線程超時了;
- 某個線程重置了CyclicBarrier;
另外,只要正在Barrier上等待的任一線程拋出了異常,那么Barrier就會認為肯定是湊不齊所有線程了,就會將柵欄置為損壞(Broken)狀態,并傳播BrokenBarrierException給其它所有正在等待(await)的線程。
異常情況模擬:
public class CyclicBarrierTest {
public static void main(String[] args) throws InterruptedException {
int N = 6; // 運動員數
CyclicBarrier cb = new CyclicBarrier(N, new Runnable() {
@Override
public void run() {
System.out.println("所有運動員已準備完畢,發令槍:跑!");
}
});
List<Thread> list = new ArrayList<>();
for (int i = 0; i < N; i++) {
Thread t = new Thread(new PrepareWork(cb), "運動員[" + i + "]");
list.add(t);
t.start();
if (i == 3) {
t.interrupt(); // 運動員[3]置中斷標志位
}
}
Thread.sleep(2000);
System.out.println("Barrier是否損壞:" + cb.isBroken());
}
CountDownLatch與CyclicBarrier的比較 CountDownLatch與CyclicBarrier都是用于控制并發的工具類,都可以理解成維護的就是一個計數器,但是這兩者還是各有不同側重點的:
- CountDownLatch一般用于某個線程A等待若干個其他線程執行完任務之后,它才執行;而CyclicBarrier一般用于一組線程互相等待至某個狀態,然后這一組線程再同時執行;CountDownLatch強調一個線程等多個線程完成某件事情。CyclicBarrier是多個線程互等,等大家都完成,再攜手共進。
- CountDownLatch方法比較少,操作比較簡單,而CyclicBarrier提供的方法更多,比如能夠通過getNumberWaiting(),isBroken()這些方法獲取當前多個線程的狀態,并且CyclicBarrier的構造方法可以傳入barrierAction,指定當所有線程都到達時執行的業務功能;
- CountDownLatch是不能復用的,而CyclicLatch是可以復用的。
4. Exchanger
Exchanger可以用來在兩個線程之間交換持有的對象。當Exchanger在一個線程中調用exchange方法之后,會等待另外的線程調用同樣的exchange方法,兩個線程都調用exchange方法之后,傳入的參數就會交換。
兩個主要方法
public V exchange(V x) throws InterruptedException
當這個方法被調用的時候,當前線程將會等待直到其他的線程調用同樣的方法。當其他的線程調用exchange之后,當前線程將會繼續執行。 在等待過程中,如果有其他的線程interrupt當前線程,則會拋出InterruptedException。
public V exchange(V x, long timeout, TimeUnit unit) throws InterruptedException, TimeoutException
多了一個timeout時間。如果在timeout時間之內沒有其他線程調用exchange方法,拋出TimeoutException。
栗子: 我們先定義一個帶交換的類: 然后定義兩個Runnable,在run方法中調用exchange方法:
public class ExchangerTest {
public static void main(String[] args) {
Exchanger<CustBook> exchanger = new Exchanger<>();
// Starting two threads
new Thread(new ExchangerOne(exchanger)).start();
new Thread(new ExchangerTwo(exchanger)).start();
}
}
public class CustBook {
private String name;
}
public class ExchangerOne implements Runnable{
Exchanger<CustBook> ex;
ExchangerOne(Exchanger<CustBook> ex){
this.ex=ex;
}
@Override
public void run() {
CustBook custBook= new CustBook();
custBook.setName("book one");
try {
CustBook exhangeCustBook=ex.exchange(custBook);
log.info(exhangeCustBook.getName());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public class ExchangerTwo implements Runnable{
Exchanger<CustBook> ex;
ExchangerTwo(Exchanger<CustBook> ex){
this.ex=ex;
}
@Override
public void run() {
CustBook custBook= new CustBook();
custBook.setName("book two");
try {
CustBook exhangeCustBook=ex.exchange(custBook);
log.info(exhangeCustBook.getName());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
5. Phaser
Phaser是一個同步工具類,適用于一些需要分階段的任務的處理。它的功能與 CyclicBarrier和CountDownLatch類似,類似于一個多階段的柵欄,并且功能更強大,我們來比較下這三者的功能:
CountDownLatch
倒數計數器,初始時設定計數器值,線程可以在計數器上等待,當計數器值歸0后,所有等待的線程繼續執行
CyclicBarrier
循環柵欄,初始時設定參與線程數,當線程到達柵欄后,會等待其它線程的到達,當到達柵欄的總數滿足指定數后,所有等待的線程繼續執行
Phaser
多階段柵欄,可以在初始時設定參與線程數,也可以中途注冊/注銷參與者,當到達的參與者數量滿足柵欄設定的數量后,會進行階段升級(advance)
相關概念:phase(階段) Phaser也有柵欄,在Phaser中,柵欄的名稱叫做phase(階段),在任意時間點,Phaser只處于某一個phase(階段),初始階段為0,最大達到Integerr.MAX_VALUE,然后再次歸零。當所有parties參與者都到達后,phase值會遞增。
parties(參與者) Phaser既可以在初始構造時指定參與者的數量,也可以中途通過register、bulkRegister、arriveAndDeregister等方法注冊/注銷參與者。
arrive(到達) / advance(進階) Phaser注冊完parties(參與者)之后,參與者的初始狀態是unarrived的,當參與者到達(arrive)當前階段(phase)后,狀態就會變成arrived。當階段的到達參與者數滿足條件后(注冊的數量等于到達的數量),階段就會發生進階(advance)——也就是phase值+1。
Termination(終止) 代表當前Phaser對象達到終止狀態。
Tiering(分層) Phaser支持分層(Tiering) —— 一種樹形結構,通過構造函數可以指定當前待構造的Phaser對象的父結點。之所以引入Tiering,是因為當一個Phaser有大量參與者(parties)的時候,內部的同步操作會使性能急劇下降,而分層可以降低競爭,從而減小因同步導致的額外開銷。 在一個分層Phasers的樹結構中,注冊和撤銷子Phaser或父Phaser是自動被管理的。當一個Phaser參與者(parties)數量變成0時,如果有該Phaser有父結點,就會將它從父結點中溢移除。
核心方法:
- arriveAndDeregister() 該方法立即返回下一階段的序號,并且其它線程需要等待的個數減一, 取消自己的注冊、把當前線程從之后需要等待的成員中移除。 如果該Phaser是另外一個Phaser的子Phaser(層次化Phaser), 并且該操作導致當前Phaser的成員數為0,則該操作也會將當前Phaser從其父Phaser中移除。
- arrive() 某個參與者完成任務后調用,該方法不作任何等待,直接返回下一階段的序號。 awaitAdvance(int phase) 該方法等待某一階段執行完畢。 如果當前階段不等于指定的階段或者該Phaser已經被終止,則立即返回。 該階段數一般由arrive()方法或者arriveAndDeregister()方法返回。 返回下一階段的序號,或者返回參數指定的值(如果該參數為負數),或者直接返回當前階段序號(如果當前Phaser已經被終止)。
- awaitAdvanceInterruptibly(int phase) 效果與awaitAdvance(int phase)相當, 唯一的不同在于若該線程在該方法等待時被中斷,則該方法拋出InterruptedException。
- awaitAdvanceInterruptibly(int phase, long timeout, TimeUnit unit) 效果與awaitAdvanceInterruptibly(int phase)相當, 區別在于如果超時則拋出TimeoutException。
- bulkRegister(int parties) 動態調整注冊任務parties的數量。如果當前phaser已經被終止,則該方法無效,并返回負數。 如果調用該方法時,onAdvance方法正在執行,則該方法等待其執行完畢。 如果該Phaser有父Phaser則指定的party數大于0,且之前該Phaser的party數為0,那么該Phaser會被注冊到其父Phaser中。
- forceTermination() 強制讓該Phaser進入終止狀態。 已經注冊的party數不受影響。如果該Phaser有子Phaser,則其所有的子Phaser均進入終止狀態。 如果該Phaser已經處于終止狀態,該方法調用不造成任何影響。
栗子:3個線程,4個階段,每個階段都并發處理
import java.util.concurrent.Phaser;
public class PhaserTest {
public static void main(String[] args) {
int parties = 3;
int phases = 4;
final Phaser phaser = new Phaser(parties) {
@Override
//每個階段結束時
protected boolean onAdvance(int phase, int registeredParties) {
System.out.println("====== Phase : " + phase + " end ======");
return registeredParties == 0;
}
};
for (int i = 0; i < parties; i++) {
int threadId = i;
Thread thread = new Thread(() -> {
for (int phase = 0; phase < phases; phase++) {
if (phase == 0) {
System.out.println(String.format("第一階段操作 Thread %s, phase %s", threadId, phase));
}
if (phase == 1) {
System.out.println(String.format("第二階段操作 Thread %s, phase %s", threadId, phase));
}
if (phase == 2) {
System.out.println(String.format("第三階段操作 Thread %s, phase %s", threadId, phase));
}
if (phase == 3) {
System.out.println(String.format("第四階段操作 Thread %s, phase %s", threadId, phase));
}
/**
* arriveAndAwaitAdvance() 當前線程當前階段執行完畢,等待其它線程完成當前階段。
* 如果當前線程是該階段最后一個未到達的,則該方法直接返回下一個階段的序號(階段序號從0開始),
* 同時其它線程的該方法也返回下一個階段的序號。
**/
int nextPhaser = phaser.arriveAndAwaitAdvance();
}
});
thread.start();
}
}
}