信號量Semaphore是一個控制訪問多個共享資源的計數器,它本質上是一個“共享鎖”。
java并發提供了兩種加鎖模式:共享鎖和獨占鎖。前面LZ介紹的ReentrantLock就是獨占鎖。對于獨占鎖而言,它每次只能有一個線程持有,而共享鎖則不同,它允許多個線程并行持有鎖,并發訪問共享資源。
獨占鎖它所采用的是一種悲觀的加鎖策略, 對于寫而言為了避免沖突獨占是必須的,但是對于讀就沒有必要了,因為它不會影響數據的一致性。如果某個只讀線程獲取獨占鎖,則其他讀線程都只能等待了,這種情況下就限制了不必要的并發性,降低了吞吐量。而共享鎖則不同,它放寬了加鎖的條件,采用了樂觀鎖機制,它是允許多個讀線程同時訪問同一個共享資源的。
Semaphore,在API中是這樣介紹的,一個計數信號量。從概念上講,信號量維護了一個許可集。如有必要,在許可可用前會阻塞每一個 acquire(),然后再獲取該許可。每個 release() 添加一個許可,從而可能釋放一個正在阻塞的獲取者。但是,不使用實際的許可對象,Semaphore 只對可用許可的號碼進行計數,并采取相應的行動。
Semaphore 通常用于限制可以訪問某些資源(物理或邏輯的)的線程數目。下面LZ以理發為例來簡述Semaphore。
為了簡單起見,我們假設只有三個理發師、一個接待人。一開始來了五個客人,接待人則安排三個客人進行理發,其余兩個人必須在那里等著,此后每個來理發店的人都必須等待。一段時間后,一個理發師完成理發后,接待人則安排另一個人(公平還是非公平機制呢??)來理發。在這里理發師則相當于公共資源,接待人則相當于信號量(Semaphore),客戶相當于線程。
進一步講,我們確定信號量Semaphore是一個非負整數(>=1)。當一個線程想要訪問某個共享資源時,它必須要先獲取Semaphore,當Semaphore >0時,獲取該資源并使Semaphore – 1。如果Semaphore值 = 0,則表示全部的共享資源已經被其他線程全部占用,線程必須要等待其他線程釋放資源。當線程釋放資源時,Semaphore則+1;
當信號量Semaphore = 1 時,它可以當作互斥鎖使用。其中0、1就相當于它的狀態,當=1時表示其他線程可以獲取,當=0時,排他,即其他線程必須要等待。
Semaphore的結構如下:
從上面可以看出,Semaphore和ReentrantLock一樣,都是包含公平鎖(FairySync)和非公平鎖(NonfairSync),兩個鎖都是繼承Sync,而Sync也是繼承自AQS。其構造函數如下:
/** * 創建具有給定的許可數和非公平的公平設置的 Semaphore。 */ public Semaphore(int permits) { sync = new NonfairSync(permits); } /** * 創建具有給定的許可數和給定的公平設置的 Semaphore。 */ public Semaphore(int permits, boolean fair) { sync = fair ? new FairSync(permits) : new NonfairSync(permits); }
在ReentrantLock中已經闡述過,公平鎖和非公平鎖獲取鎖機制的差別:對于公平鎖而言,如果當前線程不在CLH隊列的頭部,則需要排隊等候,而非公平鎖則不同,它無論當前線程處于CLH隊列的何處都會直接獲取鎖。所以公平信號量和非公平信號量的區別也一樣。
public void acquire() throws InterruptedException { sync.acquireSharedInterruptibly(1); } public final void acquireSharedInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); if (tryAcquireShared(arg) < 0) doAcquireSharedInterruptibly(arg); }
對于公平信號量和非公平信號量,他們機制的差異就體現在traAcquireShared()方法中:
公平鎖
PRotected int tryAcquireShared(int acquires) { for (;;) { //判斷該線程是否位于CLH隊列的列頭,如果是的話返回 -1,調用doAcquireSharedInterruptibly() if (hasQueuedPredecessors()) return -1; //獲取當前的信號量許可 int available = getState(); //設置“獲得acquires個信號量許可之后,剩余的信號量許可數” int remaining = available - acquires; //如果剩余信號量 > 0 ,則設置“可獲取的信號量”為remaining if (remaining < 0 || compareAndSetState(available, remaining)) return remaining; } }
tryAcquireShared是嘗試獲取 信號量,remaining表示下次可獲取的信號量。
對于hasQueuedPredecessors、compareAndSetState在ReentrantLock中已經闡述了,hasQueuedPredecessors用于判斷該線程是否位于CLH隊列列頭,compareAndSetState用于設置state的,它是進行原子操作的。代碼如下:
public final boolean hasQueuedPredecessors() { Node t = tail; // Read fields in reverse initialization order Node h = head; Node s; return h != t && ((s = h.next) == null || s.thread != Thread.currentThread()); } protected final boolean compareAndSetState(int expect, int update) { return unsafe.compareAndSwapInt(this, stateOffset, expect, update); }
doAcquireSharedInterruptibly源代碼如下:
private void doAcquireSharedInterruptibly(int arg) throws InterruptedException { /* * 創建CLH隊列的node節點,Node.SHARED表示該節點為共享鎖 */ final Node node = addWaiter(Node.SHARED); boolean failed = true; try { for (;;) { //獲取該節點的前繼節點 final Node p = node.predecessor(); //當p為頭節點時,基于公平鎖機制,線程嘗試獲取鎖 if (p == head) { //嘗試獲取鎖 int r = tryAcquireShared(arg); if (r >= 0) { setHeadAndPropagate(node, r); p.next = null; // help GC failed = false; return; } } //判斷當前線程是否需要阻塞,如果阻塞的話,則一直處于阻塞狀態知道獲取共享鎖為止 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } }
doAcquireSharedInterruptibly主要是做兩個工作;1、嘗試獲取共享鎖,2、阻塞線程直到線程獲取共享鎖。
addWaiter(Node.SHARED):創建”當前線程“的Node節點,且Node中記錄的鎖的類型是”共享鎖“(Node.SHARED);并將該節點添加到CLH隊列末尾。
shouldParkAfterFailedAcquire:如果在嘗試獲取鎖失敗之后,線程應該等待,返回true;否則返回false。
parkAndCheckInterrupt:當前線程會進入等待狀態,直到獲取到共享鎖才繼續運行。
對于addWaiter、shouldParkAfterFailedAcquire、parkAndCheckInterruptLZ在“【Java并發編程實戰】-----“J.U.C”:ReentrantLock之二lock方法分析”中詳細介紹了。
非公平鎖
對于非公平鎖就簡單多了,她沒有那些所謂的要判斷是不是CLH隊列的列頭,如下:
final int nonfairTryAcquireShared(int acquires) { for (;;) { int available = getState(); int remaining = available - acquires; if (remaining < 0 || compareAndSetState(available, remaining)) return remaining; } }
在非公平鎖中,tryAcquireShared直接調用AQS的nonfairTryAcquireShared()。通過上面的代碼我可看到非公平鎖并沒有通過if (hasQueuedPredecessors())這樣的條件來判斷該節點是否為CLH隊列的頭節點,而是直接判斷信號量。
信號量Semaphore的釋放和獲取不同,它沒有分公平鎖和非公平鎖。如下:
public void release() { sync.releaseShared(1); } public final boolean releaseShared(int arg) { //嘗試釋放共享鎖 if (tryReleaseShared(arg)) { doReleaseShared(); return true; } return false; }
release()釋放線索所占有的共享鎖,它首先通過tryReleaseShared嘗試釋放共享鎖,如果成功直接返回,如果失敗則調用doReleaseShared來釋放共享鎖。
tryReleaseShared:
protected final boolean tryReleaseShared(int releases) { for (;;) { int current = getState(); //信號量的許可數 = 當前信號許可數 + 待釋放的信號許可數 int next = current + releases; if (next < current) // overflow throw new Error("Maximum permit count exceeded"); //設置可獲取的信號許可數為next if (compareAndSetState(current, next)) return true; } }
doReleaseShared:
private void doReleaseShared() { for (;;) { //node 頭節點 Node h = head; //h != null,且h != 尾節點 if (h != null && h != tail) { //獲取h節點對應線程的狀態 int ws = h.waitStatus; //若h節點狀態為SIGNAL,表示h節點的下一個節點需要被喚醒 if (ws == Node.SIGNAL) { //設置h節點狀態 if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; //喚醒h節點對應的下一個節點 unparkSuccessor(h); } //若h節點對應的狀態== 0 ,則設置“文件點對應的線程所擁有的共享鎖”為其它線程獲取鎖的空狀態 else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; } //h == head時,則退出循環,若h節點發生改變時則循環繼續 if (h == head) break; } }
在這里有關的方法,請參考:【Java并發編程實戰】-----“J.U.C”:ReentrantLock之三unlock方法分析。
該實例來源于《java7并發編程實戰手冊》
打印任務:
public class PrintQueue { private final Semaphore semaphore; //聲明信號量 public PrintQueue(){ semaphore = new Semaphore(1); } public void printJob(Object document){ try { semaphore.acquire();//調用acquire獲取信號量 long duration = (long) (Math.random() * 10); System.out.println( Thread.currentThread().getName() + "PrintQueue : Printing a job during " + duration); Thread.sleep(duration); } catch (InterruptedException e) { e.printStackTrace(); } finally{ semaphore.release(); //釋放信號量 } }}
Job:
public class Job implements Runnable{ private PrintQueue printQueue; public Job(PrintQueue printQueue){ this.printQueue = printQueue; } @Override public void run() { System.out.println(Thread.currentThread().getName() + " Going to print a job"); printQueue.printJob(new Object()); System.out.println(Thread.currentThread().getName() + " the document has bean printed"); }}
Test:
public class Test { public static void main(String[] args) { Thread[] threads = new Thread[10]; PrintQueue printQueue = new PrintQueue(); for(int i = 0 ; i < 10 ; i++){ threads[i] = new Thread(new Job(printQueue),"Thread_" + i); } for(int i = 0 ; i < 10 ; i++){ threads[i].start(); } }}
運行結果:
Thread_0 Going to print a jobThread_0PrintQueue : Printing a job during 1Thread_4 Going to print a jobThread_1 Going to print a jobThread_2 Going to print a jobThread_3 Going to print a jobThread_0 the document has bean printedThread_4PrintQueue : Printing a job during 7Thread_4 the document has bean printedThread_1PrintQueue : Printing a job during 1Thread_2PrintQueue : Printing a job during 3Thread_1 the document has bean printedThread_2 the document has bean printedThread_3PrintQueue : Printing a job during 1Thread_3 the document has bean printed
參考資料
1、Java多線程系列--“JUC鎖”11之 Semaphore信號量的原理和示例
2、java信號量控制線程打印順序的示例分享
3、JAVA多線程--信號量(Semaphore)
新聞熱點
疑難解答