Synchronizes:Semaphore、CountDownLatch、CyclicBarrier、Exchanger、Phaser

1. Semaphore Semaphore(信號量)是用來控制同時訪問特定資源的線程數量,通過協調各個線程,保證合理的使用公共資源。Semaphore維護了一個許可集,其實就是一定數量的“許可證”。當有

1. Semaphore

Semaphore(信號量)是用來控制同時訪問特定資源的線程數量,通過協調各個線程,保證合理的使用公共資源。 Semaphore維護了一個許可集,其實就是一定數量的“許可證”。當有線程想要訪問共享資源時,需要先獲取(acquire)的許可;如果許可不夠了,線程需要一直等待,直到許可可用。當線程使用完共享資源后,可以歸還(release)許可,以供其它需要的線程使用。

和ReentrantLock類似,Semaphore支持公平/非公平策略。

Semaphore的主要方法摘要:   void acquire():從此信號量獲取一個許可,在提供一個許可前一直將線程阻塞,否則線程被中斷。   void release():釋放一個許可,將其返回給信號量。   int availablePermits():返回此信號量中當前可用的許可數。   boolean hasQueuedThreads():查詢是否有線程正在等待獲取。

使用

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;

//創建一個會實現print queue的類名為 PrintQueue。
class PrintQueue {

    // 聲明一個對象為Semaphore,稱它為semaphore。
    private final Semaphore semaphore;
    // 實現類的構造函數并初始能保護print quere的訪問的semaphore對象的值。
    public PrintQueue() {
        semaphore = new Semaphore(1);
    }

    //實現Implement the printJob()方法,此方法可以模擬打印文檔,并接收document對象作為參數。
    public void printJob(Object document) {
//在這方法內,首先,你必須調用acquire()方法獲得demaphore。這個方法會拋出 InterruptedException異常,使用必須包含處理這個異常的代碼。
        try {
            semaphore.acquire();

//然后,實現能隨機等待一段時間的模擬打印文檔的行。
            long duration = (long) (Math.random() * 10);

            System.out.printf("%s: PrintQueue: Printing a Job during %d seconds\n", Thread.currentThread().getName(), duration);

            Thread.sleep(duration);

//最后,釋放semaphore通過調用semaphore的relaser()方法。
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            semaphore.release();
        }
    }

}

//創建一個名為Job的類并一定實現Runnable 接口。這個類實現把文檔傳送到打印機的任務。
class Job implements Runnable {
    //聲明一個對象為PrintQueue,名為printQueue。
    private PrintQueue printQueue;
    //實現類的構造函數,初始化這個類里的PrintQueue對象。
    public Job(PrintQueue printQueue) {
        this.printQueue = printQueue;
    }

    //實現方法run()。
    @Override
    public void run() {
                //首先, 此方法寫信息到操控臺表明任務已經開始執行了。
        System.out.printf("%s: Going to print a job\n", Thread.currentThread().getName());
                // 然后,調用PrintQueue 對象的printJob()方法。
        printQueue.printJob(new Object());
                //最后, 此方法寫信息到操控臺表明它已經結束運行了。
        System.out.printf("%s: The document has been printed\n", Thread.currentThread().getName());

    }
}

public class SemaphoreTest {

    public static void main(String args[]) {

                // 創建PrintQueue對象名為printQueue。
        PrintQueue printQueue = new PrintQueue();
            //創建10個threads。每個線程會執行一個發送文檔到print queue的Job對象。
        Thread thread[] = new Thread[10];

        for (int i = 0; i < 10; i++) {
            thread[i] = new Thread(new Job(printQueue), "Thread" + i);
        }

        for (int i = 0; i < 10; i++) {
            thread[i].start();
        }

    }
}

2. CountDownLatch

在多線程協作完成業務功能時,有時候需要等待其他多個線程完成任務之后,主線程才能繼續往下執行業務功能,在這種的業務場景下,通??梢允褂肨hread類的join方法,讓主線程等待被join的線程執行完之后,主線程才能繼續往下執行。當然,使用線程間消息通信機制也可以完成。其實,java并發工具類中為我們提供了類似“倒計時”這樣的工具類,可以十分方便的完成所說的這種業務場景。

