Server -> 2 Consumers
public class Main { public static void main(String[] args) { Thread thread1 = new Thread(new Runnable() { @Override public void run() { try { Queue1(); } catch (Exception ex) { ex.printStackTrace(); } } }); thread1.start(); Thread thread2 = new Thread(new Runnable() { @Override public void run() { try { Queue1(); } catch (Exception ex) { ex.printStackTrace(); } } }); thread2.start(); } /** * get all routine keys */ public static void Queue1() throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); String exchangeName = "ex01"; channel.exchangeDeclare(exchangeName, BuiltinExchangeType.DIRECT); String queueName = channel.queueDeclare().getQueue(); channel.queueBind(queueName, exchangeName, "warning"); channel.queueBind(queueName, exchangeName, "info"); channel.queueBind(queueName, exchangeName, "error"); System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println(" [x] Received '" + envelope.getRoutingKey() + "':'" + message + "'"); } }; channel.basicConsume(queueName, true, consumer); } /** * get routine key : error */ public static void Queue2() throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); String exchangeName = "ex01"; channel.exchangeDeclare(exchangeName, BuiltinExchangeType.DIRECT); String queueName = channel.queueDeclare().getQueue(); channel.queueBind(queueName, exchangeName, "error"); System.out.println(" [***] Waiting for messages. To exit press CTRL+C"); Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println(" [xxx] Received '" + envelope.getRoutingKey() + "':'" + message + "'"); } }; channel.basicConsume(queueName, true, consumer); } }
Client -> Producer
public class Main { public static void main(String[] args) throws IOException, TimeoutException, InterruptedException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); String exchangeName = "ex01"; channel.exchangeDeclare(exchangeName, BuiltinExchangeType.DIRECT); int count=0; while (count<1000) { String message=new Date().toString(); String routingKey=getNextSeverity(); channel.basicPublish(exchangeName,routingKey,null,message.getBytes("UTF-8")); System.out.println(" [x] Sent '" + routingKey + "':'" + message + "'"); Thread.sleep(1000); count ++; } channel.close(); connection.close(); } public static String getNextSeverity() { int rand = new Random().nextInt(3); if (rand == 0) { return "warning"; } else if (rand == 1) { return "info"; } else if (rand == 2) { return "error"; } return "error"; } }