ActiveMQ è un Message Oriented Middleware (MOM) open source messo a disposizione dalla comunità Apache Software Foundation. L’applicativo è scritto interamente in Java e sfrutta le specifiche JMS per garantire la l’interscambio di messaggi tra sistemi eterogenei. Il principale vantaggio nell’utilizzare un broker di messaggi sta soprattutto nel disaccoppiare e modularizzare architetture di sistemi complesse e fortemente integrate tra loro. Strati applicativi disaccopiati rendono la comuicazione asincrona e non vincolano lo stato tra i vari elementi che compongono il sistema.
Nel caso in cui ActiveMQ è frapposto tra due componenti applicative, possiamo riassumere quanto segue:
– non ci sono interdipendenze o vincoli temporali fra le applicazioni;
– il mittente non conosce nulla sul destinatario, il destinatario non conosce nulla del mittente;
– il MOM ActiveMQ è l’unico delegato a garantire la consegna dei messaggi.
Per gestire l’invio e la ricezione dei messaggi, ActiveMQ mette a disposizione delle API che rispondono alle specifiche di messaggistica JMS. Tali specifiche definiscono molti concetti e artefatti legati alla messaggistica, ma non saranno trattati in questo articolo. Qui focalizzerò l’attenzione sul paradigma JMS-Client ovvero su una pura applicazione java che usa le API JMS per interagire col MOM.
Fondamentalmente JMS definisce due entità:
– MessageProducer la componente delegata all’invio dei messaggi
– MessageConsumer la componente delegata a ricevere i messaggi
Producer e Consumer possono gestire messaggi secondo 2 modalità (o domini come vengono definiti dalle specifiche JMS)
– point-to-point domain
– publish/subscribe domain
L’esempio che tratto in questo articolo riguarda un contesto di dominio point-to-point (PTP) asincrono.
In generale, in un dominio PTP le destinazioni dei client JMS (Producer e Consumer) sono delle code di messaggi:
– un Producer manda un messaggio su una coda
– ogni messaggio ricevuto sulla coda viene consegnato (in maniera sincrona o asincrona) a un singolo Consumer
Nell’esempio, ActiveMQ è configurato in modalità Master/Slave (Shared File System). In questo modo è possibile sfruttare il protocollo failover per accedere al broker in alta affidabilità. Vediamo nel dettaglio le componenti che caratterizzano il sistema:
– Apache ActiveMQ installato su due macchine dedicate in modalità Master/Slave (condivisione dati su file system via NFS)
– Producer, classe java che invia messaggi su due code
– Consumer, classe java che registra i messaggi sulle code
– Listener, classe java che resta in ascolto sulle code e consuma i messaggi in modalità asincrona
Ecco una visione ad alto livello dell’architettura
Cominciamo con la configurazione delle due macchine Master/Slave. Sul sito ufficiale sono riportate tre modalità di configurazione. Quella che ho adottato io è di tipo “Shared File System Master Slave“, in cui un solo broker alla volta può accedere ai dati depositati sulla directory condivisa. Le due macchine amq01 e amq02 montano una directory NFS dove è in esecuzione KahaDB installato di default con ActiveMQ. E’ importante puntualizzare il fatto che ho utilizzato la versione NFSv4 per la condivisione di rete poichè include nativamente il supporto al timeout sul lock dei file. Questo evita problematiche legate all’accesso concorrente e alla corruzione di file. Di seguito i passi per la configurazione del server NFS e delle due macchine amq01 e amq02.
# NFSv4 Server $ yum install nfs-utils nfs-utils-lib $ mkdir /mnt/amq $ vi /etc/exports # ActiveMQ NFS directory /mnt/activemq *(sync,rw,fsid=0) $ /etc/init.d/nfs restart # NFSv4 Client (amq01, amq02) $ yum install nfs-utils $ /etc/init.d/rpcbind start $ nano /etc/fstab # Sharing through nfs nfs_host:/ /mnt/amq_data nfs4 soft,intr,rsize=8192,wsize=8192,nosuid 0 0 $ mount /mnt/amq_data $ df -h Filesystem Size Used Avail Use% Montato su /dev/mapper/vg_01-lv_root 5,5G 1,2G 4,1G 23% / tmpfs 1004M 0 1004M 0% /dev/shm /dev/sda1 485M 30M 430M 7% /boot nfs_host:/ 43G 14G 28G 34% /mnt/amq_data
Per la configurazione dei due server ActiveMQ è sufficiente modificare il puntamento della directory dove risiede KahaDB. Aprire e modificare il file conf/activemq.xml che si trova nella directory di installazione di ActiveMQ. Questa modifica va fatta sia sulla macchina amq01 sia su amq02.
$ vi conf/activemq.xml
<broker xmlns="http://activemq.apache.org/schema/core" brokerName="amq02" dataDirectory="/mnt/amq_data"> <!-- ...blablablabla...--> <persistenceAdapter> <kahaDB directory="/mnt/amq_data/kahadb"/> </persistenceAdapter>
Avviare il message broker su amq01. A questo punto amq01 è l’entità Master di ActiveMQ. Procediamo con l’avvio anche di amq02. Se non ci sono errori di configurazione, il log dovrebbe stampare il messaggio di attesa-sblocco sul file lock di KahaDB. Di default,activeMQ effettua il check sul lock-file ogni 10 secondi. Ecco come appare l’output in cosole.
In questo momento amq01 è Master mentre amq02 è Slave. Ma se proviamo ad arrestare amq01, sarà la macchina amq02 ad agganciare il lock esclusivo di KahaDB sula directory NFS condivisa, diventando a runtime la nuova entità Master.
Procediamo ora con l’implementazione di Producer e Consumer. A questo scopo è possibile utilizzare l’archetype “maven-archetype-quickstart” di Maven per creare uno skeleton di progetto java su cui implementare i due client JMS. Entrambi utilizzeranno una stringa di connessione in failover per accedere all’istanza Master/Slave di ActiveMQ. In questo modo Producer e Consumer creeranno un keepalive TCP periodico che rileva a runtime quando un broker non è più raggiungibile (connessione persa o situazione di crash del servizio). In modalità random o sequenziale la nuova connessione sarà ripristinata sul nuovo broker specificato sulla stringa di connessione che è diventato la nuova entità Master.
Il Producer utilizza il metodo sendMessage per creare messaggi casuali e indirizzarli sulle due code dell’istanza ActiveMQ. Ecco un’estratto di codice.
private static String brokerURL = "failover:(tcp://amq01:61616,tcp://amq02:61616)?randomize=false"; private String jobs[] = new String[]{"coda1", "coda2"}; public static void main() throws JMSException { Producer producer = new Producer(); producer.sendMessage(); producer.close(); } public void sendMessage() throws JMSException { // Somthing here.. Destination destination = session.createQueue("ALF2LR." + job); Message message = session.createObjectMessage(); producer.send(destination, message); } }
In maniera analoga il Consumer crea un oggetto JMS MessageConsumer and imposta un MessageListener sulle code per consumare in maniera asincrona i messaggi ricevuti.
private static String brokerURL = "failover:(tcp://amq01:61616,tcp:amq02:61616)?randomize=false"; private String jobs[] = new String[]{"coda1", "coda2"}; public static void main() throws JMSException { Consumer consumer = new Consumer(); for (String job : consumer.jobs) { Destination destination = consumer.getSession().createQueue("ALF2LR." + job); MessageConsumer messageConsumer = consumer.getSession().createConsumer(destination); messageConsumer.setMessageListener(new Listener(job)); } //Something here... }
Nell’ implementazione di MessageListener si può stampare sullo standard output il numero di messaggi processati.
public class Listener implements MessageListener { private String job; public void onMessage(Message message) { try { System.out.println(job + " id:" + ((ObjectMessage)message).getObject()); } catch (Exception e) { e.printStackTrace(); } } }
Dopo aver lanciato “mvn clean install” sul progetto, è possibile avviare in maniera asincrona i due client JMS Producer e Consumer. Per facilitare il comando di esecuzione, ho creato due profili ad-hoc utilizzando il plugin exec-maven-plugin di Maven. Di seguito alcuni casi d’uso.
USE CASE 1
– UC1.a – Producer avviato
– UC1.b – Producer manda messaggi sulle code in maniera random
– UC1.c – Consumer avviato
– UC1.d – Consumer riceve e processa immediatamente i messaggi sulle code
USE CASE 2
– UC2.a – Consumer avviato
– UC2.b – Consumer in ascolto sulle code e pronto a processare i messaggi.
– UC2.c -Producer avviato
– UC2.d -Producer comincia ad inviare i messaggi alle code.
– UC2.e -Consumer riceve e consuma i messaggi
USE CASE 3
– UC3.a -Producer avviato su entità Master di ActiveMQ(AMQ01)
– UC3.b -Producer manda i messaggi sulle code
– UC3.c – Master AMQ01 arrestato
– UC3.d – In un intervallo di 10 secondi l’entità Slave AMQ02 diventa il nuovo Master
– UC3.e – Consumer avviato su nuova entità master Master (AMQ02)
– UC3.f – Consumer riceve e processa immediatamente i messaggi mandati sulle due code