How To Integrate RabbitMQ using Spring?

17 / Jan / 2017 by Amit Raturi 5 comments

In this blog, we will see two different implementations of RabbitMQ, but before going to the implementation part let’s take a brief intro of some prerequisites.

JMS

The Java Message Service (JMS) is a Message Oriented Middleware Java API that supports the formal communication between software components. It allows applications to create, send, receive, and read messages in such a way that communication is loosely coupled, reliable and asynchronous.

AMQP

The Advanced Message Queuing Protocol (AMQP) is an open standard for message-oriented middlewares. AMQP creates full functional interoperability between conforming clients and messaging middleware servers (also called “brokers”).

What is RabbitMQ?

RabbitMQ is an open source message-oriented middleware (also called “brokers”) that implements the Advanced Message Queuing Protocol (AMQP).

Benefits of using RabbitMQ

  1. Robust messaging for applications
  2. Runs on all major operating systems
  3. Supports a huge number of developer platforms
  4. Open source and commercially supported
  5. Easy to use

Installation Steps

  1. You can use below link to install RabbitMQ Server in your computer.

https://www.rabbitmq.com/install-debian.html .

Now use below command to enable RabbitMQ Managment Plugin.

rabbitmq-plugins enable rabbitmq_management

2. Check localhost:15672

Default user name=guest

Default password=guest

For complete reference Please Visit https://www.rabbitmq.com/management.html.

Now you’re ready to use RabbitMQ.

RabbitMQ Use Case

Let’s take a scenario in which we have to perform multiple different tasks by events we pass in the queue. We have Producer (who sends a message) and Consumer (who receives a message).

Consumer passes a message to the EventHandler Class. It is explained more clearly in the following steps.

Here we will see two different implementations in Spring. First JMS Base integration and other Spring particular AMQP base integration.

JMS Base Integration:

Step 1.1: Adding Library Dependency

To get started with RabbitMQ Java AMQP, it is recommended to use a dependency management system.

The maven dependency is given below:

[code language=”xml”]
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>4.0.0</version>
</dependency>
[/code]

Alternatively, if you’re using Gradle:

[code language=”xml”]
dependencies {
compile ‘com.rabbitmq:amqp-client:4.0.0’
}
[/code]

Step 1.2: Create Connector Class

Now, we need to create an abstract Connector class. Basic implementation of RabbitMQ is same for Producer as well as Consumer. We can generalize our code in the connector class and Producer/Consumer will have to inherit this class.

[code language=”java”]
public abstract class Connector {
protected Channel myChannel;
protected Connection connection;
protected String queueName;

public Connector(String queueName) throws IOException,TimeoutException {
this.queueName=queueName;
ConnectionFactory connectionFactory = new ConnectionFactory();
// Hostname of your rabbitmq server
connectionFactory.setHost("localhost");
// getting a connection
connection = connectionFactory.newConnection();

/*this will create a new channel, using an internally allocated channel number or we can say it will simply declare a queue for this channel. If queue does not exist.*/
myChannel= connection.createChannel();

myChannel.queueDeclare(queueName, false, false, false, null);
}

public void close() throws IOException, TimeoutException {
this.myChannel.close();
this.connection.close();
}
}
[/code]

Step 1.3: Create Producer

A program that sends messages is a Producer.

It has to inherit connector class so that producer is able to communicate with the queue.

sendMessage (Serializable object) method will send the serializable data to the queue.

[code language=”java”]
public class Producer extends Connector {
public Producer(String queueName) throws IOException, TimeoutException{

super(queueName);
}

public void sendMessage(Serializable object) throws IOException {
channel.basicPublish("",queueName, null, SerializationUtils.serialize(object));
}
}
[/code]

 Step 1.4: Create Consumer

A consumer is a program that mostly waits to receive messages. It has to inherit connector class so that Consumer is able to communicate with the queue.

Now the consumer will receive the serialized data (message) so we have to deserialize this data before use it.

public void handleDelivery(String consumerTag, Envelope env, BasicProperties props, byte[] body).

This method will receive the message and body is the serialized data which we will deserialize before using it.

[code language=”java”]
public class QueueConsumer extends Connector implements Runnable, Consumer {

public QueueConsumer(String queueName) throws IOException, TimeoutException {
super(queueName);
}

public void run() {
try {
// start consuming messages. Auto acknowledge messages.
channel.basicConsume(queueName, true, this);
} catch (IOException e) {
e.printStackTrace();
}
}
public void handleConsumeOk(String info) {
//Called when the consumer is registered.

}

// Called when new message is available.

public void handleDelivery(String info, Envelope env, BasicProperties props, byte[] body)
throws IOException {
Map map = (HashMap) SerializationUtils.deserialize(body);
EventHandler.handler((Events) map.get("event"), map.get("message"));
}

public void handleCancel(String info) {
//Called when the consumer is canceled..
}

public void handleCancelOk(String info) {
}

public void handleRecoverOk(String info) {
}

public void handleShutdownSignal(String info, ShutdownSignalException exception) {
}
}
[/code]

Step 1.5: Create enum Events

These are the events which will be used to decide which task to perform.

[code language=”java”]
enum Events {
Event1,Event2;

}
[/code]

Step 1.6: Create class EventHandler

This class will handle all the events you want to trigger whenever consumer consumes a message.

[code language=”java”]
public class EventHandler {
static void handler(Events event, Object message) {
switch (event) {
case Event1:
System.out.println("publish 1 event" + message);
break;
case Event2:
System.out.println("publish event 2" + message);
break;
}
}
}

[/code]

Step 1.7: Create a constant Class

This class will have the queue name.

[code language=”java”]
public final class Constants {
public static final String queueName = "queueName";

}
[/code]

Step 1.8: Create Service Class EventPublisherService

