public static void main(String[] args) throws InterruptedException{ final Basket basket = new Basket(); // 定義一個 producer Runnable producer = new Runnable() { public void run() { try { basket.produce(); } catch (InterruptedException ex) { ex.printStackTrace(); } } }; // 定義一個 consumer Runnable consumer = new Runnable() { public void run() { try { basket.consume(); } catch (InterruptedException ex) { ex.printStackTrace(); } } }; // 各產生 10 個 consumer 和 producer ExecutorService service = Executors.newCachedThreadPool(); for(int i=0; i < 10; i++) service.submit(consumer); Thread.sleep(2000); for(int i=0; i<10; i++) service.submit(producer); service.shutdown(); } }
5: Synchronizer :同步裝置 Java 5.0 里新加了 4 個協調線程間進程的同步裝置,它們分別是 Semaphore, CountDownLatch, CyclicBarrier 和 Exchanger. Semaphore: 用來管理一個資源池的工具, Semaphore 可以看成是個通行證,線程要想從資源池拿到資源必須先拿到通行證, Semaphore 提供的通行證數量和資源池的大小一致。如果線程暫時拿不到通行證,線程就會被阻斷進入等待狀態。以下是一個例子: public class Pool { ArrayList pool = null; Semaphore pass = null; public Pool(int size){ // 初始化資源池 pool = new ArrayList(); for(int i=0; i pool.add("Resource "+i); } //Semaphore 的大小和資源池的大小一致 pass = new Semaphore(size); } public String get() throws InterruptedException{ // 獲取通行證 , 只有得到通行證后才能得到資源 pass.acquire(); return getResource(); } public void put(String resource){ // 歸還通行證,并歸還資源 pass.release(); releaseResource(resource); } private synchronized String getResource() { String result = pool.get(0); pool.remove(0); System.out.println("Give out "+result); return result; } private synchronized void releaseResource(String resource) { System.out.println("return "+resource); pool.add(resource); } }
SemaphoreTest: public class SemaphoreTest { public static void main(String[] args){ final Pool aPool = new Pool(2); Runnable worker = new Runnable() { public void run() { String resource = null; try { // 取得 resource resource = aPool.get(); } catch (InterruptedException ex) { ex.printStackTrace(); } // 用 resource 做工作 System.out.println("I worked on "+resource); // 歸還 resource aPool.put(resource); } }; ExecutorService service = Executors.newCachedThreadPool(); for(int i=0; i<20; i++){ service.submit(worker); } service.shutdown(); } }
CountDownLatch: CountDownLatch 是個計數器,它有一個初始數,等待這個計數器的線程必須等到計數器倒數到零時才可繼續。比如說一個 Server 啟動時需要初始化 4 個部件, Server 可以同時啟動 4 個線程去初始化這 4 個部件,然后調用 CountDownLatch(4).await() 阻斷進入等待,每個線程完成任務后會調用一次 CountDownLatch.countDown() 來倒計數 , 當 4 個線程都結束時 CountDownLatch 的計數就會降低為 0 ,此時 Server 就會被喚醒繼續下一步操作。 CountDownLatch 的方法主要有: await() :使調用此方法的線程阻斷進入等待 countDown(): 倒計數,將計數值減 1 getCount(): 得到當前的計數值 CountDownLatch 的例子:一個 server 調了三個 ComponentThread 分別去啟動三個組件,然后 server 等到組件都啟動了再繼續。 public class Server { public static void main(String[] args) throws InterruptedException{ System.out.println("Server is starting."); // 初始化一個初始值為 3 的 CountDownLatch CountDownLatch latch = new CountDownLatch(3); // 起 3 個線程分別去啟動 3 個組件 ExecutorService service = Executors.newCachedThreadPool(); service.submit(new ComponentThread(latch, 1)); service.submit(new ComponentThread(latch, 2)); service.submit(new ComponentThread(latch, 3)); service.shutdown(); // 進入等待狀態 latch.await(); // 當所需的三個組件都完成時, Server 就可繼續了 System.out.println("Server is up!"); } }
public class ComponentThread implements Runnable{ CountDownLatch latch; int ID; /** Creates a new instance of ComponentThread */ public ComponentThread(CountDownLatch latch, int ID) { this.latch = latch; this.ID = ID; } public void run() { System.out.println("Component "+ID + " initialized!"); // 將計數減一 latch.countDown(); } }
運行結果: Server is starting. Component 1 initialized! Component 3 initialized! Component 2 initialized! Server is up!
CyclicBarrier: CyclicBarrier 類似于 CountDownLatch 也是個計數器,不同的是 CyclicBarrier 數的是調用了 CyclicBarrier.await() 進入等待的線程數,當線程數達到了 CyclicBarrier 初始時規定的數目時,所有進入等待狀態的線程被喚醒并繼續。 CyclicBarrier 就象它名字的意思一樣,可看成是個障礙,所有的線程必須到齊后才能一起通過這個障礙。 CyclicBarrier 初始時還可帶一個 Runnable 的參數,此 Runnable 任務在 CyclicBarrier 的數目達到后,所有其它線程被喚醒前被執行。 CyclicBarrier 提供以下幾個方法: await() :進入等待 getParties() :返回此 barrier 需要的線程數 reset() :將此 barrier 重置 以下是使用 CyclicBarrier 的一個例子:兩個線程分別在一個數組里放一個數,當這兩個線程都結束后,主線程算出數組里的數的和(這個例子比較無聊,我沒有想到更合適的例子) public class MainThread { public static void main(String[] args) throws InterruptedException, BrokenBarrierException, TimeoutException{ final int[] array = new int[2]; CyclicBarrier barrier = new CyclicBarrier(2, new Runnable() {// 在所有線程都到達 Barrier 時執行 public void run() { System.out.println("Total is:"+(array[0]+array[1])); } }); // 啟動線程 new Thread(new ComponentThread(barrier, array, 0)).start(); new Thread(new ComponentThread(barrier, array, 1)).start(); } }
public class ComponentThread implements Runnable{ CyclicBarrier barrier; int ID; int[] array; public ComponentThread(CyclicBarrier barrier, int[] array, int ID) { this.barrier = barrier; this.ID = ID; this.array = array; } public void run() { try { array[ID] = new Random().nextInt(); System.out.println(ID+ " generates:"+array[ID]); // 該線程完成了任務等在 Barrier 處 barrier.await(); } catch (BrokenBarrierException ex) { ex.printStackTrace(); } catch (InterruptedException ex) { ex.printStackTrace(); } } }