Configure Spring AMQP to Connect to Multiple Broker

@SpringBootApplication(exclude = RabbitAutoConfiguration.class)
public class Application {

    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }

    @Bean
    CachingConnectionFactory cf1() {
        return new CachingConnectionFactory("localhost");
    }

    @Bean
    CachingConnectionFactory cf2() {
        return new CachingConnectionFactory("otherHost");
    }

    @Bean
    CachingConnectionFactory cf3() {
        return new CachingConnectionFactory("thirdHost");
    }

    @Bean
    SimpleRoutingConnectionFactory rcf(CachingConnectionFactory cf1,
            CachingConnectionFactory cf2, CachingConnectionFactory cf3) {

        SimpleRoutingConnectionFactory rcf = new SimpleRoutingConnectionFactory();
        rcf.setDefaultTargetConnectionFactory(cf1);
        rcf.setTargetConnectionFactories(Map.of("one", cf1, "two", cf2, "three", cf3));
        return rcf;
    }

    @Bean("factory1-admin")
    RabbitAdmin admin1(CachingConnectionFactory cf1) {
        return new RabbitAdmin(cf1);
    }

    @Bean("factory2-admin")
    RabbitAdmin admin2(CachingConnectionFactory cf2) {
        return new RabbitAdmin(cf2);
    }

    @Bean("factory3-admin")
    RabbitAdmin admin3(CachingConnectionFactory cf3) {
        return new RabbitAdmin(cf3);
    }

    @Bean
    public RabbitListenerEndpointRegistry rabbitListenerEndpointRegistry() {
        return new RabbitListenerEndpointRegistry();
    }

    @Bean
    public RabbitListenerAnnotationBeanPostProcessor postProcessor(RabbitListenerEndpointRegistry registry) {
        MultiRabbitListenerAnnotationBeanPostProcessor postProcessor
                = new MultiRabbitListenerAnnotationBeanPostProcessor();
        postProcessor.setEndpointRegistry(registry);
        postProcessor.setContainerFactoryBeanName("defaultContainerFactory");
        return postProcessor;
    }

    @Bean
    public SimpleRabbitListenerContainerFactory factory1(CachingConnectionFactory cf1) {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(cf1);
        return factory;
    }

    @Bean
    public SimpleRabbitListenerContainerFactory factory2(CachingConnectionFactory cf2) {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(cf2);
        return factory;
    }

    @Bean
    public SimpleRabbitListenerContainerFactory factory3(CachingConnectionFactory cf3) {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(cf3);
        return factory;
    }

    @Bean
    RabbitTemplate template(SimpleRoutingConnectionFactory rcf) {
        return new RabbitTemplate(rcf);
    }

    @Bean
    ConnectionFactoryContextWrapper wrapper(SimpleRoutingConnectionFactory rcf) {
        return new ConnectionFactoryContextWrapper(rcf);
    }

}

@Component
class Listeners {

    @RabbitListener(queuesToDeclare = @Queue("q1"), containerFactory = "factory1")
    public void listen1(String in) {

    }

    @RabbitListener(queuesToDeclare = @Queue("q2"), containerFactory = "factory2")
    public void listen2(String in) {

    }

    @RabbitListener(queuesToDeclare = @Queue("q3"), containerFactory = "factory3")
    public void listen3(String in) {

    }

}

References
https://docs.spring.io/spring-amqp/docs/current/reference/html/#multi-rabbit

Configure Spring AMQP to Receive Messages Asynchronously from Queues

RabbitConfiguration.java

@Bean
public SimpleMessageListenerContainer messageListenerContainer() {
    // configure listener to receive messages from queues
    SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
    container.setConnectionFactory(connectionFactory());
    container.setConcurrentConsumers(8);
    container.setMaxConcurrentConsumers(32);
    // listen to queues
    container.setQueueNames("MonitoringV5_Queue1", "MonitoringV5_Queue2");
    container.setMessageListener(new RabbitMessageListener());
    return container;
}
public class RabbitMessageListener implements MessageListener {
    @Override
    public void onMessage(Message message) {
        switch (message.getMessageProperties().getConsumerQueue()) {
            case "MonitoringV5_Queue1" -> Queue1(message);
            case "MonitoringV5_Queue2" -> Queue2(message);
        }
    }