CountDownLatch允許一個或多個線程等待其他線程完成工作。

CountDownLatch相關方法:

  • public CountDownLatch(int count) 構造方法會傳入一個整型數N,之后調用CountDownLatch的countDown方法會對N減一,知道N減到0的時候,當前調用await方法的線程繼續執行。
  • await() throws InterruptedException:調用該方法的線程等到構造方法傳入的N減到0的時候,才能繼續往下執行;
  • await(long timeout, TimeUnit unit):與上面的await方法功能一致,只不過這里有了時間限制,調用該方法的線程等到指定的timeout時間后,不管N是否減至為0,都會繼續往下執行;
  • countDown():使CountDownLatch初始值N減1;
  • long getCount():獲取當前CountDownLatch維護的值

CountDownLatch的用法 CountDownLatch典型用法:1、某一線程在開始運行前等待n個線程執行完畢。將CountDownLatch的計數器初始化為new CountDownLatch(n),每當一個任務線程執行完畢,就將計數器減1 countdownLatch.countDown(),當計數器的值變為0時,在CountDownLatch上await()的線程就會被喚醒。一個典型應用場景就是啟動一個服務時,主線程需要等待多個組件加載完畢,之后再繼續執行。 CountDownLatch典型用法:2、實現多個線程開始執行任務的最大并行性。注意是并行性,不是并發,強調的是多個線程在某一時刻同時開始執行。類似于賽跑,將多個線程放到起點,等待發令槍響,然后同時開跑。做法是初始化一個共享的CountDownLatch(1),將其計算器初始化為1,多個線程在開始執行任務前首先countdownlatch.await(),當主線程調用countDown()時,計數器變為0,多個線程同時被喚醒。

CountDownLatch的不足 CountDownLatch是一次性的,計算器的值只能在構造方法中初始化一次,之后沒有任何機制再次對其設置值,當CountDownLatch使用完畢后,它不能再次被使用。

栗子:運動員進行跑步比賽時,假設有6個運動員參與比賽,裁判員在終點會為這6個運動員分別計時,可以想象沒當一個運動員到達終點的時候,對于裁判員來說就少了一個計時任務。直到所有運動員都到達終點了,裁判員的任務也才完成。這6個運動員可以類比成6個線程,當線程調用CountDownLatch.countDown方法時就會對計數器的值減一,直到計數器的值為0的時候,裁判員(調用await方法的線程)才能繼續往下執行。

