国产探花免费观看_亚洲丰满少妇自慰呻吟_97日韩有码在线_资源在线日韩欧美_一区二区精品毛片,辰东完美世界有声小说,欢乐颂第一季,yy玄幻小说排行榜完本

首頁 > 學院 > 開發設計 > 正文

Executor框架

2019-11-06 06:01:54
字體:
來源:轉載
供稿:網友

結構組成

Executor主要由3個部分組成:

1. 任務。包括被執行任務需要的Runnable接口或者Callable接口 2. 任務的執行。包括執行任務的核心接口Executor以及繼承自Executor的ExecutorService接口。Executor接口有兩個關鍵類實現了ExecutorService接口的類:ThreadPoolExecutor和ScheduleThreadPoolExecutor 3. 異步計算的結果。包括接口Future和Future的實現類FutureTask 這里寫圖片描述 主線程首先要創建實現Runnable或者Callable接口的任務對象。工具類Executors可以把一個Runnable對象封裝為一個Callable對象:Executors.callable(Runnable task)或者Executors.callable(Runnable task,Object resule)

ExecutorService執行任務的方法: (1)ExecutorService.execute(Runnable command) (2)ExecutorService.submit(Runnable task)或者ExecutorService.submit(Callable tasl)

ExecutorService.submit(),將返回一個實現 Future接口的對象,由于FutureTask實現了Runnable接口,因此可以直接創建FutureTask交給ExecutorService執行。

最后,主線程可以執行FuturTask.get()方法來等待任務執行完成。主線程也可以執行FutureTask.cancel(boolean mayInterruPPTIfRunning)來取消任務。

成員

主要成員包括:ThreadPoolExecutor、ScheduledThreadPoolExecutor、Future接口、Runnable接口、Callable接口和Executors

ThreadPoolExecutor ThreadPoolExecutor通常使用工廠類Executors來創建。Executors可以創建3種類型的ThreadPoolExecutor:SingleThreadPoolExecutor、FixedTHreadPool和CachedThreadPool FixedThreadPool:創建固定線程數,適用于負載比較重的服務器 SingleThreadExecutor:創建單個線程,適用于需要保證順序的執行各個任務;并且任意時間點,不會有多個線程的應用場景 CachedThreadPool:可以根據需要創建新線程。CachedThreadPool是無界的線程池,適用與執行很多的短期異步任務的小程序或者負重較輕的服務器 ScheduledThreadPoolExecutor

通常適用Executors來創建ScheduledThreadPoolExecutor,Executors可以創建2種類型的ScheduledThreadPoolExecutor:ScheduledThreadPoolExecutor和SingleThreadScheduledExecutor

ScheduledThreadPoolExecutor:包含若干個線程ScheduledThreadPoolExecutor。適用于需要多個后臺線程執行周期任務,同時為了滿足資源管理的需要而線程后臺線程數量的應用場景 SingleThreadScheduledExecutor:只包含一個線程的ScheduledThreadPoolExecutor。適用于需要單個后臺線程執行周期任務,同時需要保證順序地執行各個任務的應用場景。 Future接口

Future接口和實現Future接口的FutureTask類用來表示異步計算的結果。當我們把Runnable接口或Callable接口的實現類提交(submit)給ThreadPoolExecutor或ScheduledThreadPoolExecutor時,會返回一個實現了Future接口的對象。到目前的JDK8為止,返回的是一個FutureTask對象,但是在未來的Jdk版本匯總,有可能返回的就不是FutureTask了

Runnable接口和Callable接口 Runnable接口和Callable接口的實現類都可以被ThreadPoolExecutor或者ScheduledThreadPoolExecutor執行。他們之間的區別是Runnable不會返回結果,而Callable可以返回結果。

ThreadPoolExecutor詳解

Executor框架最核心的類是ThreadPoolExecutor,它是線程池的實現類,主要有4個組件構成:

corePool:核心線程池的大小maximumPool:最大線程池的大小BlockingQueue:用來暫時保存任務的工作隊列RejectedExecutionHandler:當ThreadPoolExecutor已經關閉或ThreadPoolExecutor已經飽和時(達到了最大線程池大小且工作隊列已滿),execute()方法將調用Handler。工具類Executors可以創建3種類型的ThreadPoolExecutor:FixedThreadPool、SingleThreadExecutor、CachedThreadPool

FixedThreadPool詳解

FixedThreadPool被稱為可重用固定線程數的線程池。

public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), threadFactory); }

當線程池中的線程數大于corePoolSize時,keepAliveTime為多余的空閑線程等待新任務的最長時間,超過這個時間后,多余的空閑線程將被終止。FixedThreadPool把keepAliveTime設置為0L,意味著多余的空閑線程會被立即終止

FixedThreadPool使用無界隊列LinkedBlockingQueue作為線程池的 工作隊列。使用無界隊列后,當線程池的線程數達到corePoolSize后,新任務將在無界隊列中等待,也就是說,來一個新任務就往無界隊列中加,因此線程池的線程數不會超過corePoolSize。這將使maximumPoolSize和KeepAliveTime變成無效參數。

SingleThreadExecutor詳解

public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())); }

SingleThreadExecutor的corePoolSize和maximumPoolSize被設置為1。SingleThreadExecutor跟FixedThreadPool一樣都是使用無界隊列LinkedBlockingQueue作為工作隊列。

CachedThreadPool

CachedThreadPool是一個會根據需要創建新線程的線程池。

public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); }

從上面的代碼可以看出,CachedThreadPool的corePoolSize被設置為0,maximumPoolSize設置為Integer.MAX_VALUE,keepAliveTime設置為60L,說明空閑線程等待新任務的最長時間為60秒。

CachedThreadPool使用沒有容量的SynchronousQueue作為線程池的工作隊列,但CachedThreadPool的maximumPoolSize是無界的。這意味著,如果主線程提交任務的速度高于maximumPoolSize中線程處理任務的速度時,CachedThreadPool會不斷創建新線程。極端情況下,CachedThreadPool會因為創建過多線程而耗盡CPU和內存資源。

CachedThreadPool的execute()方法執行示意圖: 這里寫圖片描述

(1)首先執行SynchronousQueue.offer(Runnable task)。如果當前maximumPoolSize中有空閑線程正在執行SynchronousQueue.poll(keepAliveTime,TimeUnit.NANOSECONDS),那么主線程執行offer操作與空閑線程執行的poll操作配對成功,主線程把任務交給空閑線程執行 (2)當初始maximumPoolSize為空,或者maximumPoolSize中當前沒有空閑線程時,將沒有線程執行SynchronousQueue.poll(keepAliveTime,TimeUnit.ANANOSECONDS)。這時CachedThreadPool會創建一個新線程執行任務,execute()方法來執行完成 (3)在步驟2中,新創建的線程將任務執行完后,會執行SynchronousQueue.poll(keepAliveTime,TimeUnit.NANOSECONDS).這個poll操作會讓空閑線程最多在SynchronousQueue中等待60s,如果60s內主線程提交了一個新任務,那么這個空閑線程將執行主線程提交的新任務;否則,這個空閑線程將終止。。

ScheduledThreadPoolExecutor詳解

ScheduledThreadPoolExecutor繼承自ThreadPoolExecutor。主要用來在給定的延遲之后運行任務或者定期執行任務。ScheduledThreadPoolExecutor的功能與Timer類似,但ScheduledThreadPoolExecutor功能更強大更靈活。Timer對應的是單個后臺線程,而ScheduledThreadPoolExecutor可以在構造函數中指定多個對應的后臺線程數。

運行機制

這里寫圖片描述 DelayQueue是一個無界隊列,所以maximumPoolSize在ScheduledThreadPoolExecutor中沒有什么意義。

ScheduledThreadPoolExecutor的執行主要分為兩部分 (1)當調用ScheduledThreadPoolExecutor的scheduleAtFixedRate()方法或者scheduleWithFixedDelay()方法時,會向ScheduledThreadPoolExecutor的DelayQueue添加一個實現了RunnableScheduledFutur接口的ScheduledFutureTask (2)線程池中的線程從DelayQueue中獲取ScheduledFutureTask,然后執行任務。

實現

ScheduledFutureTask主要包含3個成員變量:

/** 表示這個任務被添加到ScheduledThreadPoolExecutor中的序號 */ PRivate final long sequenceNumber; /**任務要被執行的具體時間 */ private long time; /** *表示任務執行的間隔周期 */ private final long period;

DelayQueue封裝了一個PriorityQueue,會對隊列中的ScheduledFutureTask進行排序。排序時,time小的排在前面。如果time相同,就比較sequenceNumber,sequenceNumber小的排在前面,也就是說,如果兩個任務的執行時間相同,那么先提交的任務將被先執行

這里寫圖片描述

上圖是ScheduledThreadPoolExecutor中的線程1執行某個周期任務的4個步驟。 步驟1:線程1從DelayQueue中獲取已到期的ScheduledFutureTask(DelayQueue.take())。到期任務是指ScheduledFutureTask的time大于當前時間。 步驟2:線程1執行ScheduledFutureTask 步驟3:線程1修改ScheduledFutureTask的time變量為下次將要被執行的時間。 步驟4:線程1把這個修改time之后的ScheduledFutureTask返回DelayQueue中(DelayQueue.add())

