jms即java Message Service,是面向消息中間件的API,用于在兩個應用程序之間、分布式系統中發送消息,進行異步通信。下圖是jms點-點和發布-訂閱兩種形式的原理(百度一大堆資料)。

下面的程序以發布-訂閱為例為例:
1).準備環境:linux + eclipse + jdk1.7 + maven + ActiveMQ-5.12.0
2).安裝ActiveMQ,jms是原生,而ActiveMQ是jms的容器,方便使用 <1>下載ActiveMQ(這里以5.12.0為例) 官網:http://activemq.apache.org/
<2>安裝ActiveMQ 將apache-activemq-5.12.0-bin.tar.gz解壓在 /opt 目錄下:
tar -zxvf apache-activemq-5.12.0-bin.tar.gz<3>啟動ActiveMQ 進入解壓后的apache-activemq-5.12.0/bin目錄,運行啟動腳本:
./activemq start然后在瀏覽器輸入:http://localhost:8161/admin/ 輸入用戶:admin 密碼:admin, 如果出現下圖, 則表示安裝ActiveMQ成功

3).在eclipse新建一個maven項目, 項目結構如下:

4).pom 文件:
<PRoject xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.hsp.jms</groupId> <artifactId>JmsDemo</artifactId> <version>0.0.1-SNAPSHOT</version> <packaging>jar</packaging> <name>JmsDemo</name> <url>http://maven.apache.org</url> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> </properties> <dependencies> <dependency> <groupId>org.apache.geronimo.specs</groupId> <artifactId>geronimo-jms_1.1_spec</artifactId> <version>1.1</version> </dependency> <dependency> <groupId>org.apache.qpid</groupId> <artifactId>qpid-jms-client</artifactId> <version>0.3.0</version> </dependency> <!-- <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-broker</artifactId> <version>5.12.0</version> </dependency> --> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>3.8.1</version> <scope>test</scope> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>2.1</version> <configuration> <source>1.6</source> <target>1.6</target> </configuration> </plugin> <!-- include all the dependencies into the jar so it's easier to execute the example --> <plugin> <groupId>org.fusesource.mvnplugins</groupId> <artifactId>maven-uberize-plugin</artifactId> <version>1.14</version> <executions> <execution> <phase>package</phase> <goals><goal>uberize</goal></goals> </execution> </executions> </plugin> </plugins> </build></project>5).Publisher.java:
package com.hsp.jms.JmsDemo;import org.apache.qpid.jms.*;import javax.jms.*;public class Publisher { public static void main(String[] args) throws Exception { //topic前綴 final String TOPIC_PREFIX = "topic://"; //發布者的用戶名 String user = env("ACTIVEMQ_USER", "admin"); //發布者的密碼 String passWord = env("ACTIVEMQ_PASSWORD", "password"); //發布者的主機 String host = env("ACTIVEMQ_HOST", "localhost"); //發布者的端口號 int port = Integer.parseInt(env("ACTIVEMQ_PORT", "5672")); //發布者向外提供的鏈接 String connectionURI = "amqp://" + host + ":" + port; //發布者的destinationde名稱 String destinationName = arg(args, 0, "topic://event"); int messages = 10; int size = 256; //創建連接工廠 JmsConnectionFactory factory = new JmsConnectionFactory(connectionURI); //jms連接工廠創建一個連接 Connection connection = factory.createConnection(user, password); //開放連接 connection.start(); //根據連接創建一個會話 session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //發布者的目的的 Destination destination = null; if(destinationName.startsWith(TOPIC_PREFIX)) { destination = session.createTopic(destinationName.substring(TOPIC_PREFIX.length())); } else { destination = session.createQueue(destinationName); } //創建發布者 MessageProducer producer = session.createProducer(destination); //設置發布的消息為非持久態 producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); //發布消息 for(int i = 1; i <= messages; i++) { TextMessage msg = session.createTextMessage("#" + i); msg.setIntProperty("id", i); producer.send(msg); System.out.println(String.format("發送第 %d 條信息: ", i) + msg.getText()); } producer.send(session.createTextMessage("SHUTDOWN")); Thread.sleep(1000 * 3); //關閉連接 connection.close(); //退出程序 System.exit(0); } //獲取環境變量類 private static String env(String key, String defaultValue) { String rc = System.getenv(key); if(rc == null) return defaultValue; return rc; } private static String arg(String[] args, int index, String defaultValue) { if(index < args.length) return args[index]; else return defaultValue; }}6).Listener1.java:
package com.hsp.jms.JmsDemo;import javax.jms.*;import org.apache.qpid.jms.*;public class Listener1 { public static void main(String[] args) throws Exception { //topic前綴 final String TOPIC_PREFIX = "topic://"; //發布者的用戶名 String user = env("ACTIVEMQ_USER", "admin"); //發布者的密碼 String password = env("ACTIVEMQ_PASSWORD", "password"); //發布者的主機 String host = env("ACTIVEMQ_HOST", "localhost"); //發布者的端口號 int port = Integer.parseInt(env("ACTIVEMQ_PORT", "5672")); //發布者向外提供的鏈接 String connectionURI = "amqp://" + host + ":" + port; //發布者的destinationde名稱 String destinationName = arg(args, 0, "topic://event"); //創建連接工廠 JmsConnectionFactory factory = new JmsConnectionFactory(connectionURI); //jms連接工廠創建一個連接 Connection connection = factory.createConnection(user, password); //開放連接 connection.start(); //根據連接創建一個會話 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //發布者的目的的 Destination destination = null; if(destinationName.startsWith(TOPIC_PREFIX)) { destination = session.createTopic(destinationName.substring(TOPIC_PREFIX.length())); } else { destination = session.createQueue(destinationName); } MessageConsumer consumer = session.createConsumer(destination); long start = System.currentTimeMillis(); long count = 1; System.out.println("Listener1 等待消息 ..."); while(true) { Message msg = consumer.receive(); if(msg instanceof TextMessage) { String body = ((TextMessage) msg).getText(); if("SHUTDOWN".equals(body)) { long diff = System.currentTimeMillis() - start; System.out.println(String.format("在 %.2f 秒內接收到 %d 條消息", (1.0 * diff / 1000.0), (count - 1))); connection.close(); try { Thread.sleep(10); } catch (Exception e) {} System.exit(1); } else { try { if (count != msg.getIntProperty("id")) { System.out.println("消息不匹配: " + count + "!=" + msg.getIntProperty("id")); } } catch (NumberFormatException ignore) { } if(count == 1) { start = System.currentTimeMillis(); } System.out.println(String.format("接收到第 %d 條消息: ", count) + body); count ++; } } else { System.out.println("錯誤的消息類型: " + msg.getClass()); } } } //獲取環境變量類 private static String env(String key, String defaultValue) { String rc = System.getenv(key); if(rc == null) return defaultValue; return rc; } private static String arg(String[] args, int index, String defaultValue) { if(index < args.length) return args[index]; else return defaultValue; }}7).Listener2.java文件跟Listener1.java差不多
8).先運行Listener1.java和Listener2.java,再運行Publisher.java看看效果:



新聞熱點
疑難解答