- JMS方式调用
- Queue方式调用
- Topic方式调用
- ReqRsp方式调用
1、TestMsg.java
package com.neohope.ActiveMQ.test.beans; public class TestMsg implements java.io.Serializable{ private static final long serialVersionUID = 12345678; public TestMsg(int taskId, String taskInfo, int taskLevel) { this.taskId = taskId; this.taskInfo = taskInfo; this.taskLevel = taskLevel; } public int taskId; public String taskInfo; public int taskLevel; }
2、MqTopicPublisher.java
package com.neohope.ActiveMQ.test.topic; import com.neohope.ActiveMQ.test.beans.TestMsg; import org.apache.activemq.ActiveMQConnectionFactory; import javax.jms.*; /** * Created by Hansen on 2015/12/18. */ public class MqTopicPublisher { private static void ProduceMsg() { TopicConnectionFactory connectionFactory = null; TopicConnection connection = null; TopicSession session = null; try { connectionFactory = new ActiveMQConnectionFactory("tcp://127.0.0.1:61616"); connection = connectionFactory.createTopicConnection(); connection.start(); session = connection.createTopicSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE); Topic topic = session.createTopic("NEOHOPE.TestTopic"); TopicPublisher publisher = session.createPublisher(topic); publisher.setDeliveryMode(DeliveryMode.NON_PERSISTENT); for(int i=0; i<100; i++) { ObjectMessage message = session.createObjectMessage(); message.setObject(new TestMsg(i, "topic task info "+i, 3)); publisher.send(message); } ObjectMessage message = session.createObjectMessage(); message.setObject(new TestMsg(100, "-=END=-", 3)); publisher.send(message); session.commit(); } catch (JMSException ex) { ex.printStackTrace(); } finally { try { if(session!=null) { session.close(); } } catch (JMSException ex) { } try { if(connection!=null) { connection.close(); } } catch (JMSException ex) { } } } public static void main(String[] args) { ProduceMsg(); } }
3、MqTopicSubscriber.java
package com.neohope.ActiveMQ.test.topic; import com.neohope.ActiveMQ.test.beans.TestMsg; import org.apache.activemq.ActiveMQConnectionFactory; import javax.jms.*; /** * Created by Hansen on 2015/12/18. */ public class MqTopicSubscriber { private static void ReveiveMsg() { TopicConnectionFactory connectionFactory = null; TopicConnection connection = null; TopicSession session = null; final Object wait = new Object(); try { connectionFactory = new ActiveMQConnectionFactory("tcp://127.0.0.1:61616"); connection = connectionFactory.createTopicConnection(); session = connection.createTopicSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE); Topic topic = session.createTopic("NEOHOPE.TestTopic"); TopicSubscriber subscriber = session.createSubscriber(topic); subscriber.setMessageListener(new MessageListener() { public void onMessage(Message msg) { if (msg instanceof ObjectMessage) { ObjectMessage objMsg = (ObjectMessage) msg; try { TestMsg mqmsg = (TestMsg) objMsg.getObject(); String taskInfo = mqmsg.taskInfo; System.out.println("msg received: " + taskInfo); if ("-=END=-".equals(taskInfo)) { synchronized (wait) { wait.notify(); } } } catch (JMSException e) { e.printStackTrace(); } } } }); connection.start(); synchronized (wait){ wait.wait(); } session.commit(); } catch (JMSException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } finally { try { if(session!=null) { session.close(); } } catch (JMSException ex) { } try { if(connection!=null) { connection.close(); } } catch (JMSException ex) { } } } public static void main(String[] args) { System.setProperty("org.apache.activemq.SERIALIZABLE_PACKAGES","*"); ReveiveMsg(); } }