Remote procedure call (RPC) Sample in RabbitMQ

Client

public class Main {


    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {


        int count = 0;

        while (count < 100) {
            String time = getTime();
            System.out.println(time);

            Thread.sleep(1000);
            count++;
        }

    }

    private static String getTime() throws IOException, InterruptedException, TimeoutException {

        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");

        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        String requestQueueName = "rpc01";
        String replyQueueName = channel.queueDeclare().getQueue();

        String corrId = UUID.randomUUID().toString();

        AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
                .correlationId(corrId)
                .replyTo(replyQueueName)
                .build();

        String message = "Mahmood";

        // use to block until there is a response available
        BlockingQueue<String> response = new ArrayBlockingQueue<String>(1);

        channel.basicPublish("", requestQueueName, properties, message.getBytes("UTF-8"));

        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope,
                                       AMQP.BasicProperties properties, byte[] body) throws IOException {

                if (properties.getCorrelationId().equals(corrId)) {
                    // save in the response queue
                    // so the queue can release the blockage
                    response.offer(new String(body, "UTF-8"));
                }
            }
        };

        channel.basicConsume(replyQueueName, true, consumer);

        // block until there is a response available
        String result=response.take();

        channel.close();
        connection.close();

        return result;
    }

}

Server

public class Main {

    private static String requestQueueName = "rpc01";

    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");

        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.queueDeclare(requestQueueName, false, false, false, null);
        channel.basicQos(1);

        System.out.println(" [x] Awaiting RPC requests");

        Consumer consumer = new DefaultConsumer(channel) {

            @Override
            public void handleDelivery(String consumerTag, Envelope envelope,
                                       AMQP.BasicProperties properties, byte[] body) throws IOException {

                String responseQueueName = properties.getReplyTo();
                String correlationId = properties.getCorrelationId();

                System.out.println(String.format("CorrelationId: %s, Response Queue: %s", correlationId, responseQueueName));

                String message = new String(body, "UTF-8");
                String response = String.format("Hello %s, Time is : %s", message, new Date().toString());


                AMQP.BasicProperties responseProperties = new AMQP.BasicProperties().builder()
                        .correlationId(correlationId)
                        .build();

                channel.basicPublish("", responseQueueName, responseProperties, response.getBytes("UTF-8"));
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        };

        channel.basicConsume(requestQueueName, false, consumer);
    }

}

References
https://www.rabbitmq.com/tutorials/tutorial-six-java.html
https://github.com/mhdr/RabbitMQSamples/tree/master/006_RPC