無鎖有序鏈表可以保證元素的唯一性,使其可用于哈希表的桶,甚至直接作為一個效率不那么高的map。普通鏈表的無鎖實現(xiàn)相對簡單點,因為插入元素可以在表頭插,而有序鏈表的插入則是任意位置。
本文主要基于論文High Performance Dynamic Lock-Free Hash Tables實現(xiàn)。
鏈表的主要操作包含insert和remove,先簡單實現(xiàn)一個版本,就會看到問題所在,以下代碼只用作示例:
struct node_t { key_t key; value_t val; node_t *next; }; int l_find(node_t **PRed_ptr, node_t **item_ptr, node_t *head, key_t key) { node_t *pred = head; node_t *item = head->next; while (item) { int d = KEY_CMP(item->key, key); if (d >= 0) { *pred_ptr = pred; *item_ptr = item; return d == 0 ? TRUE : FALSE; } pred = item; item = item->next; } *pred_ptr = pred; *item_ptr = NULL; return FALSE; } int l_insert(node_t *head, key_t key, value_t val) { node_t *pred, *item, *new_item; while (TRUE) { if (l_find(&pred, &item, head, key)) { return FALSE; } new_item = (node_t*) malloc(sizeof(node_t)); new_item->key = key; new_item->val = val; new_item->next = item; // A. 如果pred本身被移除了 if (CAS(&pred->next, item, new_item)) { return TRUE; } free(new_item); } } int l_remove(node_t *head, key_t key) { node_t *pred, *item; while (TRUE) { if (!l_find(&pred, &item, head, key)) { return TRUE; } // B. 如果pred被移除;如果item也被移除 if (CAS(&pred->next, item, item->next)) { haz_free(item); return TRUE; } } }l_find函數(shù)返回查找到的前序元素和元素本身,代碼A和B雖然拿到了pred和item,但在CAS的時候,其可能被其他線程移除。甚至,在l_find過程中,其每一個元素都可能被移除。問題在于,任何時候拿到一個元素時,都不確定其是否還有效。元素的有效性包括其是否還在鏈表中,其指向的內(nèi)存是否還有效。
解決方案
通過為元素指針增加一個有效性標志位,配合CAS操作的互斥性,就可以解決元素有效性判定問題。
因為node_t放在內(nèi)存中是會對齊的,所以指向node_t的指針值低幾位是不會用到的,從而可以在低幾位里設(shè)置標志,這樣在做CAS的時候,就實現(xiàn)了DCAS的效果,相當于將兩個邏輯上的操作變成了一個原子操作。想象下引用計數(shù)對象的線程安全性,其內(nèi)包裝的指針是線程安全的,但對象本身不是。
CAS的互斥性,在若干個線程CAS相同的對象時,只有一個線程會成功,失敗的線程就可以以此判定目標對象發(fā)生了變更。改進后的代碼(代碼僅做示例用,不保證正確):
typedef size_t markable_t; // 最低位置1,表示元素被刪除 #define HAS_MARK(p) ((markable_t)p & 0x01) #define MARK(p) ((markable_t)p | 0x01) #define STRip_MARK(p) ((markable_t)p & ~0x01) int l_insert(node_t *head, key_t key, value_t val) { node_t *pred, *item, *new_item; while (TRUE) { if (l_find(&pred, &item, head, key)) { return FALSE; } new_item = (node_t*) malloc(sizeof(node_t)); new_item->key = key; new_item->val = val; new_item->next = item; // A. 雖然find拿到了合法的pred,但是在以下代碼之前pred可能被刪除,此時pred->next被標記 // pred->next != item,該CAS會失敗,失敗后重試 if (CAS(&pred->next, item, new_item)) { return TRUE; } free(new_item); } return FALSE; } int l_remove(node_t *head, key_t key) { node_t *pred, *item; while (TRUE) { if (!l_find(&pred, &item, head, key)) { return FALSE; } node_t *inext = item->next; // B. 刪除item前先標記item->next,如果CAS失敗,那么情況同insert一樣,有其他線程在find之后 // 刪除了item,失敗后重試 if (!CAS(&item->next, inext, MARK(inext))) { continue; } // C. 對同一個元素item刪除時,只會有一個線程成功走到這里 if (CAS(&pred->next, item, STRIP_MARK(item->next))) { haz_defer_free(item); return TRUE; } } return FALSE; } int l_find(node_t **pred_ptr, node_t **item_ptr, node_t *head, key_t key) { node_t *pred = head; node_t *item = head->next; hazard_t *hp1 = haz_get(0); hazard_t *hp2 = haz_get(1); while (item) { haz_set_ptr(hp1, pred); haz_set_ptr(hp2, item); /* 如果已被標記,那么緊接著item可能被移除鏈表甚至釋放,所以需要重頭查找 */ if (HAS_MARK(item->next)) { return l_find(pred_ptr, item_ptr, head, key); } int d = KEY_CMP(item->key, key); if (d >= 0) { *pred_ptr = pred; *item_ptr = item; return d == 0 ? TRUE : FALSE; } pred = item; item = item->next; } *pred_ptr = pred; *item_ptr = NULL; return FALSE; }haz_get、haz_set_ptr之類的函數(shù)是一個hazard pointer實現(xiàn),用于支持多線程下內(nèi)存的GC。上面的代碼中,要刪除一個元素item時,會標記item->next,從而使得insert時中那個CAS不需要做任何調(diào)整。總結(jié)下這里的線程競爭情況:
insert中find到正常的pred及item,pred->next == item,然后在CAS前有線程刪除了pred,此時pred->next == MARK(item),CAS失敗,重試;刪除分為2種情況:a) 從鏈表移除,得到標記,pred可繼續(xù)訪問;b) pred可能被釋放內(nèi)存,此時再使用pred會錯誤。為了處理情況b,所以引入了類似hazard pointer的機制,可以有效保障任意一個指針p只要還有線程在使用它,它的內(nèi)存就不會被真正釋放insert中有多個線程在pred后插入元素,此時同樣由insert中的CAS保證,這個不多說remove中情況同insert,find拿到了有效的pred和next,但在CAS的時候pred被其他線程刪除,此時情況同insert,CAS失敗,重試任何時候改變鏈表結(jié)構(gòu)時,無論是remove還是insert,都需要重試該操作find中遍歷時,可能會遇到被標記刪除的item,此時item根據(jù)remove的實現(xiàn)很可能被刪除,所以需要重頭開始遍歷ABA問題
ABA問題還是存在的,insert中:
if (CAS(&pred->next, item, new_item)) { return TRUE; }如果CAS之前,pred后的item被移除,又以相同的地址值加進來,但其value變了,此時CAS會成功,但鏈表可能就不是有序的了。pred->val < new_item->val > item->val
為了解決這個問題,可以利用指針值地址對齊的其他位來存儲一個計數(shù),用于表示pred->next的改變次數(shù)。當insert拿到pred時,pred->next中存儲的計數(shù)假設(shè)是0,CAS之前其他線程移除了pred->next又新增回了item,此時pred->next中的計數(shù)增加,從而導致insert中CAS失敗。
// 最低位留作刪除標志 #define MASK ((sizeof(node_t) - 1) & ~0x01) #define GET_TAG(p) ((markable_t)p & MASK) #define TAG(p, tag) ((markable_t)p | (tag)) #define MARK(p) ((markable_t)p | 0x01) #define HAS_MARK(p) ((markable_t)p & 0x01) #define STRIP_MARK(p) ((node_t*)((markable_t)p & ~(MASK | 0x01)))remove的實現(xiàn):
/* 先標記再刪除 */ if (!CAS(&sitem->next, inext, MARK(inext))) { continue; } int tag = GET_TAG(pred->next) + 1; if (CAS(&pred->next, item, TAG(STRIP_MARK(sitem->next), tag))) { haz_defer_free(sitem); return TRUE; }insert中也可以更新pred->next的計數(shù)。
總結(jié)
無鎖的實現(xiàn),本質(zhì)上都會依賴于CAS的互斥性。從頭實現(xiàn)一個lock free的數(shù)據(jù)結(jié)構(gòu),可以深刻感受到lock free實現(xiàn)的tricky。最終代碼可以從這里github獲取。代碼中為了簡單,實現(xiàn)了一個不是很強大的hazard pointer,可以參考之前的博文。
新聞熱點
疑難解答