In my career I had used the JMS part extensively and recently I was doing the feasibility study of shifting to the RabbitMQ. RabbitMQ is really very robust and highly scalable but only stuff is that it is written in Erlang programming language as against the common practice of writing in Java. So how one will connect to RabbitMQ through Java ? RabbitMQ had provided the client library for connecting from Java but they are just basic one and as developer we need to write lots of code for better management.
RabbitMQ is product from the Spring Community so I decided to use the Spring-AMQP library to connect and use the normal producer and consumer pattern. Now Spring AMQP use the standard Spring 3.0 @Configuration to create the producer and consumers. So here we need to tell the queue name , routing key and all configuration during the context starts. Also you cannot reuse the @Configuration Class to create several Bean Object out of the same.
But my requirement was altogether very absurd to create the Queue and Consumers Dynamically which is not so easily possible and no help is available on the net for the same. After digging in depth of how Spring AMQP works I was able to do so.
I am assuming that people are already aware of the library required for the Spring AMQP if not download the sample code of Spring AMQP HellowWorld example open its maven project in your choice of your IDE and create the package as described below.
First Lets Create the producer:
package com.milind.spring.amqp.direct;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.connection.SingleConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.JsonMessageConverter;
/**
*
* @author milind
*/
public class ProducerConfiguration {
private String queueName;
private String routingKey;
private RabbitTemplate rabbitTemplate;
public ProducerConfiguration() {
}
public ProducerConfiguration(String queueName, String routingKey) {
this.queueName = queueName;
this.routingKey = routingKey;
this.rabbitTemplate = rabbitTemplate();
RabbitAdmin admin = new RabbitAdmin(this.rabbitTemplate.getConnectionFactory());
admin.declareQueue(new Queue(this.queueName));
}
public void setQueueName(String queueName) {
this.queueName = queueName;
}
public void setRoutingKey(String routingKey) {
this.routingKey = routingKey;
}
public String getQueueName() {
return queueName;
}
public String getRoutingKey() {
return routingKey;
}
public RabbitTemplate rabbitTemplate() {
RabbitTemplate template = new RabbitTemplate(connectionFactory());
//The routing key is set to the name of the queue by the broker for the default exchange.
template.setRoutingKey(this.routingKey);
//Where we will synchronously receive messages from
template.setQueue(this.queueName);
template.setMessageConverter(new JsonMessageConverter());
return template;
}
public ConnectionFactory connectionFactory() {
SingleConnectionFactory connectionFactory = new SingleConnectionFactory("localhost");
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
return connectionFactory;
}
public void send(String s) {
this.rabbitTemplate.convertAndSend(s);
}
}
Now in order to Create the Consumer Config we had to do one trick, we need to create our Own MessageListner by extending SimpleMessageListenerContainer this is needed as the doStart() method is protected in SimpleMessageListenerContainer.
package com.milind.spring.amqp.direct;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
/**
*
* @author milind
*/
public class ConsumerSimpleMessageListenerContainer extends SimpleMessageListenerContainer{
public void startConsumers() throws Exception {
super.doStart();
}
}
Now we can create first the Consumer Handler which will handle the incoming message.
package com.milind.spring.amqp.direct;
public class ConsumerHandler {
public void handleMessage(String text) {
System.out.println("Received--------------------------: " + text);
}
}
Finally creating the consumer configuration
package com.milind.spring.amqp.direct;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.connection.SingleConnectionFactory;
import org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter;
import org.springframework.amqp.support.converter.JsonMessageConverter;
/**
*
* @author milind
*/
public class ConsumerConfig {
private String queueName;
private String routingKey;
private int onOfConsumer;
public int getOnOfConsumer() {
return onOfConsumer;
}
public void setOnOfConsumer(int onOfConsumer) {
this.onOfConsumer = onOfConsumer;
}
public String getQueueName() {
return queueName;
}
public void setQueueName(String queueName) {
this.queueName = queueName;
}
public String getRoutingKey() {
return routingKey;
}
public void setRoutingKey(String routingKey) {
this.routingKey = routingKey;
}
public ConsumerConfig(String queueName, String routingKey, int onOfConsumer) throws Exception {
this.queueName = queueName;
this.routingKey = routingKey;
this.onOfConsumer = onOfConsumer;
ConsumerSimpleMessageListenerContainer container = new ConsumerSimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory());
container.setQueueName(this.queueName);
container.setConcurrentConsumers(this.onOfConsumer);
container.setMessageListener(new MessageListenerAdapter(new ConsumerHandler(), new JsonMessageConverter()));
container.startConsumers();
}
public ConnectionFactory connectionFactory() {
SingleConnectionFactory connectionFactory = new SingleConnectionFactory("localhost");
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
return connectionFactory;
}
}
Client/Testing:
package com.milind.spring.amqp.direct;
import java.util.concurrent.TimeUnit;
/**
*
* @author milind
*/
public class Test {
public static void main(String[] args) throws InterruptedException, Exception {
ProducerConfiguration producer = new ProducerConfiguration("q1", "q1");
ConsumerConfig consumer = new ConsumerConfig("q1", "q1", 5);
int cout = 0;
while (true) {
producer.send("Str: " + cout);
TimeUnit.SECONDS.sleep(2);
cout++;
}
}
}
Now start the rabbitmq server.
You can see the queues are created in rabbitMq : sudo rabbitmqctl list_queues
Listing queues ...
q1 0
...done.
Also checking how many consumers are created:
sudo rabbitmqctl list_consumers
Listing consumers ...
q1 <'rabbit@milind-laptop'.1065.0> amq.ctag-bJlPSH2rVzOV+BDUSWjonw== false
q1 <'rabbit@milind-laptop'.1060.0> amq.ctag-Z2Gq2S0a8i1m0RlvftUr5Q== false
q1 <'rabbit@milind-laptop'.1080.0> amq.ctag-0cZkWybV6LFDgWoBslI3qQ== false
q1 <'rabbit@milind-laptop'.1075.0> amq.ctag-riIoQhH7QcnGpZOeqY4hqA== false
q1 <'rabbit@milind-laptop'.1070.0> amq.ctag-zpOjTp7URsmwRDY7NeDB4w== false
...done.
Five are created as desired
and Finally consumers are getting the Message as below:
Received--------------------------: Str: 0
Received--------------------------: Str: 1
Received--------------------------: Str: 2
Received--------------------------: Str: 3
Nice Post Milind!!! It really helped me. As even my application requirements are similar to what you have stated.
ReplyDelete-Mitesh
very nice article ..was very useful for my project
ReplyDeleteThank you very much!!!
ReplyDeletehow to specify the exchange in this case ?
ReplyDeletevery helpful till now
ReplyDelete