国产探花免费观看_亚洲丰满少妇自慰呻吟_97日韩有码在线_资源在线日韩欧美_一区二区精品毛片,辰东完美世界有声小说,欢乐颂第一季,yy玄幻小说排行榜完本

首頁 > 學院 > 開發設計 > 正文

rabbitmq學習3:Publish/Subscribe

2019-11-08 03:09:56
字體:
來源:轉載
供稿:網友

非常感謝 http://wubin850219.iteye.com/blog/1004921

在前面的Work Queue中的消息是均勻分配消息給消費者;如果我想把消息分發給所有的消費者呢?那應當怎么操作呢?這就是要下面提到的Publish/Subscribe(分布/訂閱)。讓我們開始Publish/Subscribe之旅吧!

Publish/Subscribe的工作示意圖如下:

在上圖中的X表示Exchange(交換區);Exchange的類型有:direct , topic , headers 和 fanout

Publish/Subscribe的Exchang的類型為fanout;聲明Publish/Subscribe的Exchang代碼如下:

java代碼  收藏代碼channel.exchangeDeclare("logs", "fanout");  

 

對于Work Queue中提到的發布消息的代碼如下:

Java代碼  收藏代碼channel.basicPublish("", queueName,   null, message.getBytes());  

 但對于Publish/Subscribe中發布消息中的Queue的使用的是默認的;代碼如下:

Java代碼  收藏代碼channel.basicPublish( "logs", "", null, message.getBytes());  

 

Exchange和各Queue之間是如何通信的呢?主要是通過把Exchange和各Queue綁定(binding);示意代碼如下:

Java代碼  收藏代碼channel.queueBind(queueName, exchangeName, "");  

Publish/Subscribe加入綁定的工作示意圖如下:

 

那我們就開始程序代碼吧:P端的代碼如下:

 

Java代碼  收藏代碼package com.abin.rabbitmq;    import com.rabbitmq.client.Channel;  import com.rabbitmq.client.Connection;  import com.rabbitmq.client.ConnectionFactory;    public class EmitLog {      PRivate static final String EXCHANGE_NAME = "logs";        public static void main(String[] argv) throws Exception {          ConnectionFactory factory = new ConnectionFactory();          factory.setHost("localhost");          Connection connection = factory.newConnection();          Channel channel = connection.createChannel();          channel.exchangeDeclare(EXCHANGE_NAME, "fanout");//聲明Exchange          for (int i = 0; i <= 2; i++) {              String message = "hello Word!" + i;              channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());              System.out.println(" [x] Sent '" + message + "'");          }          channel.close();          connection.close();      }    }  

 運行結果如下:

Java代碼  收藏代碼[x] Sent 'hello word!0'  [x] Sent 'hello word!1'  [x] Sent 'hello word!2'  

 

C端的代碼如下:

Java代碼  收藏代碼package com.abin.rabbitmq;    import com.rabbitmq.client.Channel;  import com.rabbitmq.client.Connection;  import com.rabbitmq.client.ConnectionFactory;  import com.rabbitmq.client.QueueingConsumer;    public class ReceiveLogsOne {      private static final String EXCHANGE_NAME = "logs";        public static void main(String[] argv) throws Exception {          ConnectionFactory factory = new ConnectionFactory();          factory.setHost("localhost");          Connection connection = factory.newConnection();          Channel channel = connection.createChannel();          channel.exchangeDeclare(EXCHANGE_NAME, "fanout");          String queueName = "log-fb1";          channel.queueDeclare(queueName, false, false, false, null);          channel.queueBind(queueName, EXCHANGE_NAME, "");//把Queue、Exchange綁定          QueueingConsumer consumer = new QueueingConsumer(channel);          channel.basicConsume(queueName, true, consumer);          while (true) {              QueueingConsumer.Delivery delivery = consumer.nextDelivery();              String message = new String(delivery.getBody());              System.out.println(" [x] Received '" + message + "'");          }      }  }  

 

對于C端的代碼我寫了二個差不多的程序,只需要修改一下queueName。這樣就形成了二個Queue;運行結果相同;

運行結果可能如下:

Java代碼  收藏代碼[x] Received 'hello word!0'  [x] Received 'hello word!1'  [x] Received 'hello word!2'  

 


發表評論 共有條評論
用戶名: 密碼:
驗證碼: 匿名發表
主站蜘蛛池模板: 灯塔市| 永新县| 黄浦区| 滨海县| 玛纳斯县| 即墨市| 仁化县| 酉阳| 琼中| 松滋市| 东莞市| 郓城县| 晋城| 梓潼县| 濮阳县| 镇宁| 高清| 峡江县| 甘谷县| 肥西县| 承德市| 栖霞市| 桓台县| 吴桥县| 浮山县| 邵武市| 福清市| 南漳县| 通州市| 鹤岗市| 隆尧县| 丹凤县| 上高县| 甘泉县| 社旗县| 衡阳县| 宣恩县| 阳江市| 晋中市| 高阳县| 高淳县|