Aggregating messages from different sources with Mule (part I)

While implementing an integration platform with Mule at a client we run into a lot of different use cases. The use case in this example was the following:
Messages from 2 different source systems had to be combined and offered as one message to another application. The messages were generated shortly after each other with a regular scheme, like once a week.
Both messages were xml messages according to a certain schema and they had to be combined to one xml message. One of the XML message was supplied by JMS and had to be combined with the other message that was offered as a file. Two messages should be combined if they have the same code-period combination.

Here are two example files. Example 1:

<?xml version="1.0" encoding="UTF-8"?>
<root xmlns="http://www.pascalalma.net/schemas/myschema" version="1.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
    <header>
        <messageid>20102</messageid>
        <code>RS</code>
        <period>1010</period>
    </header>
    <body>
        <order>
            <country>AT</country>
            <customerID>345</customerID>
            <amount>150</amount>
        </order>
        <order>
            <country>BE</country>
            <customerID>123</customerID>
            <amount>1000</amount>
        </order>
        <order>
            <country>BE</country>
            <customerID>789</customerID>
            <amount>500</amount>
        </order>
    </body>
</root>

And here is another example:

<?xml version="1.0" encoding="UTF-8"?>
<root xmlns="http://www.pascalalma.net/schemas/myschema" version="1.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
    <header>
        <messageid>10101</messageid>
        <code>RS</code>
        <period>1010</period>
    </header>
    <body>
        <order>
            <country>AT</country>
            <customerID>345</customerID>
            <amount>200</amount>
        </order>
        <order>
            <country>BE</country>
            <customerID>123</customerID>
            <amount>100</amount>
        </order>
    </body>
</root>

The result of the aggregation should be:

<?xml version="1.0" encoding="UTF-8"?>
<root xmlns="http://www.pascalalma.net/schemas/myschema" version="1.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
	<header>
		<messageid>20102</messageid>
		<code>RS</code>
		<period>1010</period>
	</header>
	<body>
		<order>
			<country>AT</country>
			<customerID>345</customerID>
			<amount>350</amount>
		</order>
		<order>
			<country>BE</country>
			<customerID>123</customerID>
			<amount>1100</amount>
		</order>
		<order>
			<country>BE</country>
			<customerID>789</customerID>
			<amount>500</amount>
		</order>
	</body>
</root>

Although the business case is quite simple it took some effort to make this work in Mule CE 2.2.2-SNAPSHOT. This post will describe how we did it by showing the important parts of the code we created to make it work.

Lets start with the Mule config file. I will explain the specific parts in the rest of the blog.

<?xml version="1.0" encoding="UTF-8"?>
<mule xmlns="http://www.mulesource.org/schema/mule/core/2.2"
      xmlns:spring="http://www.springframework.org/schema/beans"
      xmlns:vm="http://www.mulesource.org/schema/mule/vm/2.2"
      xmlns:xm="http://www.mulesource.org/schema/mule/xml/2.2"
      xmlns:file="http://www.mulesource.org/schema/mule/file/2.2"
      xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
      xsi:schemaLocation="
      http://www.mulesource.org/schema/mule/core/2.2 http://www.mulesource.org/schema/mule/core/2.2/mule.xsd
      http://www.mulesource.org/schema/mule/vm/2.2 http://www.mulesource.org/schema/mule/vm/2.2/mule-vm.xsd
      http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.5.xsd
      http://www.mulesource.org/schema/mule/xml/2.2 http://www.mulesource.org/schema/mule/xml/2.2/mule-xml.xsd
      http://www.mulesource.org/schema/mule/file/2.2 http://www.mulesource.org/schema/mule/file/2.2/mule-file.xsd">

    <vm:connector name="vmConnector" queueEvents="true" dynamicNotification="true" />
    <file:connector name="inboundFileConnector" fileAge="500" autoDelete="true" pollingFrequency="1000"/>

    <!-- ENDPOINTS -->
    <vm:endpoint name="inTestQueue" path="in-test-queue" connector-ref="vmConnector" />
    <vm:endpoint name="outTestQueue" path="out-test-queue" connector-ref="vmConnector" />

    <xm:namespace-manager includeConfigNamespaces="true">
      <xm:namespace prefix="myschema" uri="http://www.pascalalma.net/schemas/myschema" />
    </xm:namespace-manager>

    <message-properties-transformer name="myCorrelationGroupSizeTransformer">
      <add-message-property key="MULE_CORRELATION_GROUP_SIZE" value="2" />
    </message-properties-transformer>

    <message-properties-transformer name="myJmsTransformer">
      <add-message-property key="RS_SOURCE" value="JMS" />
    </message-properties-transformer>
    <message-properties-transformer name="myFileTransformer">
      <add-message-property key="RS_SOURCE" value="FILE" />
    </message-properties-transformer>
    
    <custom-transformer name="myCorrelationIdTransformer" class="net.pascalalma.transformer.CorrelationTransformer"/>
   
    <model name="aggregationDemo">

        <service name="vmEventProducerService">
            <inbound>
                <vm:inbound-endpoint ref="inTestQueue"/>
            </inbound>

            <outbound>
                <pass-through-router>
                    <vm:outbound-endpoint path="myAggregationServiceQueue" transformer-refs="myCorrelationGroupSizeTransformer myCorrelationIdTransformer myJmsTransformer" />
                </pass-through-router>
            </outbound>
        </service>

        <service name="fileEventProducerService">
            <inbound>
                <file:inbound-endpoint connector-ref="inboundFileConnector" path="./data/in"  />
            </inbound>

            <outbound>
                <pass-through-router>
                    <vm:outbound-endpoint path="myAggregationServiceQueue" transformer-refs="myCorrelationGroupSizeTransformer myCorrelationIdTransformer myFileTransformer" />
                </pass-through-router>
            </outbound>
        </service>

        <service name="myAggregationService">
            <inbound>
                <vm:inbound-endpoint path="myAggregationServiceQueue" />
                <custom-correlation-aggregator-router class="net.pascalalma.routing.inbound.MyAggregator" />
            </inbound>

            <echo-component/>

            <outbound>
                <pass-through-router>
                    <vm:outbound-endpoint ref="outTestQueue" />
                </pass-through-router>
            </outbound>
        </service>

    </model>
