国产探花免费观看_亚洲丰满少妇自慰呻吟_97日韩有码在线_资源在线日韩欧美_一区二区精品毛片,辰东完美世界有声小说,欢乐颂第一季,yy玄幻小说排行榜完本

首頁 > 學院 > 開發設計 > 正文

【Java并發編程實戰】-----“J.U.C”:Semaphore

2019-11-14 15:35:25
字體:
來源:轉載
供稿:網友

信號量Semaphore是一個控制訪問多個共享資源的計數器,它本質上是一個“共享鎖”。

java并發提供了兩種加鎖模式:共享鎖和獨占鎖。前面LZ介紹的ReentrantLock就是獨占鎖。對于獨占鎖而言,它每次只能有一個線程持有,而共享鎖則不同,它允許多個線程并行持有鎖,并發訪問共享資源。

獨占鎖它所采用的是一種悲觀的加鎖策略,  對于寫而言為了避免沖突獨占是必須的,但是對于讀就沒有必要了,因為它不會影響數據的一致性。如果某個只讀線程獲取獨占鎖,則其他讀線程都只能等待了,這種情況下就限制了不必要的并發性,降低了吞吐量。而共享鎖則不同,它放寬了加鎖的條件,采用了樂觀鎖機制,它是允許多個讀線程同時訪問同一個共享資源的。

Semaphore簡介

Semaphore,在API中是這樣介紹的,一個計數信號量。從概念上講,信號量維護了一個許可集。如有必要,在許可可用前會阻塞每一個 acquire(),然后再獲取該許可。每個 release() 添加一個許可,從而可能釋放一個正在阻塞的獲取者。但是,不使用實際的許可對象,Semaphore 只對可用許可的號碼進行計數,并采取相應的行動。

Semaphore 通常用于限制可以訪問某些資源(物理或邏輯的)的線程數目。下面LZ以理發為例來簡述Semaphore。

為了簡單起見,我們假設只有三個理發師、一個接待人。一開始來了五個客人,接待人則安排三個客人進行理發,其余兩個人必須在那里等著,此后每個來理發店的人都必須等待。一段時間后,一個理發師完成理發后,接待人則安排另一個人(公平還是非公平機制呢??)來理發。在這里理發師則相當于公共資源,接待人則相當于信號量(Semaphore),客戶相當于線程。

進一步講,我們確定信號量Semaphore是一個非負整數(>=1)。當一個線程想要訪問某個共享資源時,它必須要先獲取Semaphore,當Semaphore >0時,獲取該資源并使Semaphore – 1。如果Semaphore值 = 0,則表示全部的共享資源已經被其他線程全部占用,線程必須要等待其他線程釋放資源。當線程釋放資源時,Semaphore則+1;

當信號量Semaphore = 1 時,它可以當作互斥鎖使用。其中0、1就相當于它的狀態,當=1時表示其他線程可以獲取,當=0時,排他,即其他線程必須要等待。

Semaphore源碼分析

Semaphore的結構如下:

2015081800001

從上面可以看出,Semaphore和ReentrantLock一樣,都是包含公平鎖(FairySync)和非公平鎖(NonfairSync),兩個鎖都是繼承Sync,而Sync也是繼承自AQS。其構造函數如下:

/**     * 創建具有給定的許可數和非公平的公平設置的 Semaphore。     */    public Semaphore(int permits) {        sync = new NonfairSync(permits);    }        /**     * 創建具有給定的許可數和給定的公平設置的 Semaphore。     */    public Semaphore(int permits, boolean fair) {        sync = fair ? new FairSync(permits) : new NonfairSync(permits);    }

 

信號量的獲取:acquire()

在ReentrantLock中已經闡述過,公平鎖和非公平鎖獲取鎖機制的差別:對于公平鎖而言,如果當前線程不在CLH隊列的頭部,則需要排隊等候,而非公平鎖則不同,它無論當前線程處于CLH隊列的何處都會直接獲取鎖。所以公平信號量和非公平信號量的區別也一樣。

public void acquire() throws InterruptedException {        sync.acquireSharedInterruptibly(1);    }        public final void acquireSharedInterruptibly(int arg)            throws InterruptedException {        if (Thread.interrupted())            throw new InterruptedException();        if (tryAcquireShared(arg) < 0)            doAcquireSharedInterruptibly(arg);    }

