import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicInteger;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.Queue;
import javax.jms.Session;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
public class DistributedLockTest
{
private static final String BROKER_URL = "vm://dlt?broker.persistent=false&broker.useJmx=false";
private final Semaphore sem = new Semaphore(0);
@Before
public void setUp() throws Exception
{
}
@After
public void tearDown() throws Exception
{
}
static interface Closable
{
void close() throws Exception;
}
class MyConn implements Closable
{
Connection conn;
Session session;
public MyConn(ConnectionFactory factory) throws Exception
{
conn = factory.createConnection();
conn.start();
}
public void start() throws Exception
{
session = conn.createSession(false, Session.CLIENT_ACKNOWLEDGE);
Queue queue = session.createQueue("queue?consumer.exclusive=true");
session.createConsumer(queue).setMessageListener(getListener(this));
}
public void close() throws Exception
{
session.close();
}
}
@Test(timeout=2000)
public void test() throws Exception
{
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(BROKER_URL);
Connection conn1 = factory.createConnection();
conn1.start();
Session producerSession = conn1.createSession(false, Session.CLIENT_ACKNOWLEDGE);
Queue queue = producerSession.createQueue("queue?consumer.exclusive=true");
producerSession.createProducer(queue).send(producerSession.createMessage());
//producerSession.close();
for(int k=0; k<100; ++k)
{
MyConn my = new MyConn(factory);
my.start();
}
//TimeUnit.MINUTES.sleep(1);
sem.acquire(100);
}
private AtomicInteger idCounter = new AtomicInteger(1);
private MessageListener getListener(final Closable closable)
{
return new MessageListener()
{
private final int id = idCounter.getAndIncrement();
@Override
public void onMessage(Message message)
{
try
{
System.out.println(String.format("ID = %04d - Begin", id));
//TimeUnit.SECONDS.sleep(1);
closable.close();
sem.release();
System.out.println(String.format("ID = %04d - End", id));
}
catch(Exception ex)
{
}
}
};
}
}
No comments:
Post a Comment