Parse JSON file with GSON

Gson gson = new Gson();
JsonReader reader = new JsonReader(new FileReader(filename));
List<Review> data = gson.fromJson(reader, REVIEW_TYPE);

Or

Write

String filename = "myfile.txt";

Vector v = new Vector(10.0f, 20.0f);
Gson gson = new Gson();
String s = gson.toJson(v);

FileOutputStream outputStream;

try {
  outputStream = openFileOutput(filename, Context.MODE_PRIVATE);
  outputStream.write(s.getBytes());
  outputStream.close();
} catch (Exception e) {
  e.printStackTrace();
}

Read

FileInputStream fis = context.openFileInput("myfile.txt", Context.MODE_PRIVATE);
 InputStreamReader isr = new InputStreamReader(fis);
 BufferedReader bufferedReader = new BufferedReader(isr);
 StringBuilder sb = new StringBuilder();
 String line;
 while ((line = bufferedReader.readLine()) != null) {
     sb.append(line);
 }

 String json = sb.toString();
 Gson gson = new Gson();
 Vector v = gson.fromJson(json, Vector.class);

References
https://stackoverflow.com/questions/29965764/how-to-parse-json-file-with-gson
https://stackoverflow.com/questions/19459082/read-and-write-data-with-gson

Work with BlockingQueue in Java

Unbounded Queue

BlockingQueue<String> blockingQueue = new LinkedBlockingDeque<>();

Bounded Queue

BlockingQueue<String> blockingQueue = new LinkedBlockingDeque<>(10);

Adding Elements

  • add() – returns true if insertion was successful, otherwise throws an IllegalStateException
  • put() – inserts the specified element into a queue, waiting for a free slot if necessary
  • offer() – returns true if insertion was successful, otherwise false
  • offer(E e, long timeout, TimeUnit unit) – tries to insert element into a queue and waits for an available slot within a specified timeout

Retrieving Elements

  • take() – waits for a head element of a queue and removes it. If the queue is empty, it blocks and waits for an element to become available
  • poll(long timeout, TimeUnit unit) – retrieves and removes the head of the queue, waiting up to the specified wait time if necessary for an element to become available. Returns null after a timeout

References
http://www.baeldung.com/java-blocking-queue
https://pupli.net/2017/07/24/remote-procedure-call-rpc-sample-in-rabbitmq/

Remote procedure call (RPC) Sample in RabbitMQ

Client

public class Main {


    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {


        int count = 0;

        while (count < 100) {
            String time = getTime();
            System.out.println(time);

            Thread.sleep(1000);
            count++;
        }

    }

    private static String getTime() throws IOException, InterruptedException, TimeoutException {

        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");

        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        String requestQueueName = "rpc01";
        String replyQueueName = channel.queueDeclare().getQueue();

        String corrId = UUID.randomUUID().toString();

        AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
                .correlationId(corrId)
                .replyTo(replyQueueName)
                .build();

        String message = "Mahmood";

        // use to block until there is a response available
        BlockingQueue<String> response = new ArrayBlockingQueue<String>(1);

        channel.basicPublish("", requestQueueName, properties, message.getBytes("UTF-8"));

        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope,
                                       AMQP.BasicProperties properties, byte[] body) throws IOException {

                if (properties.getCorrelationId().equals(corrId)) {
                    // save in the response queue
                    // so the queue can release the blockage
                    response.offer(new String(body, "UTF-8"));
                }
            }
        };

        channel.basicConsume(replyQueueName, true, consumer);

        // block until there is a response available
        String result=response.take();

        channel.close();
        connection.close();

        return result;
    }

}

Server

public class Main {

    private static String requestQueueName = "rpc01";

    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");

        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.queueDeclare(requestQueueName, false, false, false, null);
        channel.basicQos(1);

        System.out.println(" [x] Awaiting RPC requests");

        Consumer consumer = new DefaultConsumer(channel) {

            @Override
            public void handleDelivery(String consumerTag, Envelope envelope,
                                       AMQP.BasicProperties properties, byte[] body) throws IOException {

                String responseQueueName = properties.getReplyTo();
                String correlationId = properties.getCorrelationId();

                System.out.println(String.format("CorrelationId: %s, Response Queue: %s", correlationId, responseQueueName));

                String message = new String(body, "UTF-8");
                String response = String.format("Hello %s, Time is : %s", message, new Date().toString());


                AMQP.BasicProperties responseProperties = new AMQP.BasicProperties().builder()
                        .correlationId(correlationId)
                        .build();

                channel.basicPublish("", responseQueueName, responseProperties, response.getBytes("UTF-8"));
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        };

        channel.basicConsume(requestQueueName, false, consumer);
    }

}

