RabbitMQ Android

  1. Download rabbitmq at:
  2. Install
  3. Config Manangement
    • Open cmd
    • go to this path: C:\Program Files (x86)\RabbitMQ Server\rabbitmq_server-3.3.4\sbin
    • input rabbitmq-plugins.bat enable rabbitmq_management and press enter key
    • rabbitmq-service.bat stop
    • rabbitmq-service.bat install
    • rabbitmq-service.bat start
  4. Start use: http://localhost:15672
    • User: guest
    • Password: guest
  5. Crate User:
    • Add a new/fresh user, say user ‘test’ and password ‘test’
      rabbitmqctl add_user test test
    • Give administrative access to the new access
      rabbitmqctl set_user_tags test administrator
    • Set permission to newly created user
      rabbitmqctl set_permissions -p / test ".*" ".*" ".*"
  6. How to allow Guest login via IP
    • C:\Users\[User Name]\AppData\Roaming\RabbitMQ\rabbitmq.config
    • And add: [{rabbit, [{loopback_users, []}]}]. 

MIController

package com.example.icemessaging;

import android.os.Bundle;
import android.os.Handler;
import android.os.Message;
import android.util.Log;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;

import java.io.IOException;
import java.util.concurrent.BlockingDeque;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeoutException;

/**
 * Created by pd on 2/15/2017.
 */

public abstract class MIController {

    private BlockingDeque<String> queue = new LinkedBlockingDeque<String>();
    private ConnectionFactory factory = new ConnectionFactory();
    private Thread subscribeThread;
    private Thread publishThread;
    private Connection connection;
    private  Channel channel;

    private MIController(){

    }

    public MIController(MIModel miModel){
        setupConnectionFactory(miModel);
    }

    /*setup connection to mi server*/
    public void setupConnectionFactory(MIModel miModel){
        factory.setAutomaticRecoveryEnabled(false);
        factory.setHost(miModel.getHost_server());
    }

    public void publishToAMQP(final MIModel miModel)
    {
        publishThread = new Thread(new Runnable(){
            @Override
            public void run() {
                while(true) {
                    try {
                        connection = factory.newConnection();
                        channel = connection.createChannel();
                        channel.confirmSelect();

                        while (true) {

                            String message = queue.takeFirst();
                            try{
                                for (String public_key : miModel.getPublic_routing_key()){
                                    channel.basicPublish(miModel.getExchange_name(), public_key, null, message.getBytes());
                                }
                                channel.waitForConfirmsOrDie();
                            } catch (Exception e){
                                queue.putFirst(message);
                                throw e;
                            }

                        }
                    } catch (InterruptedException e) {
                        break;
                    } catch (Exception e) {
                        try {
                            Thread.sleep(5000); //sleep and then try again
                        } catch (InterruptedException e1) {
                            break;
                        }
                    }
                }
            }
        });
        publishThread.start();
    }

    public void subscribe(final MIHandlerMessage handler,final MIModel miModel)
    {
        subscribeThread = new Thread(new Runnable() {
            @Override
            public void run() {
                while(true) {
                    try {
//                        connection = factory.newConnection();
//                        channel = connection.createChannel();
                        channel.basicQos(1);
                        AMQP.Queue.DeclareOk q = channel.queueDeclare();

                        for(String KEY_VALUE : miModel.getSubscribe_routing_keys()){
                            channel.queueBind(q.getQueue(), miModel.getExchange_name(), KEY_VALUE);
                        }

                        QueueingConsumer consumer = new QueueingConsumer(channel);
                        channel.basicConsume(q.getQueue(), true, consumer);

                        while (true) {
                            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
                            String message = new String(delivery.getBody());
                            Message msg = handler.obtainMessage();
                            Bundle bundle = new Bundle();
                            bundle.putString(getMessageKey(), message);
                            msg.setData(bundle);
                            handler.sendMessage(msg);
                        }

                    } catch (InterruptedException e){
                        break;
                    } catch (Exception e1) {
                        Log.d("", "Connection broken: " + e1.getClass().getName());
                        try {
                            Thread.sleep(5000); //sleep and then try again
                        } catch (InterruptedException e) {
                            break;
                        }
                    }
                }
            }
        });
        subscribeThread.start();
    }

    public void publishMessage(String message) {
        try {
            queue.putLast(message);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    public void closeConnection(){
        subscribeThread = new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    connection.close();
                    try {
                        channel.close();
                    } catch (TimeoutException e) {
                        e.printStackTrace();
                    }
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        });
    }

    public abstract String getMessageKey();
}

MIModel

package com.example.icemessaging;

import java.util.List;

/**
 * Created by pd on 2/15/2017.
 */

public class MIModel {
    private String host_server;
    private String exchange_name;
    private List<String> public_routing_key;
    private List<String> subscribe_routing_keys;

    public String getExchange_name() {
        if(exchange_name == null)
            return Config.EXCHANGE_NAME;
        return exchange_name;
    }

    public void setExchange_name(String exchange_name) {
        this.exchange_name = exchange_name;
    }

    public List<String> getPublic_routing_key() {
        if(public_routing_key == null)
            return Config.PUBLIC_ROUTING_KEY;
        return public_routing_key;
    }

    public void setPublic_routing_key(List<String> public_routing_key) {
        this.public_routing_key = public_routing_key;
    }

    public List<String> getSubscribe_routing_keys() {
        if(subscribe_routing_keys == null)
            return Config.SUBSCRIBE_ROUTING_KEY;

        return subscribe_routing_keys;
    }

    public void setSubscribe_routing_keys(List<String> subscribe_routing_keys) {
        this.subscribe_routing_keys = subscribe_routing_keys;
    }

    public String getHost_server() {
        if(host_server == null)
            return Config.MI_SERVER;
        return host_server;
    }

    public void setHost_server(String host_server) {
        this.host_server = host_server;
    }
}

 

Advertisements