import java.util.concurrent.Semaphore; import java.util.concurrent.atomic.AtomicInteger; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Message; import javax.jms.MessageListener; import javax.jms.Queue; import javax.jms.Session; import org.apache.activemq.ActiveMQConnectionFactory; import org.junit.After; import org.junit.Before; import org.junit.Test; public class DistributedLockTest { private static final String BROKER_URL = "vm://dlt?broker.persistent=false&broker.useJmx=false"; private final Semaphore sem = new Semaphore(0); @Before public void setUp() throws Exception { } @After public void tearDown() throws Exception { } static interface Closable { void close() throws Exception; } class MyConn implements Closable { Connection conn; Session session; public MyConn(ConnectionFactory factory) throws Exception { conn = factory.createConnection(); conn.start(); } public void start() throws Exception { session = conn.createSession(false, Session.CLIENT_ACKNOWLEDGE); Queue queue = session.createQueue("queue?consumer.exclusive=true"); session.createConsumer(queue).setMessageListener(getListener(this)); } public void close() throws Exception { session.close(); } } @Test(timeout=2000) public void test() throws Exception { ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(BROKER_URL); Connection conn1 = factory.createConnection(); conn1.start(); Session producerSession = conn1.createSession(false, Session.CLIENT_ACKNOWLEDGE); Queue queue = producerSession.createQueue("queue?consumer.exclusive=true"); producerSession.createProducer(queue).send(producerSession.createMessage()); //producerSession.close(); for(int k=0; k<100; ++k) { MyConn my = new MyConn(factory); my.start(); } //TimeUnit.MINUTES.sleep(1); sem.acquire(100); } private AtomicInteger idCounter = new AtomicInteger(1); private MessageListener getListener(final Closable closable) { return new MessageListener() { private final int id = idCounter.getAndIncrement(); @Override public void onMessage(Message message) { try { System.out.println(String.format("ID = %04d - Begin", id)); //TimeUnit.SECONDS.sleep(1); closable.close(); sem.release(); System.out.println(String.format("ID = %04d - End", id)); } catch(Exception ex) { } } }; } }
27.12.11
A Simple Test for Distributed Lock with ActiveMQ's Exclusive Consumer Feature
Subscribe to:
Post Comments (Atom)
No comments:
Post a Comment