import
org.apache.activemq.ActiveMQConnectionFactory;
import
javax.jms.Connection;
import
javax.jms.DeliveryMode;
import
javax.jms.Destination;
import
javax.jms.ExceptionListener;
import
javax.jms.JMSException;
import
javax.jms.Message;
import
javax.jms.MessageConsumer;
import
javax.jms.MessageProducer;
import
javax.jms.Session;
import
javax.jms.TextMessage;
/**
* Hello world!
*/
public
class
activemq5Failover {
public
static
void
main(String[] args)
throws
Exception {
thread(
new
HelloWorldProducer(),
false
);
thread(
new
HelloWorldProducer(),
false
);
thread(
new
HelloWorldConsumer(),
false
);
Thread.sleep(
1000
);
thread(
new
HelloWorldConsumer(),
false
);
thread(
new
HelloWorldProducer(),
false
);
thread(
new
HelloWorldConsumer(),
false
);
thread(
new
HelloWorldProducer(),
false
);
Thread.sleep(
1000
);
thread(
new
HelloWorldConsumer(),
false
);
thread(
new
HelloWorldProducer(),
false
);
thread(
new
HelloWorldConsumer(),
false
);
thread(
new
HelloWorldConsumer(),
false
);
thread(
new
HelloWorldProducer(),
false
);
thread(
new
HelloWorldProducer(),
false
);
Thread.sleep(
1000
);
thread(
new
HelloWorldProducer(),
false
);
thread(
new
HelloWorldConsumer(),
false
);
thread(
new
HelloWorldConsumer(),
false
);
thread(
new
HelloWorldProducer(),
false
);
thread(
new
HelloWorldConsumer(),
false
);
thread(
new
HelloWorldProducer(),
false
);
thread(
new
HelloWorldConsumer(),
false
);
thread(
new
HelloWorldProducer(),
false
);
thread(
new
HelloWorldConsumer(),
false
);
thread(
new
HelloWorldConsumer(),
false
);
thread(
new
HelloWorldProducer(),
false
);
}
public
static
void
thread(Runnable runnable,
boolean
daemon) {
Thread brokerThread =
new
Thread(runnable);
brokerThread.setDaemon(daemon);
brokerThread.start();
}
public
static
class
HelloWorldProducer
implements
Runnable {
public
void
run() {
try
{
ActiveMQConnectionFactory connectionFactory =
new
ActiveMQConnectionFactory(
"admin"
,
"your_password"
,
"failover:(tcp://192.168.1.241:61616,tcp://192.168.1.242:61616)?randomize=false"
);
Connection connection = connectionFactory.createConnection();
connection.start();
Session session = connection.createSession(
false
, Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createQueue(
"TEST.FOO"
);
MessageProducer producer = session.createProducer(destination);
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
String text =
"Hello world! From: "
+ Thread.currentThread().getName() +
" : "
+
this
.hashCode();
TextMessage message = session.createTextMessage(text);
System.out.println(
"Sent message: "
+ message.hashCode() +
" : "
+ Thread.currentThread().getName());
producer.send(message);
session.close();
connection.close();
}
catch
(Exception e) {
System.out.println(
"Caught: "
+ e);
e.printStackTrace();
}
}
}
public
static
class
HelloWorldConsumer
implements
Runnable, ExceptionListener {
public
void
run() {
try
{
ActiveMQConnectionFactory connectionFactory =
new
ActiveMQConnectionFactory(
"admin"
,
"your_password"
,
"failover:(tcp://192.168.1.241:61616,tcp://192.168.1.242:61616)?randomize=false"
);
Connection connection = connectionFactory.createConnection();
connection.start();
connection.setExceptionListener(
this
);
Session session = connection.createSession(
false
, Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createQueue(
"TEST.FOO"
);
MessageConsumer consumer = session.createConsumer(destination);
Message message = consumer.receive(
1000
);
if
(message
instanceof
TextMessage) {
TextMessage textMessage = (TextMessage) message;
String text = textMessage.getText();
System.out.println(
"Received: "
+ text);
}
else
{
System.out.println(
"Received: "
+ message);
}
consumer.close();
session.close();
connection.close();
}
catch
(Exception e) {
System.out.println(
"Caught: "
+ e);
e.printStackTrace();
}
}
public
synchronized
void
onException(JMSException ex) {
System.out.println(
"JMS Exception occured. Shutting down client."
);
}
}
}