Dynamic Routing in Mule

I needed Mule to determine the outbound endpoint based on the content of properties in my custom XML message. Although this might look as common functionality I couldn’t find a complete example for this on the net. This one was close, but didn’t match completely with my situation:
I receive an XML message that is conform my own custom format. Based on the content of several fields in this XML message I determine what the next endpoint must be to process the message. Then my message is forwarded to that endpoint.
The ‘mule-config’ for this looks like:

<?xml version="1.0" encoding="UTF-8"?>
<mule xmlns="http://www.mulesource.org/schema/mule/core/2.2"
      xmlns:jms="http://www.mulesource.org/schema/mule/jms/2.2"
      xmlns:spring="http://www.springframework.org/schema/beans"
      xmlns:stdio="http://www.mulesource.org/schema/mule/stdio/2.2"
      xmlns:test="http://www.mulesource.org/schema/mule/test/2.2"
      xmlns:vm="http://www.mulesource.org/schema/mule/vm/2.2"
      xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
      xsi:schemaLocation="
      http://www.mulesource.org/schema/mule/core/2.2 http://www.mulesource.org/schema/mule/core/2.2/mule.xsd
      http://www.mulesource.org/schema/mule/test/2.2 http://www.mulesource.org/schema/mule/test/2.2/mule-test.xsd
      http://www.mulesource.org/schema/mule/jms/2.2 http://www.mulesource.org/schema/mule/jms/2.2/mule-jms.xsd
      http://www.mulesource.org/schema/mule/stdio/2.2 http://www.mulesource.org/schema/mule/stdio/2.2/mule-stdio.xsd
      http://www.mulesource.org/schema/mule/vm/2.2 http://www.mulesource.org/schema/mule/vm/2.2/mule-vm.xsd
      http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.5.xsd">

    <spring:bean id="routingComponent" class="net.pascalalma.components.RoutingComponent" />

    <custom-transformer name="JaxbXmlToCdmObject" class="net.pascalalma.transformers.XmlToObjectJaxbTransformer">
        <spring:property name="jaxbContextPackage" value="net.pascalalma.message.v01_0"/>
    </custom-transformer>
    <custom-transformer name="JaxbObjectToCdmXml" class="net.pascalalma.transformers.ObjectToXmlJaxbTransformer">
        <spring:property name="jaxbContextPackage" value="net.pascalalma.message.v01_0"/>
    </custom-transformer>

    <jms:activemq-connector name="JMSConnector" specification="1.1" dynamicNotification="true"
                brokerURL="failover:(vm://localhost?broker.persistent=true&amp;broker.useJmx=false)"
                acknowledgementMode="AUTO_ACKNOWLEDGE"
                maxRedelivery="3"
                disableTemporaryReplyToDestinations="true"
                persistentDelivery="true"
                username="user"
                password="password" />

    <vm:endpoint name="test_vm" path="test-path" synchronous="true" />
    <jms:endpoint name="test_queue" queue="test-queue" connector-ref="JMSConnector" synchronous="false" />

    <model name="mule-event-processing" inherit="false">
        <service name="start-event-processor">
            <inbound>
                <vm:inbound-endpoint address="vm://event-processor" synchronous="false" transformer-refs="JaxbXmlToCdmObject" />
            </inbound>

            <component>
                <spring-object bean="routingComponent" />
            </component>

            <outbound>
                <custom-outbound-router class="net.pascalalma.router.DynamicRouter" transformer-refs="JaxbObjectToCdmXml">
                    <spring:property name="evaluator" value="custom" />
                    <spring:property name="customEvaluator" value="cdm-msg-property" />
                    <spring:property name="expression" value="ADDRESS_LIST" />
                </custom-outbound-router>
            </outbound>
        </service>

        <service name="TEST">
            <inbound>
                <vm:inbound-endpoint ref="test_vm" />
            </inbound>
            <echo-component/>
            <outbound>
                <pass-through-router>
                    <jms:outbound-endpoint ref="test_queue" />
                </pass-through-router>
            </outbound>
        </service>
    </model>
</mule>

This is what happens with this configuration: the message is received on the inbound endpoint of the ‘start-event-processor’ service. The endpoint forwards the mesage to the component ‘RoutingComponent’. In this component the next inbound endpoint is determined and added as a property to the XML message. The code for the component looks like:

package net.pascalalma.components;

