国产探花免费观看_亚洲丰满少妇自慰呻吟_97日韩有码在线_资源在线日韩欧美_一区二区精品毛片,辰东完美世界有声小说,欢乐颂第一季,yy玄幻小说排行榜完本

首頁 > 學(xué)院 > 開發(fā)設(shè)計 > 正文

線程基礎(chǔ):線程池(6)——基本使用(中)

2019-11-08 03:21:02
字體:
供稿:網(wǎng)友

(接上文:《線程基礎(chǔ):線程池(5)——基本使用(上)》)

3-4、java主要線程池的繼承結(jié)構(gòu)

我們先來總結(jié)一下上文中討論過的內(nèi)容,首先就是Java中ThreadPoolExecutor類的繼承結(jié)構(gòu)。如下圖所示:

這里寫圖片描述

ThreadPoolExecutor:這個線程池就是我們這兩篇文章中介紹的重點線程池實現(xiàn)。程序員可以通過這個線程池中的submit()方法或者execute()方法,執(zhí)行所有實現(xiàn)了Runnable接口或者Callable接口的任務(wù);ThreadPoolExecutor對于這些任務(wù)的執(zhí)行是立即的、一次性的。

ScheduledThreadPoolExecutor:ScheduledThreadPoolExecutor線程池和ThreadPoolExecutor線程池的執(zhí)行特點是不一樣的,它是一個用來執(zhí)行延遲任務(wù)、定時任務(wù)或者周期性任務(wù)的線程池。一般情況下,我們用它可以處理定時計算、周期性統(tǒng)計一類的任務(wù)。

ForkJoinPool:ScheduledThreadPoolExecutor和ThreadPoolExecutor都是在JDK1.5版本中提供的。在JDK1.7中,JAVA為我們提供了一種新的線程池ForkJoinPool以及配套的任務(wù)定義ForkJoinTask。除了可以執(zhí)行實現(xiàn)了Runnable接口或者Callable接口的任務(wù)以外,F(xiàn)orkJoinPool還可以執(zhí)行集成了ForkJoinTask定義的任務(wù)。ForkJoinPool的執(zhí)行原理和ThreadPoolExecutor的執(zhí)行原理是不一樣的,我們將在專欄后續(xù)的文章中,專門討論ForkJoinPool線程池。

4、高級特性

我們繼續(xù)討論ThreadPoolExecutor線程池。上文我們給出的最簡單的ThreadPoolExecutor線程池的使用方式中,我們只采用了ThreadPoolExecutor最簡單的一個構(gòu)造函數(shù):

public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue)1234512345

實際上ThreadPoolExecutor線程池有很多種構(gòu)造函數(shù),其中最復(fù)雜的一種構(gòu)造函數(shù)是:

public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler)12345671234567

在上文中我們還沒有介紹的workQueue、threadFactory和handler參數(shù),將是本文講解的重點。

4-1、使用ThreadFactory

線程池最主要的一項工作,就是在滿足某些條件的情況下創(chuàng)建線程。而在ThreadPoolExecutor線程池中,創(chuàng)建線程的工作交給ThreadFactory來完成。要使用線程池,就必須要指定ThreadFactory。

類似于上文中,如果我們使用的構(gòu)造函數(shù)時并沒有指定使用的ThreadFactory,這個時候ThreadPoolExecutor會使用一個默認(rèn)的ThreadFactory:DefaultThreadFactory。(這個類在Executors工具類中)

根據(jù)我個人觀察,Executors工具類和ThreadPoolExecutor類存在循環(huán)依賴:ThreadPoolExecutor中使用了Executors工具類中定義的DefaultThreadFactory;而在Executors工具類中卻又在創(chuàng)建ThreadPoolExecutor的對象實例。不清楚Doug Lea是故意未知呢,還是一個設(shè)計缺陷。

當(dāng)然,在某些特殊業(yè)務(wù)場景下,您還可以使用一個自定義的ThreadFactory線程工廠,如下代碼片段:

package test.thread.pool;import java.util.concurrent.ThreadFactory;/** * 測試自定義的一個線程工廠 * @author yinwenjie */public class TestThreadFactory implements ThreadFactory { @Override public Thread newThread(Runnable r) { // do something before new thread created; // create new thread , and return return new Thread(r); }}1234567891011121314151617181912345678910111213141516171819

4-2、線程池的等待隊列

在使用ThreadPoolExecutor線程池的時候,需要指定一個實現(xiàn)了BlockingQueue接口的任務(wù)等待隊列。在ThreadPoolExecutor線程池的API文檔中,一共推薦了三種等待隊列,它們是:SynchronousQueue、LinkedBlockingQueue和ArrayBlockingQueue;但通過觀察BlockingQueue接口的實現(xiàn)情況,您可以發(fā)現(xiàn),能夠直接使用的等待隊列還有:PRiorityBlockingQueue、LinkedBlockingDeque和LinkedTransferQueue。

這里寫圖片描述

4-2-1、隊列和棧

隊列:按照大學(xué)《數(shù)據(jù)結(jié)構(gòu)》課程中的解釋:隊列是一種特殊的線性結(jié)構(gòu),允許在線性結(jié)構(gòu)的前端進(jìn)行刪除/讀取操作;允許在線性結(jié)構(gòu)的后端進(jìn)行插入操作;這種線性結(jié)構(gòu)具有“先進(jìn)先出”的操作特點:

這里寫圖片描述

但是在實際應(yīng)用中,隊列中的元素有可能不是以“進(jìn)入的順序”為排序依據(jù)的。例如我們將要講到的PriorityBlockingQueue隊列。

棧:棧也是一種線性結(jié)構(gòu),但是棧和隊列相比只允許在線性結(jié)構(gòu)的一端進(jìn)行操作,入棧和出棧都是在一端完成。

這里寫圖片描述

4-2-2、有限隊列

SynchronousQueue:

“是這樣 一種阻塞隊列,其中每個 put 必須等待一個 take,反之亦然。同步隊列沒有任何內(nèi)部容量,甚至連一個隊列的容量都沒有?!蹦岈?,各位讀者看懂了嗎?好吧,我抄網(wǎng)上的。下面我用白話翻譯一下:這是一個內(nèi)部沒有任何容量的阻塞隊列,任何一次插入操作的元素都要等待相對的刪除/讀取操作,否則進(jìn)行插入操作的線程就要一直等待,反之亦然。

SynchronousQueue<Object> queue = new SynchronousQueue<Object>();// 不要使用add,因為這個隊列內(nèi)部沒有任何容量,所以會拋出異?!癐llegalStateException”// queue.add(new Object());// 操作線程會在這里被阻塞,直到有其他操作線程取走這個對象queue.put(new Object());1234512345ArrayBlockingQueue:

一個由數(shù)組支持的有界阻塞隊列。此隊列按 FIFO(先進(jìn)先出)原則對元素進(jìn)行排序。新元素插入到隊列的尾部,隊列獲取操作則是從隊列頭部開始獲得元素。這是一個典型的“有界緩存區(qū)”,固定大小的數(shù)組在其中保持生產(chǎn)者插入的元素和使用者提取的元素。一旦創(chuàng)建了這樣的緩存區(qū),就不能再增加其容量。試圖向已滿隊列中放入元素會導(dǎo)致操作受阻塞;試圖從空隊列中提取元素將導(dǎo)致類似阻塞。

// 我們創(chuàng)建了一個ArrayBlockingQueue,并且設(shè)置隊列空間為2ArrayBlockingQueue<Object> arrayQueue = new ArrayBlockingQueue<Object>(2);// 插入第一個對象arrayQueue.put(new Object());// 插入第二個對象arrayQueue.put(new Object());// 插入第三個對象時,這個操作線程就會被阻塞。arrayQueue.put(new Object());// 請不要使用add操作,和SynchronousQueue的add操作一樣,它們都使用了AbstractQueue中的add實現(xiàn)123456789123456789

4-2-3、無限隊列

LinkedBlockingQueue:

LinkedBlockingQueue是我們在ThreadPoolExecutor線程池中常應(yīng)用的等待隊列。它可以指定容量也可以不指定容量。由于它具有“無限容量”的特性,所以我還是將它歸入了無限隊列的范疇(實際上任何無限容量的隊列/棧都是有容量的,這個容量就是Integer.MAX_VALUE)。

LinkedBlockingQueue的實現(xiàn)是基于鏈表結(jié)構(gòu),而不是類似ArrayBlockingQueue那樣的數(shù)組。但實際使用過程中,您不需要關(guān)心它的內(nèi)部實現(xiàn),如果您指定了LinkedBlockingQueue的容量大小,那么它反映出來的使用特性就和ArrayBlockingQueue類似了。

LinkedBlockingQueue<Object> linkedQueue = new LinkedBlockingQueue<Object>(2);linkedQueue.put(new Object());// 插入第二個對象linkedQueue.put(new Object());// 插入第三個對象時,這個操作線程就會被阻塞。linkedQueue.put(new Object());=======================================// 或者如下使用:LinkedBlockingQueue<Object> linkedQueue = new LinkedBlockingQueue<Object>();linkedQueue.put(new Object());// 插入第二個對象linkedQueue.put(new Object());// 插入第N個對象時,都不會阻塞linkedQueue.put(new Object());1234567891011121314151612345678910111213141516LinkedBlockingDeque

LinkedBlockingDeque是一個基于鏈表的雙端隊列。LinkedBlockingQueue的內(nèi)部結(jié)構(gòu)決定了它只能從隊列尾部插入,從隊列頭部取出元素;但是LinkedBlockingDeque既可以從尾部插入/取出元素,還可以從頭部插入元素/取出元素。

LinkedBlockingDeque<TempObject> linkedDeque = new LinkedBlockingDeque<TempObject>();// push ,可以從隊列的頭部插入元素linkedDeque.push(new TempObject(1));linkedDeque.push(new TempObject(2));linkedDeque.push(new TempObject(3));// poll , 可以從隊列的頭部取出元素TempObject tempObject = linkedDeque.poll();// 這里會打印 tempObject.index = 3System.out.println("tempObject.index = " + tempObject.getIndex());// put , 可以從隊列的尾部插入元素linkedDeque.put(new TempObject(4));linkedDeque.put(new TempObject(5));// pollLast , 可以從隊列尾部取出元素tempObject = linkedDeque.pollLast();// 這里會打印 tempObject.index = 5System.out.println("tempObject.index = " + tempObject.getIndex());12345678910111213141516171234567891011121314151617PriorityBlockingQueue

PriorityBlockingQueue是一個按照優(yōu)先級進(jìn)行內(nèi)部元素排序的無限隊列。存放在PriorityBlockingQueue中的元素必須實現(xiàn)Comparable接口,這樣才能通過實現(xiàn)compareTo()方法進(jìn)行排序。優(yōu)先級最高的元素將始終排在隊列的頭部;PriorityBlockingQueue不會保證優(yōu)先級一樣的元素的排序,也不保證當(dāng)前隊列中除了優(yōu)先級最高的元素以外的元素,隨時處于正確排序的位置

這是什么意思呢?PriorityBlockingQueue并不保證除了隊列頭部以外的元素排序一定是正確的。請看下面的示例代碼:

PriorityBlockingQueue<TempObject> priorityQueue = new PriorityBlockingQueue<TempObject>();priorityQueue.put(new TempObject(-5));priorityQueue.put(new TempObject(5));priorityQueue.put(new TempObject(-1));priorityQueue.put(new TempObject(1));// 第一個元素是5// 實際上在還沒有執(zhí)行priorityQueue.poll()語句的時候,隊列中的第二個元素不一定是1TempObject targetTempObject = priorityQueue.poll();System.out.println("tempObject.index = " + targetTempObject.getIndex());// 第二個元素是1targetTempObject = priorityQueue.poll();System.out.println("tempObject.index = " + targetTempObject.getIndex());// 第三個元素是-1targetTempObject = priorityQueue.poll();System.out.println("tempObject.index = " + targetTempObject.getIndex());// 第四個元素是-5targetTempObject = priorityQueue.poll();System.out.println("tempObject.index = " + targetTempObject.getIndex());============================================================================// 這個元素類,必須實現(xiàn)Comparable接口private static class TempObject implements Comparable<TempObject> { private int index; public TempObject(int index) { this.index = index; } /** * @return the index */ public int getIndex() { return index; } /* (non-Javadoc) * @see java.lang.Comparable#compareTo(java.lang.Object) */ @Override public int compareTo(TempObject o) { return o.getIndex() - this.index; }}1234567891011121314151617181920212223242526272829303132333435363738394041424344454612345678910111213141516171819202122232425262728293031323334353637383940414243444546LinkedTransferQueue

LinkedTransferQueue也是一個無限隊列,它除了具有一般隊列的操作特性外(先進(jìn)先出),還具有一個阻塞特性:LinkedTransferQueue可以由一對生產(chǎn)者/消費者線程進(jìn)行操作,當(dāng)消費者將一個新的元素插入隊列后,消費者線程將會一直等待,直到某一個消費者線程將這個元素取走,反之亦然。

LinkedTransferQueue的操作特性可以由下面這段代碼提現(xiàn)。在下面的代碼片段中,有兩中類型的線程:生產(chǎn)者和消費者,這兩類線程互相等待對方的操作:

/** * 生產(chǎn)者線程 * @author yinwenjie */private static class ProducerRunnable implements Runnable { private LinkedTransferQueue<TempObject> linkedQueue; public ProducerRunnable(LinkedTransferQueue<TempObject> linkedQueue) { this.linkedQueue = linkedQueue; } @Override public void run() { for(int index = 1 ; ; index++) { try { // 向LinkedTransferQueue隊列插入一個新的元素 // 然后生產(chǎn)者線程就會等待,直到有一個消費者將這個元素從隊列中取走 this.linkedQueue.transfer(new TempObject(index)); } catch (InterruptedException e) { e.printStackTrace(System.out); } } }}/** * 消費者線程 * @author yinwenjie */private static class ConsumerRunnable implements Runnable { private LinkedTransferQueue<TempObject> linkedQueue; public ConsumerRunnable(LinkedTransferQueue<TempObject> linkedQueue) { this.linkedQueue = linkedQueue; } @Override public void run() { Thread currentThread = Thread.currentThread(); while(!currentThread.isInterrupted()) { try { // 等待,直到從LinkedTransferQueue隊列中得到一個元素 TempObject targetObject = this.linkedQueue.take(); System.out.println("線程(" + currentThread.getId() + ")取得targetObject.index = " + targetObject.getIndex()); } catch (InterruptedException e) { e.printStackTrace(System.out); } } }}......===============================以下是啟動代碼:LinkedTransferQueue<TempObject> linkedQueue = new LinkedTransferQueue<TempObject>();// 這是一個生產(chǎn)者線程Thread producerThread = new Thread(new ProducerRunnable(linkedQueue));// 這里有兩個消費者線程Thread consumerRunnable1 = new Thread(new ConsumerRunnable(linkedQueue));Thread consumerRunnable2 = new Thread(new ConsumerRunnable(linkedQueue));// 開始運行producerThread.start();consumerRunnable1.start();consumerRunnable2.start();// 這里只是為了main不退出,沒有任何演示含義Thread currentThread = Thread.currentThread();synchronized (currentThread) { currentThread.wait();}......12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273741234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374

4-3、拒絕任務(wù)

在ThreadPoolExecutor線程池中還有一個重要的接口:RejectedExecutionHandler。當(dāng)提交給線程池的某一個新任務(wù)無法直接被線程池中“核心線程”直接處理,又無法加入等待隊列,也無法創(chuàng)建新的線程執(zhí)行;又或者線程池已經(jīng)調(diào)用shutdown()方法停止了工作;又或者線程池不是處于正常的工作狀態(tài);這時候ThreadPoolExecutor線程池會拒絕處理這個任務(wù),觸發(fā)您創(chuàng)建ThreadPoolExecutor線程池時定義的RejectedExecutionHandler接口的實現(xiàn)

New tasks submitted in method execute will be rejected when the Executor has been shut down, and also when the Executor uses finite bounds for both maximum threads and work queue capacity, and is saturated. In either case, the execute method invokes the RejectedExecutionHandler.rejectedExecution method of its RejectedExecutionHandler. Four predefined handler policies are provided

您在創(chuàng)建ThreadPoolExecutor線程池時,一定會指定RejectedExecutionHandler接口的實現(xiàn)。如果您調(diào)用的是不需要指定RejectedExecutionHandler接口的構(gòu)造函數(shù),如:

public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue)public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory)1234567891011121312345678910111213

那么ThreadPoolExecutor線程池在創(chuàng)建時,會使用一個默認(rèn)的RejectedExecutionHandler接口實現(xiàn),源代碼片段如下:

public class ThreadPoolExecutor extends AbstractExecutorService { ...... /** * The default rejected execution handler */ private static final RejectedExecutionHandler defaultHandler = new AbortPolicy(); ...... // 可以看到,ThreadPoolExecutor中的兩個沒有指定RejectedExecutionHandler // 接口的構(gòu)造函數(shù),都是使用了一個RejectedExecutionHandler接口的默認(rèn)實現(xiàn):AbortPolicy public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), defaultHandler); } ...... public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, defaultHandler); } ......}1234567891011121314151617181920212223242526272829303132333435363712345678910111213141516171819202122232425262728293031323334353637

實際上,在ThreadPoolExecutor中已經(jīng)提供了四種可以直接使用的RejectedExecutionHandler接口的實現(xiàn):

CallerRunsPolicy:

這個拒絕處理器,將直接運行這個任務(wù)的run方法。但是,請注意并不是在ThreadPoolExecutor線程池中的線程中運行,而是直接調(diào)用這個任務(wù)實現(xiàn)的run方法。源代碼如下:

public static class CallerRunsPolicy implements RejectedExecutionHandler { /** * Creates a {@code CallerRunsPolicy}. */ public CallerRunsPolicy() { } /** * Executes task r in the caller's thread, unless the executor * has been shut down, in which case the task is discarded. * * @param r the runnable task requested to be executed * @param e the executor attempting to execute this task */ public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { if (!e.isShutdown()) { r.run(); } }}1234567891011121314151617181912345678910111213141516171819AbortPolicy:

這個處理器,在任務(wù)被拒絕后會創(chuàng)建一個RejectedExecutionException異常并拋出。這個處理過程也是ThreadPoolExecutor線程池默認(rèn)的RejectedExecutionHandler實現(xiàn):

A handler for rejected tasks that throws a RejectedExecutionException.

DiscardPolicy:

DiscardPolicy處理器,將會默默丟棄這個被拒絕的任務(wù),不會拋出異常,也不會通過其他方式執(zhí)行這個任務(wù)的任何一個方法,更不會出現(xiàn)任何的日志提示。

A handler for rejected tasks that silently discards the rejected task.

DiscardOldestPolicy:

這個處理器很有意思。它會檢查當(dāng)前ThreadPoolExecutor線程池的等待隊列。并調(diào)用隊列的poll()方法,將當(dāng)前處于等待隊列列頭的等待任務(wù)強行取出,然后再試圖將當(dāng)前被拒絕的任務(wù)提交到線程池執(zhí)行:

public static class DiscardOldestPolicy implements RejectedExecutionHandler { ...... public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { if (!e.isShutdown()) { e.getQueue().poll(); e.execute(r); } } ......}1234567891012345678910

實際上查閱這四種ThreadPoolExecutor線程池自帶的拒絕處理器實現(xiàn),您可以發(fā)現(xiàn)CallerRunsPolicy、DiscardPolicy、DiscardOldestPolicy處理器針對被拒絕的任務(wù)并不是一個很好的處理方式。

CallerRunsPolicy在非線程池以外直接調(diào)用任務(wù)的run方法,可能會造成線程安全上的問題;DiscardPolicy默默的忽略掉被拒絕任務(wù),也沒有輸出日志或者提示,開發(fā)人員不會知道線程池的處理過程出現(xiàn)了錯誤;DiscardOldestPolicy中e.getQueue().poll()的方式好像是科學(xué)的,但是如果等待隊列出現(xiàn)了容量問題,大多數(shù)情況下就是這個線程池的代碼出現(xiàn)了BUG。最科學(xué)的的還是AbortPolicy提供的處理方式:拋出異常,由開發(fā)人員進(jìn)行處理

(接下文,好吧我承認(rèn)篇幅又沒有控制好)


發(fā)表評論 共有條評論
用戶名: 密碼:
驗證碼: 匿名發(fā)表
主站蜘蛛池模板: 凌云县| 上饶县| 琼中| 乌兰县| 汾阳市| 皋兰县| 康平县| 安陆市| 集贤县| 什邡市| 安多县| 区。| 师宗县| 乌兰浩特市| 宝兴县| 九龙城区| 永昌县| 张家界市| 滦南县| 叶城县| 北碚区| 黄山市| 若尔盖县| 比如县| 彝良县| 台州市| 错那县| 阿拉善右旗| 辰溪县| 米林县| 凤庆县| 黄石市| 松潘县| 肥乡县| 桃江县| 黎城县| 池州市| 电白县| 德安县| 莱西市| 宜昌市|