MQTT簡(jiǎn)介
MQTT(Message Queuing Telemetry Transport,消息隊(duì)列遙測(cè)傳輸)是IBM開(kāi)發(fā)的一個(gè)即時(shí)通訊協(xié)議,有可能成為物聯(lián)網(wǎng)的重要組成部分。該協(xié)議支持所有平臺(tái),幾乎可以把所有聯(lián)網(wǎng)物品和外部連接起來(lái),被用來(lái)當(dāng)做傳感器和制動(dòng)器(比如通過(guò)Twitter讓房屋聯(lián)網(wǎng))的通信協(xié)議。
Docker安裝RabbitMQ配置MQTT
使用RabbitMQ作為MQTT服務(wù)端,Eclipse Paho作為客戶端。宿主機(jī)系統(tǒng)為ubuntu16.04
Docker下載鏡像
docker pull daocloud.io/library/rabbitmq:3.7.4
啟動(dòng)RabbitMQ
docker run -d --hostname my-rabbit --name some-rabbit -p 15672:15672 -p 5672:5672 -p 1883:1883 -p 15675:15675 daocloud.io/library/rabbitmq:3.7.4
注意映射容器端口
啟用插件
默認(rèn)安裝后我們需要手動(dòng)開(kāi)啟rabbitmq_management插件,rabbitmq_mqtt插件和rabbitmq_web_mqtt插件。
執(zhí)行如下三條命令
docker exec <容器ID> rabbitmq-plugins enable rabbitmq_managementdocker exec <容器ID> rabbitmq-plugins enable rabbitmq_mqttdocker exec <容器ID> rabbitmq-plugins enable rabbitmq_web_mqtt
當(dāng)然你也可以寫個(gè)腳本start.sh,復(fù)制到容器中
/usr/sbin/rabbitmq-plugins enable rabbitmq_management/usr/sbin/rabbitmq-plugins enable rabbitmq_mqtt/usr/sbin/rabbitmq-plugins enable rabbitmq_web_mqtt
進(jìn)入容器執(zhí)行這個(gè)腳本。
sh start.sh
開(kāi)放宿主機(jī)端口
firewall-cmd --zone=public --add-port=15672/tcp --permanentfirewall-cmd --zone=public --add-port=5672/tcp --permanentfirewall-cmd --zone=public --add-port=1883/tcp --permanentfirewall-cmd --zone=public --add-port=15675/tcp --permanentfirewall-cmd --reload
Python MQTT客戶端實(shí)現(xiàn)
安裝python包
pip install paho-mqtt
發(fā)送數(shù)據(jù)demo(消費(fèi)者)
# 使用前需要啟動(dòng)hbase和thrift服務(wù)器# 啟動(dòng)hbase在cd /usr/local/hbase下bin/start-hbase.sh 默認(rèn)端口為 60000# 啟動(dòng)thrift服務(wù)器cd /usr/local/hbase/bin執(zhí)行./hbase-daemon.sh start thrift 默認(rèn)端口為9090import sysimport osdir_common = os.path.split(os.path.realpath(__file__))[0] + '/../'sys.path.append(dir_common) # 將根目錄添加到系統(tǒng)目錄,才能正常引用common文件夾import argparse #import loggingimport time,datetimefrom common.py_log import init_logger,init_console_loggerfrom common.config import *from common.py_hbase import PyHbaseimport time,jsonfrom common.py_rabbit import Rabbit_Consumerimport paho.mqtt.client as mqttimport timeHOST = "192.168.2.46"PORT = 1883def client_loop(): client_id = time.strftime('%Y%m%d%H%M%S',time.localtime(time.time())) client = mqtt.Client(client_id) # ClientId不能重復(fù),所以使用當(dāng)前時(shí)間 client.username_pw_set("guest", "guest") # 必須設(shè)置,否則會(huì)返回「Connected with result code 4」 client.on_connect = on_connect client.on_message = on_message client.connect(HOST, PORT, 60) client.loop_forever()def on_connect(client, userdata, flags, rc): print("Connected with result code "+str(rc)) client.subscribe("test")def on_message(client, userdata, msg): print(msg.topic+" "+msg.payload.decode("utf-8"))if __name__ == '__main__': client_loop()
接收數(shù)據(jù)demo(生產(chǎn)者)
import sysimport osdir_common = os.path.split(os.path.realpath(__file__))[0] + '/../'sys.path.append(dir_common) # 將根目錄添加到系統(tǒng)目錄,才能正常引用common文件夾import paho.mqtt.client as mqttimport timeHOST = "192.168.2.46"PORT = 1883def client_loop(): client_id = time.strftime('%Y%m%d%H%M%S',time.localtime(time.time())) client = mqtt.Client(client_id) # ClientId不能重復(fù),所以使用當(dāng)前時(shí)間 client.username_pw_set("guest", "guest") # 必須設(shè)置,否則會(huì)返回「Connected with result code 4」 client.on_connect = on_connect client.on_message = on_message client.connect(HOST, PORT, 60) client.loop_forever()def on_connect(client, userdata, flags, rc): print("Connected with result code "+str(rc)) client.subscribe("test")def on_message(client, userdata, msg): print(msg.topic+" "+msg.payload.decode("utf-8"))if __name__ == '__main__': client_loop()
生產(chǎn)者demo
# import paho.mqtt.client as mqttimport paho.mqtt.publish as publishimport timeHOST = "192.168.2.46"PORT = 1883def on_connect(client, userdata, flags, rc): print("Connected with result code "+str(rc)) client.subscribe("test")def on_message(client, userdata, msg): print(msg.topic+" "+msg.payload.decode("utf-8"))if __name__ == '__main__': client_id = time.strftime('%Y%m%d%H%M%S',time.localtime(time.time())) # client = mqtt.Client(client_id) # ClientId不能重復(fù),所以使用當(dāng)前時(shí)間 # client.username_pw_set("guest", "guest") # 必須設(shè)置,否則會(huì)返回「Connected with result code 4」 # client.on_connect = on_connect # client.on_message = on_message # client.connect(HOST, PORT, 60) # client.publish("test", "你好 MQTT", qos=0, retain=False) # 發(fā)布消息 publish.single("test", "你好 MQTT", qos = 1,hostname=HOST,port=PORT, client_id=client
官方文檔:
mqtt http://www.rabbitmq.com/mqtt.html
總結(jié)
以上就是這篇文章的全部?jī)?nèi)容了,希望本文的內(nèi)容對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,謝謝大家對(duì)VEVB武林網(wǎng)的支持。
新聞熱點(diǎn)
疑難解答
圖片精選