package demo; import static org.junit.Assert.*; import java.util.Date; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageListener; import javax.jms.MessageProducer; import javax.jms.Queue; import javax.jms.Session; import javax.jms.TextMessage; import javax.jms.Topic; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.broker.BrokerFactory; import org.apache.activemq.broker.BrokerService; import org.junit.After; import org.junit.Before; import org.junit.Test; public class ActiveMQWaitingListTest { private ConnectionFactory connFactory; private Connection conn; private BrokerService broker; @Before public void setUp() throws Exception { broker = BrokerFactory.createBroker("broker:tcp://0.0.0.0:61618"); broker.start(); connFactory = new ActiveMQConnectionFactory("tcp://127.0.0.1:61618"); conn = connFactory.createConnection(); conn.start(); } @After public void tearDown() throws Exception { conn.close(); broker.stop(); } @Test public void test() throws Exception { final Session s1 = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); final Session s2 = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); final Queue queue = s1.createQueue("Q"); final Topic topic = s1.createTopic("T"); final Queue reply = s1.createQueue("R"); final MessageProducer p1 = s1.createProducer(queue); final MessageProducer p2 = s2.createProducer(topic); final AtomicBoolean flag = new AtomicBoolean(); ExecutorService service = Executors.newCachedThreadPool(); service.execute(new Runnable() { @Override public void run() { try { doRun(); } catch(Exception ex) { ex.printStackTrace(); } } public void doRun() throws Exception { //wait for 10 minutes to begin TimeUnit.MINUTES.sleep(5); ConnectionFactory f2 = new ActiveMQConnectionFactory("tcp://10.0.0.17:61618"); //block the outer conn variable Connection conn = f2.createConnection(); conn.start(); Session sx1 = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); Session sx2 = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); MessageConsumer cx1 = sx1.createConsumer(queue); final AtomicInteger counter = new AtomicInteger(); cx1.setMessageListener(new MessageListener() { @Override public void onMessage(Message message) { //delay for 10 millisecond try { counter.incrementAndGet(); TimeUnit.MILLISECONDS.sleep(10); } catch (Exception ex) { ex.printStackTrace(); } } }); MessageConsumer cx2 = sx2.createConsumer(topic); TextMessage msg = (TextMessage)cx2.receive(); MessageProducer p = sx2.createProducer(reply); p.send(sx2.createTextMessage(msg.getText() + " " + counter.get())); conn.close(); } }); service.execute(new Runnable() { @Override public void run() { try { doRun(); } catch(Exception ex) { ex.printStackTrace(); } } private void doRun() throws InterruptedException, JMSException { while(!flag.get()) { TimeUnit.MILLISECONDS.sleep(1); p1.send(s1.createObjectMessage(new byte[1024])); } } }); service.execute(new Runnable() { @Override public void run() { try { doRun(); } catch(Exception ex) { ex.printStackTrace(); } } private void doRun() throws InterruptedException, JMSException { while(!flag.get()) { TimeUnit.MILLISECONDS.sleep(10); p2.send(s1.createTextMessage(String.valueOf(System.currentTimeMillis()))); } } }); service.shutdown(); Session s3 = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); MessageConsumer c = s3.createConsumer(reply); TextMessage msg = (TextMessage)c.receive(); String text = msg.getText(); String[] sep = text.split(" "); long value = Long.parseLong(sep[0]); int recv = Integer.parseInt(sep[1]); System.out.printf("It took %d milliseconds to send and receive the message in another topic, while %d msg recv for queue\n", (System.currentTimeMillis() - value), recv); flag.set(true); service.awaitTermination(1, TimeUnit.HOURS); } }And here is the test result: Here is the test result: It took 36 milliseconds to send and receive the message in another topic, while 4 msg recv for queue
29.3.13
A simple program to test two different channels of JMS
Subscribe to:
Post Comments (Atom)
No comments:
Post a Comment