29.3.13

A simple program to test two different channels of JMS

package demo;

import static org.junit.Assert.*;

import java.util.Date;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerFactory;
import org.apache.activemq.broker.BrokerService;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

public class ActiveMQWaitingListTest {
 
 private ConnectionFactory connFactory;
 private Connection conn;
 private BrokerService broker;

 @Before
 public void setUp() throws Exception {
  broker = BrokerFactory.createBroker("broker:tcp://0.0.0.0:61618");
  broker.start();
  
  connFactory = new ActiveMQConnectionFactory("tcp://127.0.0.1:61618");
  conn = connFactory.createConnection();
  conn.start();   
 }
 
 @After
 public void tearDown() throws Exception {
  conn.close();
  broker.stop();
 }

 @Test
 public void test() throws Exception {
  final Session s1 = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
  final Session s2 = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
  
  final Queue queue = s1.createQueue("Q");
  final Topic topic = s1.createTopic("T");
  final Queue reply = s1.createQueue("R");
  
  final MessageProducer p1 = s1.createProducer(queue);
  final MessageProducer p2 = s2.createProducer(topic);
  
  final AtomicBoolean flag = new AtomicBoolean();
  
  ExecutorService service = Executors.newCachedThreadPool();
  
  service.execute(new Runnable() {
   
   @Override
   public void run() {
    try {
     doRun();
    } catch(Exception ex) {
     ex.printStackTrace();
    }
   }
   
   public void doRun() throws Exception {
    //wait for 10 minutes to begin
    
    TimeUnit.MINUTES.sleep(5);
    
    ConnectionFactory f2 = new ActiveMQConnectionFactory("tcp://10.0.0.17:61618");
    
    //block the outer conn variable
    Connection conn = f2.createConnection();
    conn.start();
    
    Session sx1 = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
    Session sx2 = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
    
    MessageConsumer cx1 = sx1.createConsumer(queue);
    final AtomicInteger counter = new AtomicInteger();
    
    cx1.setMessageListener(new MessageListener() {
     
     @Override
     public void onMessage(Message message) {
      //delay for 10 millisecond
      try {
       counter.incrementAndGet();
       TimeUnit.MILLISECONDS.sleep(10);
      } catch (Exception ex) {
       ex.printStackTrace();
      }
     }
    });
    
    MessageConsumer cx2 = sx2.createConsumer(topic);
    TextMessage msg = (TextMessage)cx2.receive();
    
    MessageProducer p = sx2.createProducer(reply);
    p.send(sx2.createTextMessage(msg.getText() + " " + counter.get()));
    
    conn.close();
   }
  });
  
  service.execute(new Runnable() {
   
   @Override
   public void run() {
    try {
     doRun();
    } catch(Exception ex) {
     ex.printStackTrace();
    }
    
   }

   private void doRun() throws InterruptedException,
     JMSException {
    while(!flag.get()) {
     TimeUnit.MILLISECONDS.sleep(1);
     p1.send(s1.createObjectMessage(new byte[1024]));
    }
   }
  });
  
  service.execute(new Runnable() {
   
   @Override
   public void run() {
    try {
     doRun();
    } catch(Exception ex) {
     ex.printStackTrace();
    }
    
   }

   private void doRun() throws InterruptedException,
     JMSException {
    while(!flag.get()) {
     TimeUnit.MILLISECONDS.sleep(10);
     p2.send(s1.createTextMessage(String.valueOf(System.currentTimeMillis())));
    }
   }
  });
  
  service.shutdown();
  
  Session s3 = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
  MessageConsumer c = s3.createConsumer(reply);
  TextMessage msg = (TextMessage)c.receive();
  String text = msg.getText();
  String[] sep = text.split(" ");
  long value = Long.parseLong(sep[0]);  
  int recv = Integer.parseInt(sep[1]);
  
  System.out.printf("It took %d milliseconds to send and receive the message in another topic, while %d msg recv for queue\n", 
    (System.currentTimeMillis() - value), recv);
  
  flag.set(true);
  service.awaitTermination(1, TimeUnit.HOURS);
  
 }

}
And here is the test result: Here is the test result: It took 36 milliseconds to send and receive the message in another topic, while 4 msg recv for queue

No comments: