I needed Mule to determine the outbound endpoint based on the content of properties in my custom XML message. Although this might look as common functionality I couldn’t find a complete example for this on the net. This one was close, but didn’t match completely with my situation:
I receive an XML message that is conform my own custom format. Based on the content of several fields in this XML message I determine what the next endpoint must be to process the message. Then my message is forwarded to that endpoint.
The ‘mule-config’ for this looks like:
<?xml version="1.0" encoding="UTF-8"?> <mule xmlns="http://www.mulesource.org/schema/mule/core/2.2" xmlns:jms="http://www.mulesource.org/schema/mule/jms/2.2" xmlns:spring="http://www.springframework.org/schema/beans" xmlns:stdio="http://www.mulesource.org/schema/mule/stdio/2.2" xmlns:test="http://www.mulesource.org/schema/mule/test/2.2" xmlns:vm="http://www.mulesource.org/schema/mule/vm/2.2" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation=" http://www.mulesource.org/schema/mule/core/2.2 http://www.mulesource.org/schema/mule/core/2.2/mule.xsd http://www.mulesource.org/schema/mule/test/2.2 http://www.mulesource.org/schema/mule/test/2.2/mule-test.xsd http://www.mulesource.org/schema/mule/jms/2.2 http://www.mulesource.org/schema/mule/jms/2.2/mule-jms.xsd http://www.mulesource.org/schema/mule/stdio/2.2 http://www.mulesource.org/schema/mule/stdio/2.2/mule-stdio.xsd http://www.mulesource.org/schema/mule/vm/2.2 http://www.mulesource.org/schema/mule/vm/2.2/mule-vm.xsd http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.5.xsd"> <spring:bean id="routingComponent" class="net.pascalalma.components.RoutingComponent" /> <custom-transformer name="JaxbXmlToCdmObject" class="net.pascalalma.transformers.XmlToObjectJaxbTransformer"> <spring:property name="jaxbContextPackage" value="net.pascalalma.message.v01_0"/> </custom-transformer> <custom-transformer name="JaxbObjectToCdmXml" class="net.pascalalma.transformers.ObjectToXmlJaxbTransformer"> <spring:property name="jaxbContextPackage" value="net.pascalalma.message.v01_0"/> </custom-transformer> <jms:activemq-connector name="JMSConnector" specification="1.1" dynamicNotification="true" brokerURL="failover:(vm://localhost?broker.persistent=true&broker.useJmx=false)" acknowledgementMode="AUTO_ACKNOWLEDGE" maxRedelivery="3" disableTemporaryReplyToDestinations="true" persistentDelivery="true" username="user" password="password" /> <vm:endpoint name="test_vm" path="test-path" synchronous="true" /> <jms:endpoint name="test_queue" queue="test-queue" connector-ref="JMSConnector" synchronous="false" /> <model name="mule-event-processing" inherit="false"> <service name="start-event-processor"> <inbound> <vm:inbound-endpoint address="vm://event-processor" synchronous="false" transformer-refs="JaxbXmlToCdmObject" /> </inbound> <component> <spring-object bean="routingComponent" /> </component> <outbound> <custom-outbound-router class="net.pascalalma.router.DynamicRouter" transformer-refs="JaxbObjectToCdmXml"> <spring:property name="evaluator" value="custom" /> <spring:property name="customEvaluator" value="cdm-msg-property" /> <spring:property name="expression" value="ADDRESS_LIST" /> </custom-outbound-router> </outbound> </service> <service name="TEST"> <inbound> <vm:inbound-endpoint ref="test_vm" /> </inbound> <echo-component/> <outbound> <pass-through-router> <jms:outbound-endpoint ref="test_queue" /> </pass-through-router> </outbound> </service> </model> </mule>
This is what happens with this configuration: the message is received on the inbound endpoint of the ‘start-event-processor’ service. The endpoint forwards the mesage to the component ‘RoutingComponent’. In this component the next inbound endpoint is determined and added as a property to the XML message. The code for the component looks like:
package net.pascalalma.components; import java.util.ArrayList; import java.util.List; import net.pascalalma.message.v01_0.Message; import net.pascalalma.message.v01_0.PropertySetType; import net.pascalalma.message.v01_0.PropertyType; import org.apache.log4j.Logger; import org.mule.api.transformer.TransformerException; /** * Routes a Message to the correct Business Process by using message metadata * * @author pascal */ public class RoutingComponent { protected static Logger logger = Logger.getLogger(RoutingComponent.class); public Message routeMessage(Message cdmMessage) throws TransformerException { logger.debug("routeMessage(Message cdmMessage)"); // Determin next inbound endpoint String nextEndpoint = getBusinessProcess(cdmMessage.getMessageHeader(). getPropertySet()); // Set next endpoint setAddressList(nextEndpoint, cdmMessage); // Return message return cdmMessage; } private void setAddressList(String nextEndpoint, Message msg) { List<string> addressList = new ArrayList<string>(); addressList.add(nextEndpoint); PropertyType prop = new PropertyType(); prop.setKey("ADDRESS_LIST"); prop.setValue(nextEndpoint); msg.getMessageHeader().getPropertySet().getProperties().add(prop); } private String getBusinessProcess(PropertySetType propertySet) { // Access some logic to determine next matching BusinessProcess return "test_vm"; } }
In the component the method ‘routeMessage’ is called. In this method I determine what the next step is that has to be performed (in this example “test_vm” is hardcoded as next endpoint but this can easily be made more dynamically). Then the name of that endpoint is added as a property to the message header.
Then the message is passed to my custom-outbound-router. It is a subclass of the ‘expression-recipient-list-router’ and I made the method ‘isMatch’ always returning true so no filter has to be used. The code for the router is very simple:
package net.pascalalma.router; import org.mule.api.MuleMessage; import org.mule.api.routing.RoutingException; import org.mule.routing.outbound.ExpressionRecipientList; /** * * @author pascal */ public class DynamicRouter extends ExpressionRecipientList { @Override public boolean isMatch(MuleMessage message) throws RoutingException { return true; } }
The actual inbound endpoint to which the message is transfered next is determined by making use of my custom evaluator (The creation and use of the custom evaluator is described here. It also explains more about the used xml schema for my message). With this configuration in place it will use the property ‘ADDRESS_LIST’ in the MessageHeader to obtain the endpoint. Since the dynamic-router in this example always return ‘test_vm’ the message will be forwarded to that endpoint and be picked up by the ‘TEST’ service. Here is a test class to test this example:
package net.pascalalma.components; import net.pascalalma.utils.FileUtils; import org.mule.api.MuleMessage; import org.mule.module.client.MuleClient; import org.mule.tck.FunctionalTestCase; /** * Checks the dynamic routing * * @author pascal */ public class RoutingComponentTest extends FunctionalTestCase { protected final String getConfigResources() { return "config/test-dynamic-router-config.xml"; } /** * Tests the correct handling of an exception. * * @throws Exception */ public void testReceivingEvent() throws Exception { MuleMessage msg = null; MuleClient client = new MuleClient(); client.dispatch("vm://event-processor", FileUtils.getFileAsString("xml/example-msg.xml"), null); Thread.sleep(1500); msg = client.request("test_queue", 500); assertNotNull(msg); } }
This class sends an message to the event-processor. The message will be enriched with a property ‘ADDRESS_LIST’ in the header that has the value ‘vm_test’. So I expect a message on the queue ‘test_queue’. If it is not there the test fails, otherwise the router is working correctly.