    private void Queue1(Message message) {
        String str=new String(message.getBody());
        System.out.println(str);
        Gson gson = new Gson();
        ReadValueDto value= gson.fromJson(str,ReadValueDto.class);
    }

    private void Queue2(Message message) {
        String str=new String(message.getBody());
        System.out.println(str);
        Gson gson = new Gson();
        ReadValueDto value= gson.fromJson(str,ReadValueDto.class);
    }
}

If your callback logic depends on the AMQP Channel instance for any reason, you may instead use the ChannelAwareMessageListener. It looks similar but has an extra parameter.

References
https://docs.spring.io/spring-amqp/docs/current/reference/html/#message-listener

Configure RabbitTemplate to use Spring Retry Policy

@Bean
public RabbitTemplate rabbitTemplate() {
    RabbitTemplate template = new RabbitTemplate(connectionFactory());
    // configure template to use retry policy
    RetryTemplate retryTemplate = new RetryTemplate();
    ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy();
    backOffPolicy.setInitialInterval(500);
    backOffPolicy.setMultiplier(10.0);
    backOffPolicy.setMaxInterval(10000);
    retryTemplate.setBackOffPolicy(backOffPolicy);
    template.setRetryTemplate(retryTemplate);
    return template;
}

References
https://docs.spring.io/spring-amqp/docs/current/reference/html/#template-retry
https://www.baeldung.com/spring-retry

Configure Spring AMQP to Connect to RabbitMQ using Credential

@Configuration
public class RabbitConfiguration {

    @Bean
    public CachingConnectionFactory connectionFactory() {
        // configure to connect with credential
        ConfigFile configFile = new ConfigFile();
        CachingConnectionFactory connectionFactory =
                new CachingConnectionFactory(configFile.getRabbitmqHost());
        connectionFactory.setUsername(configFile.getRabbitmqUserName());
        connectionFactory.setPassword(configFile.getRabbitmqPassword());
        return connectionFactory;
    }

    @Bean
    public RabbitAdmin amqpAdmin() {
        return new RabbitAdmin(connectionFactory());
    }

    @Bean
    public RabbitTemplate rabbitTemplate() {
        RabbitTemplate template = new RabbitTemplate(connectionFactory());
        return template;
    }

    @Bean
    public Queue MonitoringV5_Queue1() {
        // declare queue1
        return new Queue("MonitoringV5_Queue1");
    }

    @Bean
    public Queue MonitoringV5_Queue2() {
        // declare queue2
        return new Queue("MonitoringV5_Queue2");
    }
}

References
https://docs.spring.io/spring-amqp/docs/current/reference/html/#cachingconnectionfactory

Install RabbitMQ using Quick Start Script on Ubuntu 20.04

Cloudsmith Quick Start Script

#!/usr/bin/sh

sudo apt-get install curl gnupg apt-transport-https -y

## Team RabbitMQ's main signing key
curl -1sLf "https://keys.openpgp.org/vks/v1/by-fingerprint/0A9AF2115F4687BD29803A206B73A36E6026DFCA" | sudo gpg --dearmor | sudo tee /usr/share/keyrings/com.rabbitmq.team.gpg > /dev/null
## Cloudsmith: modern Erlang repository
curl -1sLf https://dl.cloudsmith.io/public/rabbitmq/rabbitmq-erlang/gpg.E495BB49CC4BBE5B.key | sudo gpg --dearmor | sudo tee /usr/share/keyrings/io.cloudsmith.rabbitmq.E495BB49CC4BBE5B.gpg > /dev/null
## Cloudsmith: RabbitMQ repository
curl -1sLf https://dl.cloudsmith.io/public/rabbitmq/rabbitmq-server/gpg.9F4587F226208342.key | sudo gpg --dearmor | sudo tee /usr/share/keyrings/io.cloudsmith.rabbitmq.9F4587F226208342.gpg > /dev/null

## Add apt repositories maintained by Team RabbitMQ
sudo tee /etc/apt/sources.list.d/rabbitmq.list <<EOF
## Provides modern Erlang/OTP releases
##
deb [signed-by=/usr/share/keyrings/io.cloudsmith.rabbitmq.E495BB49CC4BBE5B.gpg] https://dl.cloudsmith.io/public/rabbitmq/rabbitmq-erlang/deb/ubuntu bionic main
deb-src [signed-by=/usr/share/keyrings/io.cloudsmith.rabbitmq.E495BB49CC4BBE5B.gpg] https://dl.cloudsmith.io/public/rabbitmq/rabbitmq-erlang/deb/ubuntu bionic main

## Provides RabbitMQ
##
deb [signed-by=/usr/share/keyrings/io.cloudsmith.rabbitmq.9F4587F226208342.gpg] https://dl.cloudsmith.io/public/rabbitmq/rabbitmq-server/deb/ubuntu bionic main
deb-src [signed-by=/usr/share/keyrings/io.cloudsmith.rabbitmq.9F4587F226208342.gpg] https://dl.cloudsmith.io/public/rabbitmq/rabbitmq-server/deb/ubuntu bionic main
EOF

## Update package indices
sudo apt-get update -y

## Install Erlang packages
sudo apt-get install -y erlang-base \
                        erlang-asn1 erlang-crypto erlang-eldap erlang-ftp erlang-inets \
                        erlang-mnesia erlang-os-mon erlang-parsetools erlang-public-key \
                        erlang-runtime-tools erlang-snmp erlang-ssl \
                        erlang-syntax-tools erlang-tftp erlang-tools erlang-xmerl

## Install rabbitmq-server and its dependencies
sudo apt-get install rabbitmq-server -y --fix-missing

PackageCloud Quick Start Script

#!/usr/bin/sh

sudo apt-get install curl gnupg apt-transport-https -y

## Team RabbitMQ's main signing key
curl -1sLf "https://keys.openpgp.org/vks/v1/by-fingerprint/0A9AF2115F4687BD29803A206B73A36E6026DFCA" | sudo gpg --dearmor | sudo tee /usr/share/keyrings/com.rabbitmq.team.gpg > /dev/null
## Launchpad PPA that provides modern Erlang releases
curl -1sLf "https://keyserver.ubuntu.com/pks/lookup?op=get&search=0xf77f1eda57ebb1cc" | sudo gpg --dearmor | sudo tee /usr/share/keyrings/net.launchpad.ppa.rabbitmq.erlang.gpg > /dev/null
## PackageCloud RabbitMQ repository
curl -1sLf "https://packagecloud.io/rabbitmq/rabbitmq-server/gpgkey" | sudo gpg --dearmor | sudo tee /usr/share/keyrings/io.packagecloud.rabbitmq.gpg > /dev/null

## Add apt repositories maintained by Team RabbitMQ
sudo tee /etc/apt/sources.list.d/rabbitmq.list <<EOF
## Provides modern Erlang/OTP releases
##
## "bionic" as distribution name should work for any reasonably recent Ubuntu or Debian release.
## See the release to distribution mapping table in RabbitMQ doc guides to learn more.
deb [signed-by=/usr/share/keyrings/net.launchpad.ppa.rabbitmq.erlang.gpg] http://ppa.launchpad.net/rabbitmq/rabbitmq-erlang/ubuntu bionic main
deb-src [signed-by=/usr/share/keyrings/net.launchpad.ppa.rabbitmq.erlang.gpg] http://ppa.launchpad.net/rabbitmq/rabbitmq-erlang/ubuntu bionic main

