Apache Camel è un framework scritto in Java per l’integrazione sincrona e asincrona tra sistemi eterogenei. E’ basato sul libro Enterprise Integration Patterns di Gregor Hohpe e Bobby Woolf e implementa molti dei pattern di integrazione definiti nel libro.
In questo articolo utilizzo alcuni oggetti fondamentali di Camel come per esempio Route Builder, Processor e File Component per implementare un Consumer di file su una directory. Lo scenario tipico di utilizzo è quello in cui da un lato vi è un Sender che carica uno o più file all’interno di una cartella condivisa e dall’altro vi è un Receiver che effettua il polling della directory per verificare la presenza di file e fare delle operazioni di lettura e scrittura su di essi.
Camel mette a disposizione l’interfaccia PollingConsumer per istanziare oggetti di tipo Consumer direttamente sull’endpoint URI e per impostare opzioni e modalità di ricezione messaggi direttamente sull’istanza:
myCamelContext = new DefaultCamelContext(); myCamelContext.getEndpoint("activemq:my.queue").createPollingConsumer().receive();
Tuttavia in questo esempio non faccio uso dell’interfaccia. Le opzioni di ricezione messaggi vengono definite come parametri direttamente sull’endpoint URI. Ulteriori approfondimenti si possono trovare qui:
Stack
Apache Camel 2.17.3
JDK 1.8
Maven 3.2
Il codice sorgente utilizzato in questo articolo si trova nella directory camel-polling-directory.
ConsumerRouteBuilder.java
Definisce un endpoint di tipo File e fornisce al Consumer l’accesso su file system.
import org.apache.camel.builder.RouteBuilder; import org.apache.camel.model.RouteDefinition; public class ConsumerRouteBuilder extends RouteBuilder { // A number of final static attributes here ... public void configure() throws Exception { System.out.println("Configuring Camel Endpoint with source dir: " + SOURCE_FOLDER); RouteDefinition def = from("file://" + SOURCE_FOLDER + "?initialDelay=" + CONSUMER_INITIAL_DELAY + "&delay=" + CONSUMER_DELAY + "&readLock="+ CONSUMER_LOCK + "&readLockCheckInterval=" + CONSUMER_LOCK_INTERVAL); def.process(new ConsumerProcessor()); } // Some other useful methods here ... }
CamelExecutor.java
Inizializza CamelContext e lancia un Thread.sleep per simulare un runtime di 30 secondi. In questo intervallo di tempo, il Consumer processerà i file dentro la directory in modalità polling.
import org.apache.camel.CamelContext; import org.apache.camel.impl.DefaultCamelContext; public class CamelExecutor { private int waitForFile = 30000; private CamelContext camelCtx; public void init() throws Exception { camelCtx = new DefaultCamelContext(); try { camelCtx.addRoutes(new ConsumerRouteBuilder()); camelCtx.start(); System.out.println(">>>> PLEASE TRY to put a file into the source dir within "+(waitForFile/1000)+" seconds and wait processing... "); Thread.sleep(waitForFile); } catch (Exception e) { System.out.println("Unable to initialize CamelContext: " + e); return; } } }
ConsumerProcessor.java
Processa i file presenti nella directory di origine.
import org.apache.camel.Exchange; import org.apache.camel.Message; import org.apache.camel.Processor; public class ConsumerProcessor implements Processor { @Override public void process(Exchange exchange) throws Exception { try { Message message = exchange.getIn(); File fileToProcess = message.getBody(File.class); System.out.println("Detected file name '" + fileToProcess.getAbsolutePath()); // Make something with your file here ... } catch (Exception e) { System.err.println("Exception while trying to import invoce: " + e); } } }