References
https://www.rabbitmq.com/tutorials/tutorial-six-java.html
https://github.com/mhdr/RabbitMQSamples/tree/master/006_RPC

Configure Hibernate With MySQL in Gradle and do CRUD – JPA Persistence Way

Employee.java

import javax.persistence.*;

@Entity
@Table(name = "EMPLOYEE")
public class Employee {
    @Id
    @GeneratedValue
    @Column(name = "id")
    private int id;

    @Column(name = "first_name")
    private String firstName;

    @Column(name = "last_name")
    private String lastName;

    @Column(name = "salary")
    private int salary;

    public Employee() {
    }

    public Employee(String fname, String lname, int salary) {
        this.firstName = fname;
        this.lastName = lname;
        this.salary = salary;
    }

    public int getId() {
        return id;
    }

    public void setId(int id) {
        this.id = id;
    }

    public String getFirstName() {
        return firstName;
    }

    public void setFirstName(String first_name) {
        this.firstName = first_name;
    }

    public String getLastName() {
        return lastName;
    }

    public void setLastName(String last_name) {
        this.lastName = last_name;
    }

    public int getSalary() {
        return salary;
    }

    public void setSalary(int salary) {
        this.salary = salary;
    }
}

resources/META-INF/persistence.xml

<persistence xmlns="http://xmlns.jcp.org/xml/ns/persistence" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://xmlns.jcp.org/xml/ns/persistence

http://xmlns.jcp.org/xml/ns/persistence/persistence_2_1.xsd"

             version="2.1">

    <persistence-unit name="ir.mhdr.jpaDemo" transaction-type="RESOURCE_LOCAL">
        <provider>org.hibernate.jpa.HibernatePersistenceProvider</provider>
        <class>Employee</class>

        <properties>
            <!-- Configuring JDBC properties -->
            <property name="javax.persistence.jdbc.url" value="jdbc:mysql://localhost/testdb?useUnicode=yes&amp;characterEncoding=UTF-8&amp;useJDBCCompliantTimezoneShift=true&amp;useLegacyDatetimeCode=false&amp;serverTimezone=UTC" />
            <property name="javax.persistence.jdbc.user" value="root" />
            <property name="javax.persistence.jdbc.password" value="sampad622" />
            <property name="javax.persistence.jdbc.driver" value="com.mysql.jdbc.Driver" />

            <!-- Hibernate properties -->
            <property name="hibernate.show_sql" value="true" />
            <property name="hibernate.format_sql" value="true" />
            <property name="hibernate.dialect" value="org.hibernate.dialect.MySQL57Dialect" />
            <property name="hibernate.hbm2ddl.auto" value="create" />

            <!-- Configuring Connection Pool -->
            <property name="hibernate.c3p0.min_size" value="5" />
            <property name="hibernate.c3p0.max_size" value="20" />
            <property name="hibernate.c3p0.timeout" value="500" />
            <property name="hibernate.c3p0.max_statements" value="50" />
            <property name="hibernate.c3p0.idle_test_period" value="2000" />
        </properties>
    </persistence-unit>
</persistence>

EmployeeBL.java

import org.hibernate.Session;
import org.hibernate.SessionFactory;
import org.hibernate.query.criteria.internal.CriteriaUpdateImpl;

import javax.persistence.*;
import javax.persistence.criteria.CriteriaUpdate;
import javax.transaction.Transaction;
import java.util.Iterator;
import java.util.List;

public class EmployeeBL {

    private EntityManagerFactory factory;

    public EmployeeBL() {
        factory = Persistence.createEntityManagerFactory("ir.mhdr.jpaDemo");
    }

    public void close() {
        factory.close();
    }

    /* Method to CREATE an employee in the database */
    public void addEmployee(String fname, String lname, int salary) {
        EntityManager entityManager = factory.createEntityManager();
        EntityTransaction tx = null;

        try {
            tx = entityManager.getTransaction();
            tx.begin();

            Employee employee = new Employee(fname, lname, salary);
            entityManager.persist(employee);

            tx.commit();
        } catch (Exception ex) {

            if (tx != null) tx.rollback();
            ex.printStackTrace();

        } finally {

            entityManager.close();
        }
    }


    /* Method to  READ all the employees */
    public void listEmployees() {
        EntityManager entityManager = factory.createEntityManager();
        EntityTransaction tx = null;

        try {
            tx = entityManager.getTransaction();
            tx.begin();

            List employees = entityManager.createQuery("from Employee").getResultList();
            for (Iterator iterator =
                 employees.iterator(); iterator.hasNext(); ) {
                Employee employee = (Employee) iterator.next();
                System.out.print("First Name: " + employee.getFirstName());
                System.out.print("  Last Name: " + employee.getLastName());
                System.out.println("  Salary: " + employee.getSalary());
            }

            tx.commit();
        } catch (Exception ex) {
            if (tx != null) tx.rollback();
            ex.printStackTrace();
        } finally {

            entityManager.close();

        }
    }


    /* Method to UPDATE salary for an employee */
    public void updateEmployee(Integer EmployeeID, int salary) {

        EntityManager entityManager = factory.createEntityManager();
        EntityTransaction tx = null;

        try {
            tx = entityManager.getTransaction();
            tx.begin();

            TypedQuery<Employee> query = entityManager.createQuery("select emp from Employee emp where id=:id", Employee.class);
            query.setParameter("id", EmployeeID);
            Employee employee = query.getSingleResult();

            employee.setSalary(salary);

            entityManager.persist(employee);

            tx.commit();
        } catch (Exception ex) {
            if (tx != null) tx.rollback();
            ex.printStackTrace();
        } finally {
            entityManager.close();
        }
    }


    /* Method to DELETE an employee from the records */
    public void deleteEmployee(Integer EmployeeID) {

        EntityManager entityManager = factory.createEntityManager();
        EntityTransaction tx = null;

        try {

            tx = entityManager.getTransaction();
            tx.begin();

            Employee employee = entityManager.find(Employee.class, EmployeeID);
            entityManager.remove(employee);

            tx.commit();

        } catch (Exception ex) {
            if (tx != null) tx.rollback();
            ex.printStackTrace();
        } finally {
            entityManager.close();
        }
    }
}

Main.java

public class Main {


    public static void main(String[] args)
    {
        EmployeeBL employeeBL=new EmployeeBL();
        employeeBL.addEmployee("Mahmood","Ramzani",1000);
        employeeBL.close();
    }
}

References
https://github.com/mhdr/HibernateSamples/tree/master/004_Persistent
http://docs.jboss.org/hibernate/orm/5.2/quickstart/html_single/#tutorial_jpa
https://dzone.com/articles/a-curious-java-language-feature-and-how-it-produce
https://www.sitepoint.com/5-reasons-to-use-jpa-hibernate/

Batch Insert in Hibernate

EmployeeBL.java

public class EmployeeBL {
    private SessionFactory factory;

    public EmployeeBL() {
        Configuration configuration = new Configuration();
        this.factory = configuration.configure().buildSessionFactory();
    }

    public void insertRange(List<Employee> employeeList) {
        Session session = factory.openSession();
        session.setProperty("hibernate.jdbc.batch_size", 30);
        session.setProperty("hibernate.cache.use_second_level_cache", false);

        Transaction tx = session.beginTransaction();

        int i = 0;

        for (Employee employee : employeeList) {

            session.save(employee);
            i++;

            if (i % 30 == 0) { // same as the JDBC batch size
                //flush a batch of inserts and release memory:
                session.flush();
                session.clear();
            }
        }

        tx.commit();
        session.close();
    }
}

References
https://docs.jboss.org/hibernate/orm/3.3/reference/en-US/html/batch.html
https://github.com/mhdr/HibernateSamples/tree/master/003_BatchInsert

Configure Hibernate With MySQL in Gradle and do CRUD

build.gradle

group 'ir.mhdr'
version '1.0-SNAPSHOT'

apply plugin: 'java'

sourceCompatibility = 1.8
mainClassName="Main"

repositories {
    mavenCentral()
}

dependencies {

    // https://mvnrepository.com/artifact/org.hibernate/hibernate-core
    compile group: 'org.hibernate', name: 'hibernate-core', version: '5.2.10.Final'

    // https://mvnrepository.com/artifact/org.hibernate/hibernate-gradle-plugin
    compile group: 'org.hibernate', name: 'hibernate-gradle-plugin', version: '5.2.10.Final'

    // https://mvnrepository.com/artifact/mysql/mysql-connector-java
    compile group: 'mysql', name: 'mysql-connector-java', version: '6.0.6'

    // https://mvnrepository.com/artifact/org.slf4j/slf4j-api
    compile group: 'org.slf4j', name: 'slf4j-api', version: '1.7.25'

    // https://mvnrepository.com/artifact/org.slf4j/slf4j-simple
    testCompile group: 'org.slf4j', name: 'slf4j-simple', version: '1.7.25'

    testCompile group: 'junit', name: 'junit', version: '4.12'
}

Employee.java

import javax.persistence.*;

@Entity
@Table(name = "EMPLOYEE")
public class Employee {
    @Id
    @GeneratedValue
    @Column(name = "id")
    private int id;

    @Column(name = "first_name")
    private String firstName;

    @Column(name = "last_name")
    private String lastName;

    @Column(name = "salary")
    private int salary;

    public Employee() {
    }

    public Employee(String fname, String lname, int salary) {
        this.firstName = fname;
        this.lastName = lname;
        this.salary = salary;
    }

    public int getId() {
        return id;
    }

    public void setId(int id) {
        this.id = id;
    }

    public String getFirstName() {
        return firstName;
    }

    public void setFirstName(String first_name) {
        this.firstName = first_name;
    }

    public String getLastName() {
        return lastName;
    }

    public void setLastName(String last_name) {
        this.lastName = last_name;
    }

    public int getSalary() {
        return salary;
    }

    public void setSalary(int salary) {
        this.salary = salary;
    }
}

resources/hibernate.cfg.xml

<?xml version="1.0" encoding="utf-8"?>
<!DOCTYPE hibernate-configuration SYSTEM
        "http://www.hibernate.org/dtd/hibernate-configuration-3.0.dtd">

<hibernate-configuration>
    <session-factory>
        <property name="hibernate.hbm2ddl.auto">create</property>
        <property name="hibernate.show_sql">true</property>
        <property name="hibernate.format_sql">true</property>

        <property name="hibernate.dialect">
            org.hibernate.dialect.MySQL57Dialect
        </property>
        <property name="hibernate.connection.driver_class">
            com.mysql.cj.jdbc.Driver
        </property>

        <!-- Assume test is the database name -->
        <property name="hibernate.connection.url">
            jdbc:mysql://localhost/testdb?useUnicode=yes&amp;characterEncoding=UTF-8&amp;useJDBCCompliantTimezoneShift=true&amp;useLegacyDatetimeCode=false&amp;serverTimezone=UTC
        </property>
        <property name="hibernate.connection.username">
            root
        </property>
        <property name="hibernate.connection.password">
            sampad622
        </property>

        <!-- List of XML mapping files -->
        <mapping class="Employee"/>

    </session-factory>
</hibernate-configuration>

ManageEmployee.java

import org.hibernate.HibernateException;
import org.hibernate.Session;
import org.hibernate.SessionFactory;
import org.hibernate.Transaction;
import org.hibernate.cfg.Configuration;

import java.util.Iterator;
import java.util.List;

public class ManageEmployee {

    private SessionFactory factory;

    public ManageEmployee() {
        factory = new Configuration().configure().buildSessionFactory();
    }

    /* Method to CREATE an employee in the database */
    public Integer addEmployee(String fname, String lname, int salary) {

        Session session = factory.openSession();
        Transaction tx = null;
        Integer employeeID = null;

        try {
            tx = session.beginTransaction();
            Employee employee = new Employee(fname, lname, salary);
            employeeID = (Integer) session.save(employee);
            tx.commit();
        } catch (HibernateException e) {

            if (tx != null) tx.rollback();
            e.printStackTrace();

        } finally {

            session.close();

        }

        return employeeID;
    }


    /* Method to  READ all the employees */
    public void listEmployees() {
        Session session = factory.openSession();
        Transaction tx = null;

        try {
            tx = session.beginTransaction();
            List employees = session.createQuery("FROM Employee").list();
            for (Iterator iterator =
                 employees.iterator(); iterator.hasNext(); ) {
                Employee employee = (Employee) iterator.next();
                System.out.print("First Name: " + employee.getFirstName());
                System.out.print("  Last Name: " + employee.getLastName());
                System.out.println("  Salary: " + employee.getSalary());
            }
            tx.commit();
        } catch (HibernateException e) {

            if (tx != null) tx.rollback();
            e.printStackTrace();
        } finally {

            session.close();

        }
    }


    /* Method to UPDATE salary for an employee */
    public void updateEmployee(Integer EmployeeID, int salary) {
        Session session = factory.openSession();
        Transaction tx = null;
        try {
            tx = session.beginTransaction();
            Employee employee =
                    (Employee) session.get(Employee.class, EmployeeID);
            employee.setSalary(salary);
            session.update(employee);
            tx.commit();
        } catch (HibernateException e) {
            if (tx != null) tx.rollback();
            e.printStackTrace();
        } finally {
            session.close();
        }
    }


    /* Method to DELETE an employee from the records */
    public void deleteEmployee(Integer EmployeeID) {
        Session session = factory.openSession();
        Transaction tx = null;

        try {

            tx = session.beginTransaction();
            Employee employee =
                    (Employee) session.get(Employee.class, EmployeeID);
            session.delete(employee);
            tx.commit();

        } catch (HibernateException e) {
            if (tx != null) tx.rollback();
            e.printStackTrace();
        } finally {
            session.close();
        }
    }
}

Main.java

import org.hibernate.SessionFactory;
import org.hibernate.cfg.Configuration;

public class Main {

    public static void main(String[] args) {

        ManageEmployee ME = new ManageEmployee();

      /* Add few employee records in database */
        Integer empID1 = ME.addEmployee("Zara", "Ali", 1000);
        Integer empID2 = ME.addEmployee("Daisy", "Das", 5000);
        Integer empID3 = ME.addEmployee("John", "Paul", 10000);

      /* List down all the employees */
        ME.listEmployees();

      /* Update employee's records */
        ME.updateEmployee(empID1, 5000);

      /* Delete an employee from the database */
        ME.deleteEmployee(empID2);

      /* List down new list of the employees */
        ME.listEmployees();
    }
}

References
https://github.com/mhdr/HibernateSamples/tree/master/002_CRUD

Routing Sample in RabbitMQ


Server -> 2 Consumers

public class Main {

    public static void main(String[] args) {

        Thread thread1 = new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    Queue1();
                } catch (Exception ex) {
                    ex.printStackTrace();
                }
            }
        });

        thread1.start();

        Thread thread2 = new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    Queue1();
                } catch (Exception ex) {
                    ex.printStackTrace();
                }
            }
        });

        thread2.start();
    }


    /**
     * get all routine keys
     */
    public static void Queue1() throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");

        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        String exchangeName = "ex01";
        channel.exchangeDeclare(exchangeName, BuiltinExchangeType.DIRECT);

        String queueName = channel.queueDeclare().getQueue();

        channel.queueBind(queueName, exchangeName, "warning");
        channel.queueBind(queueName, exchangeName, "info");
        channel.queueBind(queueName, exchangeName, "error");

        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope,
                                       AMQP.BasicProperties properties, byte[] body) throws IOException {

                String message = new String(body, "UTF-8");
                System.out.println(" [x] Received '" + envelope.getRoutingKey() + "':'" + message + "'");

            }
        };

        channel.basicConsume(queueName, true, consumer);
    }

    /**
     * get routine key : error
     */
    public static void Queue2() throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");

        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        String exchangeName = "ex01";
        channel.exchangeDeclare(exchangeName, BuiltinExchangeType.DIRECT);

        String queueName = channel.queueDeclare().getQueue();

        channel.queueBind(queueName, exchangeName, "error");

        System.out.println(" [***] Waiting for messages. To exit press CTRL+C");

        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope,
                                       AMQP.BasicProperties properties, byte[] body) throws IOException {

                String message = new String(body, "UTF-8");
                System.out.println(" [xxx] Received '" + envelope.getRoutingKey() + "':'" + message + "'");

            }
        };

        channel.basicConsume(queueName, true, consumer);
    }
}

