国产探花免费观看_亚洲丰满少妇自慰呻吟_97日韩有码在线_资源在线日韩欧美_一区二区精品毛片,辰东完美世界有声小说,欢乐颂第一季,yy玄幻小说排行榜完本

首頁 > 開發 > 綜合 > 正文

[C#]I/O完成端口的類定義和測試實例

2024-07-21 02:17:43
字體:
來源:轉載
供稿:網友
從william kennedy那里整理過來的,不同之處在于他自己定義了一個overlapped,而我們這里直接使用
system.threading.nativeoverlapped.附一段我以前的win32下的iocp文檔,如果您了解iocp也可以直接跳過看后面的c#測試示范:

 

整理者:鄭昀@ultrapower

 

我們采用的是i/o complete port(以下簡稱iocp)處理機制。

簡單的講,當服務應用程序初始化時,它應該先創建一個i/o cp。我們在請求到來后,將得到的數據打包用postqueuedcompletionstatus發送到iocp中。這時需要創建一些個線程(7個線程/每cpu,再多就沒有意義了)來處理發送到iocp端口的消息。實現步驟大致如下:

1     先在主線程中調用createiocompletionport創建iocp。

createiocompletionport的前三個參數只在把設備同complete port相關聯時才有用。

此時我們只需傳遞invalid_handle_value,null和0即可。

第四個參數告訴端口同時能運行的最多線程數,這里設置為0,表示默認為當前計算機的cpu數目。

2     我們的threadfun線程函數執行一些初始化之后,將進入一個循環,該循環會在服務進程終止時才結束。

在循環中,調用getqueuedcompletionstatus,這樣就把當前線程的id放入一個等待線程隊列中,i/o cp內核對象就總能知道哪個線程在等待處理完成的i/o請求。

如果在idle_thread_timeout規定的時間內i/o cp上還沒有出現一個completion packet,則轉入下一次循環。在這里我們設置的idle_thread_timeout為1秒。

 

當端口的i/o完成隊列中出現一項時,完成端口就喚醒等待線程隊列中的這個線程,該線程將得到完成的i/o項中的信息:       傳輸的字節數、完成鍵和overlapped結構的地址。

 

在我們的程序中可以用智能指針或者bstr或者int來接受這個overlapped結構的地址的值,從而得到消息;然后在這個線程中處理消息。

getqueuedcompletionstatus的第一個參數hcompletionport指出了要監視哪一個端口,這里我們傳送先前從createiocompletionport返回的端口句柄。

 

需要注意的是:

第一,   線程池的數目是有限制的,和cpu數目有關系。

第二,   iocp是一種較為完美的睡眠/喚醒 線程機制;線程當前沒有任務要處理時,就進入睡眠狀態,從而不占用cpu資源,直到被內核喚醒;

第三,   最近一次剛執行完的線程,下次任務來的時候還會喚醒它;所以有可能比較少被調用的線程以后被調用的幾率也少。

 
測試代碼:

using system;
using system.threading;  // included for the thread.sleep call
using continuum.threading;
using system.runtime.interopservices;

namespace iocpdemo
{
    //=============================================================================
    /**//// <summary> sample class for the threading class </summary>
    public class utilthreadingsample
    {
        //*****************************************************************************   
        /**//// <summary> test method </summary>
        static void main()
        {
            // create the mssql iocp thread pool
            iocpthreadpool pthreadpool = new iocpthreadpool(0, 10, 20, new iocpthreadpool.user_function(iocpthreadfunction));
      
            //for(int i =1;i<10000;i++)
            {
                pthreadpool.postevent(1234);
            }
      
            thread.sleep(100);
      
            pthreadpool.dispose();
        }
    
        //********************************************************************
        /**//// <summary> function to be called by the iocp thread pool.  called when
        ///           a command is posted for processing by the socketmanager </summary>
        /// <param name="ivalue"> the value provided by the thread posting the event </param>
        static public void iocpthreadfunction(int ivalue)
        {
            try
            {
                console.writeline("value: {0}", ivalue.tostring());
                thread.sleep(3000);
            }
      
            catch (exception pexception)
            {
                console.writeline(pexception.message);
            }
        }
    }

}


類代碼:
using system;
using system.threading;
using system.runtime.interopservices;

namespace iocpthreading
{
    [structlayout(layoutkind.sequential, charset=charset.auto)]

    public sealed class iocpthreadpool
    {
        [dllimport("kernel32", charset=charset.auto)]
        private unsafe static extern uint32 createiocompletionport(uint32 hfile, uint32 hexistingcompletionport, uint32* puicompletionkey, uint32 uinumberofconcurrentthreads);

        [dllimport("kernel32", charset=charset.auto)]
        private unsafe static extern boolean closehandle(uint32 hobject);

        [dllimport("kernel32", charset=charset.auto)]
        private unsafe static extern boolean postqueuedcompletionstatus(uint32 hcompletionport, uint32 uisizeofargument, uint32* puiuserarg, system.threading.nativeoverlapped* poverlapped);

        [dllimport("kernel32", charset=charset.auto)]
        private unsafe static extern boolean getqueuedcompletionstatus(uint32 hcompletionport, uint32* psizeofargument, uint32* puiuserarg, system.threading.nativeoverlapped** ppoverlapped, uint32 uimilliseconds);

        private const uint32 invalid_handle_value = 0xffffffff;
        private const uint32 inifinite = 0xffffffff;
        private const int32 shutdown_iocpthread = 0x7fffffff;
        public delegate void user_function(int ivalue);
        private uint32 m_hhandle;
        private uint32 gethandle { get { return m_hhandle; } set { m_hhandle = value; } }

        private int32 m_uimaxconcurrency;

        private int32 getmaxconcurrency { get { return m_uimaxconcurrency; } set { m_uimaxconcurrency = value; } }


        private int32 m_iminthreadsinpool;

        private int32 getminthreadsinpool { get { return m_iminthreadsinpool; } set { m_iminthreadsinpool = value; } }

        private int32 m_imaxthreadsinpool;

        private int32 getmaxthreadsinpool { get { return m_imaxthreadsinpool; } set { m_imaxthreadsinpool = value; } }


        private object m_pcriticalsection;

        private object getcriticalsection { get { return m_pcriticalsection; } set { m_pcriticalsection = value; } }


        private user_function m_pfnuserfunction;

        private user_function getuserfunction { get { return m_pfnuserfunction; } set { m_pfnuserfunction = value; } }


        private boolean m_bdisposeflag;

        /**//// <summary> simtype: flag to indicate if the class is disposing </summary>

        private boolean isdisposed { get { return m_bdisposeflag; } set { m_bdisposeflag = value; } }

        private int32 m_icurthreadsinpool;

        /**//// <summary> simtype: the current number of threads in the thread pool </summary>

        public int32 getcurthreadsinpool { get { return m_icurthreadsinpool; } set { m_icurthreadsinpool = value; } }

        /**//// <summary> simtype: increment current number of threads in the thread pool </summary>

        private int32 inccurthreadsinpool() { return interlocked.increment(ref m_icurthreadsinpool); }

        /**//// <summary> simtype: decrement current number of threads in the thread pool </summary>

        private int32 deccurthreadsinpool() { return interlocked.decrement(ref m_icurthreadsinpool); }


        private int32 m_iactthreadsinpool;

        /**//// <summary> simtype: the current number of active threads in the thread pool </summary>

        public int32 getactthreadsinpool { get { return m_iactthreadsinpool; } set { m_iactthreadsinpool = value; } }

        /**//// <summary> simtype: increment current number of active threads in the thread pool </summary>

        private int32 incactthreadsinpool() { return interlocked.increment(ref m_iactthreadsinpool); }

        /**//// <summary> simtype: decrement current number of active threads in the thread pool </summary>

        private int32 decactthreadsinpool() { return interlocked.decrement(ref m_iactthreadsinpool); }


        private int32 m_icurworkinpool;

        /**//// <summary> simtype: the current number of work posted in the thread pool </summary>

        public int32 getcurworkinpool { get { return m_icurworkinpool; } set { m_icurworkinpool = value; } }

        /**//// <summary> simtype: increment current number of work posted in the thread pool </summary>

        private int32 inccurworkinpool() { return interlocked.increment(ref m_icurworkinpool); }

        /**//// <summary> simtype: decrement current number of work posted in the thread pool </summary>

        private int32 deccurworkinpool() { return interlocked.decrement(ref m_icurworkinpool); }

        public iocpthreadpool(int32 imaxconcurrency, int32 iminthreadsinpool, int32 imaxthreadsinpool, user_function pfnuserfunction)
        {
            try
            {
                // set initial class state

                getmaxconcurrency   = imaxconcurrency;

                getminthreadsinpool = iminthreadsinpool;

                getmaxthreadsinpool = imaxthreadsinpool;

                getuserfunction     = pfnuserfunction;


                // init the thread counters

                getcurthreadsinpool = 0;

                getactthreadsinpool = 0;

                getcurworkinpool    = 0;


                // initialize the monitor object

                getcriticalsection = new object();


                // set the disposing flag to false

                isdisposed = false;


                unsafe
                {

                    // create an io completion port for thread pool use
                    gethandle = createiocompletionport(invalid_handle_value, 0, null, (uint32) getmaxconcurrency);

                }


                // test to make sure the io completion port was created

                if (gethandle == 0)

                    throw new exception("unable to create io completion port");


                // allocate and start the minimum number of threads specified

                int32 istartingcount = getcurthreadsinpool;

        

                threadstart tsthread = new threadstart(iocpfunction);

                for (int32 ithread = 0; ithread < getminthreadsinpool; ++ithread)
                {

                    // create a thread and start it

                    thread ththread = new thread(tsthread);

                    ththread.name = "iocp " + ththread.gethashcode();

                    ththread.start();


                    // increment the thread pool count

                    inccurthreadsinpool();

                }

            }


            catch
            {

                throw new exception("unhandled exception");

            }

        }

        ~iocpthreadpool()
        {

            if (!isdisposed)

                dispose();

        }

        public void dispose()
        {

            try
            {

                // flag that we are disposing this object

                isdisposed = true;


                // get the current number of threads in the pool

                int32 icurthreadsinpool = getcurthreadsinpool;


                // shutdown all thread in the pool

                for (int32 ithread = 0; ithread < icurthreadsinpool; ++ithread)
                {
                    unsafe
                    {

                        bool bret = postqueuedcompletionstatus(gethandle, 4, (uint32*) shutdown_iocpthread, null);

                    }

                }


                // wait here until all the threads are gone

                while (getcurthreadsinpool != 0) thread.sleep(100);


                unsafe
                {

                    // close the iocp handle
                    closehandle(gethandle);

                }

            }

            catch
            {

            }

        }
        private void iocpfunction()
        {
            uint32 uinumberofbytes;

            int32  ivalue;

            try
            {
                while (true)
                {

                    unsafe
                    {

                        system.threading.nativeoverlapped* pov;


                        // wait for an event

                        getqueuedcompletionstatus(gethandle, &uinumberofbytes, (uint32*) &ivalue, &pov, inifinite);
                    }

                    // decrement the number of events in queue

                    deccurworkinpool();


                    // was this thread told to shutdown

                    if (ivalue == shutdown_iocpthread)

                        break;


                    // increment the number of active threads

                    incactthreadsinpool();


                    try
                    {
                        // call the user function
                        getuserfunction(ivalue);

                    }

                    catch(exception ex)
                    {
                        throw ex;
                    }


                    // get a lock

                    monitor.enter(getcriticalsection);


                    try
                    {

                        // if we have less than max threads currently in the pool

                        if (getcurthreadsinpool < getmaxthreadsinpool)
                        {

                            // should we add a new thread to the pool

                            if (getactthreadsinpool == getcurthreadsinpool)
                            {

                                if (isdisposed == false)
                                {

                                    // create a thread and start it

                                    threadstart tsthread = new threadstart(iocpfunction);

                                    thread ththread = new thread(tsthread);

                                    ththread.name = "iocp " + ththread.gethashcode();

                                    ththread.start();


                                    // increment the thread pool count

                                    inccurthreadsinpool();

                                }

                            }

                        }

                    }

                    catch
                    {

                    }


                    // relase the lock

                    monitor.exit(getcriticalsection);


                    // increment the number of active threads

                    decactthreadsinpool();

                }

            }


            catch(exception ex)
            {
                string str=ex.message;

            }


            // decrement the thread pool count

            deccurthreadsinpool();

        }

        //public void postevent(int32 ivalue
        public void postevent(int ivalue)
        {

            try
            {

                // only add work if we are not disposing

                if (isdisposed == false)
                {

                    unsafe
                    {

                        // post an event into the iocp thread pool

                        postqueuedcompletionstatus(gethandle, 4, (uint32*) ivalue, null);

                    }


                    // increment the number of item of work

                    inccurworkinpool();


                    // get a lock

                    monitor.enter(getcriticalsection);


                    try
                    {

                        // if we have less than max threads currently in the pool

                        if (getcurthreadsinpool < getmaxthreadsinpool)
                        {

                            // should we add a new thread to the pool

                            if (getactthreadsinpool == getcurthreadsinpool)
                            {

                                if (isdisposed == false)
                                {

                                    // create a thread and start it

                                    threadstart tsthread = new threadstart(iocpfunction);

                                    thread ththread = new thread(tsthread);

                                    ththread.name = "iocp " + ththread.gethashcode();

                                    ththread.start();


                                    // increment the thread pool count

                                    inccurthreadsinpool();

                                }

                            }

                        }

                    }


                    catch
                    {

                    }


                    // release the lock

                    monitor.exit(getcriticalsection);

                }

            }


            catch (exception e)
            {

                throw e;

            }


            catch
            {

                throw new exception("unhandled exception");

            }

        }  

        public void postevent()
        {

            try
            {

                // only add work if we are not disposing

                if (isdisposed == false)
                {

                    unsafe
                    {

                        // post an event into the iocp thread pool

                        postqueuedcompletionstatus(gethandle, 0, null, null);

                    }


                    // increment the number of item of work

                    inccurworkinpool();


                    // get a lock

                    monitor.enter(getcriticalsection);


                    try

                    {

                        // if we have less than max threads currently in the pool

                        if (getcurthreadsinpool < getmaxthreadsinpool)

                        {

                            // should we add a new thread to the pool

                            if (getactthreadsinpool == getcurthreadsinpool)

                            {

                                if (isdisposed == false)

                                {

                                    // create a thread and start it

                                    threadstart tsthread = new threadstart(iocpfunction);

                                    thread ththread = new thread(tsthread);

                                    ththread.name = "iocp " + ththread.gethashcode();

                                    ththread.start();


                                    // increment the thread pool count

                                    inccurthreadsinpool();

                                }

                            }

                        }

                    }


                    catch

                    {

                    }


                    // release the lock

                    monitor.exit(getcriticalsection);

                }

            }

            catch

            {

                throw new exception("unhandled exception");

            }

        }

    }

}
,歡迎訪問網頁設計愛好者web開發。

發表評論 共有條評論
用戶名: 密碼:
驗證碼: 匿名發表
主站蜘蛛池模板: 昔阳县| 临汾市| 惠州市| 滦平县| 海盐县| 伊宁县| 罗源县| 江孜县| 潜山县| 宁武县| 南城县| 保定市| 镶黄旗| 米林县| 吉林省| 龙口市| 县级市| 石林| 会东县| 大英县| 库尔勒市| 汨罗市| 浦城县| 手机| 酒泉市| 南安市| 靖边县| 昌邑市| 景谷| 松江区| 垫江县| 琼结县| 宜兰市| 临洮县| 河南省| 双江| 红安县| 九龙县| 道真| 泉州市| 山丹县|