Recently, new plugin for RabbitMQ was created which provides support for delayed messaging. The plugin adds new exchange type to RabbitMQ which will store messages internally, using Mnesia, until they are scheduled for delivery. This provides a protection in case the server goes down. The beauty of this solution is that it keeps everything inside RabbitMQ and doesn’t require installing any additional software. It will also simplify RabbitMQ configuration when compared to solutions based on Dead Letter Exchanges and message TTL.
The plugin is currently in experimental phase, but hopefully community will provide precious feedback and allow it to become production ready.
Installing the plugin
Before you can use new exchange type, the rabbitmq_delayed_message_exchange
plugin has to be installed. Just download correct version from Cummunity Plugins page (currently the plugin supports RabbitMQ version 3.4 and 3.5), copy it to server’s plugin folder and enable it by running following command:
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
Publishing delayed messages using C# Client Library
This doesn’t differ much from publishing standard message. First you have to declare new exchange which will be of x-delayed-message
type:
IDictionary<string, object> args = new Dictionary<string, object>
{
{"x-delayed-type", "direct"}
};
channel.ExchangeDeclare("DelayedTest", "x-delayed-message", true, false, args);
When declaring an exchange, you specify the behaviour of the exchange using x-delayed-type argument
. This can be any valid exchange type, such as: direct, topic, fanout or any supported custom exchange type, which allows for very flexible routing.
Next, you need to declare queue and bind it to the exchange:
var queue = channel.QueueDeclare("DelayedMessageQueue", true, false, false, null);
channel.QueueBind(queue, "DelayedTest", "dtest");
And finally, when publishing message which requires delayed delivery, you set x-delay
header to a number of milliseconds before message will get routed. Important to note is that the value has to be of integer
type, otherwise it will not work:
var props = channel.CreateBasicProperties();
props.Headers = new Dictionary<string, object>
{
{"x-delay", 5000}
};
channel.BasicPublish("DelayedTest", "dtest", props, Encoding.Default.GetBytes("This message has been delayed."));
Delayed messages with EasyNetQ
EasyNetQ provided support for delayed messages for a long time now. Unfortunately this required installing separate windows service and setting up database to store messages awaiting publishing. Later versions supported solution based on Dead Letter Exchanges and message TTL, but that was creating exchange for each delay period which was not optimal.
Now, EasyNetQ also supports Delayed Message Exchange plugin (since version 0.47.10.381).
You start with instructing EasyNetQ to use new DelayedExchangeScheduler
:
bus = RabbitHutch.CreateBus("host=localhost", x => x.Register<IScheduler, DelayedExchangeScheduler>());
then subscribe to a message:
bus.Subscribe<PartyInvitation>("schedulingTest1", message =>
{
Console.WriteLine("Got scheduled message: {0}", message.Text);
});
and finally publish a message using FuturePublish
method:
var invitation = new PartyInvitation
{
Text = "Please come to my party",
Date = new DateTime(2011, 5, 24)
};
bus.FuturePublish(DateTime.UtcNow.AddMinutes(5), invitation);
The message will get delivered after 3 minutes.
Summary
Finally, RabbitMQ is getting first class support for delayed messages delivery. That opens up myriad of possibilities from simple reminders and schedulers, through sagas to handling transient errors and back off policies. And hopefully it will tick another box on the list when considering which queue system to choose.
Bear in mind that it is still in experimental phase. But please give it a go and provide feedback on official mailing list.
You can read plugin’s announcement on RabbitMQ’s blog Scheduling Messages with RabbitMQ.
And the plugin’s source code is not github.