</mule>

The Services
The Mule config that is used as example here consists of three services:

  1. vmEventProducerService
  2. This service receives one part of the XML message on a VM queue (actually, in production this will be a JMS queue). The message will be transformed by three transformersas preparation to get aggregated (the transformers will be described in the next paragraph).

  3. fileEventProducerService
  4. This service receives the other part of the XML message as a file on an incoming file endpoint. The message will be transformed by three transformers as preparation to get aggregated (the transformers will be described in the next paragraph).

  5. myAggregationService
  6. This is where the two parts are combined to one resulting XML file and the actual work is done.

The Transformers
The following transformers are used:

  1. myCorrelationIdTransformer
  2. To tell Mule it had to combine two messages which were received on different endpoints we made sure the two messages had the same ‘correlation-id’. To set the correlationID we make use of the following custom-made transformer. We used the content of the elements ‘code’ and ‘period’ in the message to create our own correlation-id:

    package net.pascalalma.transformer;
    
    import org.mule.module.xml.util.XMLUtils; 
    import org.dom4j.Document;
    import org.dom4j.Element;
    import org.dom4j.Node;
    import org.mule.api.MuleMessage;
    import org.mule.api.transformer.TransformerException;
    import org.mule.transformer.AbstractMessageAwareTransformer;
    
    /**
     * Transformer sets correlationID of MuleMessage based on content of payload
     *
     * @author Pascal Alma
     */
    public class CorrelationTransformer extends AbstractMessageAwareTransformer {
    
        @Override
        public Object transform(MuleMessage message, String outputEncoding) throws TransformerException {
            try {
                Document doc = XMLUtils.toDocument(message.getPayloadAsString());
    
                Element element = doc.getRootElement().element("header");
                Node codeNode = element.selectSingleNode("./*[local-name()='code']");
                Node periodNode = element.selectSingleNode("./*[local-name()='period']");
    
                if (codeNode != null &amp;&amp; periodNode != null) {
                    String correlationId = String.format("%s-%s", codeNode.getStringValue(), periodNode.getStringValue());
                    message.setCorrelationId(correlationId);
                }
            } catch (Exception ex) {
                throw new TransformerException(this, ex);
            }
            return message;
        }
    }
  3. myCorrelationGroupSizeTransformer
  4. This transformer tells our ‘aggregation-component’ which is described later that the groupsize will be ‘2’, so the ‘aggregation-component’ knows it has received all messages when it received a second massage with the same correlation-ID. Here is the definition of the transformer:

    <message-properties-transformer name="myCorrelationGroupSizeTransformer">
      <add-message-property key="MULE_CORRELATION_GROUP_SIZE" value="2" />
    </message-properties-transformer>

    As you can see we are using the default message-properties-transformer of mule to set the correlation-group-size property.

  5. myJmsTransformer and myFileTransformer
  6. The final transformers we used add a property to the MuleMessage so it is known in the aggregator if the message was received from the JMS/VM endpoint or the File endpoint. This distinction is necessary to make sure we do not combine two JMS messages (for instance two ‘example 1’ messages) but are really combining messages from different endpoints.
    Actually, as you will see in the implementation of the ‘aggregator component’, if we receive a second message from the same endpoint we will discard the first message and keep waiting for the message at the other endpoint.
    The transformer is quite simple:

    <message-properties-transformer name="myJmsTransformer">
      <add-message-property key="RS_SOURCE" value="JMS" />
    </message-properties-transformer>

    and

    <message-properties-transformer name="myFileTransformer">
      <add-message-property key="RS_SOURCE" value="FILE" />
    </message-properties-transformer>

With all the transformers explained there is only one item left to explain and that is the ‘custom-correlation-aggregator-router‘ class. Since there is a lot to show about this component I explain it in the next post.

About Pascal Alma

Pascal is a senior IT consultant and has been working in IT since 1997. He is monitoring the latest development in new technologies (Mobile, Cloud, Big Data) closely and particularly interested in Java open source tool stacks, cloud related technologies like AWS and mobile development like building iOS apps with Swift. Specialties: Java/JEE/Spring Amazon AWS API/REST Big Data Continuous Delivery Swift/iOS
This entry was posted in Mule2 and tagged . Bookmark the permalink.