Client -> Producer
public class Main {
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
String exchangeName = "logs";
channel.exchangeDeclare(exchangeName, BuiltinExchangeType.FANOUT);
int count = 0;
while (count < 100) {
String message = new Date().toString();
String output = String.format(" [#] Sending Log : %s", message);
System.out.println(output);
channel.basicPublish(exchangeName, "", null, message.getBytes());
Thread.sleep(1000);
count++;
}
channel.close();
connection.close();
}
}
Server -> Consumer
public class Main {
public static void main(String[] args) throws IOException, TimeoutException {
Thread thread1 = new Thread(new Runnable() {
@Override
public void run() {
try {
showLog();
} catch (Exception ex) {
ex.printStackTrace();
}
}
});
thread1.start();
Thread thread2 = new Thread(new Runnable() {
@Override
public void run() {
try {
countLong();
} catch (Exception ex) {
ex.printStackTrace();
}
}
});
thread2.start();
}
public static void showLog() throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
String exchangeName = "logs";
channel.exchangeDeclare(exchangeName, BuiltinExchangeType.FANOUT);
String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, exchangeName, "");
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "UTF-8");
String output = String.format(" [*] : %s", message);
System.out.println(output);
}
};
channel.basicConsume(queueName, true, consumer);
}
public static void countLong() throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
String exchangeName = "logs";
channel.exchangeDeclare(exchangeName, BuiltinExchangeType.FANOUT);
String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, exchangeName, "");
System.out.println(" [+] Waiting for messages. To exit press CTRL+C");
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
Statics.countLog++;
String output = String.format(" [+] : %d", Statics.countLog);
System.out.println(output);
}
};
channel.basicConsume(queueName, true, consumer);
}
}
References
https://www.rabbitmq.com/tutorials/tutorial-three-python.html
https://github.com/mhdr/RabbitMQSamples/tree/master/004_PublishSubscribe