When the EventPublisherService bean is created at that moment Consumer, producer instant will be created and the consumer thread will be started.

public void publishEvent(Events event, Object messages) method will create a message and that will be sent to the queue.

[code language=”java”]
@Service
public class EventPublisherService {
final private Producer producer;

EventPublisherService() throws IOException, TimeoutException {
QueueConsumer consumer = new QueueConsumer(Constants.queueName);
Thread consumerThread = new Thread(consumer);
consumerThread.start();
producer = new Producer(Constants.queueName);
}

public void publishEvent(Events event, Object messages) throws IOException, TimeoutException {

HashMap<String, Object> message = new HashMap<String, Object>();
message.put("event", event);
message.put("message", messages);
producer.sendMessage(message);
}
}
[/code]

Step 1.9: Create Controller

We just have to pass Event Type and message to publishEvent (Events, message) method of EventPublisherService and rest is handled by it.

[code language=”java”]
@Controller
@RequestMapping("/")
public class UserController {
@Autowired
EventPublisherService eventPublisherService;
@RequestMapping("/rabbitMq")
public ModelAndView welcome() throws IOException, TimeoutException {
Integer message1 = 11111;
eventPublisherService.publishEvent(Events.Event1, message1);
String message2 = "Implementing RabbitMQ";
eventPublisherService.publishEvent(Events.Event2, message2);
return new ModelAndView("index", "message", "Implemented");
}
}

[/code]

Spring Specific Implementation

There will be no Producer and consumer classes in this implementation. We only need to use below command to send message.

template.convertAndSend(Constants.queueName,message);

And @RabbitListener(queues = “queueName”) Annotation to receive a message. It is explained more clearly in the following steps:

Step 2.1: Adding Library Dependency

[code language=”xml”]
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit</artifactId>
<version>1.4.5.RELEASE</version>
</dependency>
[/code]

alternatively for Gradle

[code language=”java”]
compile group: ‘org.springframework.amqp’, name: ‘spring-rabbit’, version: ‘1.4.5.RELEASE’
[/code]

Step 2.2: Create a configuration class named RabbitMqConfiguration

Here we will create all the mandatory beans which are required By RabbitMQ.

[code language=”java”]
@Configuration
public class RabbitMqConfiguration {
@Bean
public ConnectionFactory connectionFactory() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory("localhost");
return connectionFactory;
}

@Bean
public AmqpAdmin amqpAdmin() {
return new RabbitAdmin(connectionFactory());
}

@Bean
public RabbitTemplate rabbitTemplate() {
return new RabbitTemplate(connectionFactory());
}

@Bean
public Queue myQueue() {
return new Queue(Constants.queueName);
}
@Bean(name="rabbitListenerContainerFactory")
public SimpleRabbitListenerContainerFactory rabbitListenerContainerlistenerFactory(){
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory());
return factory;
}

}
[/code]

Step 2.3: Repeat Steps

The Events enum, EventHandler class, Constants class and controller Class are same as created in Step 1.5,1.6,1.7 and 1.9.

Step 2.4: Create EventPublisherService

EventPublisherService will create a message and then send it to the queue using template.convertAndSend(Constants.queueName,message);

this service Act like a producer.

[code language=”java”]
@Service
public class EventPublisherService {
@Autowired
AmqpTemplate template;
public void publishEvent(Events event, Object messages) throws IOException, TimeoutException {
Map<String, Object> message = new HashMap<String, Object>();
message.put("event", event);
message.put("message", messages);
template.convertAndSend(Constants.queueName,message);
}

}
[/code]

Step 2.5 Create Listener Class or consumer named RabbitMqMessageListener

This is the Listener class or Consumer whenever Producer sends a message to the queue this class will receive it and the only method having annotation @RabbitListener(queues = Constants.queueName) will receive the messages.

[code language=”java”]
@EnableRabbit
@Component
public class RabbitMqMessageListener {
@RabbitListener(queues = Constants.queueName)
public void processQueue(Map<String, Object> message) {
EventHandler.handler((Events) message.get("event"), message.get("message"));
}
[/code]

Step 2.6: This is an additional step which depends on the use case. In the preceding step, I have used view(index.jsp) in the controller. So now you have to create the view(index.jsp) in webapp/WEB-INF/views.

Finally use the command: mvn jetty:run in your root directory of a project to run the project.

Hope this will Help. Thank You.

You can find the source code at following git repo’s

basic RabbitMq JMS integration

https://github.com/amitraturi36/RabbitMqSpringIntegration.git

spring specific AMQP Integration

https://github.com/amitraturi36/SpringRabbitMQIntegrationExample.git

Also See:

Sending Scheduled/delayed messages with RabbitMQ through java client

Few Simple steps for integrating Rabbit MQ with Grails 

FOUND THIS USEFUL? SHARE IT

comments (5)

  1. Rodney Barbati

    I definitely would not encode the queue name in a constant. If you only have one queue, then you really don’t need a broker.

    Also, your EventPublisher class is not realistic. Consumers will rarely be on the same machine as the producer, so the producer should not be starting the consumers.

    What you will possibly need is a producer that can be instantiated quickly by whatever code needs it, provides methods for sending messages into the broker, and that can then be discarded.

    It is doubtful that you can use a single consumer class for all queues as well – some queues will have different guarantee levels associated with them, and these will require different processing of messages.

    Reply
  2. Djeison

    Hi there,
    Thank you for this very helpful article!
    In my application I have a few @RabbitListeners and when RabbitMQ is down, the app doesn’t start. Would you know how to handle connection failure and make the app start even when RabbitMQ is down?
    Thank you in advance!

    Reply

Leave a Reply to Rodney Barbati Cancel reply

Your email address will not be published. Required fields are marked *