Routing Sample in RabbitMQ


Server -> 2 Consumers

public class Main {

    public static void main(String[] args) {

        Thread thread1 = new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    Queue1();
                } catch (Exception ex) {
                    ex.printStackTrace();
                }
            }
        });

        thread1.start();

        Thread thread2 = new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    Queue1();
                } catch (Exception ex) {
                    ex.printStackTrace();
                }
            }
        });

        thread2.start();
    }


    /**
     * get all routine keys
     */
    public static void Queue1() throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");

        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        String exchangeName = "ex01";
        channel.exchangeDeclare(exchangeName, BuiltinExchangeType.DIRECT);

        String queueName = channel.queueDeclare().getQueue();

        channel.queueBind(queueName, exchangeName, "warning");
        channel.queueBind(queueName, exchangeName, "info");
        channel.queueBind(queueName, exchangeName, "error");

        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope,
                                       AMQP.BasicProperties properties, byte[] body) throws IOException {

                String message = new String(body, "UTF-8");
                System.out.println(" [x] Received '" + envelope.getRoutingKey() + "':'" + message + "'");

            }
        };

        channel.basicConsume(queueName, true, consumer);
    }

    /**
     * get routine key : error
     */
    public static void Queue2() throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");

        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        String exchangeName = "ex01";
        channel.exchangeDeclare(exchangeName, BuiltinExchangeType.DIRECT);

        String queueName = channel.queueDeclare().getQueue();

        channel.queueBind(queueName, exchangeName, "error");

        System.out.println(" [***] Waiting for messages. To exit press CTRL+C");

        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope,
                                       AMQP.BasicProperties properties, byte[] body) throws IOException {

                String message = new String(body, "UTF-8");
                System.out.println(" [xxx] Received '" + envelope.getRoutingKey() + "':'" + message + "'");

            }
        };

        channel.basicConsume(queueName, true, consumer);
    }
}

Client -> Producer

public class Main {

    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");

        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        String exchangeName = "ex01";

        channel.exchangeDeclare(exchangeName, BuiltinExchangeType.DIRECT);

        int count=0;

        while (count<1000)
        {

            String message=new Date().toString();
            String routingKey=getNextSeverity();
            channel.basicPublish(exchangeName,routingKey,null,message.getBytes("UTF-8"));

            System.out.println(" [x] Sent '" + routingKey + "':'" + message + "'");

            Thread.sleep(1000);
            count ++;
        }

        channel.close();
        connection.close();

    }

    public static String getNextSeverity() {

        int rand = new Random().nextInt(3);

        if (rand == 0) {
            return "warning";
        } else if (rand == 1) {
            return "info";
        } else if (rand == 2) {
            return "error";
        }

        return "error";
    }
}

References
https://www.rabbitmq.com/tutorials/tutorial-four-java.html