27.12.11

A Simple Test for Distributed Lock with ActiveMQ's Exclusive Consumer Feature

import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicInteger;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.Queue;
import javax.jms.Session;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

public class DistributedLockTest
{
    private static final String BROKER_URL = "vm://dlt?broker.persistent=false&broker.useJmx=false";
    private final Semaphore sem = new Semaphore(0);
    
    @Before
    public void setUp() throws Exception
    {
    }

    @After
    public void tearDown() throws Exception
    {
    }
    
    static interface Closable
    {
        void close() throws Exception;
    }
    
    class MyConn implements Closable
    {
        Connection conn;
        Session session;
        
        public MyConn(ConnectionFactory factory) throws Exception
        {
            conn = factory.createConnection();
            conn.start();            
        }
        
        public void start() throws Exception
        {
            session = conn.createSession(false, Session.CLIENT_ACKNOWLEDGE);
            Queue queue = session.createQueue("queue?consumer.exclusive=true");
            session.createConsumer(queue).setMessageListener(getListener(this));
        }
        
        public void close() throws Exception
        {
            session.close();
        }
    }
    
    @Test(timeout=2000)
    public void test() throws Exception
    {
        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(BROKER_URL);
        Connection conn1 = factory.createConnection();
        conn1.start();
        
        Session producerSession = conn1.createSession(false, Session.CLIENT_ACKNOWLEDGE);
        Queue queue = producerSession.createQueue("queue?consumer.exclusive=true");
        producerSession.createProducer(queue).send(producerSession.createMessage());
        //producerSession.close();

        for(int k=0; k<100; ++k)
        {
            MyConn my = new MyConn(factory);
            my.start();
        }

        
        //TimeUnit.MINUTES.sleep(1);
        sem.acquire(100);
    }

    private AtomicInteger idCounter = new AtomicInteger(1);
    
    private MessageListener getListener(final Closable closable)
    {
        return new MessageListener()
        {
            private final int id = idCounter.getAndIncrement();
            
            @Override
            public void onMessage(Message message)
            {
                try 
                {
                    System.out.println(String.format("ID = %04d - Begin", id));                    
                    //TimeUnit.SECONDS.sleep(1);
                    closable.close();
                    sem.release();
                    System.out.println(String.format("ID = %04d - End", id));
                }
                catch(Exception ex)
                {
                    
                }
            }
        };
    }

}

No comments: