This post was written as an answer to one of the questions on EasyNetQ user group.
The main principle of [EasyNetQ][1] bus is simplicity. It greatly abstracts nuances of communicating with RabbitMQ giving developers time to concentrate on writing the core application. The framework is great and makes processing messages really simple. But there are few scenarios when this simplicity becomes a small pain. One of those times is when you want to gracefully stop the application allowing it to finish processing current message.
This post is rather lengthy but I wanted to provide good explanation.
Acknowledging messages
When it comes to acknowledging messages, the [EasyNetQ][1] takes fully care of that. The only case when message will be NACKed is when the subscriber was disposed during processing a message. In any other case including errors1 the message is ACKed. There is no manual ACK or NACK command available on the bus.
Unsubscribing from the bus
To unsubscribe from the bus you need to call Dispose()
method on the result returned from the Subscribe()
call. This will stop processing of the message and prevent new messages from delivering.
var consumer = bus.SubscribeAsync("stockChecker", m => Task.Factory.StartNew(() => StockCheckerHandler(m)));
consumer.Dispose();
Allowing to finish processing message
If you want to allow your consumer code finish processing a message, before calling Dispose() you need to wait until the handling is completed and the bus ACKs the message.
Waiting for bus to ACK message
Unfortunately there is no event which will be raised when the ACK happened. You can assume the message was ACKed when there is new message delivered for processing. So as long as there are messages waiting in the queue we can detect ACK. But what if there are no more messages in the queue? Well, you need to wait some arbitrary time to allow bus finishing the operation. In most cases it is very fast and a second or few should be enough. In the worst case scenario the message returns back to the queue and will be processed again.
Putting it all together
I am using CancellationToken
to signal application stop request and ManualResetEvent
to signal when message is being processed. The same ManualResetEvent
is also used to signal delivery of new message for processing which tells us that message was ACKed and we may Dispose
consumer. When waiting on ManualResetEvent
to signal, you can specify a timeout which is used here to force application stop if processing doesn’t finish in timely fashion.
There are four cases to consider.
APPLICATION STOPPED WHEN NOTHING IS PROCESSED
Simplest case. When application stop requested and the consumer doesn’t process any message and we can safely Dispose
it.
CONSUMER BUSY AND THERE ARE MORE MESSAGES IN THE QUEUE
As mentioned earlier on, new message delivered to consumer is a signal that current message was ACKed (line 73
) and we can proceed with shutting down. To prevent loosing new message the consumer thread sleeps infinitely (line 74
). That makes disposed bus to NACK the message moving it back to the top of the queue.
Main application thread signals the stop was requested using CancellationToken
(line 43
) and starts waiting for consumer to finish handling current message (line 49
). Providing time out value to WaitOne
method allows for forcing handler to stop in case it takes to long for it to finish and show message on the screen. If handling the message finishes before time out (line 86
) then main thread receives signal (line 54
) and starts waiting to see whether there is new message for processing (line 55
).
Bus delivers new message to the consumer which detects that application stop was requested (line 71
), signals that back to main thread (line 72
) and sleeps the thread infinitely.
Main thread receive the signal (line 57
) meaning message was ACKed and consumer can be disposed (line 59
). This will trigger NACK of the new message delivered by the bus. Finally, the bus is disposed and application stops.
CONSUMER BUSY BUT NO MORE MESSAGES IN THE QUEUE
This is the trickiest path. Because there is no notification when bus ACKed the message you can only sleep main thread giving enough time for the bus to finish operation. Using WaitOne
with time out (line 57
) of few seconds should guarantee successful ACK in most cases.
CONSUMER DOESN’T FINISH PROCESSING MESSAGE IN TIMELY FASHION
To make sure the application doesn’t hung on “exit” because long running consumer, when waiting on the signal that current message was processed you can use time out value (line 49
). In case the wait timed out the application shows a message and disposes consumer. The message will go back to the queue (bus will NACK it) and will be ready for processing again.
Generally, message handlers should be designed in a way that processing doesn’t take to long. In cases long processing is required it is a good practice to periodically check CancellationToken
for signal that application is shutting down, and if that happens to save current state of processing. When the messages is picked up again you can continue processing from the last known stage.
Reliability
As mentioned earlier, in small number of cases the consumer can be disposed before bus ACKes the message. This will happen only when there is no new message for processing and system gets busy, slowing down code execution in [EasyNetQ][1]. If that happens the message goes back to the queue and will be processed again. Assuming all consumer code is idempotent this will only result in loosing some processing time.
Sample code
Below is a gist with sample implementation. To run it just call Run()
method:
new DisposingConsumer().Run();
After application ends you should see 4 messages left on the System.String:mscorlib_billing
queue.
using EasyNetQ;
using EasyNetQ.Loggers;
using System;
using System.Threading;
using System.Threading.Tasks;
namespace DeliveryMethods
{
public class DisposingConsumer
{
private readonly IBus _bus;
private readonly CancellationTokenSource _cancellationTokenSource;
public DisposingConsumer()
{
_bus = RabbitHutch.CreateBus("host=localhost;prefetchcount=1", x => x.Register<IEasyNetQLogger, NullLogger>());
_cancellationTokenSource = new CancellationTokenSource();
}
private CancellationToken CancellationToken { get { return _cancellationTokenSource.Token; } }
public void Run()
{
var stockChecker = SubscribeStockCheckerConsumer();
var billingConsumerResetEvent = new ManualResetEvent(true);
var billingConsumer = SubscribeBillingConsumer(billingConsumerResetEvent);
int oId = 1;
while (oId < 6)
{
_bus.PublishAsync(string.Format("New order {0}. Timestamp: {1}", oId++, DateTime.Now.TimeOfDay));
System.Threading.Thread.Sleep(TimeSpan.FromSeconds(1));
}
StopApplication(stockChecker, billingConsumerResetEvent, billingConsumer);
_bus.Dispose();
}
private void StopApplication(IDisposable stockChecker, ManualResetEvent billingConsumerResetEvent, IDisposable billingConsumer)
{
_cancellationTokenSource.Cancel();
Console.WriteLine("disposing stock checker consumer");
stockChecker.Dispose();
Console.WriteLine("disposing billing consumer");
if (!billingConsumerResetEvent.WaitOne(TimeSpan.FromSeconds(15)))
{
Console.WriteLine("Billing consumer hasn't finished on time and was forcefully disposed. The message was probably NACKed.");
}
else
{
billingConsumerResetEvent.Reset();
Console.WriteLine("Waiting for message to be ACKed.");
billingConsumerResetEvent.WaitOne(TimeSpan.FromSeconds(5));
}
billingConsumer.Dispose();
Console.WriteLine("All consumers stopped.");
}
private IDisposable SubscribeBillingConsumer(ManualResetEvent billingConsumerResetEvent)
{
return _bus.SubscribeAsync<string>("billing", m => Task.Factory.StartNew(() => BillingHandler(billingConsumerResetEvent, m)));
}
private void BillingHandler(ManualResetEvent billingConsumerResetEvent, string m)
{
if (CancellationToken.IsCancellationRequested)
{
billingConsumerResetEvent.Set();
Thread.Sleep(System.Threading.Timeout.Infinite);
}
billingConsumerResetEvent.Reset();
try
{
Console.WriteLine("Billing [2]: {0}", m);
System.Threading.Thread.Sleep(TimeSpan.FromSeconds(10));
Console.WriteLine("Billing [2] - peocessing finished");
}
finally
{
billingConsumerResetEvent.Set();
}
}
private IDisposable SubscribeStockCheckerConsumer()
{
var stockChecker = _bus.SubscribeAsync<string>("stockChecker", m => Task.Factory.StartNew(() => StockCheckerHandler(m)));
return stockChecker;
}
private static void StockCheckerHandler(string m)
{
Console.WriteLine("Stock [1]: {0}", m);
}
}
}