国产探花免费观看_亚洲丰满少妇自慰呻吟_97日韩有码在线_资源在线日韩欧美_一区二区精品毛片,辰东完美世界有声小说,欢乐颂第一季,yy玄幻小说排行榜完本

首頁(yè) > 開(kāi)發(fā) > Java > 正文

詳細(xì)分析Java并發(fā)集合LinkedBlockingQueue的用法

2024-07-13 10:17:34
字體:
來(lái)源:轉(zhuǎn)載
供稿:網(wǎng)友

在上一章我們講解了ArrayBlockingQueue,用數(shù)組形式實(shí)現(xiàn)的阻塞隊(duì)列。

數(shù)組的長(zhǎng)度在創(chuàng)建時(shí)就必須確定,如果數(shù)組長(zhǎng)度小了,那么ArrayBlockingQueue隊(duì)列很容易就被阻塞,如果數(shù)組長(zhǎng)度大了,就容易浪費(fèi)內(nèi)存。

而隊(duì)列這個(gè)數(shù)據(jù)結(jié)構(gòu)天然適合用鏈表這個(gè)形式,而LinkedBlockingQueue就是使用鏈表方式實(shí)現(xiàn)的阻塞隊(duì)列。

一. 鏈表實(shí)現(xiàn)

1.1 Node內(nèi)部類(lèi)

  /**   * 鏈表的節(jié)點(diǎn),同時(shí)也是通過(guò)它來(lái)實(shí)現(xiàn)一個(gè)單向鏈表   */  static class Node<E> {    E item;    // 指向鏈表的下一個(gè)節(jié)點(diǎn)    Node<E> next;    Node(E x) { item = x; }  }

有一個(gè)變量e儲(chǔ)存數(shù)據(jù),有一個(gè)變量next指向下一個(gè)節(jié)點(diǎn)的引用。它可以實(shí)現(xiàn)最簡(jiǎn)單地單向列表。

1.2 怎樣實(shí)現(xiàn)鏈表

   /**   * 它的next指向隊(duì)列頭,這個(gè)節(jié)點(diǎn)不存儲(chǔ)數(shù)據(jù)   */  transient Node<E> head;  /**   * 隊(duì)列尾節(jié)點(diǎn)   */  private transient Node<E> last;

要實(shí)現(xiàn)鏈表,必須有兩個(gè)變量,一個(gè)表示鏈表頭head,一個(gè)表示鏈表尾last。head和last都會(huì)在LinkedBlockingQueue對(duì)象創(chuàng)建的時(shí)候被初始化。

last = head = new Node<E>(null);

注意這里用了一個(gè)小技巧,鏈表頭head節(jié)點(diǎn)并沒(méi)有存放數(shù)據(jù),它指向的下一個(gè)節(jié)點(diǎn),才真正存儲(chǔ)了鏈表中第一個(gè)數(shù)據(jù)。而鏈表尾last的確儲(chǔ)存了鏈表最后一個(gè)數(shù)據(jù)。

1.3 插入和刪除節(jié)點(diǎn)

 /**   * 向隊(duì)列尾插入節(jié)點(diǎn)   */  private void enqueue(Node<E> node) {    // assert putLock.isHeldByCurrentThread(); // 當(dāng)前線程肯定獲取了putLock鎖    // 將原隊(duì)列尾節(jié)點(diǎn)的next引用指向新節(jié)點(diǎn)node,然后再將node節(jié)點(diǎn)設(shè)置成隊(duì)列尾節(jié)點(diǎn)last    // 這樣就實(shí)現(xiàn)了向隊(duì)列尾插入節(jié)點(diǎn)    last = last.next = node;  }

在鏈表尾插入節(jié)點(diǎn)很簡(jiǎn)單,將原隊(duì)列尾last的下一個(gè)節(jié)點(diǎn)next指向新節(jié)點(diǎn)node,再將新節(jié)點(diǎn)node賦值給隊(duì)列尾last節(jié)點(diǎn)。這樣就實(shí)現(xiàn)了插入一個(gè)新節(jié)點(diǎn)。

  // 移除隊(duì)列頭節(jié)點(diǎn),并返回被刪除的節(jié)點(diǎn)數(shù)據(jù)  private E dequeue() {    // assert takeLock.isHeldByCurrentThread(); // 當(dāng)前線程肯定獲取了takeLock鎖    // assert head.item == null;    Node<E> h = head;    // first節(jié)點(diǎn)中才存儲(chǔ)了隊(duì)列中第一個(gè)元素的數(shù)據(jù)    Node<E> first = h.next;    h.next = h; // help GC    // 設(shè)置新的head值,相當(dāng)于刪除了first節(jié)點(diǎn)。因?yàn)閔ead節(jié)點(diǎn)本身不儲(chǔ)存數(shù)據(jù)    head = first;    // 隊(duì)列頭的數(shù)據(jù)    E x = first.item;    // 移除原先的數(shù)據(jù)    first.item = null;    return x;  }

要注意head并不是鏈表頭,它的next才是指向鏈表頭,所以刪除鏈表頭也很簡(jiǎn)單,就是將head.next賦值給head,然后返回原先head.next節(jié)點(diǎn)的數(shù)據(jù)。

刪除的時(shí)候,就要注意鏈表為空的情況。head.next的值使用enqueue方法添加的。當(dāng)head.next==last的時(shí)候,表示已經(jīng)刪除到最后一個(gè)元素了,當(dāng)head.next==null的時(shí)候,就不能刪除了,因?yàn)殒湵硪呀?jīng)為空了。這里沒(méi)有做判斷,是因?yàn)樵谡{(diào)用dequeue方法的地方已經(jīng)做過(guò)判斷了。

二. 同步鎖ReentrantLock和條件Condition

因?yàn)樽枞?duì)列在隊(duì)列為空和隊(duì)列已滿的情況下,都必須阻塞等待,那么就天然需要兩個(gè)條件。而為了保證多線程并發(fā)安全,又需要一個(gè)同步鎖。這個(gè)在ArrayBlockingQueue中已經(jīng)說(shuō)過(guò)了。