## Provides RabbitMQ
##
## "bionic" as distribution name should work for any reasonably recent Ubuntu or Debian release.
## See the release to distribution mapping table in RabbitMQ doc guides to learn more.
deb [signed-by=/usr/share/keyrings/io.packagecloud.rabbitmq.gpg] https://packagecloud.io/rabbitmq/rabbitmq-server/ubuntu/ bionic main
deb-src [signed-by=/usr/share/keyrings/io.packagecloud.rabbitmq.gpg] https://packagecloud.io/rabbitmq/rabbitmq-server/ubuntu/ bionic main
EOF

## Update package indices
sudo apt-get update -y

## Install Erlang packages
sudo apt-get install -y erlang-base \
                        erlang-asn1 erlang-crypto erlang-eldap erlang-ftp erlang-inets \
                        erlang-mnesia erlang-os-mon erlang-parsetools erlang-public-key \
                        erlang-runtime-tools erlang-snmp erlang-ssl \
                        erlang-syntax-tools erlang-tftp erlang-tools erlang-xmerl

## Install rabbitmq-server and its dependencies
sudo apt-get install rabbitmq-server -y --fix-missing

References
https://www.rabbitmq.com/install-debian.html#apt-quick-start-cloudsmith
https://www.rabbitmq.com/install-debian.html#apt-quick-start-packagecloud

Reset the RabbitMQ broker

In some cases, you just need to wipe everything off from your RabbitMQ server and start over. Despite being extremely stable, in some rare circumstances, you will need to completely reset RabbitMQ after encountering errors such as corrupt data, nodes permanently separated from the cluster, or an unmanageably large broker.

Recreate the virtual host from the command line

rabbitmqctl delete_vhost /
rabbitmqctl add_vhost /
rabbitmqctl set_permissions -p / monitoring ".*" ".*" ".*"

 

References
https://www.cloudamqp.com/blog/how-to-reset-the-rabbitmq-broker.html

Install RabbitMQ on Ubuntu 20.04

Install Erlang

wget -O- https://packages.erlang-solutions.com/ubuntu/erlang_solutions.asc | sudo apt-key add -
echo "deb https://packages.erlang-solutions.com/ubuntu focal contrib" | sudo tee /etc/apt/sources.list.d/rabbitmq.list
sudo apt update
sudo apt install erlang

Install RabbitMQ

sudo apt update && sudo apt install wget -y
sudo apt install apt-transport-https -y
wget -O- https://dl.bintray.com/rabbitmq/Keys/rabbitmq-release-signing-key.asc | sudo apt-key add -
wget -O- https://www.rabbitmq.com/rabbitmq-release-signing-key.asc | sudo apt-key add -
echo "deb https://dl.bintray.com/rabbitmq-erlang/debian focal erlang-22.x" | sudo tee /etc/apt/sources.list.d/rabbitmq.list
sudo apt update
sudo apt install rabbitmq-server
sudo systemctl enable rabbitmq-server

RabbitMQ Management Dashboard

sudo rabbitmq-plugins enable rabbitmq_management

To be able to login on the network, create an admin user like below:

rabbitmqctl add_user admin StrongPassword
rabbitmqctl set_user_tags admin administrator

References
https://computingforgeeks.com/how-to-install-latest-erlang-on-ubuntu-linux/
https://computingforgeeks.com/how-to-install-latest-rabbitmq-server-on-ubuntu-linux/

Install RabbitMQ Server on Ubuntu 18.04

sudo apt-key adv --keyserver "hkps.pool.sks-keyservers.net" --recv-keys "0x6B73A36E6026DFCA"
nano /etc/apt/sources.list.d/bintray.rabbitmq.list
deb https://dl.bintray.com/rabbitmq-erlang/debian bionic erlang
deb https://dl.bintray.com/rabbitmq/debian bionic main
sudo apt-get update
sudo apt-get install rabbitmq-server
sudo systemctl status rabbitmq-server.service
sudo systemctl start rabbitmq-server.service
sudo systemctl enable rabbitmq-server

