国产探花免费观看_亚洲丰满少妇自慰呻吟_97日韩有码在线_资源在线日韩欧美_一区二区精品毛片,辰东完美世界有声小说,欢乐颂第一季,yy玄幻小说排行榜完本

首頁 > 學院 > 開發(fā)設計 > 正文

ActiveMQApollo之MQTT

2019-11-14 15:39:57
字體:
來源:轉載
供稿:網(wǎng)友

Apollo是apache旗下的基金項目,它是以Apache ActiveMQ5.x為基礎,采用全新的線程和消息調度架構重新實現(xiàn)的消息中間件,針對多核處理器進行了優(yōu)化處理,它的速度更快、更可靠、更易于維護。apollo與ActiveQQ一樣支持多協(xié)議:STOMP、AMQP、MQTT、Openwire、 SSL、WebSockets,本文只介紹MQTT協(xié)議的使用。 
關于ActiveMQ5請參考:http://activemq.apache.org,本文只介紹Apollo在windows下安裝和應用,Apollo的詳細文檔請參考官網(wǎng):http://activemq.apache.org/apollo/documentation/user-manual.html.

 

Apollo的下載和安裝

 

1.下載并安裝

進入http://activemq.apache.org/apollo/download.html,下載windows版本的壓縮包,并解壓到自己工作目錄(如:E:/apache-apollo-1.7),并創(chuàng)建環(huán)境變量APOLLO_HOME=E:/apache-apollo-1.7。如果操作是系統(tǒng)是Windows Vista或更高版本,則需要安裝Microsoft Visual C++ 2010 Redistributable (64位JVM:http://www.microsoft.com/en-us/download/details.aspx?id=14632;32位JVM:http://www.microsoft.com/en-us/download/details.aspx?id=5555)。

 

2.創(chuàng)建broker實例并啟動服務