這里我們來(lái)說(shuō)說(shuō)LinkedBlockingQueue不一樣的地方。

  /** 獨(dú)占鎖,用于處理插入隊(duì)列操作的并發(fā)問(wèn)題,即put與offer操作 */  private final ReentrantLock putLock = new ReentrantLock();  /** 隊(duì)列不滿的條件Condition,它是由putLock鎖生成的 */  private final Condition notFull = putLock.newCondition();  /** 獨(dú)占鎖,用于處理刪除隊(duì)列頭操作的并發(fā)問(wèn)題,即take與poll操作 */  private final ReentrantLock takeLock = new ReentrantLock();  /** 隊(duì)列不為空的條件Condition, 它使用takeLock鎖生成的 */  private final Condition notEmpty = takeLock.newCondition();

2.1 putLock與takeLock

我們發(fā)現(xiàn)使用了兩把鎖:

  1. putLock 同步所有插入元素的操作,即put與offer系列方法的操作。
  2. takeLock 同步刪除和獲取元素的操作,即take與poll系列方法操作。

按照上面的說(shuō)法,可能會(huì)出現(xiàn)同時(shí)插入元素和刪除元素的操作,那么就不會(huì)出現(xiàn)問(wèn)題么?

我們來(lái)具體分析一個(gè)下,對(duì)于隊(duì)列來(lái)說(shuō)操作分為三種:

  1. 在隊(duì)列尾插入元素。即put與offer系列方法,它們會(huì)涉及兩個(gè)變量,隊(duì)列中元素個(gè)數(shù)count,和隊(duì)列尾節(jié)點(diǎn)last。(不會(huì)涉及head節(jié)點(diǎn))
  2. 移除隊(duì)列頭元素。即take與poll系列方法,它們會(huì)涉及兩個(gè)變量,隊(duì)列中元素個(gè)數(shù)count,和head節(jié)點(diǎn)。(不會(huì)涉及隊(duì)列尾節(jié)點(diǎn)last)
  3. 查看隊(duì)列頭元素。即 element()與peek()方法,它們會(huì)涉及兩個(gè)變量,隊(duì)列中元素個(gè)數(shù)count,和head節(jié)點(diǎn)。(不會(huì)涉及隊(duì)列尾節(jié)點(diǎn)last)

因此使用putLock鎖來(lái)保持last變量的安全,使用takeLock鎖來(lái)保持head變量的安全。

對(duì)于都涉及了隊(duì)列中元素個(gè)數(shù)count變量,所以使用AtomicInteger來(lái)保證并發(fā)安全問(wèn)題。

  /** 隊(duì)列中元素的個(gè)數(shù),這里使用AtomicInteger變量,保證并發(fā)安全問(wèn)題 */  private final AtomicInteger count = new AtomicInteger();