Client -> Producer

public class Main {

    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");

        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        String exchangeName = "ex01";

        channel.exchangeDeclare(exchangeName, BuiltinExchangeType.DIRECT);

        int count=0;

        while (count<1000)
        {

            String message=new Date().toString();
            String routingKey=getNextSeverity();
            channel.basicPublish(exchangeName,routingKey,null,message.getBytes("UTF-8"));

            System.out.println(" [x] Sent '" + routingKey + "':'" + message + "'");

            Thread.sleep(1000);
            count ++;
        }

        channel.close();
        connection.close();

    }

    public static String getNextSeverity() {

        int rand = new Random().nextInt(3);

        if (rand == 0) {
            return "warning";
        } else if (rand == 1) {
            return "info";
        } else if (rand == 2) {
            return "error";
        }

        return "error";
    }
}

References
https://www.rabbitmq.com/tutorials/tutorial-four-java.html

Publish Subscribe Sample in RabbitMQ

Client -> Producer

public class Main {

    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");

        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        String exchangeName = "logs";

        channel.exchangeDeclare(exchangeName, BuiltinExchangeType.FANOUT);

        int count = 0;

        while (count < 100) {

            String message = new Date().toString();

            String output = String.format(" [#] Sending Log : %s", message);
            System.out.println(output);

            channel.basicPublish(exchangeName, "", null, message.getBytes());

            Thread.sleep(1000);
            count++;
        }

        channel.close();
        connection.close();
    }
}

Server -> Consumer

public class Main {

    public static void main(String[] args) throws IOException, TimeoutException {
        Thread thread1 = new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    showLog();
                } catch (Exception ex) {
                    ex.printStackTrace();
                }
            }
        });
        thread1.start();

        Thread thread2 = new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    countLong();
                } catch (Exception ex) {
                    ex.printStackTrace();
                }
            }
        });
        thread2.start();
    }

    public static void showLog() throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");

        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        String exchangeName = "logs";

        channel.exchangeDeclare(exchangeName, BuiltinExchangeType.FANOUT);
        String queueName = channel.queueDeclare().getQueue();
        channel.queueBind(queueName, exchangeName, "");

        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

        Consumer consumer = new DefaultConsumer(channel) {

            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body, "UTF-8");

                String output = String.format(" [*] : %s", message);
                System.out.println(output);
            }
        };

        channel.basicConsume(queueName, true, consumer);
    }

    public static void countLong() throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");

        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        String exchangeName = "logs";

        channel.exchangeDeclare(exchangeName, BuiltinExchangeType.FANOUT);
        String queueName = channel.queueDeclare().getQueue();
        channel.queueBind(queueName, exchangeName, "");

        System.out.println(" [+] Waiting for messages. To exit press CTRL+C");

        Consumer consumer = new DefaultConsumer(channel) {

            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {

                Statics.countLog++;

                String output = String.format(" [+] : %d", Statics.countLog);
                System.out.println(output);
            }
        };

        channel.basicConsume(queueName, true, consumer);
    }
}

References
https://www.rabbitmq.com/tutorials/tutorial-three-python.html
https://github.com/mhdr/RabbitMQSamples/tree/master/004_PublishSubscribe

Fair dispatch in RabbitMQ

Server : Consumer

public class Main {

    public static void main(String[] args) throws IOException, TimeoutException {
        String queueName = "queue2";

        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");

        Connection connection = factory.newConnection();

        Channel channel = connection.createChannel();

        channel.queueDeclare(queueName, true, false, false, null);
        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

        channel.basicQos(1);

        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message=new String(body,"UTF-8");

                try {
                    doWork(message);
                }
                catch (Exception ex)
                {
                    ex.printStackTrace();
                }
                finally {
                    channel.basicAck(envelope.getDeliveryTag(),false);
                }
            }
        };

        channel.basicConsume(queueName,false,consumer);
    }

    public static void doWork(String message) throws InterruptedException {
        System.out.println(" [x] Received '" + message + "'");

        int sleepTime=new Random().nextInt(2000);

        Thread.sleep(sleepTime);
    }
}

References
https://pupli.net/2017/07/16/work-queues-sample-in-rabbitmq/
https://www.rabbitmq.com/tutorials/tutorial-two-java.html