The IBus interface from [EasyNetQ][1] framework has SubscribeAsync() method which allows easily to span message handling between different threads. It uses standard TPL Tasks for delegating handler execution. Depending on the application, you may create Tasks with LongRunning flag, which provides a hint to the scheduler that additional thread may be required[^1] and it should avoid using a ThreadPool[^2]. Below example shows how to register parallel consumer:

var bus = RabbitHutch.CreateBus("host=localhost");
 
bus.SubscribeAsync("sub1", message => Task.Factory.StartNew(() =>
{
// long running message handler
System.Threading.Thread.Sleep(TimeSpan.FromSeconds(5));
}, TaskCreationOptions.LongRunning));

What if there are thousands messages in the queue

Of course we do not want a situation where the application is flooded with messages and consumes all the resources. We want to be in control and be able to specify how many concurrent tasks to run. Fortunately it is very easy to do.

Prefetch count

[RabbitMQ][2] allows clients to specify prefetch count. It is a value which determines how many messages are sent to the client and cached by RabbitMQ client library. We can use this value to control number of concurrent tasks. The prefetch count can be set in [EasyNetQ][1] through bus’ connection string. The default value is 50 and the maximum value is 65535 (UInt16.MaxValue).

Below example shows how to set number of concurrent tasks to 15 by using prefetchcount:

var bus = RabbitHutch.CreateBus("host=localhost;prefetchcount=15");
 
bus.SubscribeAsync("sub1", message => Task.Factory.StartNew(() =>
{
// long running message handler
System.Threading.Thread.Sleep(TimeSpan.FromSeconds(5));
}, TaskCreationOptions.LongRunning));

Notice the prefetchcount setting in line 1 and LongRunning hint in line 7.

Although this solution will work fine with long running handlers where the time to deliver messages is negligible compared to the handling time, it is not good in cases where handling time is small and pre fetching messages may increase general performance of the system.

TaskScheduler

Fortunately there is natural way to control how TPL dispatches work to threads. The [task scheduler][3] class is responsible for queuing tasks onto thread. By using a specially crafted [task scheduler][3] we can control number of concurrent threads used for handling messages. Microsoft even has a How To article showing LimitedConcurrencyLevelTaskScheduler which does exactly that. Using [task scheduler][3] provided in that article we can easily set up concurrency level to 10 and pre fetch count to 15:

var bus = RabbitHutch.CreateBus("prefetchcount=15;host=localhost");
 
LimitedConcurrencyLevelTaskScheduler lcts = new LimitedConcurrencyLevelTaskScheduler(10);
TaskFactory factory = new TaskFactory(lcts);
 
bus.SubscribeAsync("sub1", message => factory.StartNew(() =>
{
// long running message handler
System.Threading.Thread.Sleep(TimeSpan.FromSeconds(5));
}, TaskCreationOptions.LongRunning));