2.2 notFull與notEmpty

  1. notFull 是由putLock鎖生成的,因?yàn)楫?dāng)插入元素時(shí),我們要判斷隊(duì)列是不是已滿。
  2. notEmpty 是由takeLock鎖生成的,因?yàn)楫?dāng)刪除元素時(shí),我們要判斷隊(duì)列是不是為空。

2.3 控制流程

當(dāng)插入元素時(shí):

  1. 先使用putLock.lockInterruptibly()保證只有一個(gè)線程進(jìn)行插入操作.
  2. 然后利用count變量,判斷隊(duì)列是否已滿.
  3. 滿了就調(diào)用notFull.await()方法,讓當(dāng)前線程等待。因?yàn)閚otFull是由putLock產(chǎn)生的,這里已經(jīng)獲取到鎖了,所以可以調(diào)用await方法。
  4. 沒(méi)滿就調(diào)用 enqueue方法,向隊(duì)列尾插入新元素。
  5. 調(diào)用count.getAndIncrement()方法,將隊(duì)列中元素個(gè)數(shù)加一,并保證多線程并發(fā)安全。
  6. 調(diào)用signalNotEmpty方法,喚醒正在等待獲取元素的線程。

當(dāng)刪除元素時(shí):

  1. 先使用takeLock.lockInterruptibly()保證只有一個(gè)線程進(jìn)行刪除操作.
  2. 然后利用count變量,判斷隊(duì)列是否為空.
  3. 隊(duì)列為空就調(diào)用notEmpty.await()方法,讓當(dāng)前線程等待。因?yàn)閚otEmpty是由takeLock產(chǎn)生的,這里已經(jīng)獲取到鎖了,所以可以調(diào)用await方法。
  4. 沒(méi)滿就調(diào)用 dequeue方法,刪除隊(duì)列頭元素。
  5. 調(diào)用count.getAndDecrement()方法,將隊(duì)列中元素個(gè)數(shù)減一,并保證多線程并發(fā)安全。
  6. 調(diào)用signalNotFull方法,喚醒正在等待插入元素的線程。

還要注意一下,Condition的signal和await方法必須在獲取鎖的情況下調(diào)用。因此就有了signalNotEmpty和signalNotFull方法:

  /**   * 喚醒在notEmpty條件下等待的線程,即移除隊(duì)列頭時(shí),發(fā)現(xiàn)隊(duì)列為空而被迫等待的線程。   * 注意,因?yàn)橐{(diào)用Condition的signal方法,必須獲取對(duì)應(yīng)的鎖,所以這里調(diào)用了takeLock.lock()方法。   * 當(dāng)隊(duì)列中插入元素(即put或offer操作),那么隊(duì)列肯定不為空,就會(huì)調(diào)用這個(gè)方法。   */  private void signalNotEmpty() {    final ReentrantLock takeLock = this.takeLock;    takeLock.lock();    try {      notEmpty.signal();    } finally {      takeLock.unlock();    }  }  /**   * 喚醒在notFull條件下等待的線程,即隊(duì)列尾添加元素時(shí),發(fā)現(xiàn)隊(duì)列已滿而被迫等待的線程。   * 注意,因?yàn)橐{(diào)用Condition的signal方法,必須獲取對(duì)應(yīng)的鎖,所以這里調(diào)用了putLock.lock()方法   * 當(dāng)隊(duì)列中刪除元素(即take或poll操作),那么隊(duì)列肯定不滿,就會(huì)調(diào)用這個(gè)方法。   */  private void signalNotFull() {    final ReentrantLock putLock = this.putLock;    putLock.lock();    try {      notFull.signal();    } finally {      putLock.unlock();    }  }

