ActiveMQ is a open source message oriented middleware (MOM) from the Apache Software Foundation. ActiveMQ is written in Java and allows communication between systems using the JMS (Java Message Service) specification.
The most important advantage of using a MOM is to loosen the coupling between applications. A loosely coupled architecture makes the comunication asynchronous, in fact the calls from either application have no bearing on
one another. Pointing the concept we can say this:
– there is no interdependence or timing requirements
– senders know nothing about receivers and receivers know nothing about senders.
– the MOM ActveMQ is delegated to guarantee message delivery
ActiveMQ provides a JMS compliant API for sending and receiving messages. The JMS specification defines many concepts and artifacts related to messaging such as the JMS-client paradigm. A JMS client is a pure Java application which uses the JMS API for interacting with the JMS provider (ActiveMQ). JMS defines two important objects:
– MessageProducer class for sending messages to a destination.
– MessageConsumer class for consuming messages from a destination.
Producer and Consumer can process messages in two main styles or domains as they’re referred in the JMS specification:
– point-to-point domain
– publish/subscribe domain
I will focus on a point-to-point domain context, you can find further information here and here.
The point-to-point (PTP) messaging domain uses destinations known as queues:
– a producer sent messages to the queue.
– each message received on the queue is delivered (synchronously or asynchronously) to a single consumer.
In this article I will show a PTP asynchronous messaging example using the failover protocol in a Master Slave ActiveMQ configuration (Shared File System). For this purpose I will configure the following components:
– Apache ActiveMQ installed into two separate machines in a Master/Slave configuration sharing a NFS file system
– a Producer class to send messages to queue
– a Consumer class for registering
– a Listener class to consume messages from queues in an asynchronous manner
Follows the high-level architecture.
We start with the configuration of the two Master/Slave machines. To do this we use the “shared storage” approach, where multiple ActiveMQ brokers can connect to the shared message store but only one broker will be active at a time. The shared storage is a shared NFS file system (an alternative is to using a shared database). The two amq01 and amq02 machines mount a shared NFS directory where is running the default KahaDB persistence layer. A important note: the shared file system requires the support to a distributed shared file lock, so I use the NFSv4 to prevent issues related to files corruption. Here the steps to configuration.
# NFSv4 Server $ yum install nfs-utils nfs-utils-lib $ mkdir /mnt/amq $ vi /etc/exports # ActiveMQ NFS directory /mnt/activemq *(sync,rw,fsid=0) $ /etc/init.d/nfs restart # NFSv4 Client (amq01, amq02) $ yum install nfs-utils $ /etc/init.d/rpcbind start $ nano /etc/fstab # Sharing through nfs nfs_host:/ /mnt/amq_data nfs4 soft,intr,rsize=8192,wsize=8192,nosuid 0 0 $ mount /mnt/amq_data $ df -h Filesystem Size Used Avail Use% Montato su /dev/mapper/vg_01-lv_root 5,5G 1,2G 4,1G 23% / tmpfs 1004M 0 1004M 0% /dev/shm /dev/sda1 485M 30M 430M 7% /boot nfs_host:/ 43G 14G 28G 34% /mnt/amq_data
Now we continue with the two ActiveMQ configurations. Open the file activemq.xml in the “conf” directory in the ActiveMQ installation path and set the NFS shared directory for data.
$ vi conf/activemq.xml
<broker xmlns="http://activemq.apache.org/schema/core" brokerName="amq02" dataDirectory="/mnt/amq_data"> <!-- ...blablablabla...--> <persistenceAdapter> <kahaDB directory="/mnt/amq_data/kahadb"/> </persistenceAdapter>
Complete these changes on both the amq01 and amq02 machines then start the amq01 message broker. We also start the second ActiveMQ, if no errors occurs we should see the message of waiting for the database to be unlocked on the amq02 output log.
Now the amq01 machine is the master entity and the amq02 is the slave. But If amq01 shuts down or fails, the amq02 slave will grabs the exclusive lock on the sahred file system and commences becoming the master entity. Now we move to the Producer and Consumer implementation.
I’m a Maven enthusiast so I create a simple java project using the “maven-archetype-quickstart” archetype. Both Producer and Consumer use a failover URL to create a connection. With the failover protocol, the ActiveMQ client will instantiate a periodic keepalive protocol, so that it can detect whether the broker is no longer reachable (connection or broker lost). When it detects that the broker is no longer available, it will randomly select another broker from the list provided at startup. The connection string sounds like this: “failover:(tcp://broker_host01:61616,tcp://broker_host02:61616)”
In the Producer class I use a “main” method to send messages to the ActiveMQ instance. The sendMessage method creates a randomly destination on coda1 and coda2 queues for the sent messages. Here an example.
private static String brokerURL = "failover:(tcp://amq01:61616,tcp://amq02:61616)?randomize=false"; private String jobs[] = new String[]{"coda1", "coda2"}; public static void main() throws JMSException { Producer producer = new Producer(); producer.sendMessage(); producer.close(); } public void sendMessage() throws JMSException { // Somthing here.. Destination destination = session.createQueue("ALF2LR." + job); Message message = session.createObjectMessage(); producer.send(destination, message); } }
Similary the Consumer class create a jms MessageConsumer and set a MessageListener on the queues to consume asynchronously the sent messages.
private static String brokerURL = "failover:(tcp://amq01:61616,tcp:amq02:61616)?randomize=false"; private String jobs[] = new String[]{"coda1", "coda2"}; public static void main() throws JMSException { Consumer consumer = new Consumer(); for (String job : consumer.jobs) { Destination destination = consumer.getSession().createQueue("ALF2LR." + job); MessageConsumer messageConsumer = consumer.getSession().createConsumer(destination); messageConsumer.setMessageListener(new Listener(job)); } //Something here... }
I also use a jms MessageListener implementation to print some message info to the standard output.
public class Listener implements MessageListener { private String job; public void onMessage(Message message) { try { System.out.println(job + " id:" + ((ObjectMessage)message).getObject()); } catch (Exception e) { e.printStackTrace(); } } }
After running the maven “clean install” goal the two jms clients Producer and Consumer can be started asynchronously. I’ve run 3 use case using two dedicated maven profiles to easily start Producer and Consumer asynchronously.
USE CASE 1
UC1.a – The Producer start
UC1.b – The Producer sends all messages to queues
UC1.c – The Consumer start
UC1.d –The Consumer immediately receives and consumes all messages on queues
USE CASE 2
UC2.a – The Consumer start
UC2.b – The Consumer is listening on the two queues and is waiting to receive messages
UC2.c -The Producer start
UC2.d -The Producer sends all messages to queues
UC2.e -The Consumer immediately receives and consumes all messages on queues
USE CASE 3
UC3.a -The Producer start creating a connection to the Master broker instance (AMQ01)
UC3.b -The Producer sends all messages to queues
UC3.c -The AMQ01 stops
UC3.d – In a interval of 10 seconds the Slave instance AMQ02 becames the new Master instance
UC3.e -The Consumer starts creating a connection to the new Master broker instance (AMQ02)
UC3.f – The Consumer immediately receives and consumes all messages on queues
Well, the failover protocol works!!!