前面介紹了三個(gè)同步輔助類:CyclicBarrier、Barrier、Phaser,這篇博客介紹最后一個(gè):Exchanger。JDK API是這樣介紹的:可以在對中對元素進(jìn)行配對和交換的線程的同步點(diǎn)。每個(gè)線程將條目上的某個(gè)方法呈現(xiàn)給 exchange 方法,與伙伴線程進(jìn)行匹配,并且在返回時(shí)接收其伙伴的對象。Exchanger 可能被視為 SynchronousQueue 的雙向形式。Exchanger 可能在應(yīng)用程序(比如遺傳算法和管道設(shè)計(jì))中很有用。
Exchanger,它允許在并發(fā)任務(wù)之間交換數(shù)據(jù)。具體來說,Exchanger類允許在兩個(gè)線程之間定義同步點(diǎn)。當(dāng)兩個(gè)線程都到達(dá)同步點(diǎn)時(shí),他們交換數(shù)據(jù)結(jié)構(gòu),因此第一個(gè)線程的數(shù)據(jù)結(jié)構(gòu)進(jìn)入到第二個(gè)線程中,第二個(gè)線程的數(shù)據(jù)結(jié)構(gòu)進(jìn)入到第一個(gè)線程中。
在官方API對Exchanger定義是相當(dāng)簡潔的,一個(gè)無參構(gòu)造函數(shù),兩個(gè)方法:
構(gòu)造函數(shù):
Exchanger()創(chuàng)建一個(gè)新的 Exchanger。
方法:
exchange(V x):等待另一個(gè)線程到達(dá)此交換點(diǎn)(除非當(dāng)前線程被中斷),然后將給定的對象傳送給該線程,并接收該線程的對象。
exchange(V x, long timeout, TimeUnit unit):等待另一個(gè)線程到達(dá)此交換點(diǎn)(除非當(dāng)前線程被中斷,或者超出了指定的等待時(shí)間),然后將給定的對象傳送給該線程,同時(shí)接收該線程的對象。
Exchanger在生產(chǎn)-消費(fèi)者問題情境中非常有用。在生產(chǎn)者-消費(fèi)者情境模式中它包含了一個(gè)數(shù)緩沖區(qū)(倉庫),一個(gè)或者多個(gè)生產(chǎn)者,一個(gè)或者多個(gè)消費(fèi)中。
下面是生產(chǎn)者-消費(fèi)者的實(shí)例(實(shí)例來自《java 7 并發(fā)編程實(shí)戰(zhàn)手冊》)
public class PRoducer implements Runnable{ /** * 生產(chǎn)者和消費(fèi)者進(jìn)行交換的數(shù)據(jù)結(jié)構(gòu) */ private List<String> buffer; /** * 同步生產(chǎn)者和消費(fèi)者的交換對象 */ private final Exchanger<List<String>> exchanger; Producer(List<String> buffer,Exchanger<List<String>> exchanger){ this.buffer = buffer; this.exchanger = exchanger; } @Override public void run() { int cycle = 1; for(int i = 0 ; i < 10 ; i++){ System.out.println("Producer : Cycle :" + cycle); for(int j = 0 ; j < 10 ; j++){ String message = "Event " + ((i * 10 ) + j); System.out.println("Producer : " + message); buffer.add(message); } //調(diào)用exchange()與消費(fèi)者進(jìn)行數(shù)據(jù)交換 try { buffer = exchanger.exchange(buffer); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("Producer :" + buffer.size()); cycle++ ; } }}
Consumer:
public class Consumer implements Runnable{ private List<String> buffer; private final Exchanger<List<String>> exchanger; public Consumer(List<String> buffer,Exchanger<List<String>> exchanger){ this.buffer = buffer; this.exchanger = exchanger; } @Override public void run() { int cycle = 1; for(int i = 0 ; i < 10 ; i++){ System.out.println("Consumer : Cycle :" + cycle); //調(diào)用exchange()與消費(fèi)者進(jìn)行數(shù)據(jù)交換 try { buffer = exchanger.exchange(buffer); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("Consumer :" + buffer.size()); for(int j = 0 ; j < 10 ; j++){ System.out.println("Consumer : " + buffer.get(0)); buffer.remove(0); } cycle++ ; } }}
Test:
public class Test { public static void main(String[] args) { List<String> buffer1 = new ArrayList<>(); List<String> buffer2 = new ArrayList<>(); Exchanger<List<String>> exchanger = new Exchanger<>(); Producer producer = new Producer(buffer1, exchanger); Consumer consumer = new Consumer(buffer2, exchanger); Thread thread1 = new Thread(producer); Thread thread2 = new Thread(consumer); thread1.start(); thread2.start(); }}
運(yùn)行結(jié)果(部分):
Producer : Cycle :1Producer : Event 0Producer : Event 1Producer : Event 2Producer : Event 3Producer : Event 4Producer : Event 5Producer : Event 6Producer : Event 7Consumer : Cycle :1Producer : Event 8Producer : Event 9Producer :0Producer : Cycle :2Producer : Event 10Producer : Event 11Producer : Event 12Producer : Event 13Consumer :10Consumer : Event 0Consumer : Event 1Consumer : Event 2Consumer : Event 3Consumer : Event 4Consumer : Event 5Consumer : Event 6Consumer : Event 7Consumer : Event 8Consumer : Event 9Consumer : Cycle :2Producer : Event 14Producer : Event 15Producer : Event 16Producer : Event 17Producer : Event 18Producer : Event 19
首先生產(chǎn)者Producer、消費(fèi)中Consumer首先都創(chuàng)建一個(gè)緩存列表,通過Exchanger來同步交換數(shù)據(jù)。消費(fèi)中通過調(diào)用Exchanger與生產(chǎn)者進(jìn)行同步來獲取數(shù)據(jù),而生產(chǎn)者則通過for循環(huán)向緩存隊(duì)列存儲(chǔ)數(shù)據(jù)并使用exchanger對象消費(fèi)者同步。到消費(fèi)者從exchanger哪里得到數(shù)據(jù)后,他的緩沖列表中有10個(gè)數(shù)據(jù),而生產(chǎn)者得到的則是一個(gè)空的列表。上面的例子充分展示了消費(fèi)者-生產(chǎn)者是如何利用Exchanger來完成數(shù)據(jù)交換的。
在Exchanger中,如果一個(gè)線程已經(jīng)到達(dá)了exchanger節(jié)點(diǎn)時(shí),對于它的伙伴節(jié)點(diǎn)的情況有三種:
1、如果它的伙伴節(jié)點(diǎn)在該線程到達(dá)之間已經(jīng)調(diào)用了exchanger方法,則它會(huì)喚醒它的伙伴然后進(jìn)行數(shù)據(jù)交換,得到各自數(shù)據(jù)返回。
2、如果它的伙伴節(jié)點(diǎn)還沒有到達(dá)交換點(diǎn),則該線程將會(huì)被掛起,等待它的伙伴節(jié)點(diǎn)到達(dá)被喚醒,完成數(shù)據(jù)交換。
3、如果當(dāng)前線程被中斷了則拋出異常,或者等待超時(shí)了,則拋出超時(shí)異常。
參考資料:
1、《java 7 并發(fā)編程實(shí)戰(zhàn)手冊》
新聞熱點(diǎn)
疑難解答
圖片精選
網(wǎng)友關(guān)注