三. 插入元素方法

  public void put(E e) throws InterruptedException {    if (e == null) throw new NullPointerException();    // 記錄插入操作前元素的個(gè)數(shù)    int c = -1;    // 創(chuàng)建新節(jié)點(diǎn)node    Node<E> node = new Node<E>(e);    final ReentrantLock putLock = this.putLock;    final AtomicInteger count = this.count;    putLock.lockInterruptibly();    try {      //表示隊(duì)列已滿,那么就要調(diào)用notFull.await方法,讓當(dāng)前線程等待      while (count.get() == capacity) {        notFull.await();      }      // 向隊(duì)列尾插入新元素      enqueue(node);      // 將當(dāng)前隊(duì)列元素個(gè)數(shù)加1,并返回加1之前的元素個(gè)數(shù)。      c = count.getAndIncrement();      // c + 1 < capacity表示隊(duì)列未滿,就喚醒可能等待插入操作的線程      if (c + 1 < capacity)        notFull.signal();    } finally {      putLock.unlock();    }    // c == 0表示插入之前,隊(duì)列是空的。隊(duì)列從空到放入一個(gè)元素時(shí),    // 才喚醒等待刪除的線程    // 防止頻繁獲取takeLock鎖,消耗性能    if (c == 0)      signalNotEmpty();  }

以put方法為例,大體流程與我們前面介紹一樣,這里有一個(gè)非常怪異的代碼,當(dāng)插入完元素時(shí),如果發(fā)現(xiàn)隊(duì)列未滿,那么調(diào)用notFull.signal()喚醒等待插入的線程。

大家就很疑惑了,一般來(lái)說(shuō),這個(gè)方法應(yīng)該放在刪除元素(take系列的方法里),因?yàn)楫?dāng)我們刪除一個(gè)元素,那么隊(duì)列肯定是未滿的,那么調(diào)用notFull.signal()方法,喚醒等待插入的線程。

這里這么做主要是因?yàn)檎{(diào)用signal方法,必須先獲取對(duì)應(yīng)的鎖,而在take系列的方法里使用的鎖是takeLock,那么想調(diào)用notFull.signal()方法,必須先獲取putLock鎖,這樣的話會(huì)性能就會(huì)下降,所以用了另一種方式。

  1. 首先我們應(yīng)該知道signal方法,當(dāng)有線程在這個(gè)條件下等待時(shí),才會(huì)喚醒其中一個(gè)線程,當(dāng)沒(méi)有線程等待時(shí),這個(gè)方法相當(dāng)于什么都沒(méi)做。所以這個(gè)方法的意義是可能會(huì)喚醒等待的一個(gè)線程。
  2. 當(dāng)隊(duì)列未滿時(shí),我們都調(diào)用notFull.signal()嘗試去喚醒一個(gè)等待插入線程。而且這里已經(jīng)獲取putLock鎖了,所以不耗時(shí)。
  3. 但是有一個(gè)問(wèn)題,當(dāng)隊(duì)列已滿的時(shí)候,所有插入操作的線程,都會(huì)等待,就沒(méi)有機(jī)會(huì)調(diào)用notFull.signal()方法,那么喚醒這些等待線程呢?
  4. 喚醒這些線程的啟動(dòng)條件,必須是由刪除元素操作觸發(fā)的,因?yàn)橹挥袆h除隊(duì)列才會(huì)不滿。因?yàn)樵趖ake方法中 if (c == capacity) signalNotFull();

