ConcurrentLinkedQueue源碼分析 作者 周盛帆
ConcurrentLinkedQueue是一個基于鏈接節(jié)點的無界線程安全隊列,采用FIFO的規(guī)則對節(jié)點進(jìn)行排序,當(dāng)我們添加一個元素的時候,它會添加到隊列的尾;當(dāng)我們獲取一個元素時,它會返回隊列頭部的元素。用CAS實現(xiàn)非阻塞的線程安全隊列。 ConcurrentLinkedQueue 的非阻塞算法實現(xiàn)主要可概括為下面幾點:
1 . 使用 CAS 原子指令來處理對數(shù)據(jù)的并發(fā)訪問,這是非阻塞算法得以實現(xiàn)的基礎(chǔ)。 2. head/tail 并非總是指向隊列的頭 / 尾節(jié)點,也就是說允許隊列處于不一致狀態(tài)。 這個特性把入隊 / 出隊時,原本需要一起原子化執(zhí)行的兩個步驟分離開來,從而縮小了入隊 / 出隊時需要原子化更新值的范圍到唯一變量。這是非阻塞算法得以實現(xiàn)的關(guān)鍵。 3. 以批處理方式來更新head/tail,從整體上減少入隊 / 出隊操作的開銷。
在ConcurrentLinkedQueue的源碼中,有一段紅字規(guī)定了的一些基本不變性條件
1. 在入隊時最后一個結(jié)點中的next域為null
2. 隊列中的所有未刪除結(jié)點的item域不能為null且從head都可以在O(N)時間內(nèi)遍歷到
3. 對于要刪除的結(jié)點,不是將其引用直接置為空,而是將其的item域先置為null(迭代器在遍歷是會跳過item為null的結(jié)點)
4. 允許head和tail滯后更新,也就是上文提到的head/tail并非總是指向隊列的頭 / 尾節(jié)點(這主要是為了減少CAS指令執(zhí)行的次數(shù),但同時會增加volatile讀的次數(shù),但是這種消耗較小)。具體而言就是,當(dāng)在隊列中插入一個元素是,會檢測tail和最后一個結(jié)點之間的距離是否在兩個結(jié)點及以上(內(nèi)部稱之為hop);而在出隊時,對head的檢測就是與隊列的第一個結(jié)點的距離是否達(dá)到兩個,有則將head指向第一個結(jié)點并將head原來指向的結(jié)點的next域指向自己,這樣就能斷開與隊列的聯(lián)系從而幫助GC
head的不變性和可變性條件
不變性:
1. 所有未刪除節(jié)點,都能從head通過調(diào)用succ()方法遍歷可達(dá)。
2. head不能為null。
3. head節(jié)點的next域不能引用到自身。
可變性:
1. head節(jié)點的item域可能為null,也可能不為null。
2. 允許tail滯后(lag behind)于head,也就是說:從head開始遍歷隊列,不一定能到達(dá)tail。
tail的不變性和可變性條件
不變性:
1. 通過tail調(diào)用succ()方法,最后節(jié)點總是可達(dá)的。
2. tail不能為null。
可變性:
1. tail節(jié)點的item域可能為null,也可能不為 null。
2. 允許tail滯后于head,也就是說:從head開始遍歷隊列,不一定能到達(dá)tail。
3.tail節(jié)點的next域可以引用到自身。
ConcurrentLinkedQueue是由head節(jié)點和tail節(jié)點組成,每個節(jié)點有節(jié)點元素item和指向下一個節(jié)點的next的引用組成,節(jié)點與節(jié)點之間就是通過這個next關(guān)聯(lián)起來,從而組成一張鏈表結(jié)構(gòu)的隊列。默認(rèn)情況下head節(jié)點存儲的元素為空,tail節(jié)點等于head節(jié)點。
PRivate transient volatile Node<E> head; private transient volatile Node<E> tail;Node節(jié)點的構(gòu)成如下:
private static class Node<E> { volatile E item; volatile Node<E> next; /** * Constructs a new node. Uses relaxed write because item can * only be seen after publication via casNext. */ Node(E item) { UNSAFE.putObject(this, itemOffset, item); } boolean casItem(E cmp, E val) { return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val); } void lazySetNext(Node<E> val) { UNSAFE.putOrderedObject(this, nextOffset, val); } boolean casNext(Node<E> cmp, Node<E> val) { return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val); } // Unsafe mechanics private static final sun.misc.Unsafe UNSAFE; private static final long itemOffset; private static final long nextOffset; static { try { UNSAFE = sun.misc.Unsafe.getUnsafe(); Class k = Node.class; itemOffset = UNSAFE.objectFieldOffset (k.getDeclaredField("item")); nextOffset = UNSAFE.objectFieldOffset (k.getDeclaredField("next")); } catch (Exception e) { throw new Error(e); } } }
添加元素1:隊列更新head節(jié)點的next節(jié)點為元素1。因為tail節(jié)點默認(rèn)情況下等于head節(jié)點,所以它們的next節(jié)點都指向元素1節(jié)點 添加元素2:隊列首先設(shè)置元素1節(jié)點的next節(jié)點為元素2,然后更新tail節(jié)點指向元素2 添加元素3:設(shè)置tail節(jié)點的next節(jié)點為元素3節(jié)點 添加元素4:設(shè)置元素3的next節(jié)點為元素4,然后將tail節(jié)點指向元素4節(jié)點。
從以上描述可以知道,發(fā)現(xiàn)入隊過程主要做兩件事情:第一是將入隊節(jié)點設(shè)置成當(dāng)前隊列最后一個節(jié)點的next節(jié)點;第二是更新tail節(jié)點,如果tail節(jié)點的next節(jié)點不為空,則將新添加節(jié)點設(shè)置成tail節(jié)點,如果tail節(jié)點的next節(jié)點為空,則將新添加節(jié)點設(shè)置成tail節(jié)點的next節(jié)點。
在并發(fā)情況下,如果有一個線程正在入隊,那么它必須先獲取尾節(jié)點,然后設(shè)置尾節(jié)點的下一個節(jié)點為入隊節(jié)點,但這時可能有另外一個線程插隊了,那么隊列的尾節(jié)點就會發(fā)送變化,這是當(dāng)前線程要暫停入隊操作,然后重新獲取尾節(jié)點。
public boolean offer(E e) { checkNotNull(e); final Node<E> newNode = new Node<E>(e); for (Node<E> t = tail, p = t;;) {//1 Node<E> q = p.next; if (q == null) {//2 if (p.casNext(null, newNode)) {//3 if (p != t) //4 casTail(t, newNode); //更新失敗了也是沒事的,因為表示有其他線程成功更新了tail節(jié)點 return true; } //其他線程搶先完成入隊,需要重新嘗試 } else if (p == q)//5 p = (t != (t = tail)) ? t : head; else // 在兩跳之后檢查尾部更新. p = (p != t && t != (t = tail)) ? t : q; //6 } }1:對于入隊操作,采用失敗即重試的方式,直到入隊成功 2:表明p是最后一個結(jié)點 3:采用CAS指令修改隊列的最后一個結(jié)點的next域,從而保證最后一個結(jié)點是新插入的結(jié)點,同時將p指向這個新結(jié)點 4:如果插入結(jié)點后tail和p距離達(dá)到兩個結(jié)點,則修改tail的指向(失敗也沒關(guān)系),這里在判斷tail為最后一個結(jié)點后仍然要判斷hop是否達(dá)到2主要是為了預(yù)防在并發(fā)修改下,多個線程同時修改的問題 5:根據(jù)tail的可變性條件和滯后更新策略,我們知道tail的next域可以引用到自身,在ConcurrentLinkedQueue規(guī)定如果tail的next如果指向自己的話,則表明tail現(xiàn)在所在指向的結(jié)點已被刪除(從head遍歷無法到達(dá)tail),那么就要從head開始遍歷到所有的未刪除結(jié)點(這也是上文head的不變性條件保證的)具體看下圖:
當(dāng)然,我們還是要判斷其他線程是否已經(jīng)提前修改tail的指向,修改的話就表明tail結(jié)點已經(jīng)更新完畢,沒有引用到自身了,就可以直接重新嘗試插入了。其實從這我們大致可以揣摩出作者的設(shè)計的巧妙部分:即雖然tail有滯后更新策略從而導(dǎo)致無法一次就將結(jié)點插入,但結(jié)點要想插入的話還是必須要當(dāng)tail為最后一個結(jié)點才行 6:tail未指向尾結(jié)點,同時也沒有滯后head,就像下圖這樣: 插入前:
這時候表明tail結(jié)點還未更新,但需要事先判斷其他線程是否可能搶先插入了一個結(jié)點,如下圖: 其它線程搶先插入后:
在這種情況下如果插入元素的話導(dǎo)致tail和最后一個結(jié)點的距離達(dá)到兩個,就要更新tail的指向(不得不承認(rèn)這句代碼的簡潔性,但還是要吐槽一下,從可讀性的角度和JDK6.0的版本比起來實在是難以理解),并且tail已經(jīng)指向尾結(jié)點,說明下一個結(jié)點可以直接將tail賦給p以便重新嘗試插入。 其實仔細(xì)分析的話就可以明白多個if判斷表明tail的三種可能狀態(tài):
1.tail滯后于 head。
2. tail指向尾結(jié)點。
3. tail指向非尾結(jié)點。
*出隊列*
public E poll(){ restartFromHead: for (;;) { for (Node<E> h = head, p = h, q;;) { E item = p.item; if (item != null && p.casItem(item, null)) //1 { if (p != h) //2 updateHead(h, ((q = p.next) != null) ? q : p); return item; } else if ((q = p.next) == null) //3 { updateHead(h, p); return null; } else if (p == q) //4 continue restartFromHead; else //5 p = q; } } }1:在獲取head結(jié)點后,如果item不為null的話將其設(shè)為null實現(xiàn)刪除頭結(jié)點(這是一個特殊的刪除策略,即item為null的結(jié)點就是已經(jīng)刪除的結(jié)點,即使它還在隊列中)
2:刪除該結(jié)點后檢查head是否與頭結(jié)點相差兩個結(jié)點,有則向后推進(jìn)一個item非null結(jié)點來更新head 
3:head的item為null則向后選取一個結(jié)點,如果item為null的結(jié)點,設(shè)置head指向p節(jié)點(此時隊列沒有元素,只有一個偽結(jié)點p) 4:結(jié)點出隊失敗,重新進(jìn)行出隊(關(guān)于p == q的判斷條件我是在有點難以理解,在此只能作一個不負(fù)責(zé)任的猜測:就是上一次判斷先執(zhí)行了步驟5,使得p和q指向同一個item不為null的結(jié)點,在下一次循環(huán)開始前其它線程線程先刪除了該結(jié)點導(dǎo)致步驟4的發(fā)生,這樣的話就要重新獲取head進(jìn)行刪除) A線程執(zhí)行步驟5后(為了方便沒有畫出tail,再次聲明,只是個人觀點):
B線程搶先刪除結(jié)點后A線程執(zhí)行步驟4:
5:在結(jié)點出隊失敗后可以保證下次嘗試出隊時p不為空(之前q = p.next != null才有可能跳到這一步)
根據(jù)head的不變性和可變性條件,在執(zhí)行出隊操作前,head在隊列中的位置共有兩種可能:
1. head指向有效結(jié)點(從head向后遍歷可達(dá)的結(jié)點當(dāng)中,item域不為null的結(jié)點)
2. head指向無效結(jié)點(從head向后遍歷可達(dá)的結(jié)點當(dāng)中,item域為null的結(jié)點)
有些人在判斷隊列是否為空時喜歡用
queue.size()==0public int size() { int count = 0; for (Node<E> p = first(); p != null; p = succ(p)) if (p.item != null) // Collection.size() spec says to max out if (++count == Integer.MAX_VALUE) break; return count; }在計算隊列的長度是并沒有向我們往常一樣直接通過一個變量來存儲,這樣主要是要盡可能保證隊列在并發(fā)訪問下的數(shù)據(jù)的正確性,但由于遍歷時還是會有其它線程對隊列的狀態(tài)進(jìn)行修改,因而數(shù)據(jù)仍有可能錯誤(removeAll,retainAll,containsAll,equals,toArray也有一樣的問題) 可以看到這樣在隊列在結(jié)點較多時會依次遍歷所有結(jié)點,這樣的性能會有較大影響,因而可以考慮empty函數(shù),它只要判斷第一個結(jié)點(注意不一定是head指向的結(jié)點)
public boolean isEmpty(){ return first() == null;}ConcurrentLinkedQueue的迭代器是弱一致性的,這在并發(fā)容器中是比較普遍的現(xiàn)象,主要是指在一個線程在遍歷隊列結(jié)點而另一個線程嘗試對某個隊列結(jié)點進(jìn)行修改的話不會拋出ConcurrentModificationException,這也就造成在遍歷某個尚未被修改的結(jié)點時,在next方法返回時可以看到該結(jié)點的修改,但在遍歷后再對該結(jié)點修改時就看不到這種變化。特別注意的是ConcurrentLinkedQueue提供的線程安全操作只是相對安全的,即只對單個函數(shù)調(diào)用所涉及的操作提供安全性
新聞熱點
疑難解答