Client
public class Main { public static void main(String[] args) throws IOException, TimeoutException, InterruptedException { int count = 0; while (count < 100) { String time = getTime(); System.out.println(time); Thread.sleep(1000); count++; } } private static String getTime() throws IOException, InterruptedException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); String requestQueueName = "rpc01"; String replyQueueName = channel.queueDeclare().getQueue(); String corrId = UUID.randomUUID().toString(); AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder() .correlationId(corrId) .replyTo(replyQueueName) .build(); String message = "Mahmood"; // use to block until there is a response available BlockingQueue<String> response = new ArrayBlockingQueue<String>(1); channel.basicPublish("", requestQueueName, properties, message.getBytes("UTF-8")); Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { if (properties.getCorrelationId().equals(corrId)) { // save in the response queue // so the queue can release the blockage response.offer(new String(body, "UTF-8")); } } }; channel.basicConsume(replyQueueName, true, consumer); // block until there is a response available String result=response.take(); channel.close(); connection.close(); return result; } }
Server
public class Main { private static String requestQueueName = "rpc01"; public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(requestQueueName, false, false, false, null); channel.basicQos(1); System.out.println(" [x] Awaiting RPC requests"); Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String responseQueueName = properties.getReplyTo(); String correlationId = properties.getCorrelationId(); System.out.println(String.format("CorrelationId: %s, Response Queue: %s", correlationId, responseQueueName)); String message = new String(body, "UTF-8"); String response = String.format("Hello %s, Time is : %s", message, new Date().toString()); AMQP.BasicProperties responseProperties = new AMQP.BasicProperties().builder() .correlationId(correlationId) .build(); channel.basicPublish("", responseQueueName, responseProperties, response.getBytes("UTF-8")); channel.basicAck(envelope.getDeliveryTag(), false); } }; channel.basicConsume(requestQueueName, false, consumer); } }
References
https://www.rabbitmq.com/tutorials/tutorial-six-java.html
https://github.com/mhdr/RabbitMQSamples/tree/master/006_RPC