This post continues the use case I explained here. The part left to explain was the ‘custom-correlation-aggregator-router‘.
Let us start with the source code of this routing component.
The class ‘MyAggregator’
package net.pascalalma.routing.inbound; ... import net.pascalalma.routing.MyEventCorrelator; ... /** * * @author Pascal Alma */ public class MyAggregator extends AbstractCorrelationAggregator { @Override // Custom EventCorrelator implementation to accomodate special requirements for xml file handling public void initialise() throws InitialisationException { eventCorrelator = new MyEventCorrelator(getCorrelatorCallback(), getMessageInfoMapping(), muleContext); if (getTimeout() != 0) { eventCorrelator.setTimeout(getTimeout()); eventCorrelator.setFailOnTimeout(isFailOnTimeout()); try { eventCorrelator.enableTimeoutMonitor(); } catch (WorkException e) { throw new InitialisationException(e, this); } } } @Override protected EventCorrelatorCallback getCorrelatorCallback() { return new DelegateCorrelatorCallback(); } @Override public MuleMessage aggregateEvents(EventGroup events) throws AggregationException { ... } private class DelegateCorrelatorCallback extends CollectionCorrelatorCallback { @Override public MuleMessage aggregateEvents(EventGroup events) throws AggregationException { return MyAggregator.this.aggregateEvents(events); } @Override public EventGroup createEventGroup(MuleEvent event, Object groupId) { return new MyEventGroup(groupId, event.getMessage().getCorrelationGroupSize()); } } }
The important methods in this class are the following:
- initialise()
- aggregateEvents(EventGroup events)
- private class DelegateCorrelatorCallback
The initialise-method is overridden so we were able to instantiate our own EventCorrelator instead of the default one. The source code of our implementation is shown here in this blog.
This is where the aggregation of the two XML messages is performed. It contains lot of XML related code
that does the real work. This method is called when the two messages from different sources with the same correlationID are received.
This implementation is necessary to instantiate our own EventGroup class instead of the default one. The source for our EventGroup is shown here in this post.
package net.pascalalma.routing; import net.pascalalma.mule.routing.CustomEventCorrelator; import org.mule.api.MuleContext; import org.mule.api.routing.MessageInfoMapping; import org.mule.routing.EventCorrelatorCallback; /** * * @author Pascal Alma */ public class MyEventCorrelator extends CustomEventCorrelator { public MyEventCorrelator(EventCorrelatorCallback callback, MessageInfoMapping messageInfoMapping, MuleContext context) { super(callback, messageInfoMapping, context); } @Override protected void addProcessedGroup(Object id) { // Do not remember already processed groups } @Override protected boolean isGroupAlreadyProcessed(Object id) { return false; } }
The thing we changed for this class was the way it handles already processed correlationID’s. Normally when a group of messages with a certain correlationID is processed and another message with that correlationID is received this message is rejected. However, in our case we wanted the possibility to process this group of messages again, so we override the methods responsible for this behavior and made it to behave our way.
As you might have noticed this last class extends another class that we have modified net.pascalalma.mule.routing.CustomEventCorrelator
. We had to create this class because the original one org.mule.routing.EventCorrelator
wasn’t working properly in our case when we set the time-out value for the correlator.
What we did for this fix was we extended the original class and copied the source of the method ‘enableTimeoutMonitor()’. In this method we replaced the lines:
// TODO which use cases would need a sync reply event returned? service.getComponent().invoke(newEvent);
with:
//Pascal Some fix here msg = service.getComponent().invoke(newEvent); service.getOutboundRouter().route(msg, new DefaultMuleSession(service, context));
This made the process continue after a timeout occurred and our tests succeed.
The class ‘MyEventGroup’
The last class I want to show is this one. The source looks like this:
package net.pascalalma.routing.inbound; import org.mule.api.MuleEvent; import org.mule.routing.inbound.EventGroup; /** * * @author Pascal Alma */ public class MyEventGroup extends EventGroup { public static String MY_SOURCE = "RS_SOURCE"; public MyEventGroup(Object groupId, int expectedSize) { super(groupId, expectedSize); } /** * Replaces existing event in the eventGroup is a new event with the same SOURCE is * added. * * @param event */ @Override public void addEvent(MuleEvent event) { MuleEvent[] eventArray = super.toArray(); for (int i = 0; i < eventArray.length; i++) { MuleEvent muleEvent = eventArray[i]; if (event.getProperty(MY_SOURCE) != null && muleEvent.getProperty(MY_SOURCE) != null && event.getProperty(MY_SOURCE).toString().equalsIgnoreCase(muleEvent.getProperty(MY_SOURCE).toString())) { super.removeEvent(muleEvent); } } super.addEvent(event); } }
The keypoints here is that when an event is added to the group it has a property (RS_SOURCE) indicating from which source it is originated. If there already is an event for that source the original event will be removed from the group and the new one will be added. This way we are able to replace a message that was post by resending a newer version before the aggregation has taken place.
This concludes the rather complex example of this business case. The chance you have the exact same business case is rather small but I hope you can pick the pieces that are relevant for you.
Hi Pascal,
I am new to SoapUI, could you please forward me any documentation or some material so that i could get the idea of how to use soapui.
I appreciate your help.
Thanks