Install RabbitMQ on Ubuntu 16.04

Adding repository entry

wget https://packages.erlang-solutions.com/erlang-solutions_1.0_all.deb
sudo dpkg -i erlang-solutions_1.0_all.deb

Installing Erlang

sudo apt-get update
sudo apt-get install erlang

Install RabbitMQ

echo "deb https://dl.bintray.com/rabbitmq/debian xenial main" | sudo tee /etc/apt/sources.list.d/bintray.rabbitmq.list
wget -O- https://dl.bintray.com/rabbitmq/Keys/rabbitmq-release-signing-key.asc |
     sudo apt-key add -
wget -O- https://www.rabbitmq.com/rabbitmq-release-signing-key.asc | sudo apt-key add -
sudo apt-get update
sudo apt-get install rabbitmq-server

References
https://packages.erlang-solutions.com/erlang/#tabs-debian
https://www.rabbitmq.com/install-debian.html

Remote procedure call (RPC) Sample in RabbitMQ

Client

public class Main {


    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {


        int count = 0;

        while (count < 100) {
            String time = getTime();
            System.out.println(time);

            Thread.sleep(1000);
            count++;
        }

    }

    private static String getTime() throws IOException, InterruptedException, TimeoutException {

        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");

        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        String requestQueueName = "rpc01";
        String replyQueueName = channel.queueDeclare().getQueue();

        String corrId = UUID.randomUUID().toString();

        AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
                .correlationId(corrId)
                .replyTo(replyQueueName)
                .build();

        String message = "Mahmood";

        // use to block until there is a response available
        BlockingQueue<String> response = new ArrayBlockingQueue<String>(1);

        channel.basicPublish("", requestQueueName, properties, message.getBytes("UTF-8"));

        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope,
                                       AMQP.BasicProperties properties, byte[] body) throws IOException {

                if (properties.getCorrelationId().equals(corrId)) {
                    // save in the response queue
                    // so the queue can release the blockage
                    response.offer(new String(body, "UTF-8"));
                }
            }
        };

        channel.basicConsume(replyQueueName, true, consumer);

        // block until there is a response available
        String result=response.take();

        channel.close();
        connection.close();

        return result;
    }

}

Server

public class Main {

    private static String requestQueueName = "rpc01";

    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");

        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.queueDeclare(requestQueueName, false, false, false, null);
        channel.basicQos(1);

        System.out.println(" [x] Awaiting RPC requests");

        Consumer consumer = new DefaultConsumer(channel) {

            @Override
            public void handleDelivery(String consumerTag, Envelope envelope,
                                       AMQP.BasicProperties properties, byte[] body) throws IOException {

                String responseQueueName = properties.getReplyTo();
                String correlationId = properties.getCorrelationId();

                System.out.println(String.format("CorrelationId: %s, Response Queue: %s", correlationId, responseQueueName));

                String message = new String(body, "UTF-8");
                String response = String.format("Hello %s, Time is : %s", message, new Date().toString());


                AMQP.BasicProperties responseProperties = new AMQP.BasicProperties().builder()
                        .correlationId(correlationId)
                        .build();

                channel.basicPublish("", responseQueueName, responseProperties, response.getBytes("UTF-8"));
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        };

        channel.basicConsume(requestQueueName, false, consumer);
    }

}

References
https://www.rabbitmq.com/tutorials/tutorial-six-java.html
https://github.com/mhdr/RabbitMQSamples/tree/master/006_RPC

Routing Sample in RabbitMQ


Server -> 2 Consumers

public class Main {

    public static void main(String[] args) {

        Thread thread1 = new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    Queue1();
                } catch (Exception ex) {
                    ex.printStackTrace();
                }
            }
        });

        thread1.start();

        Thread thread2 = new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    Queue1();
                } catch (Exception ex) {
                    ex.printStackTrace();
                }
            }
        });

        thread2.start();
    }


    /**
     * get all routine keys
     */
    public static void Queue1() throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");

        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        String exchangeName = "ex01";
        channel.exchangeDeclare(exchangeName, BuiltinExchangeType.DIRECT);

        String queueName = channel.queueDeclare().getQueue();

        channel.queueBind(queueName, exchangeName, "warning");
        channel.queueBind(queueName, exchangeName, "info");
        channel.queueBind(queueName, exchangeName, "error");

        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope,
                                       AMQP.BasicProperties properties, byte[] body) throws IOException {

                String message = new String(body, "UTF-8");
                System.out.println(" [x] Received '" + envelope.getRoutingKey() + "':'" + message + "'");

            }
        };

        channel.basicConsume(queueName, true, consumer);
    }

    /**
     * get routine key : error
     */
    public static void Queue2() throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");

        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        String exchangeName = "ex01";
        channel.exchangeDeclare(exchangeName, BuiltinExchangeType.DIRECT);

        String queueName = channel.queueDeclare().getQueue();

        channel.queueBind(queueName, exchangeName, "error");

        System.out.println(" [***] Waiting for messages. To exit press CTRL+C");

        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope,
                                       AMQP.BasicProperties properties, byte[] body) throws IOException {

                String message = new String(body, "UTF-8");
                System.out.println(" [xxx] Received '" + envelope.getRoutingKey() + "':'" + message + "'");

            }
        };

        channel.basicConsume(queueName, true, consumer);
    }
}

