using System;using System.Collections.Generic;using System.Linq;using System.Text;using System.Threading.Tasks;using System.Collections.Concurrent;using System.Threading;namespace 并發編程{ class PRogram { static ConcurrentQueue<Order> orderQueue = new ConcurrentQueue<Order>(); static void Main(string[] args) { ProductData<Order> productData = new ProductData<Order>(orderQueue); Thread.Sleep(TimeSpan.FromSeconds(2)); //C#并發編程(一) //單線程處理 //SingleThreadDo(); //多線程處理 //MutilThreadDo(); //并行處理 //ParallelDo(); //異步處理 AsyncDo(); while (true) { Thread.Sleep(TimeSpan.FromSeconds(1)); if (orderQueue.Count == 0) { Console.WriteLine("訂單全部處理完成"); } } } /// <summary> /// 異步 /// </summary> static void AsyncDo() { int maxTaskCount = 100; int currentTaskCount = 0; OrderMgr orderMgr = new OrderMgr(); Order order; while (true) { if (orderQueue.TryDequeue(out order)) { Console.WriteLine("當前還剩:{0}", orderQueue.Count); while (currentTaskCount > maxTaskCount) { } var delegateAct = new Action(() => { orderMgr.SaveOrder(order); }); delegateAct.BeginInvoke(new AsyncCallback(r => { delegateAct.EndInvoke(r); Interlocked.Decrement(ref currentTaskCount); }), null); Interlocked.Increment(ref currentTaskCount); } } } /// <summary> /// 并行處理 /// </summary> static void ParallelDo() { OrderMgr orderMgr = new OrderMgr(); Order order; while (orderQueue.Count > 0) { List<Order> orderList = new List<Order>(); if (orderQueue.TryDequeue(out order)) { orderList.Add(order); if (orderQueue.Count > 100) { for (int i = 0; i < 100; i++) { orderQueue.TryDequeue(out order); orderList.Add(order); } } //Parallel中的任務全部執行完成才會往下走 Parallel.ForEach(orderList, (p) => { orderMgr.SaveOrder(order); Console.WriteLine("當前還剩:{0}", orderQueue.Count); }); } } } /// <summary> /// 多線程處理 /// </summary> static void MutilThreadDo() { int maxTaskCount = 100; int currentTaskCount = 0; OrderMgr orderMgr = new OrderMgr(); Order order; while (orderQueue.Count > 0) { if (orderQueue.TryDequeue(out order)) { Console.WriteLine("當前還剩:{0}", orderQueue.Count); while (currentTaskCount > maxTaskCount) { } Task.Factory.StartNew(() => { orderMgr.SaveOrder(order); Interlocked.Decrement(ref currentTaskCount); }); Interlocked.Increment(ref currentTaskCount); } } } /// <summary> /// 單線程處理 /// </summary> static void SingleThreadDo() { OrderMgr orderMgr = new OrderMgr(); Order order; while (orderQueue.Count > 0) { if (orderQueue.TryDequeue(out order)) { Console.WriteLine("當前還剩:{0}", orderQueue.Count); orderMgr.SaveOrder(order); } } } } public class ProductData<T> where T : Order, new() { ConcurrentQueue<T> queue; public ProductData(ConcurrentQueue<T> queue) { this.queue = queue; Data(); } private void Data() { for (int j = 0; j < 100; j++) { queue.Enqueue(new T { ID = Guid.NewGuid(), NO = string.Format("J{0:00000000}", j) }); } var i = 1; Task.Factory.StartNew(() => { while (true) { queue.Enqueue(new T { ID = Guid.NewGuid(), NO = string.Format("{0:00000000}", i) }); i++; Thread.Sleep(TimeSpan.FromMilliseconds(100));//每1S生產10個訂單 } }); } } public class Order { public Guid ID { get; set; } public string NO { get; set; } } public class OrderMgr { public OrderMgr() { } public string SaveOrder(Order order) { Console.WriteLine("訂單:" + order.NO + ",處理完成"); Thread.Sleep(TimeSpan.FromSeconds(2));//假設每個訂單的處理時間為2S return order.NO; } }}
新聞熱點
疑難解答