Message Throttling with Mule ESB

mulesoft-logoWhile implementing the Mule ESB I ran into the following scenario:
Messages were picked up from a queue, the message was transformed, offered to a web service, the response was transformed and put on a response queue.
Not an unusual use case in the world of integration. One thing that made this one special was the (lack of) performance of the web service. It appeared as soon as we started to send messages to the services via the ESB the response time for each of the messages increased significally. Investigation showed that the web service was having serious issues when concurrent messages were sent. This is what the Mule ESB do by default. It will start of with a set of threads to process the messages as quickly as possible. Until the issue with the web service is solved I decided to add throttling functionality to my flow so I could manage the number of calls to the web service.
Anyway to add the throttling I think there are two ways: one is by adding some delay before a message is delivered. This way is described here. But although I haven’t tried this one I expect it would lead to the situation where 16 concurrent web service calls are delayed 10 seconds and then still fired all at once.
So what I was looking for was to be able to configure the number of concurrent threads that would call the web service. Luckily this has been greatly simplified in Mule3 vs Mule2 as you can read here. To mimic my situation I created the following Mule flow:

<?xml version="1.0" encoding="UTF-8"?>
<mule xmlns="http://www.mulesoft.org/schema/mule/core"
      xmlns:vm="http://www.mulesoft.org/schema/mule/vm"
      xmlns:test="http://www.mulesoft.org/schema/mule/test"
      version="EE-3.4.1"
      xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
      xsi:schemaLocation="http://www.mulesoft.org/schema/mule/vm http://www.mulesoft.org/schema/mule/vm/current/mule-vm.xsd
        http://www.mulesoft.org/schema/mule/test http://www.mulesoft.org/schema/mule/test/current/mule-test.xsd
http://www.mulesoft.org/schema/mule/core http://www.mulesoft.org/schema/mule/core/current/mule.xsd">
    <vm:endpoint name="input" path="input" exchange-pattern="one-way" />
    <vm:endpoint name="ws-call" path="ws-call" exchange-pattern="request-response" />
    <vm:endpoint name="output" path="output" exchange-pattern="one-way" />
    <flow name="testFlow" processingStrategy="asynchronous">
        <inbound-endpoint ref="input" />
        <outbound-endpoint ref="ws-call" />
        <outbound-endpoint ref="output" />
    </flow>
    <flow name="wsFlow" processingStrategy="synchronous">
        <inbound-endpoint ref="ws-call" />
        <append-string-transformer message=" added to the payload" />
        <test:component waitTime="2000"/>
    </flow>
</mule>

As you can see I have done nothing special related to threads, processing etc. The test class to run this flow looks like this:

package net.pascalalma.mule;

import org.junit.Test;
import org.mule.DefaultMuleMessage;
import org.mule.api.MuleMessage;
import org.mule.module.client.MuleClient;
import org.mule.tck.junit4.FunctionalTestCase;

import java.util.Date;

public class ThrottleTest extends FunctionalTestCase {

    @Test
    public void simplePassThrough() throws Exception
    {
        MuleClient client = new MuleClient(muleContext);
        long start = new Date().getTime();
        for (int i=0;i<30;i++) {
            MuleMessage inMsg = new DefaultMuleMessage("Message "+ i,muleContext);
            client.dispatch("input", inMsg);
        }
        MuleMessage result = client.request("output",3000);
        while (result != null) {
            result = client.request("output",3000);
            long end = new Date().getTime();
            System.out.println("Message took : " + (end - start)/1000);
        }
        long end = new Date().getTime();
        System.out.println("Total Service took : " + (end - start)/1000);
    }
    @Override
    protected String getConfigResources() {
        return "mule-config.xml";
    }
}

This generates the following output:

================================================================================
= Testing: simplePassThrough =
================================================================================
Message took : 3
Message took : 3
Message took : 3
Message took : 3
Message took : 3
Message took : 3
Message took : 3
Message took : 3
Message took : 3
Message took : 3
Message took : 3
Message took : 3
Message took : 3
Message took : 3
Message took : 3
Message took : 3
Message took : 3
Message took : 5
Message took : 5
Message took : 5
Message took : 5
Message took : 5
Message took : 5
Message took : 5
Message took : 7
Message took : 7
Message took : 7
Message took : 7
Message took : 7
Message took : 7
Total Service took : 10

As you can see a lot of messages are processed concurrently in a ‘batch’.
To have control about the number of threads you can do two things (at least). The first is to make sure there are only a certain number of threads for the flow allowed. This can only be done if the processing strategy of the flow is asynchronous. In that case you can define your own processing strategy like this:

...
  <queued-asynchronous-processing-strategy name="allow2Threads" maxThreads="2" poolExhaustedAction="RUN"/>

  <flow name="testFlow" processingStrategy="allow2Threads">
   ...

The rest of the config is the same as before. If we run this we get the following output (for 10 messages):

================================================================================
= Testing: simplePassThrough =
================================================================================
Message took : 3
Message took : 3
Message took : 5
Message took : 5
Message took : 7
Message took : 9
Message took : 9
Message took : 11
Message took : 11
Message took : 13
Total Service took : 16

Process finished with exit code 0

As we can see there are now max 2 concurrent messages.
In case we have to have a synchronous flow (when using transactions for instance) we cannot set maxthreads on the processing strategy. But we can set it on the connector as I show here:

...
  <vm:connector name="myConnector">
    <receiver-threading-profile doThreading="true" maxThreadsActive="2" poolExhaustedAction="RUN" />
  </vm:connector>
  <flow name="testFlow" processingStrategy="synchronous">
...

If we run this (for 10 messages) we get the following output:

================================================================================
= Testing: simplePassThrough =
================================================================================
Message took : 2
Message took : 2
Message took : 4
Message took : 4
Message took : 6
Message took : 6
Message took : 9
Message took : 9
Message took : 11
Message took : 11
Total Service took : 14

We get a similar result as the previous case but now the whole flow is processed synchronously.
With this test case you are able to test it your self to see what best fits your situation. You can also play with the ‘poolExhaustedAction’ and other attributes.

About Pascal Alma

Pascal is a senior IT consultant and has been working in IT since 1997. He is monitoring the latest development in new technologies (Mobile, Cloud, Big Data) closely and particularly interested in Java open source tool stacks, cloud related technologies like AWS and mobile development like building iOS apps with Swift. Specialties: Java/JEE/Spring Amazon AWS API/REST Big Data Continuous Delivery Swift/iOS
This entry was posted in Mule3 and tagged . Bookmark the permalink.