摘要
在企業中,許多計算機由于在其上執行工作的性質而未得到充分利用,或者因為過了上班時間而干脆得不到使用。在許多情況下,應用服務器耗光了寶貴的CPU(尤其是在執行CPU密集型的數學運算時),而網絡上的其他計算機則閑置一旁。本文提出了一種框架,用于把java消息服務(Java Messaging Service,JMS)客戶端放置在這些未充分利用的計算機上,以分擔一些通常應由服務器執行的工作。該客戶端可以監聽某個要執行的工作單元的請求隊列,然后在應答隊列中做出響應。此外,本文還給出了一個BEA WebLogic Integration 8.1架構的例子,它通過把一個工作流以及相關的Java控件用作替代框架,把工作分發到遠程客戶端上,從而把工作單元可靠地分發給JMS請求隊列。
簡介
本文提出了一種J2EE框架,用于解決把工作分配給未充分利用的計算機資源這個難題。具體來說,可以把JMS客戶端放置在這些未充分利用的計算機上,從而分擔一些通常應由服務器執行的工作。該客戶端可以監聽某個要執行的工作單元的請求隊列,然后在應答隊列上做出響應。可以使用一組消息驅動bean獲取應答隊列上的響應消息,以便進行進一步的處理。此外,還可以使用一種servlet實現來治理性地啟動用于創建(要發送給JMS客戶端的)工作單元的整個子流程,并使用它來終止這個子流程。
我使用常見的BEA WebLogic Server作為把離散的工作單元分配給分布式JMS客戶端的例子。在另一個更為復雜的例子中,BEA WebLogic Integration (WLI)工作流也執行類似的分發任務,但是通過對請求隊列進行監控,它在靈活性、Java控件的可重用性和可伸縮性等方面要更好一些。
使用的例子
業內有相當多的例子可以演示如何使用JMS框架來充分利用計算機進行并行處理。例如:
一個銀行應用程序可以實現抵押貸款,并以不同的比率和年份執行幾類利息計算,從而為信貸官員提供與每個申請者相關的、可能影響借貸類型的數據。所有不同種類的計算可以按照申請者分配給可用的計算機來執行,然后把結果返回給應用服務器儲存起來。
一個記帳系統可以從數據庫讀取記錄,然后重新計算記錄中的數字,以求做到更加精確。對于每條記錄,它可能需要連接到業務用戶的本地系統中以獲得輔助數據,這可能需要幾秒鐘。假如順序執行,當涉及到的記錄上千時,這種方法不僅很慢,而且可能進一步延長服務器線程等待從各地返回響應的時間。通過把這些工作分發給JMS客戶端,不僅可以并行完成處理,而且還可以節省服務器線程。
一個天氣預告系統或線性優化系統可能需要操縱或執行矩陣乘法。隨著矩陣的大小和數量逐步增加,服務器CPU的負擔也隨之加重。假如這種情況經常發生,那么通過把矩陣操作和乘法分發給其他計算機上的JMS客戶端,服務器的CPU就可以節省下來用于其他工作。
使用常規的WebLogic Server來分發工作單元
借鑒最后一個例子,我將構建一個簡單的例子,用于執行矩陣乘法,同時說明如何使用JMS框架把計算工作分發給企業中的計算機資源。JMS客戶端將收到一個工作單元實例,之后,它將會調用其doWork()方法。在這個簡單的例子中,doWork()方法將把2個3×3的矩陣相乘,然后把結果保存到一個結果矩陣中。
接著,JMS客戶端使用工作單元實例的一個副本(工作是在這上面執行的)對應答隊列做出響應。一個消息驅動bean將接受已完成的工作。圖1說明了我將要討論的各個組件:

圖 1.該WebLogic Server實現中的各個組件
這種方法以常規方式使用了JMS系統。在下面的內容中,我將引入一些代碼,并考慮幾個擴展問題。
工作單元類
JMS請求隊列上的每個類都將實現一個UnitOfWork接口,該接口有一個非凡有趣的方法,叫做doWork():
public interface UnitOfWork extends java.io.Serializable {
// This method executes itself on the client machine
public void doWork();
// This method PRints the current contents of performed work
public void print();
// This method stores the instance into a backing store
public void store();
}
在我們這個簡潔而直觀的例子中,我使用一個SimpleMatri類實現了UnitOfWork接口:
public class SimpleMatrix implements UnitOfWork {
private Integer m1[][];
private Integer m2[][];
private Integer result[][];
private Integer rows = new Integer(3);
private Integer cols = new Integer(3);
// May initialize m1 and m2 by locating records from a database
public SimpleMatrix() {
}
// This method actually multiplies m1 x m2 and stores in result
public void doWork() {
}
// This method stores result into a backing store
public void store() {
}
// This method prints the current contents of result
public void print() {
}
}
方法的實現相當簡單,限于文章的篇幅,這里就不再進行說明。請參見所附的示例代碼,其中給出了完整實現。這里的要點在于,這個SimpleMatrix實例被傳遞給一個JMS客戶端,該客戶端只要調用doWork()即可利用其CPU來執行工作。對于這個例子,我不會實際從數據庫中檢索矩陣或者把矩陣保存到數據庫中,但是在實際應用中,這是必須完成的工作。
Servlet工作創建程序
可以使用一個servlet來創建這些UnitOfWork實例。盡管WebLogic Server啟動類可以執行同樣的功能,但出于治理的目的,從安全的Web瀏覽器發送消息給servlet要更加輕易。(另一種可選的實現是使用Web服務。)假如在servlet上安置了安全性,通過身份驗證的用戶可以在查詢字符串中傳遞命令,以便開始交付工作單元給JMS請求隊列或停止交付。我將給出一個servlet的主干例子,以說明其中的一些有用方法。
public class WorkServlet extends HttpServlet {
...
private QueueSender qsender;
private ObjectMessage msg;
private int numMessages = 5;
...
// This places the unit of work on the request queue
public synchronized boolean sendMessages(int numberOfMessages,
PrintWriter o) {
for(int i=0; i
JMS客戶端類的任務僅僅是接受請求隊列上的消息,調用對象上的doWork()方法在這臺計算機上執行工作,然后把結果返回給應答隊列,消息驅動bean從應答隊列中獲取結果,以便進行進一步的處理和保存。可以檢查它是否是文本消息,然后告訴客戶端停止處理,從而答應發送控件消息給客戶端。當然,在實際應用中,消息可能包含客戶端的名稱,這樣就不會造成所有的客戶端都停止處理。
使用UnitOfWork接口的優點在于,JMS客戶端只要編寫一次,就可以用于以后實現該接口的任何類。這使得JMS客戶端具有很大的通用性,可以不加修改地應用到許多不同的場景中。只需把編譯后的UnitOfWork接口以及它所有的實現類都包含在客戶端的類路徑中。
在這個簡化模型中,客戶端需要等待消息到達以啟動處理。在實際情況中,客戶端的方法等待的條件可以是一天中的某個時刻,比如下午5點,或者是計算機的CPU負載低于某個閾值。需要把此類邏輯添加到客戶端,使之與計算機的使用安排更加一致。
消息驅動bean接收程序
消息驅動bean實例將監聽應答隊列,看看有沒有已完成的工作對象單元。下面給出一個例子的主干部分:public class MessageWorkBean implements MessageDrivenBean,
MessageListener {
...
// This method will receive a unit of work object to store
public void onMessage(Message msg) {
ObjectMessage om = (ObjectMessage) msg;
try {
UnitOfWork unit = (UnitOfWork)om.getObject();
unit.print();
unit.store();
}
catch(JMSException ex) {
log("Message Driven Bean: Could not retrieve Unit of Work.");
ex.printStackTrace();
}
}
}
考慮使用一個大小可以調整的消息驅動bean池來處理響應。
假如請求隊列沒有外部使用者,應該創建一些消息驅動bean來使用服務器上的請求隊列。這與本文的主旨不相符,但是可以防止隊列溢出,或者在沒有使用者的情況下請求隊列利用不充分。
假如存在多種類型的工作單元,那么每種類型都應該有自己的請求和響應隊列。
對于WebLogic Server,考慮使用JMS頁面調度技術,以便防止當隊列中存在過多沒有及時使用的消息時,出現內存不足問題。
對于WebLogic Server,假如生產者(servlet)生產出過多沒有使用的工作,考慮使用WebLogic JMS的調節功能。
對于WebLogic Server,考慮對隊列使用分布式目的地,因為這可以把隊列分布到多臺服務器上。在這種情況下,應該集群化servlet本身,并對其進行協調,以避免創建重復的工作請求單元。
還應該考慮本文結尾處的參考資料。此外,對其他服務器也適用的考慮事項是把客戶端部分交付給各臺計算機的方式。一種方式是自愿,即每臺計算機的所有者都下載一個可以在客戶端計算機上配置和運行的安裝程序。另一種方式是使用商業軟件分發包,它可以自動下載客戶端的最新版本,并把它安裝在客戶端計算機上。
使用WebLogic Integration工作流來分發工作
前面給出了一種把工作單元分發給客戶端的直觀方法,即使用servlet和消息驅動bean。盡管該方法實現起來相當輕易,但是它不能解決的問題還很多,比如如何以自支持的方式啟動過程,定時把請求交付給請求隊列。當然,我們不希望讓治理員編寫一個shell腳本來不停地調用該servlet。此外,還應該以一種應用程序可以預先控制的方式限制所使用的請求數量。考慮到這一點,下面給出一個更加復雜的例子,用于把工作單元分發給遠程JMS客戶端并對其做出響應,從而利用未充分使用的計算機。
這個控件的實現使用了JMS QueueBrowser類來查看一個帶有給定的JMS連接工廠的給定JMS隊列。它返回隊列中等待處理的實例個數。本文所附的代碼中提供了完整實現。
啟動和停止工作流的Web服務
為了啟動和停止負責把工作單元分發給請求隊列的WebLogic Integration流程,我們創建了一個Java Web Service (JWS),它服從JSR 181,帶有兩個方法。public classControlWebService implements
com.bea.jws.WebService {
/**
* @common:control
*/
private Controls.JMSStopControlMessage JMSStopControl;
/**
* @common:control
*/
private Controls.JMSControlMessage JMSControl;
static final long serialVersionUID = 1L;
/**
* @common:Operation
*/
public void startFlow() {
JMSControl.subscribe();
JMSControl.sendTextMessage("start");
JMSControl.unsubscribe();
}
/**
* @common:operation
*/
public void stopFlow() {
JMSStopControl.subscribe();
JMSStopControl.sendTextMessage("stop");
JMSStopControl.unsubscribe();
}
}
該Web服務不是直接調用工作流,而是把一條消息放在JMS隊列中,然后調用Worker.Message把消息發送給分發JPD。這解除了Web服務實現與工作流之間的耦合,以保持其模塊性。在WebLogic Integration中,有一個概念叫做事件生成器,可以使用WebLogic Integration Administration Console對它進行配置您可以把事件生成器配置為從JMS Worker.Message中取出消息,然后將其交付給一個Message Broker通道(邏輯概念)。分發工作流監聽/UnitOfWork/StartWorkflow通道,該通道被綁定在與JMS Worker.Message隊列相關聯的JMS事件生成器上。只要有一個String “start”消息交付到此通道上,工作流就會開始工作。類似地,開始之后,分發工作流就會在它的一個Event Choice節點中監聽Message Broker通道(/UnitOfWork/StopWorkflow),以便從Worker.StopMessage JMS隊列接收”stop”消息。然后,事件生成器再次把Worker.StopMessage隊列上的JMS消息關聯到/UnitOfWork/StopWorkflow通道,以便交付消息。
這實際上創建了一種與啟動和停止分發工作流的實現解耦合的面向服務方法。通過Web服務客戶端,或者使用所提供的WebLogic Integration Workshop Test Browser,可以輕松對Web服務進行測試。
分發工作流
圖2說明了負責分發工作單元的DistributeFlow.jpd、我們的簡單矩陣對象以及請求隊列的相關部分:

圖2. 用于分發工作單元的工作流
新聞熱點
疑難解答