While implementing an integration platform with Mule at a client we run into a lot of different use cases. The use case in this example was the following:
Messages from 2 different source systems had to be combined and offered as one message to another application. The messages were generated shortly after each other with a regular scheme, like once a week.
Both messages were xml messages according to a certain schema and they had to be combined to one xml message. One of the XML message was supplied by JMS and had to be combined with the other message that was offered as a file. Two messages should be combined if they have the same code-period combination.
Here are two example files. Example 1:
<?xml version="1.0" encoding="UTF-8"?> <root xmlns="http://www.pascalalma.net/schemas/myschema" version="1.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"> <header> <messageid>20102</messageid> <code>RS</code> <period>1010</period> </header> <body> <order> <country>AT</country> <customerID>345</customerID> <amount>150</amount> </order> <order> <country>BE</country> <customerID>123</customerID> <amount>1000</amount> </order> <order> <country>BE</country> <customerID>789</customerID> <amount>500</amount> </order> </body> </root>
And here is another example:
<?xml version="1.0" encoding="UTF-8"?> <root xmlns="http://www.pascalalma.net/schemas/myschema" version="1.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"> <header> <messageid>10101</messageid> <code>RS</code> <period>1010</period> </header> <body> <order> <country>AT</country> <customerID>345</customerID> <amount>200</amount> </order> <order> <country>BE</country> <customerID>123</customerID> <amount>100</amount> </order> </body> </root>
The result of the aggregation should be:
<?xml version="1.0" encoding="UTF-8"?> <root xmlns="http://www.pascalalma.net/schemas/myschema" version="1.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"> <header> <messageid>20102</messageid> <code>RS</code> <period>1010</period> </header> <body> <order> <country>AT</country> <customerID>345</customerID> <amount>350</amount> </order> <order> <country>BE</country> <customerID>123</customerID> <amount>1100</amount> </order> <order> <country>BE</country> <customerID>789</customerID> <amount>500</amount> </order> </body> </root>
Although the business case is quite simple it took some effort to make this work in Mule CE 2.2.2-SNAPSHOT. This post will describe how we did it by showing the important parts of the code we created to make it work.
Lets start with the Mule config file. I will explain the specific parts in the rest of the blog.
<?xml version="1.0" encoding="UTF-8"?> <mule xmlns="http://www.mulesource.org/schema/mule/core/2.2" xmlns:spring="http://www.springframework.org/schema/beans" xmlns:vm="http://www.mulesource.org/schema/mule/vm/2.2" xmlns:xm="http://www.mulesource.org/schema/mule/xml/2.2" xmlns:file="http://www.mulesource.org/schema/mule/file/2.2" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation=" http://www.mulesource.org/schema/mule/core/2.2 http://www.mulesource.org/schema/mule/core/2.2/mule.xsd http://www.mulesource.org/schema/mule/vm/2.2 http://www.mulesource.org/schema/mule/vm/2.2/mule-vm.xsd http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.5.xsd http://www.mulesource.org/schema/mule/xml/2.2 http://www.mulesource.org/schema/mule/xml/2.2/mule-xml.xsd http://www.mulesource.org/schema/mule/file/2.2 http://www.mulesource.org/schema/mule/file/2.2/mule-file.xsd"> <vm:connector name="vmConnector" queueEvents="true" dynamicNotification="true" /> <file:connector name="inboundFileConnector" fileAge="500" autoDelete="true" pollingFrequency="1000"/> <!-- ENDPOINTS --> <vm:endpoint name="inTestQueue" path="in-test-queue" connector-ref="vmConnector" /> <vm:endpoint name="outTestQueue" path="out-test-queue" connector-ref="vmConnector" /> <xm:namespace-manager includeConfigNamespaces="true"> <xm:namespace prefix="myschema" uri="http://www.pascalalma.net/schemas/myschema" /> </xm:namespace-manager> <message-properties-transformer name="myCorrelationGroupSizeTransformer"> <add-message-property key="MULE_CORRELATION_GROUP_SIZE" value="2" /> </message-properties-transformer> <message-properties-transformer name="myJmsTransformer"> <add-message-property key="RS_SOURCE" value="JMS" /> </message-properties-transformer> <message-properties-transformer name="myFileTransformer"> <add-message-property key="RS_SOURCE" value="FILE" /> </message-properties-transformer> <custom-transformer name="myCorrelationIdTransformer" class="net.pascalalma.transformer.CorrelationTransformer"/> <model name="aggregationDemo"> <service name="vmEventProducerService"> <inbound> <vm:inbound-endpoint ref="inTestQueue"/> </inbound> <outbound> <pass-through-router> <vm:outbound-endpoint path="myAggregationServiceQueue" transformer-refs="myCorrelationGroupSizeTransformer myCorrelationIdTransformer myJmsTransformer" /> </pass-through-router> </outbound> </service> <service name="fileEventProducerService"> <inbound> <file:inbound-endpoint connector-ref="inboundFileConnector" path="./data/in" /> </inbound> <outbound> <pass-through-router> <vm:outbound-endpoint path="myAggregationServiceQueue" transformer-refs="myCorrelationGroupSizeTransformer myCorrelationIdTransformer myFileTransformer" /> </pass-through-router> </outbound> </service> <service name="myAggregationService"> <inbound> <vm:inbound-endpoint path="myAggregationServiceQueue" /> <custom-correlation-aggregator-router class="net.pascalalma.routing.inbound.MyAggregator" /> </inbound> <echo-component/> <outbound> <pass-through-router> <vm:outbound-endpoint ref="outTestQueue" /> </pass-through-router> </outbound> </service> </model> </mule>
The Services
The Mule config that is used as example here consists of three services:
- vmEventProducerService
- fileEventProducerService
- myAggregationService
This service receives one part of the XML message on a VM queue (actually, in production this will be a JMS queue). The message will be transformed by three transformersas preparation to get aggregated (the transformers will be described in the next paragraph).
This service receives the other part of the XML message as a file on an incoming file endpoint. The message will be transformed by three transformers as preparation to get aggregated (the transformers will be described in the next paragraph).
This is where the two parts are combined to one resulting XML file and the actual work is done.
The Transformers
The following transformers are used:
- myCorrelationIdTransformer
- myCorrelationGroupSizeTransformer
- myJmsTransformer and myFileTransformer
To tell Mule it had to combine two messages which were received on different endpoints we made sure the two messages had the same ‘correlation-id’. To set the correlationID we make use of the following custom-made transformer. We used the content of the elements ‘code’ and ‘period’ in the message to create our own correlation-id:
package net.pascalalma.transformer; import org.mule.module.xml.util.XMLUtils; import org.dom4j.Document; import org.dom4j.Element; import org.dom4j.Node; import org.mule.api.MuleMessage; import org.mule.api.transformer.TransformerException; import org.mule.transformer.AbstractMessageAwareTransformer; /** * Transformer sets correlationID of MuleMessage based on content of payload * * @author Pascal Alma */ public class CorrelationTransformer extends AbstractMessageAwareTransformer { @Override public Object transform(MuleMessage message, String outputEncoding) throws TransformerException { try { Document doc = XMLUtils.toDocument(message.getPayloadAsString()); Element element = doc.getRootElement().element("header"); Node codeNode = element.selectSingleNode("./*[local-name()='code']"); Node periodNode = element.selectSingleNode("./*[local-name()='period']"); if (codeNode != null && periodNode != null) { String correlationId = String.format("%s-%s", codeNode.getStringValue(), periodNode.getStringValue()); message.setCorrelationId(correlationId); } } catch (Exception ex) { throw new TransformerException(this, ex); } return message; } }
This transformer tells our ‘aggregation-component’ which is described later that the groupsize will be ‘2’, so the ‘aggregation-component’ knows it has received all messages when it received a second massage with the same correlation-ID. Here is the definition of the transformer:
<message-properties-transformer name="myCorrelationGroupSizeTransformer"> <add-message-property key="MULE_CORRELATION_GROUP_SIZE" value="2" /> </message-properties-transformer>
As you can see we are using the default message-properties-transformer of mule to set the correlation-group-size property.
The final transformers we used add a property to the MuleMessage so it is known in the aggregator if the message was received from the JMS/VM endpoint or the File endpoint. This distinction is necessary to make sure we do not combine two JMS messages (for instance two ‘example 1’ messages) but are really combining messages from different endpoints.
Actually, as you will see in the implementation of the ‘aggregator component’, if we receive a second message from the same endpoint we will discard the first message and keep waiting for the message at the other endpoint.
The transformer is quite simple:
<message-properties-transformer name="myJmsTransformer"> <add-message-property key="RS_SOURCE" value="JMS" /> </message-properties-transformer>
and
<message-properties-transformer name="myFileTransformer"> <add-message-property key="RS_SOURCE" value="FILE" /> </message-properties-transformer>
With all the transformers explained there is only one item left to explain and that is the ‘custom-correlation-aggregator-router‘ class. Since there is a lot to show about this component I explain it in the next post.