整理者:鄭昀@ultrapower
我們采用的是i/o complete port(以下簡稱iocp)處理機制。
簡單的講,當服務應用程序初始化時,它應該先創建一個i/o cp。我們在請求到來后,將得到的數據打包用postqueuedcompletionstatus發送到iocp中。這時需要創建一些個線程(7個線程/每cpu,再多就沒有意義了)來處理發送到iocp端口的消息。實現步驟大致如下:
1 先在主線程中調用createiocompletionport創建iocp。
createiocompletionport的前三個參數只在把設備同complete port相關聯時才有用。
此時我們只需傳遞invalid_handle_value,null和0即可。
第四個參數告訴端口同時能運行的最多線程數,這里設置為0,表示默認為當前計算機的cpu數目。
2 我們的threadfun線程函數執行一些初始化之后,將進入一個循環,該循環會在服務進程終止時才結束。
在循環中,調用getqueuedcompletionstatus,這樣就把當前線程的id放入一個等待線程隊列中,i/o cp內核對象就總能知道哪個線程在等待處理完成的i/o請求。
如果在idle_thread_timeout規定的時間內i/o cp上還沒有出現一個completion packet,則轉入下一次循環。在這里我們設置的idle_thread_timeout為1秒。
當端口的i/o完成隊列中出現一項時,完成端口就喚醒等待線程隊列中的這個線程,該線程將得到完成的i/o項中的信息: 傳輸的字節數、完成鍵和overlapped結構的地址。
在我們的程序中可以用智能指針或者bstr或者int來接受這個overlapped結構的地址的值,從而得到消息;然后在這個線程中處理消息。
getqueuedcompletionstatus的第一個參數hcompletionport指出了要監視哪一個端口,這里我們傳送先前從createiocompletionport返回的端口句柄。
需要注意的是:
第一, 線程池的數目是有限制的,和cpu數目有關系。
第二, iocp是一種較為完美的睡眠/喚醒 線程機制;線程當前沒有任務要處理時,就進入睡眠狀態,從而不占用cpu資源,直到被內核喚醒;
第三, 最近一次剛執行完的線程,下次任務來的時候還會喚醒它;所以有可能比較少被調用的線程以后被調用的幾率也少。
測試代碼:
using system;
using system.threading; // included for the thread.sleep call
using continuum.threading;
using system.runtime.interopservices;
namespace iocpdemo ![]()
![]()
{
//============================================================================= ![]()
/**//// <summary> sample class for the threading class </summary>
public class utilthreadingsample ![]()
{
//***************************************************************************** ![]()
/**//// <summary> test method </summary>
static void main() ![]()
{
// create the mssql iocp thread pool
iocpthreadpool pthreadpool = new iocpthreadpool(0, 10, 20, new iocpthreadpool.user_function(iocpthreadfunction));
//for(int i =1;i<10000;i++) ![]()
{
pthreadpool.postevent(1234);
}
thread.sleep(100);
pthreadpool.dispose();
}
//******************************************************************** ![]()
/**//// <summary> function to be called by the iocp thread pool. called when
/// a command is posted for processing by the socketmanager </summary>
/// <param name="ivalue"> the value provided by the thread posting the event </param>
static public void iocpthreadfunction(int ivalue) ![]()
{
try ![]()
{
console.writeline("value: {0}", ivalue.tostring());
thread.sleep(3000);
}
catch (exception pexception) ![]()
{
console.writeline(pexception.message);
}
}
}
}
類代碼:
using system;
using system.threading;
using system.runtime.interopservices;
namespace iocpthreading ![]()
![]()
{
[structlayout(layoutkind.sequential, charset=charset.auto)]
public sealed class iocpthreadpool ![]()
{
[dllimport("kernel32", charset=charset.auto)]
private unsafe static extern uint32 createiocompletionport(uint32 hfile, uint32 hexistingcompletionport, uint32* puicompletionkey, uint32 uinumberofconcurrentthreads);
[dllimport("kernel32", charset=charset.auto)]
private unsafe static extern boolean closehandle(uint32 hobject);
[dllimport("kernel32", charset=charset.auto)]
private unsafe static extern boolean postqueuedcompletionstatus(uint32 hcompletionport, uint32 uisizeofargument, uint32* puiuserarg, system.threading.nativeoverlapped* poverlapped);
[dllimport("kernel32", charset=charset.auto)]
private unsafe static extern boolean getqueuedcompletionstatus(uint32 hcompletionport, uint32* psizeofargument, uint32* puiuserarg, system.threading.nativeoverlapped** ppoverlapped, uint32 uimilliseconds);
private const uint32 invalid_handle_value = 0xffffffff;
private const uint32 inifinite = 0xffffffff;
private const int32 shutdown_iocpthread = 0x7fffffff;
public delegate void user_function(int ivalue);
private uint32 m_hhandle; ![]()
private uint32 gethandle
{ get
{ return m_hhandle; } set
{ m_hhandle = value; } }
private int32 m_uimaxconcurrency;
![]()
private int32 getmaxconcurrency
{ get
{ return m_uimaxconcurrency; } set
{ m_uimaxconcurrency = value; } }
private int32 m_iminthreadsinpool;
![]()
private int32 getminthreadsinpool
{ get
{ return m_iminthreadsinpool; } set
{ m_iminthreadsinpool = value; } }
private int32 m_imaxthreadsinpool;
![]()
private int32 getmaxthreadsinpool
{ get
{ return m_imaxthreadsinpool; } set
{ m_imaxthreadsinpool = value; } }
private object m_pcriticalsection;
![]()
private object getcriticalsection
{ get
{ return m_pcriticalsection; } set
{ m_pcriticalsection = value; } }
private user_function m_pfnuserfunction;
![]()
private user_function getuserfunction
{ get
{ return m_pfnuserfunction; } set
{ m_pfnuserfunction = value; } }
private boolean m_bdisposeflag;
![]()
/**//// <summary> simtype: flag to indicate if the class is disposing </summary>
![]()
private boolean isdisposed
{ get
{ return m_bdisposeflag; } set
{ m_bdisposeflag = value; } }
private int32 m_icurthreadsinpool;
![]()
/**//// <summary> simtype: the current number of threads in the thread pool </summary>
![]()
public int32 getcurthreadsinpool
{ get
{ return m_icurthreadsinpool; } set
{ m_icurthreadsinpool = value; } }
![]()
/**//// <summary> simtype: increment current number of threads in the thread pool </summary>
![]()
private int32 inccurthreadsinpool()
{ return interlocked.increment(ref m_icurthreadsinpool); }
![]()
/**//// <summary> simtype: decrement current number of threads in the thread pool </summary>
![]()
private int32 deccurthreadsinpool()
{ return interlocked.decrement(ref m_icurthreadsinpool); }
private int32 m_iactthreadsinpool;
![]()
/**//// <summary> simtype: the current number of active threads in the thread pool </summary>
![]()
public int32 getactthreadsinpool
{ get
{ return m_iactthreadsinpool; } set
{ m_iactthreadsinpool = value; } }
![]()
/**//// <summary> simtype: increment current number of active threads in the thread pool </summary>
![]()
private int32 incactthreadsinpool()
{ return interlocked.increment(ref m_iactthreadsinpool); }
![]()
/**//// <summary> simtype: decrement current number of active threads in the thread pool </summary>
![]()
private int32 decactthreadsinpool()
{ return interlocked.decrement(ref m_iactthreadsinpool); }
private int32 m_icurworkinpool;
![]()
/**//// <summary> simtype: the current number of work posted in the thread pool </summary>
![]()
public int32 getcurworkinpool
{ get
{ return m_icurworkinpool; } set
{ m_icurworkinpool = value; } }
![]()
/**//// <summary> simtype: increment current number of work posted in the thread pool </summary>
![]()
private int32 inccurworkinpool()
{ return interlocked.increment(ref m_icurworkinpool); }
![]()
/**//// <summary> simtype: decrement current number of work posted in the thread pool </summary>
![]()
private int32 deccurworkinpool()
{ return interlocked.decrement(ref m_icurworkinpool); }
public iocpthreadpool(int32 imaxconcurrency, int32 iminthreadsinpool, int32 imaxthreadsinpool, user_function pfnuserfunction) ![]()
{
try ![]()
{
// set initial class state
getmaxconcurrency = imaxconcurrency;
getminthreadsinpool = iminthreadsinpool;
getmaxthreadsinpool = imaxthreadsinpool;
getuserfunction = pfnuserfunction;
// init the thread counters
getcurthreadsinpool = 0;
getactthreadsinpool = 0;
getcurworkinpool = 0;
// initialize the monitor object
getcriticalsection = new object();
// set the disposing flag to false
isdisposed = false;
unsafe ![]()
{
// create an io completion port for thread pool use
gethandle = createiocompletionport(invalid_handle_value, 0, null, (uint32) getmaxconcurrency);
}
// test to make sure the io completion port was created
if (gethandle == 0)
throw new exception("unable to create io completion port");
// allocate and start the minimum number of threads specified
int32 istartingcount = getcurthreadsinpool;
threadstart tsthread = new threadstart(iocpfunction);
for (int32 ithread = 0; ithread < getminthreadsinpool; ++ithread) ![]()
{
// create a thread and start it
thread ththread = new thread(tsthread);
ththread.name = "iocp " + ththread.gethashcode();
ththread.start();
// increment the thread pool count
inccurthreadsinpool();
}
}
catch ![]()
{
throw new exception("unhandled exception");
}
}
~iocpthreadpool() ![]()
{
if (!isdisposed)
dispose();
}
public void dispose() ![]()
{
try ![]()
{
// flag that we are disposing this object
isdisposed = true;
// get the current number of threads in the pool
int32 icurthreadsinpool = getcurthreadsinpool;
// shutdown all thread in the pool
for (int32 ithread = 0; ithread < icurthreadsinpool; ++ithread) ![]()
{
unsafe ![]()
{
bool bret = postqueuedcompletionstatus(gethandle, 4, (uint32*) shutdown_iocpthread, null);
}
}
// wait here until all the threads are gone
while (getcurthreadsinpool != 0) thread.sleep(100);
unsafe ![]()
{
// close the iocp handle
closehandle(gethandle);
}
}
catch ![]()
{
}
}
private void iocpfunction() ![]()
{
uint32 uinumberofbytes;
int32 ivalue;
try ![]()
{
while (true) ![]()
{
unsafe ![]()
{
system.threading.nativeoverlapped* pov;
// wait for an event
getqueuedcompletionstatus(gethandle, &uinumberofbytes, (uint32*) &ivalue, &pov, inifinite);
}
// decrement the number of events in queue
deccurworkinpool();
// was this thread told to shutdown
if (ivalue == shutdown_iocpthread)
break;
// increment the number of active threads
incactthreadsinpool();
try ![]()
{
// call the user function
getuserfunction(ivalue);
}
catch(exception ex) ![]()
{
throw ex;
}
// get a lock
monitor.enter(getcriticalsection);
try ![]()
{
// if we have less than max threads currently in the pool
if (getcurthreadsinpool < getmaxthreadsinpool) ![]()
{
// should we add a new thread to the pool
if (getactthreadsinpool == getcurthreadsinpool) ![]()
{
if (isdisposed == false) ![]()
{
// create a thread and start it
threadstart tsthread = new threadstart(iocpfunction);
thread ththread = new thread(tsthread);
ththread.name = "iocp " + ththread.gethashcode();
ththread.start();
// increment the thread pool count
inccurthreadsinpool();
}
}
}
}
catch ![]()
{
}
// relase the lock
monitor.exit(getcriticalsection);
// increment the number of active threads
decactthreadsinpool();
}
}
catch(exception ex) ![]()
{
string str=ex.message;
}
// decrement the thread pool count
deccurthreadsinpool();
}
//public void postevent(int32 ivalue
public void postevent(int ivalue) ![]()
{
try ![]()
{
// only add work if we are not disposing
if (isdisposed == false) ![]()
{
unsafe ![]()
{
// post an event into the iocp thread pool
postqueuedcompletionstatus(gethandle, 4, (uint32*) ivalue, null);
}
// increment the number of item of work
inccurworkinpool();
// get a lock
monitor.enter(getcriticalsection);
try ![]()
{
// if we have less than max threads currently in the pool
if (getcurthreadsinpool < getmaxthreadsinpool) ![]()
{
// should we add a new thread to the pool
if (getactthreadsinpool == getcurthreadsinpool) ![]()
{
if (isdisposed == false) ![]()
{
// create a thread and start it
threadstart tsthread = new threadstart(iocpfunction);
thread ththread = new thread(tsthread);
ththread.name = "iocp " + ththread.gethashcode();
ththread.start();
// increment the thread pool count
inccurthreadsinpool();
}
}
}
}
catch ![]()
{
}
// release the lock
monitor.exit(getcriticalsection);
}
}
catch (exception e) ![]()
{
throw e;
}
catch ![]()
{
throw new exception("unhandled exception");
}
}
public void postevent() ![]()
{
try ![]()
{
// only add work if we are not disposing
if (isdisposed == false) ![]()
{
unsafe ![]()
{
// post an event into the iocp thread pool
postqueuedcompletionstatus(gethandle, 0, null, null);
}
// increment the number of item of work
inccurworkinpool();
// get a lock
monitor.enter(getcriticalsection);
try
![]()
{
// if we have less than max threads currently in the pool
if (getcurthreadsinpool < getmaxthreadsinpool)
![]()
{
// should we add a new thread to the pool
if (getactthreadsinpool == getcurthreadsinpool)
![]()
{
if (isdisposed == false)
![]()
{
// create a thread and start it
threadstart tsthread = new threadstart(iocpfunction);
thread ththread = new thread(tsthread);
ththread.name = "iocp " + ththread.gethashcode();
ththread.start();
// increment the thread pool count
inccurthreadsinpool();
}
}
}
}
catch
![]()
{
}
// release the lock
monitor.exit(getcriticalsection);
}
}
catch
![]()
{
throw new exception("unhandled exception");
}
}
}
}
,歡迎訪問網頁設計愛好者web開發。
新聞熱點
疑難解答