Node.js的標準API沒有提供進程共享內存,然而通過IPC接口的send方法和對message事件的監聽,就可以實現一個多進程之間的協同機制,通過通信來操作共享內存。
##IPC的基本用法:
// worker進程 發送消息process.send(‘讀取共享內存'); // master進程 接收消息 -> 處理 -> 發送回信cluster.on('online', function (worker) { // 有worker進程建立,即開始監聽message事件 worker.on(‘message', function(data) { // 處理來自worker的請求 // 回傳結果 worker.send(‘result') });});
在Node.js中,通過send和on(‘message', callback)實現的IPC通信有幾個特點。首先,master和worker之間可以互相通信,而各個worker之間不能直接通信,但是worker之間可以通過master轉發實現間接通信。另外,通過send方法傳遞的數據,會先被JSON.stringify處理后再傳遞,接收后會再用JSON.parse解析。所以Buffer對象傳遞后會變成數組,而function則無法直接傳遞。反過來說,就是可以直接傳遞除了buffer和function之外的所有數據類型(已經很強大了,而且buffer和function也可以用變通的方法實現傳遞)。
基于以上特點,我們可以設計一個通過IPC來共享內存的方案:
1、worker進程作為共享內存的使用者,并不直接操作共享內存,而是通過send方法通知master進程進行寫入(set)或者讀取(get)操作。
2、master進程初始化一個Object對象作為共享內存,并根據worker發來的message,對Object的鍵值進行讀寫。
3、由于要使用跨進程通信,所以worker發起的set和get都是異步操作,master根據請求進行實際讀寫操作,然后將結果返回給worker(即把結果數據send給worker)。
##數據格式
為了實現進程間異步的讀寫功能,需要對通信數據的格式做一點規范。
首先是worker的請求數據:
requestMessage = { isSharedMemoryMessage: true, // 表示這是一次共享內存的操作通信 method: ‘set', // or ‘get' 操作的方法 id: cluster.worker.id, // 發起操作的進程(在一些特殊場景下,用于保證master可以回信) uuid: uuid, // 此次操作的(用于注冊/調用回調函數) key: key, // 要操作的鍵 value: value // 鍵對應的值(寫入)}master在接到數據后,會根據method執行相應操作,然后根據requestMessage.id將結果數據發給對應的worker,數據格式如下:
responseMessage = { isSharedMemoryMessage: true, // 標記這是一次共享內存通信 uuid: requestMessage.uuid, // 此次操作的唯一標示 value: value // 返回值。get操作為key對應的值,set操作為成功或失敗}規范數據格式的意義在于,master在接收到請求后,能夠將處理結果發送給對應的worker,而worker在接到回傳的結果后,能夠調用此次通信對應的callback,從而實現協同。
規范數據格式后,接下來要做的就是設計兩套代碼,分別用于master進程和worker進程,監聽通信并處理通信數據,實現共享內存的功能。
##User類
User類的實例在worker進程中工作,負責發送操作共享內存的請求,并監聽master的回信。
var User = function() { var self = this; self.__uuid__ = 0; // 緩存回調函數 self.__getCallbacks__ = {}; // 接收每次操作請求的回信 process.on('message', function(data) { if (!data.isSharedMemoryMessage) return; // 通過uuid找到相應的回調函數 var cb = self.__getCallbacks__[data.uuid]; if (cb && typeof cb == 'function') { cb(data.value) } // 卸載回調函數 self.__getCallbacks__[data.uuid] = undefined; });}; // 處理操作User.prototype.handle = function(method, key, value, callback) { var self = this; var uuid = self.__uuid__++; process.send({ isSharedMemoryMessage: true, method: method, id: cluster.worker.id, uuid: uuid, key: key, value: value }); // 注冊回調函數 self.__getCallbacks__[uuid] = callback; }; User.prototype.set = function(key, value, callback) { this.handle('set', key, value, callback);}; User.prototype.get = function(key, callback) { this.handle('get', key, null, callback);};##Manager類
Manager類的實例在master進程中工作,用于初始化一個Object作為共享內存,并根據User實例的請求,在共享內存中增加鍵值對,或者讀取鍵值,然后將結果發送回去。
var Manager = function() { var self = this; // 初始化共享內存 self.__sharedMemory__ = {}; // 監聽并處理來自worker的請求 cluster.on('online', function(worker) { worker.on('message', function(data) { // isSharedMemoryMessage是操作共享內存的通信標記 if (!data.isSharedMemoryMessage) return; self.handle(data); }); });}; Manager.prototype.handle = function(data) { var self = this; var value = this[data.method](data); var msg = { // 標記這是一次共享內存通信 isSharedMemoryMessage: true, // 此次操作的唯一標示 uuid: data.uuid, // 返回值 value: value }; cluster.workers[data.id].send(msg);}; // set操作返回ok表示成功Manager.prototype.set = function(data) { this.__sharedMemory__[data.key] = data.value; return 'OK';}; // get操作返回key對應的值Manager.prototype.get = function(data) { return this.__sharedMemory__[data.key];};##使用方法
if (cluster.isMaster) { // 初始化Manager的實例 var sharedMemoryManager = new Manager(); // fork第一個worker cluster.fork(); // 1秒后fork第二個worker setTimeout(function() { cluster.fork(); }, 1000); } else { // 初始化User類的實例 var sharedMemoryUser = new User(); if (cluster.worker.id == 1) { // 第一個worker向共享內存寫入一組數據,用a標記 sharedMemoryUser.set('a', [0, 1, 2, 3]); } if (cluster.worker.id == 2) { // 第二個worker從共享內存讀取a的值 sharedMemoryUser.get('a', function(data) { console.log(data); // => [0, 1, 2, 3] }); } }以上就是一個通過IPC通信實現的多進程共享內存功能,需要注意的是,這種方法是直接在master進程的內存里緩存數據,必須注意內存的使用情況,這里可以考慮加入一些簡單的淘汰策略,優化內存的使用。另外,如果單次讀寫的數據比較大,IPC通信的耗時也會相應增加。
完整代碼:https://github.com/x6doooo/sharedmemory
以上就是本文的全部內容,希望對大家的學習有所幫助,也希望大家多多支持VeVb武林網。
新聞熱點
疑難解答