In case of error install latest erlang from

https://www.erlang-solutions.com/resources/download.html

References
https://www.fosslinux.com/6339/how-to-install-rabbitmq-server-on-ubuntu-18-04-lts.htm

Install RabbitMQ on Ubuntu 16.04

Adding repository entry

wget https://packages.erlang-solutions.com/erlang-solutions_1.0_all.deb
sudo dpkg -i erlang-solutions_1.0_all.deb

Installing Erlang

sudo apt-get update
sudo apt-get install erlang

Install RabbitMQ

echo "deb https://dl.bintray.com/rabbitmq/debian xenial main" | sudo tee /etc/apt/sources.list.d/bintray.rabbitmq.list
wget -O- https://dl.bintray.com/rabbitmq/Keys/rabbitmq-release-signing-key.asc |
     sudo apt-key add -
wget -O- https://www.rabbitmq.com/rabbitmq-release-signing-key.asc | sudo apt-key add -
sudo apt-get update
sudo apt-get install rabbitmq-server

References
https://packages.erlang-solutions.com/erlang/#tabs-debian
https://www.rabbitmq.com/install-debian.html

Remote procedure call (RPC) Sample in RabbitMQ

Client

public class Main {


    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {


        int count = 0;

        while (count < 100) {
            String time = getTime();
            System.out.println(time);

            Thread.sleep(1000);
            count++;
        }

    }

    private static String getTime() throws IOException, InterruptedException, TimeoutException {

        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");

        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        String requestQueueName = "rpc01";
        String replyQueueName = channel.queueDeclare().getQueue();

        String corrId = UUID.randomUUID().toString();

        AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
                .correlationId(corrId)
                .replyTo(replyQueueName)
                .build();

        String message = "Mahmood";

        // use to block until there is a response available
        BlockingQueue<String> response = new ArrayBlockingQueue<String>(1);

        channel.basicPublish("", requestQueueName, properties, message.getBytes("UTF-8"));

        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope,
                                       AMQP.BasicProperties properties, byte[] body) throws IOException {

                if (properties.getCorrelationId().equals(corrId)) {
                    // save in the response queue
                    // so the queue can release the blockage
                    response.offer(new String(body, "UTF-8"));
                }
            }
        };

        channel.basicConsume(replyQueueName, true, consumer);

        // block until there is a response available
        String result=response.take();

        channel.close();
        connection.close();

        return result;
    }

}

Server

public class Main {

    private static String requestQueueName = "rpc01";

    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");

        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.queueDeclare(requestQueueName, false, false, false, null);
        channel.basicQos(1);

        System.out.println(" [x] Awaiting RPC requests");

        Consumer consumer = new DefaultConsumer(channel) {

            @Override
            public void handleDelivery(String consumerTag, Envelope envelope,
                                       AMQP.BasicProperties properties, byte[] body) throws IOException {

                String responseQueueName = properties.getReplyTo();
                String correlationId = properties.getCorrelationId();

                System.out.println(String.format("CorrelationId: %s, Response Queue: %s", correlationId, responseQueueName));

                String message = new String(body, "UTF-8");
                String response = String.format("Hello %s, Time is : %s", message, new Date().toString());


                AMQP.BasicProperties responseProperties = new AMQP.BasicProperties().builder()
                        .correlationId(correlationId)
                        .build();

                channel.basicPublish("", responseQueueName, responseProperties, response.getBytes("UTF-8"));
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        };

        channel.basicConsume(requestQueueName, false, consumer);
    }

}

References
https://www.rabbitmq.com/tutorials/tutorial-six-java.html
https://github.com/mhdr/RabbitMQSamples/tree/master/006_RPC