package demo;
import static org.junit.Assert.*;
import java.util.Date;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerFactory;
import org.apache.activemq.broker.BrokerService;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
public class ActiveMQWaitingListTest {
private ConnectionFactory connFactory;
private Connection conn;
private BrokerService broker;
@Before
public void setUp() throws Exception {
broker = BrokerFactory.createBroker("broker:tcp://0.0.0.0:61618");
broker.start();
connFactory = new ActiveMQConnectionFactory("tcp://127.0.0.1:61618");
conn = connFactory.createConnection();
conn.start();
}
@After
public void tearDown() throws Exception {
conn.close();
broker.stop();
}
@Test
public void test() throws Exception {
final Session s1 = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
final Session s2 = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
final Queue queue = s1.createQueue("Q");
final Topic topic = s1.createTopic("T");
final Queue reply = s1.createQueue("R");
final MessageProducer p1 = s1.createProducer(queue);
final MessageProducer p2 = s2.createProducer(topic);
final AtomicBoolean flag = new AtomicBoolean();
ExecutorService service = Executors.newCachedThreadPool();
service.execute(new Runnable() {
@Override
public void run() {
try {
doRun();
} catch(Exception ex) {
ex.printStackTrace();
}
}
public void doRun() throws Exception {
//wait for 10 minutes to begin
TimeUnit.MINUTES.sleep(5);
ConnectionFactory f2 = new ActiveMQConnectionFactory("tcp://10.0.0.17:61618");
//block the outer conn variable
Connection conn = f2.createConnection();
conn.start();
Session sx1 = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
Session sx2 = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer cx1 = sx1.createConsumer(queue);
final AtomicInteger counter = new AtomicInteger();
cx1.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
//delay for 10 millisecond
try {
counter.incrementAndGet();
TimeUnit.MILLISECONDS.sleep(10);
} catch (Exception ex) {
ex.printStackTrace();
}
}
});
MessageConsumer cx2 = sx2.createConsumer(topic);
TextMessage msg = (TextMessage)cx2.receive();
MessageProducer p = sx2.createProducer(reply);
p.send(sx2.createTextMessage(msg.getText() + " " + counter.get()));
conn.close();
}
});
service.execute(new Runnable() {
@Override
public void run() {
try {
doRun();
} catch(Exception ex) {
ex.printStackTrace();
}
}
private void doRun() throws InterruptedException,
JMSException {
while(!flag.get()) {
TimeUnit.MILLISECONDS.sleep(1);
p1.send(s1.createObjectMessage(new byte[1024]));
}
}
});
service.execute(new Runnable() {
@Override
public void run() {
try {
doRun();
} catch(Exception ex) {
ex.printStackTrace();
}
}
private void doRun() throws InterruptedException,
JMSException {
while(!flag.get()) {
TimeUnit.MILLISECONDS.sleep(10);
p2.send(s1.createTextMessage(String.valueOf(System.currentTimeMillis())));
}
}
});
service.shutdown();
Session s3 = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer c = s3.createConsumer(reply);
TextMessage msg = (TextMessage)c.receive();
String text = msg.getText();
String[] sep = text.split(" ");
long value = Long.parseLong(sep[0]);
int recv = Integer.parseInt(sep[1]);
System.out.printf("It took %d milliseconds to send and receive the message in another topic, while %d msg recv for queue\n",
(System.currentTimeMillis() - value), recv);
flag.set(true);
service.awaitTermination(1, TimeUnit.HOURS);
}
}
And here is the test result:
Here is the test result:
It took 36 milliseconds to send and receive the message in another topic, while 4 msg recv for queue
No comments:
Post a Comment