Client -> Producer

public class Main {

    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");

        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        String exchangeName = "ex01";

        channel.exchangeDeclare(exchangeName, BuiltinExchangeType.DIRECT);

        int count=0;

        while (count<1000)
        {

            String message=new Date().toString();
            String routingKey=getNextSeverity();
            channel.basicPublish(exchangeName,routingKey,null,message.getBytes("UTF-8"));

            System.out.println(" [x] Sent '" + routingKey + "':'" + message + "'");

            Thread.sleep(1000);
            count ++;
        }

        channel.close();
        connection.close();

    }

    public static String getNextSeverity() {

        int rand = new Random().nextInt(3);

        if (rand == 0) {
            return "warning";
        } else if (rand == 1) {
            return "info";
        } else if (rand == 2) {
            return "error";
        }

        return "error";
    }
}

References
https://www.rabbitmq.com/tutorials/tutorial-four-java.html

Publish Subscribe Sample in RabbitMQ

Client -> Producer

public class Main {

    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");

        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        String exchangeName = "logs";

        channel.exchangeDeclare(exchangeName, BuiltinExchangeType.FANOUT);

        int count = 0;

        while (count < 100) {

            String message = new Date().toString();

            String output = String.format(" [#] Sending Log : %s", message);
            System.out.println(output);

            channel.basicPublish(exchangeName, "", null, message.getBytes());

            Thread.sleep(1000);
            count++;
        }

        channel.close();
        connection.close();
    }
}

Server -> Consumer

public class Main {

    public static void main(String[] args) throws IOException, TimeoutException {
        Thread thread1 = new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    showLog();
                } catch (Exception ex) {
                    ex.printStackTrace();
                }
            }
        });
        thread1.start();

        Thread thread2 = new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    countLong();
                } catch (Exception ex) {
                    ex.printStackTrace();
                }
            }
        });
        thread2.start();
    }

    public static void showLog() throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");

        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        String exchangeName = "logs";

        channel.exchangeDeclare(exchangeName, BuiltinExchangeType.FANOUT);
        String queueName = channel.queueDeclare().getQueue();
        channel.queueBind(queueName, exchangeName, "");

        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

        Consumer consumer = new DefaultConsumer(channel) {

            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body, "UTF-8");

                String output = String.format(" [*] : %s", message);
                System.out.println(output);
            }
        };

        channel.basicConsume(queueName, true, consumer);
    }

    public static void countLong() throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");

        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        String exchangeName = "logs";

        channel.exchangeDeclare(exchangeName, BuiltinExchangeType.FANOUT);
        String queueName = channel.queueDeclare().getQueue();
        channel.queueBind(queueName, exchangeName, "");

        System.out.println(" [+] Waiting for messages. To exit press CTRL+C");

        Consumer consumer = new DefaultConsumer(channel) {

            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {

                Statics.countLog++;

                String output = String.format(" [+] : %d", Statics.countLog);
                System.out.println(output);
            }
        };

        channel.basicConsume(queueName, true, consumer);
    }
}

References
https://www.rabbitmq.com/tutorials/tutorial-three-python.html
https://github.com/mhdr/RabbitMQSamples/tree/master/004_PublishSubscribe

Fair dispatch in RabbitMQ

Server : Consumer

public class Main {

    public static void main(String[] args) throws IOException, TimeoutException {
        String queueName = "queue2";

        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");

        Connection connection = factory.newConnection();

        Channel channel = connection.createChannel();

        channel.queueDeclare(queueName, true, false, false, null);
        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

        channel.basicQos(1);

        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message=new String(body,"UTF-8");

                try {
                    doWork(message);
                }
                catch (Exception ex)
                {
                    ex.printStackTrace();
                }
                finally {
                    channel.basicAck(envelope.getDeliveryTag(),false);
                }
            }
        };

        channel.basicConsume(queueName,false,consumer);
    }

    public static void doWork(String message) throws InterruptedException {
        System.out.println(" [x] Received '" + message + "'");

        int sleepTime=new Random().nextInt(2000);

        Thread.sleep(sleepTime);
    }
}