import java.util.ArrayList;
import java.util.List;
import net.pascalalma.message.v01_0.Message;
import net.pascalalma.message.v01_0.PropertySetType;
import net.pascalalma.message.v01_0.PropertyType;
import org.apache.log4j.Logger;
import org.mule.api.transformer.TransformerException;

/**
 * Routes a Message to the correct Business Process by using message metadata
 *
 * @author pascal
 */
public class RoutingComponent
{
    protected static Logger logger = Logger.getLogger(RoutingComponent.class);

    public Message routeMessage(Message cdmMessage) throws TransformerException {
        logger.debug("routeMessage(Message cdmMessage)");

        // Determin next inbound endpoint
        String nextEndpoint = getBusinessProcess(cdmMessage.getMessageHeader().
                getPropertySet());

        // Set next endpoint
        setAddressList(nextEndpoint, cdmMessage);

        // Return message
        return cdmMessage;
    }

    private void setAddressList(String nextEndpoint, Message msg) {
        List<string> addressList = new ArrayList<string>();
        addressList.add(nextEndpoint);
        PropertyType prop = new PropertyType();
        prop.setKey("ADDRESS_LIST");
        prop.setValue(nextEndpoint);
        msg.getMessageHeader().getPropertySet().getProperties().add(prop);
    }

    private String getBusinessProcess(PropertySetType propertySet) {
        // Access some logic to determine next matching BusinessProcess
        return "test_vm";
    }
}

In the component the method ‘routeMessage’ is called. In this method I determine what the next step is that has to be performed (in this example “test_vm” is hardcoded as next endpoint but this can easily be made more dynamically). Then the name of that endpoint is added as a property to the message header.
Then the message is passed to my custom-outbound-router. It is a subclass of the ‘expression-recipient-list-router’ and I made the method ‘isMatch’ always returning true so no filter has to be used. The code for the router is very simple:

package net.pascalalma.router;

import org.mule.api.MuleMessage;
import org.mule.api.routing.RoutingException;
import org.mule.routing.outbound.ExpressionRecipientList;

/**
 *
 * @author pascal
 */
public class DynamicRouter extends ExpressionRecipientList {

    @Override
    public boolean isMatch(MuleMessage message) throws RoutingException {
        return true;
    }
}

The actual inbound endpoint to which the message is transfered next is determined by making use of my custom evaluator (The creation and use of the custom evaluator is described here. It also explains more about the used xml schema for my message). With this configuration in place it will use the property ‘ADDRESS_LIST’ in the MessageHeader to obtain the endpoint. Since the dynamic-router in this example always return ‘test_vm’ the message will be forwarded to that endpoint and be picked up by the ‘TEST’ service. Here is a test class to test this example:

package net.pascalalma.components;

import net.pascalalma.utils.FileUtils;
import org.mule.api.MuleMessage;
import org.mule.module.client.MuleClient;
import org.mule.tck.FunctionalTestCase;

/**
 * Checks the dynamic routing
 *
 * @author pascal
 */
public class RoutingComponentTest extends FunctionalTestCase {

    protected final String getConfigResources() {
        return "config/test-dynamic-router-config.xml";
    }

    /**
     * Tests the correct handling of an exception.
     *
     * @throws Exception
     */
    public void testReceivingEvent() throws Exception {
        MuleMessage msg = null;
        MuleClient client = new MuleClient();

        client.dispatch("vm://event-processor",
                FileUtils.getFileAsString("xml/example-msg.xml"), null);

        Thread.sleep(1500);

        msg = client.request("test_queue", 500);

        assertNotNull(msg);
    }
}

This class sends an message to the event-processor. The message will be enriched with a property ‘ADDRESS_LIST’ in the header that has the value ‘vm_test’. So I expect a message on the queue ‘test_queue’. If it is not there the test fails, otherwise the router is working correctly.

Advertisement

About Pascal Alma

Pascal is a senior IT consultant and has been working in IT since 1997. He is monitoring the latest development in new technologies (Mobile, Cloud, Big Data) closely and particularly interested in Java open source tool stacks, cloud related technologies like AWS and mobile development like building iOS apps with Swift. Specialties: Java/JEE/Spring Amazon AWS API/REST Big Data Continuous Delivery Swift/iOS
This entry was posted in Maven, Mule and tagged , . Bookmark the permalink.