對于公平信號量和非公平信號量,他們機制的差異就體現在traAcquireShared()方法中:

公平鎖

PRotected int tryAcquireShared(int acquires) {        for (;;) {            //判斷該線程是否位于CLH隊列的列頭,如果是的話返回 -1,調用doAcquireSharedInterruptibly()            if (hasQueuedPredecessors())                return -1;            //獲取當前的信號量許可            int available = getState();            //設置“獲得acquires個信號量許可之后,剩余的信號量許可數”            int remaining = available - acquires;                        //如果剩余信號量 > 0 ,則設置“可獲取的信號量”為remaining            if (remaining < 0 || compareAndSetState(available, remaining))                return remaining;        }    }

tryAcquireShared是嘗試獲取 信號量,remaining表示下次可獲取的信號量。

對于hasQueuedPredecessors、compareAndSetState在ReentrantLock中已經闡述了,hasQueuedPredecessors用于判斷該線程是否位于CLH隊列列頭,compareAndSetState用于設置state的,它是進行原子操作的。代碼如下:

public final boolean hasQueuedPredecessors() {        Node t = tail; // Read fields in reverse initialization order        Node h = head;        Node s;        return h != t &&            ((s = h.next) == null || s.thread != Thread.currentThread());    }    protected final boolean compareAndSetState(int expect, int update) {        return unsafe.compareAndSwapInt(this, stateOffset, expect, update);    }

doAcquireSharedInterruptibly源代碼如下:

private void doAcquireSharedInterruptibly(int arg)            throws InterruptedException {            /*             * 創建CLH隊列的node節點,Node.SHARED表示該節點為共享鎖             */            final Node node = addWaiter(Node.SHARED);            boolean failed = true;            try {                for (;;) {                    //獲取該節點的前繼節點                    final Node p = node.predecessor();                    //當p為頭節點時,基于公平鎖機制,線程嘗試獲取鎖                    if (p == head) {                        //嘗試獲取鎖                        int r = tryAcquireShared(arg);                            if (r >= 0) {                            setHeadAndPropagate(node, r);                              p.next = null; // help GC                            failed = false;                            return;                        }                    }                    //判斷當前線程是否需要阻塞,如果阻塞的話,則一直處于阻塞狀態知道獲取共享鎖為止                    if (shouldParkAfterFailedAcquire(p, node) &&                        parkAndCheckInterrupt())                        throw new InterruptedException();                }            } finally {                if (failed)                    cancelAcquire(node);            }        }

doAcquireSharedInterruptibly主要是做兩個工作;1、嘗試獲取共享鎖,2、阻塞線程直到線程獲取共享鎖。

addWaiter(Node.SHARED):創建”當前線程“的Node節點,且Node中記錄的鎖的類型是”共享鎖“(Node.SHARED);并將該節點添加到CLH隊列末尾。

shouldParkAfterFailedAcquire:如果在嘗試獲取鎖失敗之后,線程應該等待,返回true;否則返回false。

parkAndCheckInterrupt:當前線程會進入等待狀態,直到獲取到共享鎖才繼續運行。

對于addWaiter、shouldParkAfterFailedAcquire、parkAndCheckInterruptLZ在“【Java并發編程實戰】-----“J.U.C”:ReentrantLock之二lock方法分析”中詳細介紹了。

非公平鎖

對于非公平鎖就簡單多了,她沒有那些所謂的要判斷是不是CLH隊列的列頭,如下:

final int nonfairTryAcquireShared(int acquires) {            for (;;) {                int available = getState();                int remaining = available - acquires;                if (remaining < 0 ||                    compareAndSetState(available, remaining))                    return remaining;            }        }

在非公平鎖中,tryAcquireShared直接調用AQS的nonfairTryAcquireShared()。通過上面的代碼我可看到非公平鎖并沒有通過if (hasQueuedPredecessors())這樣的條件來判斷該節點是否為CLH隊列的頭節點,而是直接判斷信號量。

信號量的釋放:release()

信號量Semaphore的釋放和獲取不同,它沒有分公平鎖和非公平鎖。如下:

public void release() {        sync.releaseShared(1);    }    public final boolean releaseShared(int arg) {        //嘗試釋放共享鎖        if (tryReleaseShared(arg)) {            doReleaseShared();            return true;        }        return false;    }

release()釋放線索所占有的共享鎖,它首先通過tryReleaseShared嘗試釋放共享鎖,如果成功直接返回,如果失敗則調用doReleaseShared來釋放共享鎖。

tryReleaseShared:

protected final boolean tryReleaseShared(int releases) {        for (;;) {            int current = getState();            //信號量的許可數 = 當前信號許可數 + 待釋放的信號許可數            int next = current + releases;            if (next < current) // overflow                throw new Error("Maximum permit count exceeded");            //設置可獲取的信號許可數為next            if (compareAndSetState(current, next))                return true;        }    }

doReleaseShared:

private void doReleaseShared() {            for (;;) {                //node 頭節點                Node h = head;                //h != null,且h != 尾節點                if (h != null && h != tail) {                    //獲取h節點對應線程的狀態                    int ws = h.waitStatus;                    //若h節點狀態為SIGNAL,表示h節點的下一個節點需要被喚醒                    if (ws == Node.SIGNAL) {                        //設置h節點狀態                        if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))                            continue;                        //喚醒h節點對應的下一個節點                        unparkSuccessor(h);                    }                    //若h節點對應的狀態== 0 ,則設置“文件點對應的線程所擁有的共享鎖”為其它線程獲取鎖的空狀態                    else if (ws == 0 &&                             !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))                        continue;                                }                //h == head時,則退出循環,若h節點發生改變時則循環繼續                if (h == head)                                      break;            }        }

在這里有關的方法,請參考:【Java并發編程實戰】-----“J.U.C”:ReentrantLock之三unlock方法分析。

實例

該實例來源于《java7并發編程實戰手冊》

打印任務:

public class PrintQueue {    private final Semaphore semaphore;   //聲明信號量        public PrintQueue(){        semaphore = new Semaphore(1);    }        public void printJob(Object document){        try {            semaphore.acquire();//調用acquire獲取信號量            long duration = (long) (Math.random() * 10);            System.out.println( Thread.currentThread().getName() +                     "PrintQueue : Printing a job during " + duration);            Thread.sleep(duration);        } catch (InterruptedException e) {            e.printStackTrace();        }  finally{            semaphore.release();  //釋放信號量        }    }}

Job:

public class Job implements Runnable{    private PrintQueue printQueue;        public Job(PrintQueue printQueue){        this.printQueue = printQueue;    }        @Override    public void run() {        System.out.println(Thread.currentThread().getName() + " Going to print a job");        printQueue.printJob(new Object());        System.out.println(Thread.currentThread().getName() + " the document has bean printed");    }}

Test:

public class Test {    public static void main(String[] args) {        Thread[] threads = new Thread[10];                PrintQueue printQueue = new PrintQueue();                for(int i = 0 ; i < 10 ; i++){            threads[i] = new Thread(new Job(printQueue),"Thread_" + i);        }                for(int i = 0 ; i < 10 ; i++){            threads[i].start();        }    }}

運行結果:

Thread_0 Going to print a jobThread_0PrintQueue : Printing a job during 1Thread_4 Going to print a jobThread_1 Going to print a jobThread_2 Going to print a jobThread_3 Going to print a jobThread_0 the document has bean printedThread_4PrintQueue : Printing a job during 7Thread_4 the document has bean printedThread_1PrintQueue : Printing a job during 1Thread_2PrintQueue : Printing a job during 3Thread_1 the document has bean printedThread_2 the document has bean printedThread_3PrintQueue : Printing a job during 1Thread_3 the document has bean printed

參考資料

1、Java多線程系列--“JUC鎖”11之 Semaphore信號量的原理和示例

2、java信號量控制線程打印順序的示例分享

3、JAVA多線程--信號量(Semaphore)


發表評論 共有條評論
用戶名: 密碼:
驗證碼: 匿名發表
主站蜘蛛池模板: 梅州市| 华亭县| 定襄县| 遵化市| 保德县| 南开区| 平顺县| 靖州| 镇赉县| 铜梁县| 玉溪市| 桓台县| 苗栗市| 富源县| 嘉义市| 图木舒克市| 黄石市| 兴文县| 云阳县| 平山县| 平利县| 思南县| 和平县| 易门县| 定南县| 隆尧县| 宁德市| 通城县| 武山县| 长乐市| 平和县| 芦山县| 漯河市| 临澧县| 万州区| 新余市| 洞口县| 南陵县| 名山县| 惠安县| 东山县|