public class CountDownLatchTest {
private static CountDownLatch startSignal = new CountDownLatch(1);
//用來表示裁判員需要維護的是6個運動員
private static CountDownLatch endSignal = new CountDownLatch(6);

public static void main(String[] args) throws InterruptedException {
    ExecutorService executorService = Executors.newFixedThreadPool(6);
    for (int i = 0; i < 6; i++) {
        executorService.execute(() -> {
            try {
                System.out.println(Thread.currentThread().getName() + " 運動員等待裁判員響哨?。?!");
                startSignal.await();
                System.out.println(Thread.currentThread().getName() + "正在全力沖刺");
                endSignal.countDown();
                System.out.println(Thread.currentThread().getName() + "  到達終點");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
    }
    System.out.println("裁判員響哨開始啦!!!");
    startSignal.countDown();
    endSignal.await();
    System.out.println("所有運動員到達終點,比賽結束!");
    executorService.shutdown();
}
}

該示例代碼中設置了兩個CountDownLatch,第一個endSignal用于控制讓main線程(裁判員)必須等到其他線程(運動員)讓CountDownLatch維護的數值N減到0為止,相當于一個完成信號;另一個startSignal用于讓main線程對其他線程進行“發號施令”,相當于一個入口或者開關。

startSignal引用的CountDownLatch初始值為1,而其他線程執行的run方法中都會先通過 startSignal.await()讓這些線程都被阻塞,直到main線程通過調用startSignal.countDown();,將值N減1,CountDownLatch維護的數值N為0后,其他線程才能往下執行,并且,每個線程執行的run方法中都會通過endSignal.countDown();對endSignal維護的數值進行減一,由于往線程池提交了6個任務,會被減6次,所以endSignal維護的值最終會變為0,因此main線程在latch.await();阻塞結束,才能繼續往下執行。

注意:當調用CountDownLatch的countDown方法時,當前線程是不會被阻塞,會繼續往下執行。

3. CyclicBarrier

CountDownLatch是一個倒數計數器,在計數器不為0時,所有調用await的線程都會等待,當計數器降為0,線程才會繼續執行,且計數器一旦變為0,就不能再重置了。

CyclicBarrier可以認為是一個柵欄,柵欄的作用是什么?就是阻擋前行。

CyclicBarrier是一個可以循環使用的柵欄,它做的事情就是:讓線程到達柵欄時被阻塞(調用await方法),直到到達柵欄的線程數滿足指定數量要求時,柵欄才會打開放行,被柵欄攔截的線程才可以執行。

當多個線程都達到了指定點后,才能繼續往下繼續執行。這就有點像報數的感覺,假設6個線程就相當于6個運動員,到賽道起點時會報數進行統計,如果剛好是6的話,這一波就湊齊了,才能往下執行。這里的6個線程,也就是計數器的初始值6,是通過CyclicBarrier的構造方法傳入的。

CyclicBarrier的主要方法:

  • await() throws InterruptedException, BrokenBarrierException 等到所有的線程都到達指定的臨界點;
  • await(long timeout, TimeUnit unit) throws InterruptedException, BrokenBarrierException, TimeoutException 與上面的await方法功能基本一致,只不過這里有超時限制,阻塞等待直至到達超時時間為止;
  • int getNumberWaiting()獲取當前有多少個線程阻塞等待在臨界點上;
  • boolean isBroken()用于查詢阻塞等待的線程是否被中斷
  • void reset()將屏障重置為初始狀態。如果當前有線程正在臨界點等待的話,將拋出BrokenBarrierException。

另外需要注意的是,CyclicBarrier提供了這樣的構造方法:

public CyclicBarrier(int parties, Runnable barrierAction)

可以用來,當指定的線程都到達了指定的臨界點的時,接下來執行的操作可以由barrierAction傳入即可。

栗子:6個運動員準備跑步比賽,運動員在賽跑需要在起點做好準備,當裁判發現所有運動員準備完畢后,就舉起發令槍,比賽開始。這里的起跑線就是屏障,是臨界點,而這6個運動員就類比成線程的話,就是這6個線程都必須到達指定點了,意味著湊齊了一波,然后才能繼續執行,否則每個線程都得阻塞等待,直至湊齊一波即可。

public class CyclicBarrierTest {
    public static void main(String[] args) {

        int N = 6;  // 運動員數
        CyclicBarrier cb = new CyclicBarrier(N, new Runnable() {
            @Override
            public void run() {
                System.out.println("所有運動員已準備完畢,發令槍:跑!");
            }
        });

        for (int i = 0; i < N; i++) {
            Thread t = new Thread(new PrepareWork(cb), "運動員[" + i + "]");
            t.start();
        }
    }


private static class PrepareWork implements Runnable {
        private CyclicBarrier cb;

        PrepareWork(CyclicBarrier cb) {
            this.cb = cb;
        }

        @Override
        public void run() {

            try {
                Thread.sleep(500);
                System.out.println(Thread.currentThread().getName() + ": 準備完成");
                cb.await();          // 在柵欄等待
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (BrokenBarrierException e) {
                e.printStackTrace();
            }
        }
    }
}

從輸出結果可以看出,當6個運動員(線程)都到達了指定的臨界點(barrier)時候,才能繼續往下執行,否則,則會阻塞等待在調用await()處。 (在CyclicBarrier構造函數中傳入N參數,代表當由N個線程調用cb.await(); 方法,CyclicBarrier才會往下執行barrierAction)

CyclicBarrier對異常的處理 線程在阻塞過程中,可能被中斷,那么既然CyclicBarrier放行的條件是等待的線程數達到指定數目,萬一線程被中斷導致最終的等待線程數達不到柵欄的要求怎么辦?

public int await() throws InterruptedException, BrokenBarrierException {
    //...
}

可以看到,這個方法除了拋出InterruptedException異常外,還會拋出BrokenBarrierException。 BrokenBarrierException表示當前的CyclicBarrier已經損壞了,等不到所有線程都到達柵欄了,所以已經在等待的線程也沒必要再等了,可以散伙了。

出現以下幾種情況之一時,當前等待線程會拋出BrokenBarrierException異常:

  • 其它某個正在await等待的線程被中斷了;
  • 其它某個正在await等待的線程超時了;
  • 某個線程重置了CyclicBarrier;

另外,只要正在Barrier上等待的任一線程拋出了異常,那么Barrier就會認為肯定是湊不齊所有線程了,就會將柵欄置為損壞(Broken)狀態,并傳播BrokenBarrierException給其它所有正在等待(await)的線程。

異常情況模擬:

public class CyclicBarrierTest {
    public static void main(String[] args) throws InterruptedException {

        int N = 6;  // 運動員數
        CyclicBarrier cb = new CyclicBarrier(N, new Runnable() {
            @Override
            public void run() {
                System.out.println("所有運動員已準備完畢,發令槍:跑!");
            }
        });

        List<Thread> list = new ArrayList<>();
        for (int i = 0; i < N; i++) {
            Thread t = new Thread(new PrepareWork(cb), "運動員[" + i + "]");
            list.add(t);
            t.start();
            if (i == 3) {
                t.interrupt();  // 運動員[3]置中斷標志位
            }
        }

        Thread.sleep(2000);
        System.out.println("Barrier是否損壞:" + cb.isBroken());
    }

CountDownLatch與CyclicBarrier的比較 CountDownLatch與CyclicBarrier都是用于控制并發的工具類,都可以理解成維護的就是一個計數器,但是這兩者還是各有不同側重點的:

  • CountDownLatch一般用于某個線程A等待若干個其他線程執行完任務之后,它才執行;而CyclicBarrier一般用于一組線程互相等待至某個狀態,然后這一組線程再同時執行;CountDownLatch強調一個線程等多個線程完成某件事情。CyclicBarrier是多個線程互等,等大家都完成,再攜手共進。
  • CountDownLatch方法比較少,操作比較簡單,而CyclicBarrier提供的方法更多,比如能夠通過getNumberWaiting(),isBroken()這些方法獲取當前多個線程的狀態,并且CyclicBarrier的構造方法可以傳入barrierAction,指定當所有線程都到達時執行的業務功能;
  • CountDownLatch是不能復用的,而CyclicLatch是可以復用的。

4. Exchanger

Exchanger可以用來在兩個線程之間交換持有的對象。當Exchanger在一個線程中調用exchange方法之后,會等待另外的線程調用同樣的exchange方法,兩個線程都調用exchange方法之后,傳入的參數就會交換。

兩個主要方法 public V exchange(V x) throws InterruptedException

當這個方法被調用的時候,當前線程將會等待直到其他的線程調用同樣的方法。當其他的線程調用exchange之后,當前線程將會繼續執行。 在等待過程中,如果有其他的線程interrupt當前線程,則會拋出InterruptedException。

public V exchange(V x, long timeout, TimeUnit unit) throws InterruptedException, TimeoutException

多了一個timeout時間。如果在timeout時間之內沒有其他線程調用exchange方法,拋出TimeoutException。

栗子: 我們先定義一個帶交換的類: 然后定義兩個Runnable,在run方法中調用exchange方法:

public class ExchangerTest {

    public static void main(String[] args) {
        Exchanger<CustBook> exchanger = new Exchanger<>();
        // Starting two threads
        new Thread(new ExchangerOne(exchanger)).start();
        new Thread(new ExchangerTwo(exchanger)).start();
    }
}
public class CustBook {

    private String name;
}
public class ExchangerOne implements Runnable{

    Exchanger<CustBook> ex;

    ExchangerOne(Exchanger<CustBook> ex){
      this.ex=ex;
    }

    @Override
    public void run() {
    CustBook custBook= new CustBook();
        custBook.setName("book one");

        try {
            CustBook exhangeCustBook=ex.exchange(custBook);
            log.info(exhangeCustBook.getName());
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}
public class ExchangerTwo implements Runnable{

    Exchanger<CustBook> ex;

    ExchangerTwo(Exchanger<CustBook> ex){
      this.ex=ex;
    }

    @Override
    public void run() {
    CustBook custBook= new CustBook();
        custBook.setName("book two");

        try {
            CustBook exhangeCustBook=ex.exchange(custBook);
            log.info(exhangeCustBook.getName());
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

5. Phaser

Phaser是一個同步工具類,適用于一些需要分階段的任務的處理。它的功能與 CyclicBarrier和CountDownLatch類似,類似于一個多階段的柵欄,并且功能更強大,我們來比較下這三者的功能:

CountDownLatch

倒數計數器,初始時設定計數器值,線程可以在計數器上等待,當計數器值歸0后,所有等待的線程繼續執行

CyclicBarrier

循環柵欄,初始時設定參與線程數,當線程到達柵欄后,會等待其它線程的到達,當到達柵欄的總數滿足指定數后,所有等待的線程繼續執行

Phaser

多階段柵欄,可以在初始時設定參與線程數,也可以中途注冊/注銷參與者,當到達的參與者數量滿足柵欄設定的數量后,會進行階段升級(advance)

相關概念:phase(階段) Phaser也有柵欄,在Phaser中,柵欄的名稱叫做phase(階段),在任意時間點,Phaser只處于某一個phase(階段),初始階段為0,最大達到Integerr.MAX_VALUE,然后再次歸零。當所有parties參與者都到達后,phase值會遞增。

parties(參與者) Phaser既可以在初始構造時指定參與者的數量,也可以中途通過register、bulkRegister、arriveAndDeregister等方法注冊/注銷參與者。

arrive(到達) / advance(進階) Phaser注冊完parties(參與者)之后,參與者的初始狀態是unarrived的,當參與者到達(arrive)當前階段(phase)后,狀態就會變成arrived。當階段的到達參與者數滿足條件后(注冊的數量等于到達的數量),階段就會發生進階(advance)——也就是phase值+1。

Termination(終止) 代表當前Phaser對象達到終止狀態。

Tiering(分層) Phaser支持分層(Tiering) —— 一種樹形結構,通過構造函數可以指定當前待構造的Phaser對象的父結點。之所以引入Tiering,是因為當一個Phaser有大量參與者(parties)的時候,內部的同步操作會使性能急劇下降,而分層可以降低競爭,從而減小因同步導致的額外開銷。 在一個分層Phasers的樹結構中,注冊和撤銷子Phaser或父Phaser是自動被管理的。當一個Phaser參與者(parties)數量變成0時,如果有該Phaser有父結點,就會將它從父結點中溢移除。

核心方法:

  • arriveAndDeregister() 該方法立即返回下一階段的序號,并且其它線程需要等待的個數減一, 取消自己的注冊、把當前線程從之后需要等待的成員中移除。 如果該Phaser是另外一個Phaser的子Phaser(層次化Phaser), 并且該操作導致當前Phaser的成員數為0,則該操作也會將當前Phaser從其父Phaser中移除。
  • arrive() 某個參與者完成任務后調用,該方法不作任何等待,直接返回下一階段的序號。 awaitAdvance(int phase) 該方法等待某一階段執行完畢。 如果當前階段不等于指定的階段或者該Phaser已經被終止,則立即返回。 該階段數一般由arrive()方法或者arriveAndDeregister()方法返回。 返回下一階段的序號,或者返回參數指定的值(如果該參數為負數),或者直接返回當前階段序號(如果當前Phaser已經被終止)。
  • awaitAdvanceInterruptibly(int phase) 效果與awaitAdvance(int phase)相當, 唯一的不同在于若該線程在該方法等待時被中斷,則該方法拋出InterruptedException。
  • awaitAdvanceInterruptibly(int phase, long timeout, TimeUnit unit) 效果與awaitAdvanceInterruptibly(int phase)相當, 區別在于如果超時則拋出TimeoutException。
  • bulkRegister(int parties) 動態調整注冊任務parties的數量。如果當前phaser已經被終止,則該方法無效,并返回負數。 如果調用該方法時,onAdvance方法正在執行,則該方法等待其執行完畢。 如果該Phaser有父Phaser則指定的party數大于0,且之前該Phaser的party數為0,那么該Phaser會被注冊到其父Phaser中。
  • forceTermination() 強制讓該Phaser進入終止狀態。 已經注冊的party數不受影響。如果該Phaser有子Phaser,則其所有的子Phaser均進入終止狀態。 如果該Phaser已經處于終止狀態,該方法調用不造成任何影響。

栗子:3個線程,4個階段,每個階段都并發處理

import java.util.concurrent.Phaser;

public class PhaserTest {
    public static void main(String[] args) {
        int parties = 3;
        int phases = 4;
        final Phaser phaser = new Phaser(parties) {
            @Override
            //每個階段結束時
            protected boolean onAdvance(int phase, int registeredParties) {
                System.out.println("====== Phase : " + phase + "  end ======");
                return registeredParties == 0;
            }
        };
        for (int i = 0; i < parties; i++) {
            int threadId = i;
            Thread thread = new Thread(() -> {
                for (int phase = 0; phase < phases; phase++) {
                    if (phase == 0) {
                        System.out.println(String.format("第一階段操作  Thread %s, phase %s", threadId, phase));
                    }
                    if (phase == 1) {
                        System.out.println(String.format("第二階段操作  Thread %s, phase %s", threadId, phase));
                    }
                    if (phase == 2) {
                        System.out.println(String.format("第三階段操作  Thread %s, phase %s", threadId, phase));
                    }
                    if (phase == 3) {
                        System.out.println(String.format("第四階段操作  Thread %s, phase %s", threadId, phase));
                    }
         /**
          * arriveAndAwaitAdvance() 當前線程當前階段執行完畢,等待其它線程完成當前階段。
          * 如果當前線程是該階段最后一個未到達的,則該方法直接返回下一個階段的序號(階段序號從0開始),
          * 同時其它線程的該方法也返回下一個階段的序號。
          **/
                    int nextPhaser = phaser.arriveAndAwaitAdvance();

                }
            });
            thread.start();
        }
    }
}
聲明:所有內容來自互聯網搜索結果,不保證100%準確性,僅供參考。如若本站內容侵犯了原著者的合法權益,可聯系我們進行處理。
發表評論
更多 網友評論0 條評論)
暫無評論

返回頂部

主站蜘蛛池模板: 麻豆精品在线观看| 久久一区二区精品| 1000部啪啪未满十八勿入| 玖玖资源站无码专区| 娇小xxxxx性开放| 午夜福利麻豆国产精品| 中文字幕一区二区三区在线播放 | 日本动态120秒免费| 国产在线精品一区二区| 久久国产视频网站| 里番acg全彩本子在线观看| 日本哺乳期xxxx丨| 国产va免费精品高清在线| 中文字幕无码日韩欧毛| 美女久久久久久久久久久| 性做久久久久久| 免费a级试看片| 99riav国产在线观看| 欧美日韩国产三级| 国产精品久久久久一区二区三区| 亚洲三级视频在线| 99re最新这里只有精品| 日本欧美视频在线观看| 囯产精品一品二区三区| 一嫁三夫电影免费观看| 男爵夫人的调教| 国内精品18videosex性欧美| 亚洲欧美国产va在线播放| 福利网址在线观看| 日本黄色免费观看| 四虎.com官网| a级毛片在线播放| 欧美性天天影院欧美狂野| 国产成人精品午夜在线播放| 久久久亚洲欧洲日产国码二区 | 久久中文字幕无码专区| 精品露脸国产偷人在视频7| 太粗太深了用力点视频| 亚洲欧洲综合网| 香蕉久久夜色精品国产| 成人免费v片在线观看|