References
https://pupli.net/2017/07/16/work-queues-sample-in-rabbitmq/
https://www.rabbitmq.com/tutorials/tutorial-two-java.html

Message durability in RabbitMQ

Server : Consumer

public class Main {

    public static void main(String[] args) throws IOException, TimeoutException {
        String queueName = "queue2";

        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");

        Connection connection = factory.newConnection();

        Channel channel = connection.createChannel();

        channel.queueDeclare(queueName, true, false, false, null);
        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

        channel.basicQos(1);

        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message=new String(body,"UTF-8");

                try {
                    doWork(message);
                }
                catch (Exception ex)
                {
                    ex.printStackTrace();
                }
                finally {
                    channel.basicAck(envelope.getDeliveryTag(),false);
                }
            }
        };

        channel.basicConsume(queueName,false,consumer);
    }

    public static void doWork(String message) throws InterruptedException {
        System.out.println(" [x] Received '" + message + "'");

        int sleepTime=new Random().nextInt(2000);

        Thread.sleep(sleepTime);
    }
}

Client : Producer

public class Main {

    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {

        String queueName = "queue2";

        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");

        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.queueDeclare(queueName, true, false, false, null);

        int count = 0;

        while (count < 100) {

            Date date = new Date();

            String message = String.format("%d : %s", count, date.toString());

            channel.basicPublish("", queueName, MessageProperties.PERSISTENT_TEXT_PLAIN,
                    message.getBytes("UTF-8"));
            System.out.println(String.format(" [x] Sent %s", message));

            Thread.sleep(1000);
            count++;
        }
    }
}

References
https://pupli.net/2017/07/16/work-queues-sample-in-rabbitmq/
https://www.rabbitmq.com/tutorials/tutorial-two-java.html

Work Queues Sample in RabbitMQ

Server : Consumer

public class Main {

    public static void main(String[] args) throws IOException, TimeoutException {
        String queueName = "queue2";

        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");

        Connection connection = factory.newConnection();

        Channel channel = connection.createChannel();

        channel.queueDeclare(queueName, true, false, false, null);
        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

        channel.basicQos(1);

        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message=new String(body,"UTF-8");

                try {
                    doWork(message);
                }
                catch (Exception ex)
                {
                    ex.printStackTrace();
                }
                finally {
                    channel.basicAck(envelope.getDeliveryTag(),false);
                }
            }
        };

        channel.basicConsume(queueName,false,consumer);
    }

    public static void doWork(String message) throws InterruptedException {
        System.out.println(" [x] Received '" + message + "'");

        int sleepTime=new Random().nextInt(2000);

        Thread.sleep(sleepTime);
    }
}

Client : Producer

public class Main {

    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {

        String queueName = "queue2";

        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");

        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.queueDeclare(queueName, true, false, false, null);

        int count = 0;

        while (count < 100) {

            Date date = new Date();

            String message = String.format("%d : %s", count, date.toString());

            channel.basicPublish("", queueName, MessageProperties.PERSISTENT_TEXT_PLAIN,
                    message.getBytes("UTF-8"));
            System.out.println(String.format(" [x] Sent %s", message));

            Thread.sleep(1000);
            count++;
        }
    }
}

References
https://www.rabbitmq.com/tutorials/tutorial-two-java.html
https://github.com/mhdr/RabbitMQSamples/tree/master/002_WorkQueues

Hello World Sample in RabbitMQ

Server

public class Main {

    public static void main(String[] args) throws IOException, TimeoutException {
        String queueName="Hello";

        ConnectionFactory factory=new ConnectionFactory();
        factory.setHost("localhost");

        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.queueDeclare(queueName,false,false,false,null);

        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");


        Consumer consumer=new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {

                String message=new String(body,"UTF-8");
                System.out.println(" [x] Received '" + message + "'");
            }
        };

        channel.basicConsume(queueName,true,consumer);
    }
}

Client

public class Main {
    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {

        String queueName="Hello";

        ConnectionFactory factory=new ConnectionFactory();
        factory.setHost("localhost");

        Connection connection= factory.newConnection();
        Channel channel= connection.createChannel();

        channel.queueDeclare(queueName,false,false,false,null);


        int count=0;

        while (count<100){
            Date date=new Date();
            String message=String.format("Hello World : %s", date.toString());

            channel.basicPublish("",queueName,null,message.getBytes("UTF-8"));

            System.out.println(" [x] Sent '" + message + "'");
            count++;

            Thread.sleep(1000);
        }

        channel.close();
        connection.close();
    }
}

References
https://www.rabbitmq.com/tutorials/tutorial-one-java.html
https://github.com/mhdr/RabbitMQSamples/tree/master/001_HelloWorld