接下來看看DelayQueue.take()方法的實現:

/*** 檢索并刪除隊列的頭部,如有必要,直到具有可用的到期延遲的元素為止***/ public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly();//獲取鎖 try { for (;;) { E first = q.peek(); if (first == null) available.await();//如果PriorityQueue為空,則在當前線程的Condition中等待 else { long delay = first.getDelay(NANOSECONDS); if (delay <= 0) return q.poll(); //獲取PriorityQueue中的頭元素 first = null; //在等待時不保留引用 if (leader != null)//領導線程不為空,當前線程只能繼續等待 available.await(); else { Thread thisThread = Thread.currentThread(); leader = thisThread;//當前線程被確認為領導線程 try { available.awaitNanos(delay);//等待到延遲時間到達 } finally { if (leader == thisThread)//當前線程被喚醒后,交出leader地位,設置leader為空 leader = null; } } } } } finally { if (leader == null && q.peek() != null) available.signal();//將等待時間最長的線程從Condition等待隊列中移到獲取鎖的隊列里,準備獲取鎖 lock.unlock(); } }

接下來看看DelayQueue.take()方法的實現:

public boolean add(E e) { return offer(e); } public boolean offer(E e) { final ReentrantLock lock = this.lock; lock.lock(); try { q.offer(e);//把元素加入到PriorityQueue隊列 if (q.peek() == e) {//如果PriorityQueue隊列的頭部元素是當前加入的元素,設置leader線程為null,喚醒Condition等待隊列中的一個等待時間最長的線程進入到同步隊列 leader = null; available.signal(); } return true; } finally { lock.unlock(); } }

PriorityQueue的offer()方法:

public boolean offer(E e) { if (e == null) throw new NullPointerException(); modCount++; int i = size;//PriorityQueue隊列的元素數量 if (i >= queue.length) grow(i + 1);//如果很小時,length是size的雙倍,如果size增大到大于等于length時,需要擴容50% size = i + 1; if (i == 0) queue[0] = e; else siftUp(i, e); return true; }

FutureTask詳解

簡介

FutureTask除了實現Future接口外,還實現了Runnable接口,因此,FutureTask可以交給Executor執行。也可以調用線程執行FutureTask.run()。根據FutureTask.run()方法被執行的時機,FutureTask可以處于下面3中狀態:

1. 未啟動。當創建一個FutureTask,且沒有執行FutureTask.run()方法之前,這個FutureTask處于未啟動狀態。 2. 已啟動。FutureTask.run()方法被執行的過程中 3. 已完成。FutureTask.run()方法執行完后正常結束,或被取消FutureTask.cancel()或者執行run方法拋出異常而結束

當FuturTask處于未啟動或已啟動時,執行FutureTask.get()方法將導致調用線程阻塞;當FutureTask處于已完成時,執行FutureTask.get()方法時,將導致調用線程立即返回結果或者拋出異常。

當FutureTask處于未啟動時,執行FutureTask.cancel()方法將導致任務永遠不會被執行;如果是在已啟動時,調用FutureTask.cancle(true)方法將已中斷執行此任務線程的方式來試圖停止任務;當FutureTask處于已啟動狀態時,執行FutureTask.cancle(false)不會對正在執行此任務的線程產生影響,當FutureTask處于已完成狀態時,執行FutureTask.cancel方法將返回false

使用

應用場景:

1. Future用于異步獲取執行結果或者取消任務。 2. 在高并發場景下確保任務只執行一次。

實例如下:

import java.util.concurrent.Callable;import java.util.concurrent.ExecutionException;import java.util.concurrent.FutureTask;public class FutureTaskDemo { private final FutureTask<Long> future = new FutureTask<Long>(new Callable<Long>() { @Override public Long call() throws Exception { Thread.currentThread().setName("Thread(3)"); System.out.println(Thread.currentThread().getName() + ":開始業務操作! "); try { Thread.sleep(5000); } catch (Exception e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName() + ":業務邏輯執行結束!"); return Math.round(Math.random() * 1000);//返回一個隨機數 } }); private final Thread loader = new Thread(future); public void start() { System.out.println(Thread.currentThread().getName() + ": 啟動loader線程!"); loader.start();//啟動loader線程 System.out.println(Thread.currentThread().getName() + ": loader線程已啟動!"); } public Long get() { try { System.out.println(Thread.currentThread().getName() + ": 開始調用get方法"); long start = System.currentTimeMillis(); Long result = future.get();//調用FutureTask.get()方法 System.out.println(Thread.currentThread().getName() + ": 獲取的結果為: " + result); System.out.println(Thread.currentThread().getName() + ": 消耗的時間為: " + (System.currentTimeMillis() - start)); return result; } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName() + ": got nothing"); return null; } public static void main(String[] args) { Thread.currentThread().setName("Thread(main)"); final FutureTaskDemo demo = new FutureTaskDemo(); demo.start(); new Thread(new Runnable() { @Override public void run() { Thread.currentThread().setName("Thread(1)"); System.out.println("嘗試在延遲時間到達之前獲取結果"); demo.get(); } }).start(); new Thread(new Runnable() { @Override public void run() { Thread.currentThread().setName("Thread(2)"); try { Thread.sleep(6000); System.out.println("嘗試在延遲時間到達后獲取結果 "); demo.get(); } catch (InterruptedException e) { e.printStackTrace(); } } }).start(); } }

輸出結果如下:

Thread(main): 啟動loader線程! Thread(main): loader線程已啟動! Thread(3):開始業務操作! 嘗試在延遲時間到達之前獲取結果 Thread(1): 開始調用get方法 Thread(3):業務邏輯執行結束! Thread(1): 獲取的結果為: 580 Thread(1): 消耗的時間為: 4999 嘗試在延遲時間到達后獲取結果 Thread(2): 開始調用get方法 Thread(2): 獲取的結果為: 580 Thread(2): 消耗的時間為: 0

通過上面的執行結果我們可以看出,當Thread3開始執行任務,但是還未執行完成時,去嘗試獲取執行結果,Thread1會阻塞直到Thread3執行完成后;如果Thread3執行完成后,去Thread2嘗試獲取執行結果,會立即返回

源碼實現

成員變量:

/*** 此任務的運行狀態,最初為NEW。 運行狀態只在方法set,setException和cancel中轉換到終端狀態。* 在完成期間,狀態可以接受COMPLETING(當結果被設置時)或INTERRUPTING(僅在中斷運行器以滿足* 取消(真)時)的瞬時值。 從這些中間狀態到最終狀態的轉換使用更便宜的有序/延遲寫入,因為值是唯一的,不能進一步修改。 **/private volatile int state;

state的值如下:

private static final int NEW = 0; private static final int COMPLETING = 1; private static final int NORMAL = 2; private static final int EXCEPTIONAL = 3; private static final int CANCELLED = 4; private static final int INTERRUPTING = 5; private static final int INTERRUPTED = 6;

可能的狀態值變化: NEW -> COMPLETING -> NORMAL NEW -> COMPLETING -> EXCEPTIONAL NEW -> CANCELLED *NEW -> INTERRUPTING -> INTERRUPTED FutureTask.run()方法:

public void run() { if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread())) return; try { Callable<V> c = callable; if (c != null && state == NEW) { V result; boolean ran; try { result = c.call(); ran = true; } catch (Throwable ex) { result = null; ran = false; setException(ex); } if (ran) set(result); } } finally { // runner must be non-null until state is settled to // prevent concurrent calls to run() runner = null; // state must be re-read after nulling runner to prevent // leaked interrupts int s = state; if (s >= INTERRUPTING) handlePossibleCancellationInterrupt(s); } }

FuturTask.get()方法的實現:

public V get() throws InterruptedException, ExecutionException { int s = state; if (s <= COMPLETING) s = awaitDone(false, 0L); return report(s); } private V report(int s) throws ExecutionException { Object x = outcome; if (s == NORMAL) return (V)x; if (s >= CANCELLED) throw new CancellationException(); throw new ExecutionException((Throwable)x); }

FuturTask.cancel()方法的實現:

public boolean cancel(boolean mayInterruptIfRunning) { if (!(state == NEW && UNSAFE.compareAndSwapInt(this, stateOffset, NEW, mayInterruptIfRunning ? INTERRUPTING : CANCELLED))) return false; try { // in case call to interrupt throws exception if (mayInterruptIfRunning) { try { Thread t = runner; if (t != null) t.interrupt(); } finally { // final state UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED); } } } finally { finishCompletion(); } return true; }
發表評論 共有條評論
用戶名: 密碼:
驗證碼: 匿名發表
主站蜘蛛池模板: 阿巴嘎旗| 临潭县| 万州区| 平陆县| 即墨市| 临高县| 日照市| 普洱| 育儿| 天峨县| 黑河市| 宿州市| 陇南市| 宁陕县| 页游| 玛纳斯县| 西丰县| 克什克腾旗| 岢岚县| 垣曲县| 桂阳县| 枞阳县| 甘谷县| 龙陵县| 松阳县| 盐边县| 高唐县| 镇远县| 乌兰县| 通辽市| 汾西县| 策勒县| 仁化县| 三门县| 西吉县| 驻马店市| 昔阳县| 安西县| 湘西| 永丰县| 灵川县|