git clone git://github.com/stephansun/samples.git samples包含7個模塊,分別為samples-jms-plain:使用JMS原生API;samples-jms-sPRing:使用Spring對JMS原生API封裝后的spring-jms;samples-jms-spring-remoting:使用spring-jms實現JMS的請求/響應模式,需要用到spring提供的遠程調用框架;samples-spring-remoting:介紹spring的遠程調用框架;samples-amqp-plain:使用RabbitMQ提供的AMQP java客戶端;samples-amqp-spring:使用spring對AMQP Java客戶端封裝后的spring-amqp-rabbit;samples-amqp-spring-remoting:使用spring-amqp-rabbit實現AMQP的請求/響應模式,需要用到spring提供的遠程調用框架;下面逐一講解
<dependencies> <dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>2.5.0</version> <exclusions> <exclusion> <groupId>commons-cli</groupId> <artifactId>commons-cli</artifactId> </exclusion> </exclusions> </dependency> </dependencies> amqp-client-2.5.0.jar以及它依賴的commons-io-1.2.jar加載進來了,常用的類有:Java代碼
com.rabbitmq.client.BasicProperties com.rabbitmq.client.Channel com.rabbitmq.client.Connection com.rabbitmq.client.ConnectionFactory com.rabbitmq.client.Consumer com.rabbitmq.client.MessageProperties com.rabbitmq.client.QueueingConsumer
package stephansun.github.samples.amqp.plain.helloworld; import java.io.IOException; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; public class Send { private final static String QUEUE_NAME = "hello"; public static void main(String[] args) throws IOException { // AMQP的連接其實是對Socket做的封裝, 注意以下AMQP協議的版本號,不同版本的協議用法可能不同。 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); // 下一步我們創建一個channel, 通過這個channel就可以完成API中的大部分工作了。 Channel channel = connection.createChannel(); // 為了發送消息, 我們必須聲明一個隊列,來表示我們的消息最終要發往的目的地。 channel.queueDeclare(QUEUE_NAME, false, false, false, null); String message = "Hello World!"; // 然后我們將一個消息發往這個隊列。 channel.basicPublish("", QUEUE_NAME, null, message.getBytes()); System.out.println("[" + message + "]"); // 最后,我們關閉channel和連接,釋放資源。 channel.close(); connection.close(); } } RabbitMQ默認有一個exchange,叫default exchange,它用一個空字符串表示,它是direct exchange類型,任何發往這個exchange的消息都會被路由到routing key的名字對應的隊列上,如果沒有對應的隊列,則消息會被丟棄。這就是為什么代碼中channel執行basicPulish方法時,第二個參數本應該為routing key,卻被寫上了QUEUE_NAME。Recv.javaJava代碼
package stephansun.github.samples.amqp.plain.helloworld; import java.io.IOException; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.ConsumerCancelledException; import com.rabbitmq.client.QueueingConsumer; import com.rabbitmq.client.ShutdownSignalException; public class Recv { private final static String QUEUE_NAME = "hello"; public static void main(String[] args) throws IOException, ShutdownSignalException, ConsumerCancelledException, InterruptedException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); // 注意我們也在這里聲明了一個queue,因為我們有可能在發送者啟動前先啟動接收者。 // 我們要確保當從這個queue消費消息時,這個queue是存在的。 channel.queueDeclare(QUEUE_NAME, false, false, false, null); System.out.println("CRTL+C"); // 這個另外的QueueingConsumer類用來緩存服務端推送給我們的消息。 // 下面我們準備告訴服務端給我們傳遞存放在queue里的消息,因為消息是由服務端推送過來的。 QueueingConsumer consumer = new QueueingConsumer(channel); channel.basicConsume(QUEUE_NAME, true, consumer); while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String message = new String(delivery.getBody()); System.out.println("[" + message + "]"); } } } channel.queueDeclare:第一個參數:隊列名字,第二個參數:隊列是否可持久化即重啟后該隊列是否依然存在,第三個參數:該隊列是否時獨占的即連接上來時它占用整個網絡連接,第四個參數:是否自動銷毀即當這個隊列不再被使用的時候即沒有消費者對接上來時自動刪除,第五個參數:其他參數如TTL(隊列存活時間)等。channel.basicConsume:第一個參數:隊列名字,第二個參數:是否自動應答,如果為真,消息一旦被消費者收到,服務端就知道該消息已經投遞,從而從隊列中將消息剔除,否則,需要在消費者端手工調用channel.basicAck()方法通知服務端,如果沒有調用,消息將會進入unacknowledged狀態,并且當消費者連接斷開后變成ready狀態重新進入隊列,第三個參數,具體消費者類。
package stephansun.github.samples.amqp.plain.workqueues; import java.io.IOException; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.ConsumerCancelledException; import com.rabbitmq.client.QueueingConsumer; import com.rabbitmq.client.ShutdownSignalException; public class Worker { private final static String QUEUE_NAME = "task_queue"; public static void main(String[] args) throws IOException, ShutdownSignalException, ConsumerCancelledException, InterruptedException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); System.out.println("CRTL+C"); // 這條語句告訴RabbitMQ在同一時間不要給一個worker一個以上的消息。 // 或者換一句話說, 不要將一個新的消息分發給worker知道它處理完了并且返回了前一個消息的通知標志(acknowledged) // 替代的,消息將會分發給下一個不忙的worker。 channel.basicQos(1); QueueingConsumer consumer = new QueueingConsumer(channel); // 自動通知標志 boolean autoAck = false; channel.basicConsume(QUEUE_NAME, autoAck, consumer); while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String message = new String(delivery.getBody()); System.out.println("r[" + message + "]"); doWord(message); System.out.println("r[done]"); // 發出通知標志 channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); } } private static void doWord(String task) throws InterruptedException { for (char ch : task.toCharArray()) { if (ch == '.') { Thread.sleep(1000); } } } } 在本代碼中,channel執行basicConsume方法時autoAck為false,這就意味著接受者在收到消息后需要主動通知RabbitMQ才能將該消息從隊列中刪除,否則該在接收者跟MQ連接沒斷的情況下,消息將會變為untracked狀態,一旦接收者斷開連接,消息重新變為ready狀態。通知MQ需要調用channel.basicAck(int, boolean),如果不調用,消息永遠不會從隊列中消失。該方法第一個參數為一個標志,一般是delivery.getEnvelope().getDeliveryTag(),其實就是一個遞增的數字,它表示這個這個隊列中第幾個消息。以下解釋錯誤!第二個參數為true表示通知所有untracked的消息,false標志只通知第一個參數對應的那個消息。不管是true還是false,只要執行了channel.basicAck方法,消息都會從隊列中刪除。第二個參數Java代碼
Parameters: deliveryTag the tag from the received com.rabbitmq.client.AMQP.Basic.GetOk or com.rabbitmq.client.AMQP.Basic.Deliver multiple true to acknowledge all messages up to and including the supplied delivery tag; false to acknowledge just the supplied delivery tag. 我之前錯誤的將and作為的斷句點,認為true通知所有的untracked消息,包含tag指定的那個,其實應該將 up to and including 作為一個整體理解,通知所有擁有相同tag的untracked消息(暫時還沒有在代碼中模擬出這種場景)。尼瑪英語不好害死人啊。參考這個版本的API NewTask.javaJava代碼
package stephansun.github.samples.amqp.plain.workqueues; import java.io.IOException; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.MessageProperties; public class NewTask { // 使用Work Queues (也稱為Task Queues)最主要的想法是分流那些耗時,耗資源的任務,不至于使隊列擁堵。 private static String getMessage(String[] strings) { if (strings.length < 1) { return "Hello World!"; } return joinStrings(strings, " "); } private static String joinStrings(String[] strings, String delimiter) { int length = strings.length; if (length == 0) { return ""; } StringBuilder words = new StringBuilder(strings[0]); for (int i = 1; i < length; i++) { words.append(delimiter).append(strings[i]); } return words.toString(); } private final static String QUEUE_NAME = "task_queue"; public static void main(String[] args) throws IOException { String[] strs = new String[] { "First message." }; ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); // 跟helloworld的不同點 boolean durable = true; // 下面這個聲明隊列的隊列名字改了,所以生產者和消費者兩邊的程序都要改成統一的隊列名字。 channel.queueDeclare(QUEUE_NAME, durable, false, false, null); // 有了durable為true,我們可以保證名叫task_queue的隊列即使在RabbitMQ重啟的情況下也不會消失。 String message = getMessage(strs); // 現在我們需要將消息標記成可持久化的。 // 如果你需要更強大的保證消息傳遞,你可以將發布消息的代碼打包到一個事務里。 channel.basicPublish("", QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes()); System.out.println("s[" + message + "]"); channel.close(); connection.close(); } }
package stephansun.github.samples.amqp.plain.publishsubscribe; import java.io.IOException; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; public class EmitLog { // 在前面,我們使用queue,都給定了一個指定的名字。能夠對一個queue命名對于我們來說是很嚴肅的 // 下面我們需要將worker指定到同一個queue。 // echange的類型有: direct, topic, headers and fanout. private static final String EXCHANGE_NAME = "logs"; public static void main(String[] args) throws IOException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); // fanout exchange 將它收的所有消息廣播給它知道的所有隊列。 channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); String message = getMessage(new String[] { "test" }); // 如果routingkey存在的話,消息通過一個routingkey指定的名字路由至隊列 channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes()); System.out.println("sent [" + message + "]"); channel.close(); connection.close(); } private static String getMessage(String[] strings) { if (strings.length < 1) { return "Hello World!"; } return joinStrings(strings, " "); } private static String joinStrings(String[] strings, String delimiter) { int length = strings.length; if (length == 0) { return ""; } StringBuilder words = new StringBuilder(strings[0]); for (int i = 1; i < length; i++) { words.append(delimiter).append(strings[i]); } return words.toString(); } } ReceiveLogs.javaJava代碼
package stephansun.github.samples.amqp.plain.publishsubscribe; import java.io.IOException; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.ConsumerCancelledException; import com.rabbitmq.client.QueueingConsumer; import com.rabbitmq.client.ShutdownSignalException; public class ReceiveLogs { // 就像你看到的, 創建了連接后,我們聲明了一個exchange,這一步是必須的,因為將消息發送到一個并不存在的exchange上是不允許的。 // 如果還沒有queue綁定到exchange上,消息將會丟失。 // 但那對我們來說是ok的。 // 如果沒有消費者在監聽,我們可以安全地丟棄掉消息。 // RabbitMQ中有關消息模型地核心觀點是,生產者永遠不會直接將消息發往隊列。 // 事實上,相當多的生產者甚至根本不知道一個消息是否已經傳遞給了一個隊列。 // 相反,生產者只能將消息發送給一個exchange。 // exchange是一個很簡單的東西。 // 一邊它接收來自生產者的消息,另一邊它將這些消息推送到隊列。 // exchagne必須明確地知道拿它收到的消息來做什么。把消息附在一個特定的隊列上?把消息附在很多隊列上?或者把消息丟棄掉。 // 這些規則在exchange類型里都有定義。 private static final String EXCHANGE_NAME = "logs"; public static void main(String[] args) throws IOException, ShutdownSignalException, ConsumerCancelledException, InterruptedException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); // 創建fanout類型的exchange, 我們叫它logs: // 這種類型的exchange將它收到的所有消息廣播給它知道的所有隊列。 channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); // 臨時隊列(temporary queue) // 首先,無論什么時候連接Rabbit時,我們需要一個fresh的,空的隊列 // First, whenever we connect to Rabbit we need a fresh, empty queue. // 為了做到這一點,我們可以創建一個隨機命名的隊列,或者更好的,就讓服務端給我們選擇一個隨機的隊列名字。 // 其次,一旦我們關閉消費者的連接,這個臨時隊列應該自動銷毀。 String queueName = channel.queueDeclare().getQueue(); channel.queueBind(queueName, EXCHANGE_NAME, ""); System.out.println("CTRL+C"); QueueingConsumer consumer = new QueueingConsumer(channel); channel.basicConsume(queueName, true, consumer); while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String message = new String(delivery.getBody()); System.out.println("r[" + message + "]"); } } } 發布訂閱,本代碼演示的是fanout exchange,這種類型的exchange將它收到的所有消息直接發送給所有跟它綁定的隊列,這里說了直接,是因為rouring key對于fanout exchange來說沒有任何意義!不管一個隊列以怎樣的routing key和fanout exhange綁定,只要他們綁定了,消息就會送到隊列。代碼中發送端將消息發到logs名字的fanout exchange,routing key為空字符串,你可以將它改成任何其他值或者null試試看。另外,接收端代碼使用channel聲明了一個臨時隊列,并將這個隊列通過空字符串的routing key綁定到fanout exchange。這個臨時隊列的名字的隨機取的,如:amq.gen-U0srCoW8TsaXjNh73pnVAw==,臨時隊列在后面的請求響應模式中有用到。
package stephansun.github.samples.amqp.plain.routing; import java.io.IOException; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; public class EmitLogDirect { private static final String EXCHANGE_NAME = "direct_logs"; public static void main(String[] args) throws IOException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "direct"); // diff String serverity = getServerity(new String[] { "test" }); String message = getMessage(new String[] { "test" }); channel.basicPublish(EXCHANGE_NAME, serverity, null, message.getBytes()); System.out.println("s[" + serverity + "]:[" + message + "]"); channel.close(); connection.close(); } private static String getServerity(String[] strings) { return "info"; } private static String getMessage(String[] strings) { if (strings.length < 1) { return "Hello World!"; } return joinStrings(strings, " "); } private static String joinStrings(String[] strings, String delimiter) { int length = strings.length; if (length == 0) { return ""; } StringBuilder words = new StringBuilder(strings[0]); for (int i = 1; i < length; i++) { words.append(delimiter).append(strings[i]); } return words.toString(); } } ReceiveLogsDirect.javaJava代碼
package stephansun.github.samples.amqp.plain.routing; import java.io.IOException; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.ConsumerCancelledException; import com.rabbitmq.client.QueueingConsumer; import com.rabbitmq.client.ShutdownSignalException; public class ReceiveLogsDirect { private static final String EXCHANGE_NAME = "direct_logs"; public static void main(String[] args) throws IOException, ShutdownSignalException, ConsumerCancelledException, InterruptedException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "direct"); String queueName = channel.queueDeclare().getQueue(); String[] strs = new String[] { "info", "waring", "error" }; for (String str : strs) { channel.queueBind(queueName, EXCHANGE_NAME, str); } System.out.println("CTRL+C"); QueueingConsumer consumer = new QueueingConsumer(channel); channel.basicConsume(queueName, true, consumer); while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String message = new String(delivery.getBody()); String routingKey = delivery.getEnvelope().getRoutingKey(); System.out.println("r:[" + routingKey + "]:[" + message + "]"); } } } 本代碼演示了另外一種exchange,direct exchange,該exchange根據routing key將消息發往使用該routing key和exchange綁定的一個或者多個隊列里,如果沒找到,則消息丟棄。本代碼中可以啟動3個接收端,分別使用info,warning,error作為routing key,代表3種級別的日志。只要將不同級別的日志發往不同接收端只需將日志級別當作routing key。
package stephansun.github.samples.amqp.plain.topics; import java.io.IOException; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; public class EmitLogTopic { private static final String EXCHANGE_NAME = "topic_logs"; public static void main(String[] args) throws IOException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "topic"); // diff String routingKey = getServerity(new String[] { "test" }); String message = getMessage(new String[] { "test" }); channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes()); System.out.println("s[" + routingKey + "]:[" + message + "]"); channel.close(); connection.close(); } private static String getServerity(String[] strings) { return "kern.critical"; } private static String getMessage(String[] strings) { if (strings.length < 1) { return "Hello World!"; } return joinStrings(strings, " "); } private static String joinStrings(String[] strings, String delimiter) { int length = strings.length; if (length == 0) { return ""; } StringBuilder words = new StringBuilder(strings[0]); for (int i = 1; i < length; i++) { words.append(delimiter).append(strings[i]); } return words.toString(); } } ReceiveLogsTopic.javaJava代碼
package stephansun.github.samples.amqp.plain.topics; import java.io.IOException; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.ConsumerCancelledException; import com.rabbitmq.client.QueueingConsumer; import com.rabbitmq.client.ShutdownSignalException; public class ReceiveLogsTopic { // FIXME // Some teasers: // Will "*" binding catch a message sent with an empty routing key? // Will "#.*" catch a message with a string ".." as a key? Will it catch a message with a single word key? // How different is "a.*.#" from "a.#"? private static final String EXCHANGE_NAME = "topic_logs"; public static void main(String[] args) throws IOException, ShutdownSignalException, ConsumerCancelledException, InterruptedException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "topic"); String queueName = channel.queueDeclare().getQueue(); String[] strs = new String[] { "kern.critical", "A critical kernel error" }; for (String str : strs) { channel.queueBind(queueName, EXCHANGE_NAME, str); } System.out.println("CTRL+C"); QueueingConsumer consumer = new QueueingConsumer(channel); channel.basicConsume(queueName, true, consumer); while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String message = new String(delivery.getBody()); String routingKey = delivery.getEnvelope().getRoutingKey(); System.out.println("r:[" + routingKey + "]:[" + message + "]"); } } } 本代碼演示了最后一種類型的exchange,topic exchange,topic exchange和direct exchange最大的不同就是它綁定的routing key是一種模式,而不是簡單的一個字符串。為什么要有模式(Patten)這個概念?模式可以理解為對事物描述的一種抽象。以代碼種的日志系統為例,使用direct exchange只能區別info,error,debug等等不同級別的日志,但是實際上不光有不同級別的日志,還有不同來源的日志,如操作系統內核的日志,定時腳本等, 使用模式就可以用<level>.<source>表示,更強大的是,模式允許使用通配符,*代表一個單詞,#代表一個多個單詞。
package stephansun.github.samples.amqp.plain.rpc; import java.io.IOException; import java.util.UUID; import com.rabbitmq.client.AMQP.BasicProperties; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.ConsumerCancelledException; import com.rabbitmq.client.QueueingConsumer; import com.rabbitmq.client.ShutdownSignalException; public class RPCClient { // FIXME // AMQP協議預定義了14種伴隨著消息的屬性。大多數屬性很少使用到。除了以下這些異常情況: // deliveryMode: // contentType: // replyTo: // correlationId: // FIXME // 為什么我們忽略掉callback隊列里的消息,而不是拋出錯誤? // 這取決于服務端的競爭條件的可能性。 // 雖然不太可能,但這種情況是存在的,即 // RPC服務在剛剛將答案發給我們,然而沒等我們將通知標志后返回時就死了 // 如果發生了這種情況, 重啟的RPC服務將會重新再次處理該請求。 // 這就是為什么在客戶端我們必須優雅地處理重復性的響應,及RPC在理想情況下應該時冪等的。(不太理解這句話的意思) private Connection connection; private Channel channel; private String requestQueueName = "rpc_queue"; private String replyQueueName; private QueueingConsumer consumer; public RPCClient() throws IOException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); connection = factory.newConnection(); channel = connection.createChannel(); // temporary queue. replyQueueName = channel.queueDeclare().getQueue(); consumer = new QueueingConsumer(channel); channel.basicConsume(replyQueueName, true, consumer); } public String call(String message) throws IOException, ShutdownSignalException, ConsumerCancelledException, InterruptedException { String response = null; String corrId = UUID.randomUUID().toString(); // in order to receive a response we need to send a 'callback' queue address with the request. // We can use the default queue(which is exclusive in the Java client) BasicProperties props = new BasicProperties.Builder().correlationId(corrId).replyTo(replyQueueName).build(); channel.basicPublish("", requestQueueName, props, message.getBytes()); while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); if (delivery.getProperties().getCorrelationId().equals(corrId)) { response = new String(delivery.getBody(), "UTF-8"); break; } } return response; } public void close() throws IOException { connection.close(); } public static void main(String[] args) { RPCClient fibonacciRpc = null; String response = null; try { fibonacciRpc = new RPCClient(); System.out.println("fib(30)"); response = fibonacciRpc.call("30"); System.out.println("got[" + response + "]"); } catch (Exception e) { e.printStackTrace(); } finally { if (fibonacciRpc != null) { try { fibonacciRpc.clone(); } catch (Exception ignore) { // ignore } } } } } RPCServer.javaJava代碼
package stephansun.github.samples.amqp.plain.rpc; import com.rabbitmq.client.AMQP.BasicProperties; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.QueueingConsumer; public class RPCServer { // 我們的代碼仍然相當簡單,沒有試圖解決更復雜(或者更重要)的問題,像: // 客戶端在沒有服務端運行的情況下如何處理? // 一個RPC的客戶端應該有一些超時類型嗎? // 如果服務端出現異常,是否應該將異常返回給客戶端? // 在進行業務處理前阻止不合法的消息進入(比如檢查綁定,類型) // Protecting against invalid incoming messages (eg checking bounds, type) before processing. private static final String RPC_QUEUE_NAME = "rpc_queue"; // FIXME Don't expect this one to work for big numbers, and it's probably the slowest recursive implementation possible. private static int fib(int n) { if (n == 0) { return 0; } if (n == 1) { return 1; } return fib(n - 1) + fib(n - 2); } public static void main(String[] args) { Connection connection = null; Channel channel = null; try { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); connection = factory.newConnection(); channel = connection.createChannel(); channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null); // We might want to run more than one server process. // In order to spread the load equally over multiple servers we need to set the prefetchCount setting in channel.basicQos. channel.basicQos(1); QueueingConsumer consumer = new QueueingConsumer(channel); channel.basicConsume(RPC_QUEUE_NAME, false, consumer); System.out.println("[x] Awaiting RPC requests"); while (true) { String response = null; QueueingConsumer.Delivery delivery = consumer.nextDelivery(); BasicProperties props = delivery.getProperties(); BasicProperties replyProps = new BasicProperties.Builder().correlationId(props.getCorrelationId()).build(); try { String message = new String(delivery.getBody(), "UTF-8"); int n = Integer.parseInt(message); System.out.println(" [.] fib(" + message + ")"); response = "" + fib(n); } catch (Exception e) { System.out.println(" [.] " + e.toString()); response = ""; } finally { channel.basicPublish("", props.getReplyTo(), replyProps, response.getBytes("UTF-8")); channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); } } } catch (Exception e) { e.printStackTrace(); } finally { if (connection != null) { try { connection.close(); } catch (Exception ignore) { // ignore } } } } } 本代碼實現了一個簡單的RPC,英文全稱Remote Procedure Call,中文一般翻譯遠程方法調用。RPC需要使用一個唯一標志代表請求,Java中使用java.util.UUID實現,發送端在發送消息前通過channel生成一個臨時隊列,并監聽該隊列,BasicProperties props = new BasicProperties.Builder().correlationId(corrId).replyTo(replyQueueName).build();這句代碼生成的就是發送消息的基本屬性,可以看到corrId就是UUID,replyQueueName就是臨時隊列名,這樣當接收端收到消息后就知道返回的消息應該發回哪個隊列了。
<dependencies> <dependency> <groupId>org.springframework.amqp</groupId> <artifactId>spring-amqp-rabbit</artifactId> <version>1.0.0.RC1</version> </dependency> </dependencies> 常用的類有:org.springframework.amqp.AmqpAdminJava代碼
org.springframework.amqp.AmqpTemplate org.springframework.amqp.Binding org.springframework.amqp.DirectExchange org.springframework.amqp.FanoutExchange org.springframework.amqp.TopicExchange org.springframework.amqp.Message org.springframework.amqp.MessageListener org.springframework.amqp.MessageProperties
package stephansun.github.samples.amqp.spring.helloworld; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.support.converter.MessageConverter; import org.springframework.amqp.support.converter.SimpleMessageConverter; import org.springframework.context.support.ClassPathXmlapplicationContext; public class Send { private final static String QUEUE_NAME = "hello"; private static MessageConverter messageConverter = new SimpleMessageConverter(); public static void main(String[] args) { ClassPathXmlApplicationContext applicationContext = new ClassPathXmlApplicationContext( "stephansun/github/samples/amqp/spring/helloworld/spring-rabbitmq.xml"); RabbitTemplate rabbitTempalte = (RabbitTemplate) applicationContext.getBean("rabbitTemplate"); String message = "Hello World!"; rabbitTempalte.send("", QUEUE_NAME, messageConverter.toMessage(message, null)); } } Recv.javaJava代碼
package stephansun.github.samples.amqp.spring.helloworld; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.support.converter.MessageConverter; import org.springframework.amqp.support.converter.SimpleMessageConverter; import org.springframework.context.support.ClassPathXmlApplicationContext; public class Recv { private final static String QUEUE_NAME = "hello"; private static MessageConverter messageConverter = new SimpleMessageConverter(); public static void main(String[] args) { ClassPathXmlApplicationContext applicationContext = new ClassPathXmlApplicationContext( "stephansun/github/samples/amqp/spring/helloworld/spring-rabbitmq.xml"); RabbitTemplate rabbitTempalte = (RabbitTemplate) applicationContext.getBean("rabbitTemplate"); Message message = rabbitTempalte.receive(QUEUE_NAME); Object obj = messageConverter.fromMessage(message); System.out.println("received:[" + obj + "]"); } } spring-rabbitmq.xmlXml代碼
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xmlns:rabbit="http://www.springframework.org/schema/rabbit" xsi:schemaLocation=" http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.1.xsd http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit-1.0.xsd http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.1.xsd"> <!-- Infrastructure --> <rabbit:connection-factory id="connectionFactory"/> <rabbit:template id="rabbitTemplate" connection-factory="connectionFactory"/> <rabbit:admin connection-factory="connectionFactory"/> <rabbit:queue name="hello" durable="false" exclusive="false" auto-delete="false"/> </beans> 我們著重講解以下xml配置文件,第一行就給我們創建了一個mq的連接工廠,第二行創建了一個RabbitTemplate,這是一個模板類,定義了amqp中絕大多數的發送,接收方法。第三行是一個管理器,該bean在創建的時候,會在Spring Context中掃描所有已經注冊的queue,exchange,binding并將他們初始化好。第四行聲明了一個隊列,所見即所得,可以發現使用xml節省了好多代碼量。
package stephansun.github.samples.amqp.spring.workqueues; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessageProperties; import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener; import org.springframework.amqp.support.converter.SimpleMessageConverter; import com.rabbitmq.client.Channel; public class MyWorker implements ChannelAwareMessageListener { private void doWord(String task) { for (char ch : task.toCharArray()) { if (ch == '.') { try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } } throw new RuntimeException("test exception"); } @Override public void onMessage(Message message, Channel channel) throws Exception { System.out.println("MyWorker"); MessageProperties messageProperties = message.getMessageProperties(); String messageContent = (String) new SimpleMessageConverter().fromMessage(message); System.out.println("r[" + message + "]"); // 寫在前面會怎樣? // channel.basicAck(messageProperties.getDeliveryTag(), true); doWord(messageContent); System.out.println("deliveryTag是遞增的"); System.out.println(messageProperties.getDeliveryTag()); // 寫在后面會怎樣? // channel.basicAck(messageProperties.getDeliveryTag(), false); System.out.println("r[done]"); } } NewTask.javaJava代碼
package stephansun.github.samples.amqp.spring.workqueues; import java.io.IOException; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessageProperties; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.rabbit.support.DefaultMessagePropertiesConverter; import org.springframework.amqp.rabbit.support.MessagePropertiesConverter; import org.springframework.amqp.support.converter.MessageConverter; import org.springframework.amqp.support.converter.SimpleMessageConverter; import org.springframework.context.support.ClassPathXmlApplicationContext; public class NewTask { private static String getMessage(String[] strings) { if (strings.length < 1) { return "Hello World!"; } return joinStrings(strings, " "); } private static String joinStrings(String[] strings, String delimiter) { int length = strings.length; if (length == 0) { return ""; } StringBuilder words = new StringBuilder(strings[0]); for (int i = 1; i < length; i++) { words.append(delimiter).append(strings[i]); } return words.toString(); } private final static String QUEUE_NAME = "task_queue"; public static void main(String[] args) throws IOException { ClassPathXmlApplicationContext applicationContext = new ClassPathXmlApplicationContext( "stephansun/github/samples/amqp/spring/workqueues/spring-rabbitmq-sender.xml"); RabbitTemplate rabbitTemplate = (RabbitTemplate) applicationContext.getBean("rabbitTemplate"); String[] strs = new String[] { "First message." }; String messageStr = getMessage(strs); MessagePropertiesConverter messagePropertiesConverter = new DefaultMessagePropertiesConverter(); MessageProperties messageProperties = messagePropertiesConverter.toMessageProperties( com.rabbitmq.client.MessageProperties.PERSISTENT_TEXT_PLAIN, null, null); MessageConverter messageConverter = new SimpleMessageConverter(); Message message = messageConverter.toMessage(messageStr, messageProperties); rabbitTemplate.send("", QUEUE_NAME, message); System.out.println("s[" + message + "]"); } } Worker.javaJava代碼
package stephansun.github.samples.amqp.spring.workqueues; import org.springframework.context.support.ClassPathXmlApplicationContext; public class Worker { public static void main(String[] args) { ClassPathXmlApplicationContext applicationContext = new ClassPathXmlApplicationContext( "stephansun/github/samples/amqp/spring/workqueues/spring-rabbitmq-receiver.xml"); } } spring-rabbitmq-receiver.xmlJava代碼
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xmlns:rabbit="http://www.springframework.org/schema/rabbit" xsi:schemaLocation=" http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.1.xsd http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit-1.0.xsd http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.1.xsd"> <!-- Infrastructure --> <rabbit:connection-factory id="connectionFactory"/> <rabbit:template id="rabbitTemplate" connection-factory="connectionFactory" channel-transacted="true"/> <rabbit:admin connection-factory="connectionFactory"/> <rabbit:queue name="task_queue" durable="true" exclusive="false" auto-delete="false"/> <bean id="myWorker" class="stephansun.github.samples.amqp.spring.workqueues.MyWorker"/> <rabbit:listener-container connection-factory="connectionFactory" acknowledge="none" prefetch="1"> <rabbit:listener ref="myWorker" queue-names="task_queue"/> </rabbit:listener-container> </beans> spring-rabbit-sender.xmlXml代碼
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xmlns:rabbit="http://www.springframework.org/schema/rabbit" xsi:schemaLocation=" http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.1.xsd http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit-1.0.xsd http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.1.xsd"> <!-- Infrastructure --> <rabbit:connection-factory id="connectionFactory"/> <rabbit:template id="rabbitTemplate" connection-factory="connectionFactory"/> <rabbit:admin connection-factory="connectionFactory"/> <rabbit:queue name="task_queue" durable="true" exclusive="false" auto-delete="false"/> </beans> 具體區別可以通過與前面RabbitMQ 原生API寫的代碼做對照看出來。
package stephansun.github.samples.amqp.spring.publishsubscribe; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.support.converter.MessageConverter; import org.springframework.amqp.support.converter.SimpleMessageConverter; import org.springframework.context.support.ClassPathXmlApplicationContext; public class EmitLog { private static final String EXCHANGE_NAME = "logs"; private static MessageConverter messageConverter = new SimpleMessageConverter(); public static void main(String[] args) { ClassPathXmlApplicationContext applicationContext = new ClassPathXmlApplicationContext( "stephansun/github/samples/amqp/spring/publishsubscribe/spring-rabbitmq.xml"); RabbitTemplate rabbitTempalte = (RabbitTemplate) applicationContext.getBean("rabbitTemplate"); String message = getMessage(new String[] { "test" }); rabbitTempalte.send(EXCHANGE_NAME, "", messageConverter.toMessage(message, null)); System.out.println("sent [" + message + "]"); } private static String getMessage(String[] strings) { if (strings.length < 1) { return "Hello World!"; } return joinStrings(strings, " "); } private static String joinStrings(String[] strings, String delimiter) { int length = strings.length; if (length == 0) { return ""; } StringBuilder words = new StringBuilder(strings[0]); for (int i = 1; i < length; i++) { words.append(delimiter).append(strings[i]); } return words.toString(); } } ReceiveLogs.javaJava代碼
package stephansun.github.samples.amqp.spring.publishsubscribe; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.Binding.DestinationType; import org.springframework.amqp.core.FanoutExchange; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.core.RabbitAdmin; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.support.converter.MessageConverter; import org.springframework.amqp.support.converter.SimpleMessageConverter; import org.springframework.context.support.ClassPathXmlApplicationContext; public class ReceiveLogs { private static final String EXCHANGE_NAME = "logs"; private static MessageConverter messageConverter = new SimpleMessageConverter(); public static void main(String[] args) { ClassPathXmlApplicationContext applicationContext = new ClassPathXmlApplicationContext( "stephansun/github/samples/amqp/spring/publishsubscribe/spring-rabbitmq.xml"); RabbitTemplate rabbitTempalte = (RabbitTemplate) applicationContext.getBean("rabbitTemplate"); RabbitAdmin rabbitAdmin = (RabbitAdmin) applicationContext.getBean(RabbitAdmin.class); FanoutExchange fanoutExchange = new FanoutExchange(EXCHANGE_NAME); rabbitAdmin.declareExchange(fanoutExchange); String queueName = rabbitAdmin.declareQueue().getName(); Binding binding = new Binding(queueName, DestinationType.QUEUE, EXCHANGE_NAME, "", null); rabbitAdmin.declareBinding(binding); System.out.println("CTRL+C"); // FIXME 為什么要在這里暫停10秒鐘? try { Thread.sleep(10000); } catch (InterruptedException e) { e.printStackTrace(); } Message message = rabbitTempalte.receive(queueName); Object obj = messageConverter.fromMessage(message); System.out.println("received:[" + obj + "]"); } } spring-rabbitmq.xmlXml代碼
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xmlns:rabbit="http://www.springframework.org/schema/rabbit" xsi:schemaLocation=" http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.1.xsd http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit-1.0.xsd http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.1.xsd"> <!-- Infrastructure --> <rabbit:connection-factory id="connectionFactory"/> <rabbit:template id="rabbitTemplate" connection-factory="connectionFactory"/> <rabbit:admin connection-factory="connectionFactory"/> </beans>
package stephansun.github.samples.amqp.spring.routing; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.support.converter.MessageConverter; import org.springframework.amqp.support.converter.SimpleMessageConverter; import org.springframework.context.support.ClassPathXmlApplicationContext; public class EmitLogDirect { private static final String EXCHANGE_NAME = "direct_logs"; private static MessageConverter messageConverter = new SimpleMessageConverter(); public static void main(String[] args) { ClassPathXmlApplicationContext applicationContext = new ClassPathXmlApplicationContext( "stephansun/github/samples/amqp/spring/routing/spring-rabbitmq.xml"); RabbitTemplate rabbitTempalte = (RabbitTemplate) applicationContext.getBean("rabbitTemplate"); // diff String serverity = getServerity(new String[] { "test" }); String message = getMessage(new String[] { "test" }); rabbitTempalte.send(EXCHANGE_NAME, serverity, messageConverter.toMessage(message, null)); System.out.println("s[" + serverity + "]:[" + message + "]"); } private static String getServerity(String[] strings) { return "info"; } private static String getMessage(String[] strings) { if (strings.length < 1) { return "Hello World!"; } return joinStrings(strings, " "); } private static String joinStrings(String[] strings, String delimiter) { int length = strings.length; if (length == 0) { return ""; } StringBuilder words = new StringBuilder(strings[0]); for (int i = 1; i < length; i++) { words.append(delimiter).append(strings[i]); } return words.toString(); } } ReceiveLogsDirect.javaJava代碼
package stephansun.github.samples.amqp.spring.routing; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.Binding.DestinationType; import org.springframework.amqp.core.DirectExchange; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.core.RabbitAdmin; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.support.converter.MessageConverter; import org.springframework.amqp.support.converter.SimpleMessageConverter; import org.springframework.context.support.ClassPathXmlApplicationContext; public class ReceiveLogsDirect { private static final String EXCHANGE_NAME = "direct_logs"; private static MessageConverter messageConverter = new SimpleMessageConverter(); public static void main(String[] args) { ClassPathXmlApplicationContext applicationContext = new ClassPathXmlApplicationContext( "stephansun/github/samples/amqp/spring/routing/spring-rabbitmq.xml"); RabbitTemplate rabbitTempalte = (RabbitTemplate) applicationContext.getBean("rabbitTemplate"); RabbitAdmin rabbitAdmin = (RabbitAdmin) applicationContext.getBean(RabbitAdmin.class); DirectExchange directExchange = new DirectExchange(EXCHANGE_NAME); rabbitAdmin.declareExchange(directExchange); String queueName = rabbitAdmin.declareQueue().getName(); String[] strs = new String[] { "info", "waring", "error" }; for (String str : strs) { Binding binding = new Binding(queueName, DestinationType.QUEUE, EXCHANGE_NAME, str, null); rabbitAdmin.declareBinding(binding); } System.out.println("CTRL+C"); // FIXME 請你先思考一下,為什么要在這里暫停10秒鐘?然后問我。 try { Thread.sleep(10000); } catch (InterruptedException e) { e.printStackTrace(); } Message message = rabbitTempalte.receive(queueName); Object obj = messageConverter.fromMessage(message); System.out.println("received:[" + obj + "]"); } } spring-rabbitmq.xmlXml代碼
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xmlns:rabbit="http://www.springframework.org/schema/rabbit" xsi:schemaLocation=" http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.1.xsd http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit-1.0.xsd http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.1.xsd"> <!-- Infrastructure --> <rabbit:connection-factory id="connectionFactory"/> <rabbit:template id="rabbitTemplate" connection-factory="connectionFactory"/> <rabbit:admin connection-factory="connectionFactory"/> </beans> 實際上exchange,binding的聲明完全可以放在xml中,只是為了展示封裝的代碼底層到底是如何運行的,才在程序中手工調用方法。
package stephansun.github.samples.amqp.spring.topics; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.support.converter.MessageConverter; import org.springframework.amqp.support.converter.SimpleMessageConverter; import org.springframework.context.support.ClassPathXmlApplicationContext; public class EmitLogTopic { private static final String EXCHANGE_NAME = "topic_logs"; private static MessageConverter messageConverter = new SimpleMessageConverter(); public static void main(String[] args) { ClassPathXmlApplicationContext applicationContext = new ClassPathXmlApplicationContext( "stephansun/github/samples/amqp/spring/topics/spring-rabbitmq.xml"); RabbitTemplate rabbitTempalte = (RabbitTemplate) applicationContext.getBean("rabbitTemplate"); // diff String serverity = getServerity(new String[] { "test" }); String message = getMessage(new String[] { "test" }); rabbitTempalte.send(EXCHANGE_NAME, serverity, messageConverter.toMessage(message, null)); System.out.println("s[" + serverity + "]:[" + message + "]"); } private static String getServerity(String[] strings) { return "kern.critical"; } private static String getMessage(String[] strings) { if (strings.length < 1) { return "Hello World!"; } return joinStrings(strings, " "); } private static String joinStrings(String[] strings, String delimiter) { int length = strings.length; if (length == 0) { return ""; } StringBuilder words = new StringBuilder(strings[0]); for (int i = 1; i < length; i++) { words.append(delimiter).append(strings[i]); } return words.toString(); } } ReceiveLogsTopic.java Java代碼
package stephansun.github.samples.amqp.spring.topics; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.Binding.DestinationType; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.TopicExchange; import org.springframework.amqp.rabbit.core.RabbitAdmin; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.support.converter.MessageConverter; import org.springframework.amqp.support.converter.SimpleMessageConverter; import org.springframework.context.support.ClassPathXmlApplicationContext; public class ReceiveLogsTopic { private static final String EXCHANGE_NAME = "topic_logs"; private static MessageConverter messageConverter = new SimpleMessageConverter(); public static void main(String[] args) { ClassPathXmlApplicationContext applicationContext = new ClassPathXmlApplicationContext( "stephansun/github/samples/amqp/spring/topics/spring-rabbitmq.xml"); RabbitTemplate rabbitTempalte = (RabbitTemplate) applicationContext.getBean("rabbitTemplate"); RabbitAdmin rabbitAdmin = (RabbitAdmin) applicationContext.getBean(RabbitAdmin.class); TopicExchange directExchange = new TopicExchange(EXCHANGE_NAME); rabbitAdmin.declareExchange(directExchange); String queueName = rabbitAdmin.declareQueue().getName(); String[] strs1 = new String[] { "#" }; String[] strs2 = new String[] { "kern.*" }; String[] strs3 = new String[] { "*.critical" }; String[] strs4 = new String[] { "kern.*", "*.critical" }; String[] strs5 = new String[] { "kern.critical", "A critical kernel error" }; for (String str : strs5) { Binding binding = new Binding(queueName, DestinationType.QUEUE, EXCHANGE_NAME, str, null); rabbitAdmin.declareBinding(binding); } System.out.println("CTRL+C"); // FIXME 為什么要在這里暫停10秒鐘? try { Thread.sleep(30000); } catch (InterruptedException e) { e.printStackTrace(); } Message message = rabbitTempalte.receive(queueName); Object obj = messageConverter.fromMessage(message); System.out.println("received:[" + obj + "]"); } } spring-rabbitmq.xmlJava代碼
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xmlns:rabbit="http://www.springframework.org/schema/rabbit" xsi:schemaLocation=" http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.1.xsd http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit-1.0.xsd http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.1.xsd"> <!-- Infrastructure --> <rabbit:connection-factory id="connectionFactory"/> <rabbit:template id="rabbitTemplate" connection-factory="connectionFactory"/> <rabbit:admin connection-factory="connectionFactory"/> </beans>
package stephansun.github.samples.amqp.spring.rpc; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.support.converter.SimpleMessageConverter; import org.springframework.context.support.ClassPathXmlApplicationContext; public class RPCClient { private static String requestQueueName = "rpc_queue"; public static void main(String[] args) { ClassPathXmlApplicationContext applicationContext = new ClassPathXmlApplicationContext( "stephansun/github/samples/amqp/spring/rpc/spring-rabbitmq-client.xml"); RabbitTemplate rabbitTemplate = (RabbitTemplate) applicationContext.getBean("rabbitTemplate"); String message = "30"; Message reply = rabbitTemplate.sendAndReceive("", requestQueueName, new SimpleMessageConverter().toMessage(message, null)); if (reply == null) { System.out.println("接收超時,返回null"); } else { System.out.println("接收到消息:"); System.out.println(reply); } } } RPCServer.javaJava代碼
package stephansun.github.samples.amqp.spring.rpc; import org.springframework.context.support.ClassPathXmlApplicationContext; public class RPCServer { public static void main(String[] args) { ClassPathXmlApplicationContext applicationContext = new ClassPathXmlApplicationContext( "stephansun/github/samples/amqp/spring/rpc/spring-rabbitmq-server.xml"); } } RPCServerListener.javaJava代碼
package stephansun.github.samples.amqp.spring.rpc; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessageListener; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.support.converter.MessageConverter; import org.springframework.amqp.support.converter.SimpleMessageConverter; public class RPCServerListener implements MessageListener { private RabbitTemplate rabbitTemplate; private static MessageConverter messageConverter = new SimpleMessageConverter(); public void setRabbitTemplate(RabbitTemplate rabbitTemplate) { this.rabbitTemplate = rabbitTemplate; } @Override public void onMessage(Message requestMessage) { Object obj = messageConverter.fromMessage(requestMessage); String str = (String) obj; int n = Integer.parseInt(str); System.out.println(" [.] fib(" + requestMessage + ")"); String response = "" + fib(n); String replyTo = requestMessage.getMessageProperties().getReplyTo(); rabbitTemplate.send( "", replyTo, messageConverter.toMessage(response, null)); } private static int fib(int n) { if (n == 0) { return 0; } if (n == 1) { return 1; } return fib(n - 1) + fib(n - 2); } } spring-rabbitmq-client.xmlJava代碼
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xmlns:rabbit="http://www.springframework.org/schema/rabbit" xsi:schemaLocation=" http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.1.xsd http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit-1.0.xsd http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.1.xsd"> <!-- Infrastructure --> <rabbit:connection-factory id="connectionFactory"/> <rabbit:template id="rabbitTemplate" connection-factory="connectionFactory" reply-timeout="1000"/> <rabbit:admin connection-factory="connectionFactory"/> </beans> spring-rabbitmq-server.xmlJava代碼
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xmlns:rabbit="http://www.springframework.org/schema/rabbit" xsi:schemaLocation=" http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.1.xsd http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit-1.0.xsd http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.1.xsd"> <!-- Infrastructure --> <rabbit:connection-factory id="connectionFactory"/> <rabbit:template id="rabbitTemplate" connection-factory="connectionFactory"/> <rabbit:admin connection-factory="connectionFactory"/> <rabbit:queue name="rpc_queue" durable="false" exclusive="false" auto-delete="false"> </rabbit:queue> <bean id="myListener" class="stephansun.github.samples.amqp.spring.rpc.RPCServerListener"> <property name="rabbitTemplate" ref="rabbitTemplate"/> </bean> <rabbit:listener-container connection-factory="connectionFactory" prefetch="1"> <rabbit:listener queue-names="rpc_queue" ref="myListener"/> </rabbit:listener-container> </beans> 本代碼演示了監聽器的用法,RabbitTemplate提供的所有方法都是同步的,所有當使用RabbitTemplate的receive方法時,它馬上連接到隊列,查看是否由消息,有就收下來,并關閉連接,沒有也不拋出異常,只返回一個null值。這就解釋了為什么我上面代碼中多次使用sleep10秒,因為如果先運行接收端,它不能不停循環地收消息,所以在發送端還沒發消息時,它就已經結束了。而監聽器(Listener)不一樣,底層代碼中會使用org.springframework.amqp.rabbit.listener.SimepleMessageListenerContainer中的內部類AsyncMessageProcessingConsumer實現,該類為一個線程類,在線程的run方法中執行了while的一段代碼。RabbitTemplate提供了一個sendAndReceive()方法,它實現了一個簡單的RPC模型。這里還有一個prefetch的含義,該含義同原生API中的Qos一樣。隨后會講到Spring遠程調用框架,在此先把代碼列出來
Main.java
Java代碼
package stephansun.github.samples.amqp.spring.remoting; import java.util.HashMap; import java.util.Map; import org.springframework.context.support.ClassPathXmlApplicationContext; public class Main { public static void main(String[] args) { ClassPathXmlApplicationContext applicationContext = new ClassPathXmlApplicationContext(new String[] { "stephansun/github/samples/amqp/spring/remoting/amqp-remoting.xml", "stephansun/github/samples/amqp/spring/remoting/amqp-remoting-sender.xml", "stephansun/github/samples/amqp/spring/remoting/amqp-remoting-receiver.xml" }); MyService sender = (MyService) applicationContext.getBean("sender"); sender.sayHello(); Map<String, Object> param = new HashMap<String, Object>(); param.put("name", "stephan"); param.put("age", 26); String str = sender.foo(param); System.out.println("str:" + str); } }
MyService.java
Java代碼
package stephansun.github.samples.amqp.spring.remoting; import java.util.Map; public interface MyService { void sayHello(); String foo(Map<String, Object> param); }
MyServiceImpl.java
Java代碼
package stephansun.github.samples.amqp.spring.remoting; import java.util.Map; public class MyServiceImpl implements MyService { @Override public void sayHello() { System.out.println("hello world!"); } @Override public String foo(Map<String, Object> param) { return param.toString(); } }
amqp-remoting-receiver.xml
Xml代碼
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xmlns:rabbit="http://www.springframework.org/schema/rabbit" xsi:schemaLocation=" http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.1.xsd http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit-1.0.xsd http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.1.xsd"> <bean id="myService" class="stephansun.github.samples.amqp.spring.remoting.MyServiceImpl"/> <bean id="receiver" class="org.springframework.amqp.remoting.AmqpInvokerServiceExporter"> <property name="serviceInterface" value="stephansun.github.samples.amqp.spring.remoting.MyService"/> <property name="service" ref="myService"/> </bean> <rabbit:listener-container connection-factory="connectionFactory"> <rabbit:listener ref="receiver" queue-names="si.test.queue"/> </rabbit:listener-container> </beans>
amqp-remoting-sender.xml
Xml代碼
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xmlns:rabbit="http://www.springframework.org/schema/rabbit" xsi:schemaLocation=" http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.1.xsd http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit-1.0.xsd http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.1.xsd"> <bean id="sender" class="org.springframework.amqp.remoting.AmqpInvokerProxyFactoryBean"> <property name="amqpTemplate" ref="amqpTemplate"/> <property name="serviceInterface" value="stephansun.github.samples.amqp.spring.remoting.MyService"/> <property name="exchange" value="si.test.exchange"/> <property name="routingKey" value="si.test.binding"/> </bean> </beans>
amqp-remoting.xml
Xml代碼
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xmlns:rabbit="http://www.springframework.org/schema/rabbit" xsi:schemaLocation=" http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.1.xsd http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit-1.0.xsd http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.1.xsd"> <!-- Infrastructure --> <rabbit:connection-factory id="connectionFactory"/> <rabbit:template id="amqpTemplate" connection-factory="connectionFactory"/> <rabbit:admin connection-factory="connectionFactory"/> <rabbit:queue name="si.test.queue"/> <rabbit:direct-exchange name="si.test.exchange"> <rabbit:bindings> <rabbit:binding queue="si.test.queue" key="si.test.binding"/> </rabbit:bindings> </rabbit:direct-exchange> </beans>
關鍵的幾個類有:
Java代碼
org.springframework.amqp.remoting.AmqpInvokerClientIntecrptor org.springframework.amqp.remoting.AmqpInvokerProxyFactoryBean org.springframework.amqp.remoting.AmqpInvokerServiceExporter
其中AmqpInvokerProxyFactoryBean繼承與AmqpInvokerClientInterceptor
AmqpInvovkerServiceExporter除了繼承了Spring遠程調用框架的RemoteInvocationBasedExporter,還額外實現了ChannelAwareMessageListener接口,這個接口的handle方法處理消息,且實現該接口的類都可以被SimpleMessageListenerContainer管理起來。
下面我們寫一段簡單的代碼初步領略一下Spring遠程調用框架
pom.xml
Xml代碼
<dependencies> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-context</artifactId> <version>3.1.0.RELEASE</version> </dependency> </dependencies>
Main.java
Java代碼
package stephansun.github.samples.spring.remoting; import java.util.HashMap; import java.util.Map; import org.springframework.context.support.ClassPathXmlApplicationContext; public class Main { public static void main(String[] args) { ClassPathXmlApplicationContext applicationContext = new ClassPathXmlApplicationContext(new String[] { "stephansun/github/samples/spring/remoting/spring-remoting.xml" }); MyService myService = (MyService) applicationContext.getBean("sender"); Map<String, Object> param = new HashMap<String, Object>(); param.put("name", "stephan"); param.put("age", 26); String str = myService.foo(param); System.out.println("str:" + str); } } MyInvokerClientInterceptor.java
Java代碼
package stephansun.github.samples.spring.remoting; import org.aopalliance.intercept.MethodInterceptor; import org.aopalliance.intercept.MethodInvocation; import org.springframework.beans.factory.InitializingBean; import org.springframework.remoting.support.DefaultRemoteInvocationFactory; import org.springframework.remoting.support.RemoteInvocation; import org.springframework.remoting.support.RemoteInvocationFactory; import org.springframework.remoting.support.RemoteInvocationResult; public class MyInvokerClientInterceptor implements MethodInterceptor, InitializingBean { private RemoteInvocationFactory remoteInvocationFactory = new DefaultRemoteInvocationFactory(); public void setRemoteInvocationFactory(RemoteInvocationFactory remoteInvocationFactory) { this.remoteInvocationFactory = (remoteInvocationFactory != null ? remoteInvocationFactory : new DefaultRemoteInvocationFactory()); } protected RemoteInvocation createRemoteInvocation(MethodInvocation methodInvocation) { return this.remoteInvocationFactory.createRemoteInvocation(methodInvocation); } @Override public void afterPropertiesSet() throws Exception { System.out.println("afterPropertiesSet"); } @Override public Object invoke(MethodInvocation methodInvocation) throws Throwable { RemoteInvocation invocation = createRemoteInvocation(methodInvocation); Object[] arguments = invocation.getArguments(); System.out.println("arguments:" + arguments); String methodName = invocation.getMethodName(); System.out.println("methodName:" + methodName); Class[] classes = invocation.getParameterTypes(); System.out.println("classes:" + classes); // do whatever you want to do RemoteInvocationResult result = new RemoteInvocationResult("hello, world!"); return result.getValue(); } }
MyInvokerProxyFactoryBean.java
Java代碼
package stephansun.github.samples.spring.remoting; import org.springframework.aop.framework.ProxyFactory; import org.springframework.beans.factory.BeanClassLoaderAware; import org.springframework.beans.factory.FactoryBean; import org.springframework.util.ClassUtils; public class MyInvokerProxyFactoryBean extends MyInvokerClientInterceptor implements FactoryBean<Object>, BeanClassLoaderAware { private Class serviceInterface; private ClassLoader beanClassLoader = ClassUtils.getDefaultClassLoader(); private Object serviceProxy; // FIXME for Spring injection public void setServiceInterface(Class serviceInterface) { this.serviceInterface = serviceInterface; } public void afterPropertiesSet() throws Exception { super.afterPropertiesSet(); if (this.serviceInterface == null) { throw new IllegalArgumentException("Property 'serviceInterface' is required"); } this.serviceProxy = new ProxyFactory(this.serviceInterface, this).getProxy(this.beanClassLoader); } @Override public void setBeanClassLoader(ClassLoader classLoader) { this.beanClassLoader = classLoader; } @Override public Object getObject() throws Exception { return this.serviceProxy; } @Override public Class<?> getObjectType() { return this.serviceInterface; } @Override public boolean isSingleton() { return true; } }
MyService.java
Java代碼
package stephansun.github.samples.spring.remoting; import java.util.Map; public interface MyService { void sayHello(); String foo(Map<String, Object> param); }
spring-remoting.xml
Xml代碼
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xsi:schemaLocation=" http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.1.xsd http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.1.xsd"> <bean id="sender" class="stephansun.github.samples.spring.remoting.MyInvokerProxyFactoryBean"> <property name="serviceInterface" value="stephansun.github.samples.spring.remoting.MyService"/> </bean> </beans>
從輸出的結果可以看出,Spring將接口的參數,調用方法,類名字封裝到RemoteInvocation類中,這個類是序列的,意味著它可以自由地以字節形式在網絡上傳輸,jms,http,amqp都支持字節形式地消息傳輸,所以我們能基于接口遠程方法調用,無論你采用那種網絡傳輸協議。
pom.xml
Xml代碼
<dependencies> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-all</artifactId> <version>5.3.0</version> </dependency> </dependencies>
Receiver.java
Java代碼
package stephansun.github.samples.jms.plain.pointtopoint; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.command.ActiveMQQueue; public class Receiver { public static void main(String[] args) { // 獲得連接工廠 ConnectionFactory cf = new ActiveMQConnectionFactory( "tcp://localhost:61616"); // javax.jms.Connection Connection conn = null; // javax.jms.Session Session session = null; try { // 創建連接 conn = cf.createConnection(); // 創建會話 session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); // 選擇目標 Destination destination = new ActiveMQQueue("myQueue"); // MessageConsumer consumer = session.createConsumer(destination); conn.start(); // 接收消息 Message message = consumer.receive(); TextMessage textMessage = (TextMessage) message; System.out.println("得到一個消息:" + textMessage.getText()); } catch (JMSException e) { // 處理異常 e.printStackTrace(); } finally { try { // 清理資源 if (session != null) { session.close(); } if (conn != null) { conn.close(); } } catch (JMSException ex) { ex.printStackTrace(); } } } }
Sender.java
Java代碼
package stephansun.github.samples.jms.plain.pointtopoint; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.command.ActiveMQQueue; public class Sender { public static void main(String[] args) { // 獲得連接工廠 ConnectionFactory cf = new ActiveMQConnectionFactory( "tcp://localhost:61616"); // javax.jms.Connection Connection conn = null; // javax.jms.Session Session session = null; try { // 創建連接 conn = cf.createConnection(); // 創建會話 session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); // 創建隊列 Destination destination = new ActiveMQQueue("myQueue"); // 設置消息 MessageProducer producer = session.createProducer(destination); TextMessage message = session.createTextMessage(); message.setText("Hello World!"); producer.send(message); } catch (JMSException e) { // 處理異常 e.printStackTrace(); } finally { try { // 清理資源 if (session != null) { session.close(); } if (conn != null) { conn.close(); } } catch (JMSException ex) { ex.printStackTrace(); } } } }
Receiver1.java
Java代碼
package stephansun.github.samples.jms.plain.pubsub; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.command.ActiveMQTopic; public class Receiver1 { public static void main(String[] args) { // 獲得連接工廠 ConnectionFactory cf = new ActiveMQConnectionFactory( "tcp://localhost:61616"); // javax.jms.Connection Connection conn = null; // javax.jms.Session Session session = null; try { // 創建連接 conn = cf.createConnection(); // 創建會話 session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); // 選擇目標 Destination destination = new ActiveMQTopic("myTopic"); // MessageConsumer consumer = session.createConsumer(destination); conn.start(); // 接收消息 Message message = consumer.receive(); TextMessage textMessage = (TextMessage) message; System.out.println("接收者1 得到一個消息:" + textMessage.getText()); } catch (JMSException e) { // 處理異常 e.printStackTrace(); } finally { try { // 清理資源 if (session != null) { session.close(); } if (conn != null) { conn.close(); } } catch (JMSException ex) { ex.printStackTrace(); } } } }
Sender.java
Java代碼
package stephansun.github.samples.jms.plain.pubsub; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.command.ActiveMQTopic; public class Sender { public static void main(String[] args) { // 獲得連接工廠 ConnectionFactory cf = new ActiveMQConnectionFactory("tcp://localhost:61616"); // javax.jms.Connection Connection conn = null; // javax.jms.Session Session session = null; try { // 創建連接 conn = cf.createConnection(); // 創建會話 session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); // 創建隊列 Destination destination = new ActiveMQTopic("myTopic"); // 設置消息 MessageProducer producer = session.createProducer(destination); TextMessage message = session.createTextMessage(); message.setText("Hello World!"); producer.send(message); } catch (JMSException e) { // 處理異常 e.printStackTrace(); } finally { try { // 清理資源 if (session != null) { session.close(); } if (conn != null) { conn.close(); } } catch (JMSException ex) { ex.printStackTrace(); } } } }
pom.xml
Xml代碼
<dependencies> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-all</artifactId> <version>5.3.0</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-jms</artifactId> <version>3.1.0.RELEASE</version> </dependency> </dependencies> point-to-point
Receiver.java
Java代碼
package stephansun.github.samples.jms.spring.pointtopoint; import javax.jms.JMSException; import javax.jms.MapMessage; import javax.jms.Queue; import org.springframework.context.support.ClassPathXmlApplicationContext; import org.springframework.jms.core.JmsTemplate; public class Receiver { public static void main(String[] args) throws JMSException { ClassPathXmlApplicationContext applicationContext = new ClassPathXmlApplicationContext( "stephansun/github/samples/jms/spring/pointtopoint/jms-point-to-point.xml"); Queue myQueue = (Queue) applicationContext.getBean("myQueue"); JmsTemplate jmsTemplate = (JmsTemplate) applicationContext.getBean("jmsTemplate"); MapMessage message = (MapMessage) jmsTemplate.receive(myQueue); String name = message.getString("name"); int age = message.getInt("age"); System.out.println("name:" + name); System.out.println("age:" + age); } } Sender.java
Java代碼
package stephansun.github.samples.jms.spring.pointtopoint; import javax.jms.JMSException; import javax.jms.MapMessage; import javax.jms.Message; import javax.jms.Queue; import javax.jms.Session; import org.springframework.context.support.ClassPathXmlApplicationContext; import org.springframework.jms.core.JmsTemplate; import org.springframework.jms.core.MessageCreator; public class Sender { public static void main(String[] args) { ClassPathXmlApplicationContext applicationContext = new ClassPathXmlApplicationContext( "stephansun/github/samples/jms/spring/pointtopoint/jms-point-to-point.xml"); Queue myQueue = (Queue) applicationContext.getBean("myQueue"); JmsTemplate jmsTemplate = (JmsTemplate) applicationContext.getBean("jmsTemplate"); jmsTemplate.send(myQueue, new MessageCreator() { @Override public Message createMessage(Session session) throws JMSException { MapMessage message = session.createMapMessage(); message.setString("name", "stephan"); message.setInt("age", 26); return message; } }); } }
jms-point-to-point.xml
Xml代碼
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xsi:schemaLocation=" http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.1.xsd http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.1.xsd"> <bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"/> <bean id="myQueue" class="org.apache.activemq.command.ActiveMQQueue"> <constructor-arg index="0" value="myQueue"/> </bean> <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate"> <property name="connectionFactory" ref="connectionFactory"/> </bean> </beans>
Receiver1.java
Java代碼
package stephansun.github.samples.jms.spring.pubsub; import javax.jms.JMSException; import javax.jms.MapMessage; import javax.jms.Topic; import org.springframework.context.support.ClassPathXmlApplicationContext; import org.springframework.jms.core.JmsTemplate; public class Receiver1 { public static void main(String[] args) throws JMSException { ClassPathXmlApplicationContext applicationContext = new ClassPathXmlApplicationContext( "stephansun/github/samples/jms/spring/pubsub/jms-pub-sub.xml"); Topic myTopic = (Topic) applicationContext.getBean("myTopic"); JmsTemplate jmsTemplate = (JmsTemplate) applicationContext.getBean("jmsTemplate"); MapMessage message = (MapMessage) jmsTemplate.receive(myTopic); String name = message.getString("name"); int age = message.getInt("age"); System.out.println("name:" + name); System.out.println("age:" + age); } }
Sender.java
Java代碼
package stephansun.github.samples.jms.spring.pubsub; import javax.jms.JMSException; import javax.jms.MapMessage; import javax.jms.Message; import javax.jms.Session; import javax.jms.Topic; import org.springframework.context.support.ClassPathXmlApplicationContext; import org.springframework.jms.core.JmsTemplate; import org.springframework.jms.core.MessageCreator; public class Sender { public static void main(String[] args) { ClassPathXmlApplicationContext applicationContext = new ClassPathXmlApplicationContext( "stephansun/github/samples/jms/spring/pubsub/jms-pub-sub.xml"); Topic myTopic = (Topic) applicationContext.getBean("myTopic"); JmsTemplate jmsTemplate = (JmsTemplate) applicationContext.getBean("jmsTemplate"); jmsTemplate.send(myTopic, new MessageCreator() { @Override public Message createMessage(Session session) throws JMSException { MapMessage message = session.createMapMessage(); message.setString("name", "stephan"); message.setInt("age", 26); return message; } }); } }
jms-pub-sub.xml
Xml代碼
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xsi:schemaLocation=" http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.1.xsd http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.1.xsd"> <bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"/> <bean id="myTopic" class="org.apache.activemq.command.ActiveMQTopic"> <constructor-arg index="0" value="myTopic"/> </bean> <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate"> <property name="connectionFactory" ref="connectionFactory"/> </bean> </beans>
pom.xml
Xml代碼
<dependencies> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-all</artifactId> <version>5.3.0</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-jms</artifactId> <version>3.1.0.RELEASE</version> </dependency> </dependencies>
Main.java
Java代碼
package stephansun.github.samples.jms.spring.remoting; import java.util.HashMap; import java.util.Map; import org.springframework.context.support.ClassPathXmlApplicationContext; public class Main { public static void main(String[] args) { ClassPathXmlApplicationContext applicationContext = new ClassPathXmlApplicationContext(new String[] { "stephansun/github/samples/jms/spring/remoting/jms-remoting.xml", "stephansun/github/samples/jms/spring/remoting/jms-remoting-sender.xml", "stephansun/github/samples/jms/spring/remoting/jms-remoting-receiver.xml" }); MyService sender = (MyService) applicationContext.getBean("sender"); sender.sayHello(); Map<String, Object> param = new HashMap<String, Object>(); param.put("name", "stephan"); param.put("age", 26); String str = sender.foo(param); System.out.println("str:" + str); } }
MyService.java
Java代碼
package stephansun.github.samples.jms.spring.remoting; import java.util.Map; public interface MyService { void sayHello(); String foo(Map<String, Object> param); } MyServiceImpl.java
Java代碼
package stephansun.github.samples.jms.spring.remoting; import java.util.Map; public class MyServiceImpl implements MyService { @Override public void sayHello() { System.out.println("hello world!"); } @Override public String foo(Map<String, Object> param) { return param.toString(); } } jms-remoting-receiver.xml
Xml代碼
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xsi:schemaLocation=" http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.1.xsd http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.1.xsd"> <bean id="myService" class="stephansun.github.samples.jms.spring.remoting.MyServiceImpl"/> <bean id="receiver" class="org.springframework.jms.remoting.JmsInvokerServiceExporter"> <property name="serviceInterface" value="stephansun.github.samples.jms.spring.remoting.MyService"/> <property name="service" ref="myService"/> </bean> <bean id="container" class="org.springframework.jms.listener.DefaultMessageListenerContainer"> <property name="connectionFactory" ref="connectionFactory"/> <property name="messageListener" ref="receiver"/> <property name="destination" ref="myQueue"/> </bean> </beans> jms-remoting-sender.xml
Xml代碼
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xsi:schemaLocation=" http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.1.xsd http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.1.xsd"> <bean id="sender" class="org.springframework.jms.remoting.JmsInvokerProxyFactoryBean"> <property name="connectionFactory" ref="connectionFactory"/> <property name="queue" ref="myQueue"/> <property name="serviceInterface" value="stephansun.github.samples.jms.spring.remoting.MyService"/> <property name="receiveTimeout" value="5000"/> </bean> </beans>
jms-remoting.xml
Xml代碼
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xsi:schemaLocation=" http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.1.xsd http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.1.xsd"> <bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"/> <bean id="myQueue" class="org.apache.activemq.command.ActiveMQQueue"> <constructor-arg index="0" value="myQueue"/> </bean> <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate"> <property name="connectionFactory" ref="connectionFactory"/> </bean> </beans>
JMS跟AMQP有很大的區別,JMS有兩種類型的隊列,一個是點對點的,一種是主題訂閱的,發送者直接將消息發送至隊列,接受者從隊列收消息,對于發布訂閱模式,每個消費者都從隊列中得到了相同的消息拷貝。
新聞熱點
疑難解答