Java集合框架中的Map類型的數據結構是非線程安全,在多線程環境中使用時需要手動進行線程同步。因此在java.util.concurrent包中提供了一個線程安全版本的Map類型數據結構:ConcurrentMap。本篇文章主要關注ConcurrentMap接口以及它的Hash版本的實現ConcurrentHashMap。
ConcurrentMap是Map接口的子接口
public interface ConcurrentMap<K, V> extends Map<K, V>
與Map接口相比,ConcurrentMap多了4個方法:
1)putIfAbsent方法:如果key不存在,添加key-value。方法會返回與key關聯的value。
V putIfAbsent(K key, V value);
2)remove方法
boolean remove(Object key, Object value);
Map接口中也有一個remove方法:
V remove(Object key);
ConcurrentMap中的remove方法需要比較原有的value和參數中的value是否一致,只有一致才會刪除。
3)Replace方法:有2個重載
boolean replace(K key, V oldValue, V newValue);V replace(K key, V value);
兩個重載的區別和2)中的兩個remove方法的區別很類似,多了一個檢查value一致。
通過ConcurrentMap多出來的方法可以看到多線程中一個很重要的概念:compare。compare的作用就是為了保證value的一致性。
重頭戲來了:ConcurrentHashMap。
public class ConcurrentHashMap<K, V> extends AbstractMap<K, V> implements ConcurrentMap<K, V>, Serializable
ConcurrentHashMap和HashMap類似,這里重點關注的是如何實現線程安全,也就是如何加鎖。
對于HashMap來說,有一個Entry數組,根據Key的hash值對數組長度取模得到數組下標,找到Entry,遍歷整個Entry鏈表,用equals比較來確定key所在的Entry。
ConcurrentHashMap的基本思想是采取分塊的方式加鎖,分塊數由參數“concurrencyLevel”來決定(和HashMap中的“initialCapacity”類似,實際塊數是第一個大于concurrencyLevel的2的n次方)。每個分塊被稱為Segment,Segment的索引方式和HashMap中的Entry索引方式一致(hash值對數組長度取模)。
對Segment加鎖的方式很簡單,直接把Segment定義為ReentrantLock的子類。同時Segment又是一個特定實現的hash table。
static final class Segment<K,V> extends ReentrantLock implements Serializable
下面分析ConcurrentHashMap讀寫時如何加鎖。
首先是讀操作類的方法,來看get方法:
public V get(Object key) { Segment<K,V> s; // manually integrate access methods to reduce overhead HashEntry<K,V>[] tab; int h = hash(key); long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE; if ((s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u)) != null && (tab = s.table) != null) { for (HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE.getObjectVolatile (tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE); e != null; e = e.next) { K k; if ((k = e.key) == key || (e.hash == h && key.equals(k))) return e.value; } } return null; }可以看到,讀取的時候沒有調用的Segment的獲取鎖的方法,而是通過hash值定位到Entry,然后遍歷Entry的鏈表。為什么這里不用加鎖呢?看看HashEntry的代碼就會明白了。
static final class HashEntry<K,V> { final int hash; final K key; volatile V value; volatile HashEntry<K,V> next;value和next屬性是帶有volatile修飾符的,可以大膽放心的遍歷和比較。
接著是寫操作,寫操作是肯定要加鎖的。因為Segment可以看成是一個hash table,因此ConcurrentHashMap直接調用Segment的對應的寫入方法如put,replace等。
比如put方法
public V put(K key, V value) { Segment<K,V> s; if (value == null) throw new NullPointerException(); int hash = hash(key); int j = (hash >>> segmentShift) & segmentMask; if ((s = (Segment<K,V>)UNSAFE.getObject // nonvolatile; recheck (segments, (j << SSHIFT) + SBASE)) == null) // in ensureSegment s = ensureSegment(j); return s.put(key, hash, value, false); }因此這里直接關注Segment的對應寫操作方法即可。在每個寫操作的方法開頭都這樣的類似代碼:
final V remove(Object key, int hash, Object value) { if (!tryLock()) scanAndLock(key, hash);HashEntry<K,V> node = tryLock() ? null : scanAndLockForPut(key, hash, value);
也就是,首先嘗試獲取鎖,如果成功則會帶鎖繼續操作,失敗則要通過scanAndLock或scanAndLockForPut獲取鎖,因此這里關注的重點也就轉移到這兩個方法了。
按照多線程環境的規則,如果嘗試獲取鎖失敗的話就會進入阻塞等待狀態,那么這兩個方法的作用應該是類似的。
PRivate HashEntry<K,V> scanAndLockForPut(K key, int hash, V value) { HashEntry<K,V> first = entryForHash(this, hash); HashEntry<K,V> e = first; HashEntry<K,V> node = null; int retries = -1; // negative while locating node while (!tryLock()) { HashEntry<K,V> f; // to recheck first below if (retries < 0) { if (e == null) { if (node == null) // speculatively create node node = new HashEntry<K,V>(hash, key, value, null); retries = 0; } else if (key.equals(e.key)) retries = 0; else e = e.next; } else if (++retries > MAX_SCAN_RETRIES) { lock(); break; } else if ((retries & 1) == 0 && (f = entryForHash(this, hash)) != first) { e = first = f; // re-traverse if entry changed retries = -1; } } return node; }
這兩個方法的邏輯:在等待的時候閑著沒事兒干把該做好的準備做好,查找一下目標entry,如果是新建entry就把entry創建好,然后如果一切沒問題就用lock()方法把自己給阻塞了,也就是做好準備然后去等著了。
因為Segment本身就可以看成一個hash table,因此必然涉及rehash的問題,因為和HashMap中的rehash類似,在這里就省略了。
新聞熱點
疑難解答