Aggregating messages from different sources with Mule (part 2)

This post continues the use case I explained here. The part left to explain was the ‘custom-correlation-aggregator-router‘.
Let us start with the source code of this routing component.
The class ‘MyAggregator’

package net.pascalalma.routing.inbound;

...
import net.pascalalma.routing.MyEventCorrelator;
...
/**
 *
 * @author Pascal Alma
 */
public class MyAggregator extends AbstractCorrelationAggregator {

    @Override
    // Custom EventCorrelator implementation to accomodate special requirements for xml file handling
    public void initialise() throws InitialisationException
    {
        eventCorrelator = new MyEventCorrelator(getCorrelatorCallback(), getMessageInfoMapping(), muleContext);
        if (getTimeout() != 0)
        {
            eventCorrelator.setTimeout(getTimeout());
            eventCorrelator.setFailOnTimeout(isFailOnTimeout());
            try
            {
                eventCorrelator.enableTimeoutMonitor();
            }
            catch (WorkException e)
            {
                throw new InitialisationException(e, this);
            }
        }
    }

    @Override
    protected EventCorrelatorCallback getCorrelatorCallback()
    {
        return new DelegateCorrelatorCallback();
    }

    @Override
    public MuleMessage aggregateEvents(EventGroup events) throws AggregationException {

       ...
    }

    private class DelegateCorrelatorCallback extends CollectionCorrelatorCallback
    {
        @Override
        public MuleMessage aggregateEvents(EventGroup events) throws AggregationException
        {
            return MyAggregator.this.aggregateEvents(events);
        }

        @Override
        public EventGroup createEventGroup(MuleEvent event, Object groupId)
        {
            return new MyEventGroup(groupId, event.getMessage().getCorrelationGroupSize());
        }
    }
}

The important methods in this class are the following:

  • initialise()
  • The initialise-method is overridden so we were able to instantiate our own EventCorrelator instead of the default one. The source code of our implementation is shown here in this blog.

  • aggregateEvents(EventGroup events)
  • This is where the aggregation of the two XML messages is performed. It contains lot of XML related code
    that does the real work. This method is called when the two messages from different sources with the same correlationID are received.

  • private class DelegateCorrelatorCallback
  • This implementation is necessary to instantiate our own EventGroup class instead of the default one. The source for our EventGroup is shown here in this post.

    The class ‘MyEventCorrelator

    package net.pascalalma.routing;
    
    import net.pascalalma.mule.routing.CustomEventCorrelator;
    import org.mule.api.MuleContext;
    import org.mule.api.routing.MessageInfoMapping;
    import org.mule.routing.EventCorrelatorCallback;
    
    /**
     *
     * @author Pascal Alma
     */
    public class MyEventCorrelator extends CustomEventCorrelator {
    
        public MyEventCorrelator(EventCorrelatorCallback callback, MessageInfoMapping messageInfoMapping, MuleContext context)
        {
            super(callback, messageInfoMapping, context);
        }
    
        @Override
        protected void addProcessedGroup(Object id)
        {
            // Do not remember already processed groups
        }
    
        @Override
        protected boolean isGroupAlreadyProcessed(Object id)
        {
            return false;
        }
    }
    

    The thing we changed for this class was the way it handles already processed correlationID’s. Normally when a group of messages with a certain correlationID is processed and another message with that correlationID is received this message is rejected. However, in our case we wanted the possibility to process this group of messages again, so we override the methods responsible for this behavior and made it to behave our way.
    As you might have noticed this last class extends another class that we have modified net.pascalalma.mule.routing.CustomEventCorrelator. We had to create this class because the original one org.mule.routing.EventCorrelator wasn’t working properly in our case when we set the time-out value for the correlator.
    What we did for this fix was we extended the original class and copied the source of the method ‘enableTimeoutMonitor()’. In this method we replaced the lines:

    // TODO which use cases would need a sync reply event returned?
    service.getComponent().invoke(newEvent);
    

    with:

    //Pascal Some fix here
    msg = service.getComponent().invoke(newEvent);
    service.getOutboundRouter().route(msg, new DefaultMuleSession(service, context));
    

    This made the process continue after a timeout occurred and our tests succeed.

    The class ‘MyEventGroup’
    The last class I want to show is this one. The source looks like this:

    package net.pascalalma.routing.inbound;
    
    import org.mule.api.MuleEvent;
    import org.mule.routing.inbound.EventGroup;
    
    /**
     *
     * @author Pascal Alma
     */
    public class MyEventGroup extends EventGroup {
    
        public static String MY_SOURCE = "RS_SOURCE";
    
        public MyEventGroup(Object groupId, int expectedSize)
        {
            super(groupId, expectedSize);
        }
    
        /**
         * Replaces existing event in the eventGroup is a new event with the same SOURCE is
         * added.
         *
         * @param event
         */
        @Override
        public void addEvent(MuleEvent event)
        {
            MuleEvent[] eventArray = super.toArray();
            for (int i = 0; i < eventArray.length; i++) {
                MuleEvent muleEvent = eventArray[i];
                if (event.getProperty(MY_SOURCE) != null && muleEvent.getProperty(MY_SOURCE) != null
                        && event.getProperty(MY_SOURCE).toString().equalsIgnoreCase(muleEvent.getProperty(MY_SOURCE).toString())) {
                    super.removeEvent(muleEvent);
                }
            }
            super.addEvent(event);
        }
    }
    

    The keypoints here is that when an event is added to the group it has a property (RS_SOURCE) indicating from which source it is originated. If there already is an event for that source the original event will be removed from the group and the new one will be added. This way we are able to replace a message that was post by resending a newer version before the aggregation has taken place.

    This concludes the rather complex example of this business case. The chance you have the exact same business case is rather small but I hope you can pick the pieces that are relevant for you.

About Pascal Alma

Pascal is a senior IT consultant and has been working in IT since 1997. He is monitoring the latest development in new technologies (Mobile, Cloud, Big Data) closely and particularly interested in Java open source tool stacks, cloud related technologies like AWS and mobile development like building iOS apps with Swift. Specialties: Java/JEE/Spring Amazon AWS API/REST Big Data Continuous Delivery Swift/iOS
This entry was posted in Mule2 and tagged . Bookmark the permalink.

One Response to Aggregating messages from different sources with Mule (part 2)

  1. Mohammed says:

    Hi Pascal,

    I am new to SoapUI, could you please forward me any documentation or some material so that i could get the idea of how to use soapui.

    I appreciate your help.

    Thanks

Comments are closed.