四. 刪除隊(duì)列頭元素

  public E take() throws InterruptedException {    E x;    int c = -1;    final AtomicInteger count = this.count;    final ReentrantLock takeLock = this.takeLock;    takeLock.lockInterruptibly();    try {      //表示隊(duì)列為空,那么就要調(diào)用notEmpty.await方法,讓當(dāng)前線程等待      while (count.get() == 0) {        notEmpty.await();      }      // 刪除隊(duì)列頭元素,并返回它      x = dequeue();      // 返回當(dāng)前隊(duì)列個(gè)數(shù),然后將隊(duì)列個(gè)數(shù)減一      c = count.getAndDecrement();      // c > 1表示隊(duì)列不為空,就喚醒可能等待刪除操作的線程      if (c > 1)        notEmpty.signal();    } finally {      takeLock.unlock();    }    /**     * c == capacity表示刪除操作之前,隊(duì)列是滿的。只有從滿隊(duì)列中刪除一個(gè)元素時(shí),     * 才喚醒等待插入的線程     * 防止頻繁獲取putLock鎖,消耗性能     */    if (c == capacity)      signalNotFull();    return x;  }

為什么調(diào)用notEmpty.signal()方法原因,對(duì)比一下我們?cè)诓迦朐胤椒ㄖ械慕忉尅?/p>

五. 查看隊(duì)列頭元素

  // 查看隊(duì)列頭元素  public E peek() {    // 隊(duì)列為空,返回null    if (count.get() == 0)      return null;    final ReentrantLock takeLock = this.takeLock;    takeLock.lock();    try {      // 獲取隊(duì)列頭節(jié)點(diǎn)first      Node<E> first = head.next;      // first == null表示隊(duì)列為空,返回null      if (first == null)        return null;      else        // 返回隊(duì)列頭元素        return first.item;    } finally {      takeLock.unlock();    }  }

查看隊(duì)列頭元素,涉及到head節(jié)點(diǎn),所以必須使用takeLock鎖。

六. 其他重要方法

6.1 remove(Object o)方法

  // 從隊(duì)列中刪除指定元素o  public boolean remove(Object o) {    if (o == null) return false;    // 因?yàn)椴皇莿h除列表頭元素,所以就涉及到head和last兩個(gè)變量,    // putLock與takeLock都要加鎖    fullyLock();    try {      // 遍歷整個(gè)隊(duì)列,p表示當(dāng)前節(jié)點(diǎn),trail表示當(dāng)前節(jié)點(diǎn)的前一個(gè)節(jié)點(diǎn)      // 因?yàn)槭菃蜗蜴湵恚孕枰涗泝蓚€(gè)節(jié)點(diǎn)      for (Node<E> trail = head, p = trail.next;         p != null;         trail = p, p = p.next) {        // 如果找到了指定元素,那么刪除節(jié)點(diǎn)p        if (o.equals(p.item)) {          unlink(p, trail);          return true;        }      }      return false;    } finally {      fullyUnlock();    }  }

從列表中刪除指定元素,因?yàn)閯h除的元素不一定在列表頭,所以可能會(huì)head和last兩個(gè)變量,所以必須同時(shí)使用putLock與takeLock兩把鎖。因?yàn)槭菃蜗蜴湵恚枰粋€(gè)輔助變量trail來(lái)記錄前一個(gè)節(jié)點(diǎn),這樣才能刪除當(dāng)前節(jié)點(diǎn)p。

6.2 unlink(Node<E> p, Node<E> trail) 方法

  // 刪除當(dāng)前節(jié)點(diǎn)p,trail代表p的前一個(gè)節(jié)點(diǎn)  void unlink(Node<E> p, Node<E> trail) {    // 將當(dāng)前節(jié)點(diǎn)的數(shù)據(jù)設(shè)置為null    p.item = null;    // 這樣就在鏈表中刪除了節(jié)點(diǎn)p    trail.next = p.next;    // 如果節(jié)點(diǎn)p是隊(duì)列尾last,而它被刪除了,那么就要將trail設(shè)置為last    if (last == p)      last = trail;    // 將元素個(gè)數(shù)減一,如果原隊(duì)列是滿的,那么就調(diào)用notFull.signal()方法    // 其實(shí)這個(gè)不用判斷直接調(diào)用的,因?yàn)檫@里肯定獲取了putLock鎖    if (count.getAndDecrement() == capacity)      notFull.signal();  }

要在鏈表中刪除一個(gè)節(jié)點(diǎn)p,只需要將p的前一個(gè)節(jié)點(diǎn)trail的next指向節(jié)點(diǎn)p的下一個(gè)節(jié)點(diǎn)next。

以上就是本文的全部?jī)?nèi)容,希望對(duì)大家的學(xué)習(xí)有所幫助,也希望大家多多支持VeVb武林網(wǎng)。


注:相關(guān)教程知識(shí)閱讀請(qǐng)移步到JAVA教程頻道。
發(fā)表評(píng)論 共有條評(píng)論
用戶名: 密碼:
驗(yàn)證碼: 匿名發(fā)表
主站蜘蛛池模板: 滨海县| 丽水市| 沙坪坝区| 长岭县| 丰原市| 平顶山市| 闸北区| 成都市| 辽宁省| 磴口县| 榕江县| 天水市| 新巴尔虎右旗| 博乐市| 巩义市| 楚雄市| 湄潭县| 贵州省| 石门县| 龙川县| 繁昌县| 横山县| 高要市| 乌拉特中旗| 双牌县| 彭州市| 云南省| 林芝县| 成安县| 当涂县| 乌鲁木齐县| 仪陇县| 婺源县| 黄梅县| 巴彦淖尔市| 集贤县| 育儿| 余江县| 镇巴县| 宝鸡市| 静宁县|