In questo articolo riporto un’esempio di come utilizzare Quartz Scheduler sull’oggetto DefaultMessageListenerContainer (DMLC) per controllare lo start/stop di un MessageListener JMS. Per il delivery pianificato direttamente su ActiveMQ invece rimando alla documentazione ufficiale.
Stack
ActiveMQ 5.11.1
JDK 1.8
Maven 3.2
Spring 3.0.5.RELEASE
Quartz Scheduler 1.5.2
Consumer
Per ricevere i messaggi in maniera asincrona creo un Consumer che implementa MessageListener. Il metodo onMessage() si farà carico dello scodamento dei messaggi.
I metodi startConsumption() e stopConsumption() effettueranno invece lo start e stop del DMLC istanziato via Spring.
public class Consumer implements MessageListener { private Object sampleListener; // setter injection via Spring... public void onMessage(Message message) { _log.info("Consuming message..."); //... } public void startConsumption(){ _log.info("Starting the Consumer..."); ((DefaultMessageListenerContainer) getSampleListener()).start(); _log.info("JOB STARTED!"); } public void stopConsumption(){ _log.debug("Stopping the Consumer... "); ((DefaultMessageListenerContainer) getSampleListener()).stop(); _log.info("JOB FINISHED!"); } }
Producer
Per l’invio dei messaggi, utilizzo Spring JmsTemplate nella classe Producer. La proprietà Destinations[ ] potrà tornare utile nel caso di invio su più code.
public class Producer { private static JmsTemplate template; private Destination[] destinations; public void sendMessage(Message message) { try { Destination destination = destinations[0]; template.convertAndSend(destination, message); _log.debug("Published message with JMS Correlation ID: "+message.getJMSCorrelationID()+" on queue: "+destination.toString()); } catch (JMSException e) { _log.error(e); } } }
Spring XML configuration
La gestione della dependecy-injection per le classi Consumer e Producer è delegata interamente a Spring. Fare attenzione alla proprietà autoStartup impostata a false nel bean sampleDMLC. L’avvio del listener container infatti è gestito da Quartz. Per realizzare ed eseguire i job sui metodi startConsumption() e stopConsumption() del mio Consumer, utilizzo gli oggetti MethodInvokingJobDetailFactoryBean e SchedulerFactoryBean di Quartz. In questo esempio, ho definito due cron expression rispettivamente per l’avvio e l’arresto del Consumer.
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd"> <bean id="jmsConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"> <property name="brokerURL" value="tcp://localhost:61616" /> <property name="userName" value="amqsystem"/> <property name="password" value="12345"/> <!-- If you're using the embedded Broker uncomment this --> <!-- <property name="brokerURL" value="vm://localhost?broker.persistent=false" />--> </bean> <bean id="sampleDest" class="org.apache.activemq.command.ActiveMQQueue"> <constructor-arg value="QUEUE.SAMPLE" /> </bean> <!-- PRODUCER --> <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate"> <property name="connectionFactory" ref="jmsConnectionFactory" /> </bean> <bean id="producer" class="eu.giuseppeurso.activemq.scheduledconsumer.Producer"> <property name="template" ref="jmsTemplate" /> <property name="destinations"> <list> <ref local="sampleDest" /> </list> </property> </bean> <!-- CONSUMER --> <bean id="consumer" class="eu.giuseppeurso.activemq.scheduledconsumer.Consumer"> <property name="sampleListener" ref="sampleDMLC" /> </bean> <!-- Spring DMLC --> <bean id="sampleDMLC" class="org.springframework.jms.listener.DefaultMessageListenerContainer"> <property name="connectionFactory" ref="jmsConnectionFactory" /> <property name="destination" ref="sampleDest" /> <property name="messageListener" ref="consumer" /> <property name="autoStartup" value="false"/> </bean> <!-- QUARTZ SCHEDULER (Start Job) --> <bean id="jobConsumerStart" class="org.springframework.scheduling.quartz.MethodInvokingJobDetailFactoryBean"> <property name="targetObject" ref="consumer" /> <property name="targetMethod" value="startConsumption" /> </bean> <bean id="cronConsumerStart" class="org.springframework.scheduling.quartz.CronTriggerBean"> <property name="jobDetail" ref="jobConsumerStart" /> <property name="cronExpression" value="10 49 19 * * ?" /> </bean> <!-- QUARTZ SCHEDULER (Stop Job) --> <bean id="jobConsumerStop" class="org.springframework.scheduling.quartz.MethodInvokingJobDetailFactoryBean"> <property name="targetObject" ref="consumer" /> <property name="targetMethod" value="stopConsumption" /> </bean> <bean id="cronConsumerStop" class="org.springframework.scheduling.quartz.CronTriggerBean"> <property name="jobDetail" ref="jobConsumerStop" /> <property name="cronExpression" value="15 49 19 * * ?" /> </bean> <!-- TRIGGERs --> <bean class="org.springframework.scheduling.quartz.SchedulerFactoryBean"> <property name="triggers"> <list> <ref bean="cronConsumerStart" /> <ref bean="cronConsumerStop" /> </list> </property> </bean> </beans>
Demo
Ho creato una classe ScheduledConsumerTest, che simula un caso di test con JUNIT. La classe crea il context definition di Spring grazie a FileSystemXmlApplicationContext e invia subito un messaggio su una coda. Per simulare un’attesa prima dell’esecuzione dei job, mando in sleep il Thread per 20 secondi subito dopo l’invio di un messaggio. Com Maven compilo ed eseguo il test.
$ cd activemq-samples/spring-scheduled-consumer $ mvn clean install