/*
 * Decompiled with CFR 0.152.
 */
package com.datatorrent.contrib.mqtt;

import com.datatorrent.api.Context;
import com.datatorrent.api.InputOperator;
import com.datatorrent.api.Operator;
import com.datatorrent.contrib.mqtt.MqttClientConfig;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import org.fusesource.mqtt.client.BlockingConnection;
import org.fusesource.mqtt.client.MQTT;
import org.fusesource.mqtt.client.Message;
import org.fusesource.mqtt.client.QoS;
import org.fusesource.mqtt.client.Topic;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class AbstractMqttInputOperator
implements InputOperator,
Operator.ActivationListener<Context.OperatorContext> {
    private static final Logger LOG = LoggerFactory.getLogger(AbstractMqttInputOperator.class);
    private static final int DEFAULT_BLAST_SIZE = 1000;
    private static final int DEFAULT_BUFFER_SIZE = 0x100000;
    private int tupleBlast = 1000;
    private int bufferSize = 0x100000;
    protected Map<String, QoS> topicMap = new HashMap<String, QoS>();
    protected MqttClientConfig mqttClientConfig;
    protected transient MQTT client;
    protected transient ArrayBlockingQueue<Message> holdingBuffer;
    protected transient BlockingConnection connection;
    protected transient Thread thread;

    public abstract void emitTuple(Message var1);

    public MqttClientConfig getMqttClientConfig() {
        return this.mqttClientConfig;
    }

    public void setMqttClientConfig(MqttClientConfig mqttClientConfig) {
        this.mqttClientConfig = mqttClientConfig;
    }

    public void setTupleBlast(int tupleBlast) {
        this.tupleBlast = tupleBlast;
    }

    public void addSubscribeTopic(String topic, QoS qos) {
        this.topicMap.put(topic, qos);
    }

    public void removeSubscribeTopic(String topic) {
        this.topicMap.remove(topic);
    }

    public void emitTuples() {
        Message msg;
        int ntuples = this.tupleBlast;
        if (ntuples > this.holdingBuffer.size()) {
            ntuples = this.holdingBuffer.size();
        }
        int i = ntuples;
        while (i-- > 0 && (msg = this.holdingBuffer.poll()) != null) {
            this.emitTuple(msg);
        }
    }

    public void beginWindow(long l) {
    }

    public void endWindow() {
    }

    public void setup(Context.OperatorContext t1) {
        this.holdingBuffer = new ArrayBlockingQueue(this.bufferSize);
    }

    public void teardown() {
    }

    private void initializeConnection() throws Exception {
        this.connection = this.client.blockingConnection();
        this.connection.connect();
        if (!this.topicMap.isEmpty()) {
            Topic[] topics = new Topic[this.topicMap.size()];
            int i = 0;
            for (Map.Entry<String, QoS> entry : this.topicMap.entrySet()) {
                topics[i++] = new Topic(entry.getKey(), entry.getValue());
            }
            this.connection.subscribe(topics);
        }
    }

    public void activate(Context.OperatorContext context) {
        try {
            this.client = new MQTT();
            if (this.mqttClientConfig.getClientId() != null) {
                this.client.setClientId(this.mqttClientConfig.getClientId());
            }
            this.client.setCleanSession(this.mqttClientConfig.isCleanSession());
            this.client.setConnectAttemptsMax((long)this.mqttClientConfig.getConnectAttemptsMax());
            this.client.setHost(this.mqttClientConfig.getHost(), this.mqttClientConfig.getPort());
            this.client.setKeepAlive(this.mqttClientConfig.getKeepAliveInterval());
            if (this.mqttClientConfig.getPassword() != null) {
                this.client.setPassword(this.mqttClientConfig.getPassword());
            }
            if (this.mqttClientConfig.getUserName() != null) {
                this.client.setUserName(this.mqttClientConfig.getUserName());
            }
            if (this.mqttClientConfig.getWillMessage() != null) {
                this.client.setWillMessage(this.mqttClientConfig.getWillMessage());
                this.client.setWillQos(this.mqttClientConfig.getWillQos());
                this.client.setWillRetain(this.mqttClientConfig.isWillRetain());
            }
            if (this.mqttClientConfig.getWillTopic() != null) {
                this.client.setWillTopic(this.mqttClientConfig.getWillTopic());
            }
            this.initializeConnection();
            this.thread = new Thread(new Runnable(){

                @Override
                public void run() {
                    while (true) {
                        try {
                            while (true) {
                                Message msg = AbstractMqttInputOperator.this.connection.receive();
                                AbstractMqttInputOperator.this.holdingBuffer.add(msg);
                            }
                        }
                        catch (Exception ex) {
                            LOG.error("Trouble receiving", (Throwable)ex);
                            continue;
                        }
                        break;
                    }
                }
            });
            this.thread.start();
        }
        catch (Exception ex) {
            LOG.error("Caught exception during activation: ", (Throwable)ex);
            throw new RuntimeException(ex);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void deactivate() {
        try {
            this.thread.interrupt();
            this.thread.join();
        }
        catch (InterruptedException ex) {
            LOG.error("interrupted");
        }
        finally {
            try {
                this.connection.disconnect();
            }
            catch (Exception ex) {
                LOG.error("Caught exception during disconnect", (Throwable)ex);
            }
        }
    }
}