進入E:/apache-apollo-1.7之下的bin目錄,打開cmd窗口,執(zhí)行命令:apollo create E:/apollo_broker,命令執(zhí)行成功后,在E盤下會有apollo_broker目錄,在其下有個bin目錄,其中有兩個文件:apollo-broker.cmd和apollo-broker-service.exe,第一個是通過cmd命令啟動apollo服務的,第二個是創(chuàng)建window服務的。 
cmd命令啟動:apollo-broker run,啟動成功可以在瀏覽器中查看運行情況(http://127.0.0.1:61680/,默認用戶名/密碼:admin/passWord); 
windows服務啟動:執(zhí)行apollo-broker-service.exe,創(chuàng)建windows服務,就可以以windows服務的方式啟動apollo服務。

 

3.MQTT協(xié)議的應用

MQTT協(xié)議有眾多客戶端實現(xiàn),相關請參考:http://activemq.apache.org/apollo/versions/1.7/website/documentation/mqtt-manual.html。 
本文采用eclipse的paho客戶端實現(xiàn)(https://eclipse.org/paho/)。

a.javascript客戶端:https://eclipse.org/paho/clients/js/

Javascript客戶端項目下載下來,并在其項目根目錄下執(zhí)行mvn命令,進行編譯,生成target目錄,其下生成mqttws31.js、mqttws31-min.js兩個js文件,將其拷貝到自己項目相關目錄下,并在頁面中引用,即可實現(xiàn)javascript客戶端的消息訂閱和發(fā)布,demo代碼如下: 
var client = new Paho.MQTT.Client(location.hostname, 61623,"/", "clientId"); 
// 61623是ws連接的默認端口,可以在apollo中間件中進行配置(關于apollo的配置請參考:http://activemq.apache.org/apollo/documentation/user-manual.html) 
// set callback handlers 
client.onConnectionLost = onConnectionLost; 
client.onMessageArrived = onMessageArrived; 
// connect the client 
client.connect({userName:'admin',password:'password',onSuccess:onConnect}); 
// called when the client connects 
function onConnect() { // 連接成功后的處理 
// Once a connection has been made, make a subscription and send a message. 
console.log("onConnect"); 
client.subscribe("/topic/event"); // 訂閱消息的主題 
var message = new Paho.MQTT.Message("Hello,this is a test"); 
message.destinationName = "/topic/event"; 
client.send(message); // 發(fā)送消息 

// called when the client loses its connection 
function onConnectionLost(responSEObject) { // 連接丟失后的處理 
if (responseObject.errorCode !== 0) { 
console.log("onConnectionLost:"+responseObject.errorMessage); 


// called when a message arrives 
function onMessageArrived(message) { // 消息接收成功后的處理 
console.log("onMessageArrived:"+message.payloadString); 
}

b. java客戶端實現(xiàn)

paho目前只支持J2SE和安卓,下載地址:https://eclipse.org/paho/clients/java/,我們采用maven方式。 
maven庫地址: 
https://repo.eclipse.org/content/repositories/paho-releases/ - Official Releases 
https://repo.eclipse.org/content/repositories/paho-snapshots/ - Nightly Snapshots 
maven dependency: 
<dependency> 
<groupId>org.eclipse.paho</groupId> 
<artifactId>org.eclipse.paho.client.mqttv3</artifactId> 
<version>1.0.1</version> 
</dependency>
 
說明:版本為1.0.0或0.9.0時,其jar包根本加載不進來,最后搜到1.0.1版本才可以正常使用。 
java端實現(xiàn): 
public interface IMessage { 
String getHost(); 
Integer getPort(); 
Integer getQos(); 
String getTopic(); 
String getClientId(); 
String getContent(); 
byte[] getContentBytes(); 

 


Map<String,Object> getOption(); 
Object getSender(); 
Date getSendTime(); 
}
 
public final class MessagePRocessingCenter { 
protected static Logger logger=LoggerFactory.getLogger(MessageProcessingCenter.class); 
protected static final String BROKER_PREFIX="tcp://"; 
protected static final String BROKER_HOST="localhost"; 
protected static final int PORT=61613; 
protected static final int QOS=2; 
protected static final String TOPIC="/topic/event"; 
protected static final String CLIENT_ID="clientId"; 
protected static final String MQ_USER="admin"; 
protected static final String MQ_PASSWORD="password"; 
public static void send(IMessage message){ 
String topic= StringUtils.isEmpty(message.getTopic())?TOPIC: message.getTopic(); 
int qos=null == message.getQos()?QOS: message.getQos(); 
String broker=BROKER_PREFIX+ (StringUtils.isEmpty(message.getHost())?BROKER_HOST:message.getHost()); 
int port=null == message.getPort()?PORT:message.getPort(); 
broker+=":"+port; 
String clientId = StringUtils.isEmpty(message.getClientId())?CLIENT_ID:message.getClientId(); 
Map<String,Object> opts=message.getOption(); 
String user=MQ_USER; 
String password=MQ_PASSWORD; 
if(null != opts){ 
if(null != opts.get("userName")){ 
user=opts.get("userName").toString(); 

if(null != opts.get("password")){ 
password=opts.get("password").toString(); 


MemoryPersistence persistence = new MemoryPersistence(); 
try { 
MqttClient sampleClient = new MqttClient(broker, clientId, persistence); 
MqttConnectOptions connOpts = new MqttConnectOptions(); 
connOpts.setUserName(user); 
connOpts.setPassword(password.toCharArray()); 
connOpts.setCleansession(true); 
sampleClient.connect(connOpts); 
MqttMessage mqm = new MqttMessage(message.getContentBytes()); 
mqm.setQos(qos); 
sampleClient.publish(topic, mqm); 
sampleClient.disconnect(); 
} catch(MqttException me) { 
logger.info("********************* send message exception :"); 
logger.info("********************* reason : " + me.getReasonCode()); 
logger.info("********************* msg : " + me.getMessage()); 
logger.info("********************* loc : " + me.getLocalizedMessage()); 
logger.info("********************* cause : " + me.getCause()); 
logger.info("********************* excep : " + me); 
me.printStackTrace(); 


public static void send(Set<IMessage> set){ 
for(IMessage message:set){ 
send(message); 


}

 

小結

至此,MQTT協(xié)議已部署完畢,java端可以發(fā)布消息,而javascript端則可以訂閱并接收到java端發(fā)布的信息。 
本文只是依照官網(wǎng)手冊而實現(xiàn)的簡單應用,講解不一定十分準確,有什么不對的地方還請多多指點,更詳細的應用請參考官網(wǎng)文檔: 
apollo:http://activemq.apache.org/apollo/documentation/user-manual.html 
eclipse paho:https://eclipse.org/paho/

 


發(fā)表評論 共有條評論
用戶名: 密碼:
驗證碼: 匿名發(fā)表
主站蜘蛛池模板: 宁波市| 青河县| 兖州市| 盖州市| 三原县| 桐城市| 中方县| 赤城县| 确山县| 株洲市| 怀集县| 武胜县| 博湖县| 科尔| 石河子市| 金阳县| 洛阳市| 上杭县| 比如县| 潍坊市| 玉树县| 泰宁县| 南充市| 土默特左旗| 内丘县| 瑞昌市| 永仁县| 科尔| 甘德县| 攀枝花市| 信丰县| 三门县| 星子县| 开原市| 禄劝| 南澳县| 兴和县| 金溪县| 张家界市| 漯河市| 建始县|