package com.datatorrent.lib.io;

import com.datatorrent.api.Context;
import com.ning.http.client.AsyncHttpClient;
import com.ning.http.client.AsyncHttpClientConfigBean;
import com.ning.http.client.websocket.WebSocket;
import com.ning.http.client.websocket.WebSocketTextListener;
import com.ning.http.client.websocket.WebSocketUpgradeHandler;
import java.io.IOException;
import java.net.URI;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.ClassUtils;
import org.codehaus.jackson.JsonFactory;
import org.codehaus.jackson.map.ObjectMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/datatorrent/lib/io/WebSocketInputOperator.class */
public class WebSocketInputOperator<T> extends SimpleSinglePortInputOperator<T> implements Runnable {
    private static final long serialVersionUID = 201506160829L;
    private static final Logger LOG = LoggerFactory.getLogger(WebSocketInputOperator.class);
    private URI uri;
    private transient AsyncHttpClient client;
    protected transient WebSocket connection;
    private transient WebSocketInputOperator<T>.MonitorThread monThread;
    public int readTimeoutMillis = 0;
    private final transient JsonFactory jsonFactory = new JsonFactory();
    protected final transient ObjectMapper mapper = new ObjectMapper(this.jsonFactory);
    private transient boolean connectionClosed = false;
    private volatile transient boolean shutdown = false;
    private int ioThreadMultiplier = 1;
    protected boolean skipNull = false;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/datatorrent/lib/io/WebSocketInputOperator$MonitorThread.class */
    public class MonitorThread extends Thread {
        private MonitorThread() {
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            while (!WebSocketInputOperator.this.shutdown) {
                try {
                    sleep(1000L);
                    if (WebSocketInputOperator.this.connectionClosed && !WebSocketInputOperator.this.shutdown) {
                        WebSocketInputOperator.this.connection.close();
                        WebSocketInputOperator.this.activate((Context.OperatorContext) null);
                    }
                } catch (Exception e) {
                }
            }
        }
    }

    public URI getUri() {
        return this.uri;
    }

    public void setUri(URI uri) {
        this.uri = uri;
    }

    public int getIoThreadMultiplier() {
        return this.ioThreadMultiplier;
    }

    public void setIoThreadMultiplier(int i) {
        this.ioThreadMultiplier = i;
    }

    @Override // 
    public void setup(Context.OperatorContext operatorContext) {
        try {
            this.uri = URI.create(this.uri.toString());
            LOG.info("URL: {}", this.uri);
            this.shutdown = false;
            this.monThread = new MonitorThread();
            this.monThread.start();
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    public void teardown() {
        this.shutdown = true;
        try {
            if (this.monThread != null) {
                this.monThread.join();
            }
        } catch (Exception e) {
            LOG.error("Error joining monitor", e);
        }
        if (this.connection != null) {
            this.connection.close();
        }
        if (this.client != null) {
            this.client.close();
        }
        super.teardown();
    }

    protected T convertMessage(String str) throws IOException {
        return (T) this.mapper.readValue(str, Object.class);
    }

    public void run() {
        try {
            this.connectionClosed = false;
            AsyncHttpClientConfigBean asyncHttpClientConfigBean = new AsyncHttpClientConfigBean();
            asyncHttpClientConfigBean.setIoThreadMultiplier(this.ioThreadMultiplier);
            asyncHttpClientConfigBean.setApplicationThreadPool(Executors.newCachedThreadPool(new ThreadFactory() { // from class: com.datatorrent.lib.io.WebSocketInputOperator.1
                private long count = 0;

                @Override // java.util.concurrent.ThreadFactory
                public Thread newThread(Runnable runnable) {
                    Thread thread = new Thread(runnable);
                    StringBuilder append = new StringBuilder().append(ClassUtils.getShortClassName(getClass())).append("-AsyncHttpClient-");
                    long j = this.count;
                    this.count = j + 1;
                    thread.setName(append.append(j).toString());
                    return thread;
                }
            }));
            if (this.client != null) {
                this.client.closeAsynchronously();
            }
            this.client = new AsyncHttpClient(asyncHttpClientConfigBean);
            this.connection = (WebSocket) this.client.prepareGet(this.uri.toString()).execute(new WebSocketUpgradeHandler.Builder().addWebSocketListener(new WebSocketTextListener() { // from class: com.datatorrent.lib.io.WebSocketInputOperator.2
                /* JADX WARN: Multi-variable type inference failed */
                public void onMessage(String str) {
                    WebSocketInputOperator.LOG.debug("Got: " + str);
                    try {
                        Object convertMessage = WebSocketInputOperator.this.convertMessage(str);
                        if (!WebSocketInputOperator.this.skipNull || convertMessage != null) {
                            WebSocketInputOperator.this.outputPort.emit(convertMessage);
                        }
                    } catch (IOException e) {
                        WebSocketInputOperator.LOG.error("Got exception: ", e);
                    }
                }

                public void onFragment(String str, boolean z) {
                    WebSocketInputOperator.LOG.debug("onFragment");
                }

                public void onOpen(WebSocket webSocket) {
                    WebSocketInputOperator.LOG.debug("Connection opened");
                }

                public void onClose(WebSocket webSocket) {
                    WebSocketInputOperator.LOG.debug("Connection connectionClosed.");
                    WebSocketInputOperator.this.connectionClosed = true;
                }

                public void onError(Throwable th) {
                    WebSocketInputOperator.LOG.error("Caught exception", th);
                }
            }).build()).get(5L, TimeUnit.SECONDS);
        } catch (Exception e) {
            LOG.error("Error reading from " + this.uri, e);
            if (this.client != null) {
                this.client.close();
            }
            this.connectionClosed = true;
        }
    }
}
