Sending Scheduled/delayed messages with RabbitMQ through java client

28 / Jul / 2015 by Vishal Kumar 1 comments

The requirement to send delayed/scheduled message is to publish any message with a delay time. To achieve this, earlier we had to use dead letter exchange but now we can send scheduled/delayed messages from RabbitMQ with “rabbitmq_delayed_message_exchange” plugin. we can send scheduled/ delayed messages from rabbitMQ by following these steps :-

Step 1- Install rabbitMQ server to 3.5.3 or above on your machine. Follow this guide to install rabbitMQ http://www.rabbitmq.com/download.html

Step 2 - Download “rabbitmq_delayed_message_exchange” plugin from rabbitMQ official website. Follow this link to download “rabbitmq_delayed_message_exchange” http://www.rabbitmq.com/community-plugins.html

Step 3 - Place “rabbitmq_delayed_message_exchange” plugin into plugins directory of rabbitMQ server directory (in linux ‘/usr/lib/rabbitmq/lib/rabbitmq_server-3.5.3/plugins’ path)

Step 4 - Execute “rabbitmq-plugins enable rabbitmq_delayed_message_exchange” command to enable “rabbitmq_delayed_message_exchange” plugin.

Step 5 - Now, after configuring rabbitMQ server we are ready to send scheduled or delayed messages. To send scheduled message we have to declare an exchange with ‘x-delayed-message’ which is indirectly mapped with the any of the exchange types i.e topic,direct,fanout or headers.
Follow this code to publish message to delayed exchange:-

 String readyToPushContent = "Hello this is the delayed message.";

Map<String, Object> args = new HashMap<String, Object>();
 args.put("x-delayed-type", "direct");
 channel.exchangeDeclare("exChangeName", "x-delayed-message", true, false, args);

Map<String, Object> headers = new HashMap<String, Object>()
 headers.put("x-delay", 10000); //delay in miliseconds i.e 10secs

AMQP.BasicProperties.Builder props = new AMQP.BasicProperties.Builder().headers(headers);
 channel.basicPublish("exChangeName", "", props.build(), readyToPushContent.bytes);

a. This plugin allows flexible routing behavior via “x-delayed-type” which can be passed at the time of  exchange declaration.  i.e  this exchange will provide routing behavior like “direct” exchange, and exchange with type “x-delayed-message” will act as proxy.

b. Here we declare an exchange with “x-delayed-message” type. Then we attach a header with “x-delay” at the time of publish message,  which accepts integer value  as time (in milliseconds).

Step 6 - To Subscribe message from delayed exchange follow this code :-

 QueueingConsumer queueingConsumer = new QueueingConsumer(channel) // creating a consumer from channel

Map<String, Object> args = new HashMap<String, Object>();
 args.put("x-delayed-type", "direct");
 channel.exchangeDeclare("exChangeName", "x-delayed-message", true, false, args); // declare delayed exchange

String queueName = channel.queueDeclare().getQueue(); // declaring dynamic queue in channel

channel.queueBind(queueName, "exChangeName", ""); // binding queue with delayed exchnage

channel.basicConsume(queueName, true, queueingConsumer) // consuming message from queue in consumer object

try {
 QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery() // 

 String message = (new String(delivery.getBody()))

 } catch (Exception exception) {
 exception.printStackTrace()

}

Hope, this will help you to send scheduled/delayed messages with rabbitMQ.
Bouquets and brickbats are welcome.

~vishal(d0t)kumar(at)tothenew(d0t)com~

FOUND THIS USEFUL? SHARE IT

comments (1 “Sending Scheduled/delayed messages with RabbitMQ through java client”)

Leave a comment -