- JMS方式调用
- Queue方式调用
- Topic方式调用
- ReqRsp方式调用
1、TestMsg.java
package com.neohope.ActiveMQ.test.beans; public class TestMsg implements java.io.Serializable{ private static final long serialVersionUID = 12345678; public TestMsg(int taskId, String taskInfo, int taskLevel) { this.taskId = taskId; this.taskInfo = taskInfo; this.taskLevel = taskLevel; } public int taskId; public String taskInfo; public int taskLevel; }
2、MqProducer.java
package com.neohope.ActiveMQ.test.jms; import com.neohope.ActiveMQ.test.beans.TestMsg; import org.apache.activemq.ActiveMQConnectionFactory; import javax.jms.*; public class MqProducer { private static void ProduceMsg() { ConnectionFactory connectionFactory = null; Connection connection = null; Session session = null; try { connectionFactory = new ActiveMQConnectionFactory("tcp://127.0.0.1:61616"); connection = connectionFactory.createConnection(); connection.start(); session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE); Destination destination = session.createQueue("NEOHOPE.TestQueue"); MessageProducer producer = session.createProducer(destination); producer.setDeliveryMode(DeliveryMode.PERSISTENT); for(int i=0; i<100; i++) { ObjectMessage message = session.createObjectMessage(); message.setObject(new TestMsg(i, "task info "+i, 3)); producer.send(message); } ObjectMessage message = session.createObjectMessage(); message.setObject(new TestMsg(100, "-=END=-", 3)); producer.send(message); session.commit(); } catch (JMSException ex) { ex.printStackTrace(); } finally { try { if(session!=null) { session.close(); } } catch (JMSException ex) { } try { if(connection!=null) { connection.close(); } } catch (JMSException ex) { } } } public static void main(String[] args) { ProduceMsg(); } }
3、MqConsumer.java
package com.neohope.ActiveMQ.test.jms; import com.neohope.ActiveMQ.test.beans.TestMsg; import org.apache.activemq.ActiveMQConnectionFactory; import javax.jms.*; public class MqConsumer { private static void ReveiveMsg() { ConnectionFactory connectionFactory = null; Connection connection = null; Session session = null; try { connectionFactory = new ActiveMQConnectionFactory("tcp://127.0.0.1:61616"); connection = connectionFactory.createConnection(); session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE); Destination destination = session.createQueue("NEOHOPE.TestQueue"); MessageConsumer consumer = session.createConsumer(destination); connection.start(); while (true) { Message msg = consumer.receive(); if (msg instanceof ObjectMessage) { ObjectMessage objMsg = (ObjectMessage) msg; TestMsg mqmsg = (TestMsg) objMsg.getObject(); String taskInfo = mqmsg.taskInfo; System.out.println("msg received: " + taskInfo); if ("-=END=-".equals(taskInfo)) { break; } } } session.commit(); } catch (JMSException e) { e.printStackTrace(); } finally { try { if(session!=null) { session.close(); } } catch (JMSException ex) { } try { if(connection!=null) { connection.close(); } } catch (JMSException ex) { } } } public static void main(String[] args) { System.setProperty("org.apache.activemq.SERIALIZABLE_PACKAGES","*"); ReveiveMsg(); } }
4、MqListener.java
package com.neohope.ActiveMQ.test.jms; import com.neohope.ActiveMQ.test.beans.TestMsg; import org.apache.activemq.ActiveMQConnectionFactory; import javax.jms.*; public class MqListener { private static void StartListen() { ConnectionFactory connectionFactory = null; Connection connection = null; Session session = null; final Object wait = new Object(); try { connectionFactory = new ActiveMQConnectionFactory("tcp://127.0.0.1:61616"); connection = connectionFactory.createConnection(); session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE); Destination destination = session.createQueue("NEOHOPE.TestQueue"); MessageConsumer consumer = session.createConsumer(destination); consumer.setMessageListener(new MessageListener() { public void onMessage(Message msg) { if (msg instanceof ObjectMessage) { ObjectMessage objMsg = (ObjectMessage) msg; try { TestMsg mqmsg = (TestMsg) objMsg.getObject(); String taskInfo = mqmsg.taskInfo; System.out.println("msg received: " + taskInfo); if ("-=END=-".equals(taskInfo)) { synchronized (wait) { wait.notify(); } } } catch (JMSException e) { e.printStackTrace(); } } } }); connection.start(); synchronized (wait){ wait.wait(); } session.commit(); } catch (JMSException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } finally { try { if(session!=null) { session.close(); } } catch (JMSException ex) { } try { if(connection!=null) { connection.close(); } } catch (JMSException ex) { } } } public static void main(String[] args) { System.setProperty("org.apache.activemq.SERIALIZABLE_PACKAGES", "*"); StartListen(); } }