package com.datatorrent.stram.util;

import com.datatorrent.api.Component;
import com.datatorrent.api.Context;
import com.datatorrent.common.util.JacksonObjectMapperProvider;
import com.datatorrent.common.util.NameableThreadFactory;
import com.datatorrent.common.util.PubSubMessage;
import com.datatorrent.common.util.PubSubMessageCodec;
import com.google.common.base.Throwables;
import java.io.IOException;
import java.net.URI;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.apex.shaded.ning19.com.ning.http.client.AsyncCompletionHandler;
import org.apache.apex.shaded.ning19.com.ning.http.client.AsyncHttpClient;
import org.apache.apex.shaded.ning19.com.ning.http.client.AsyncHttpClientConfigBean;
import org.apache.apex.shaded.ning19.com.ning.http.client.Response;
import org.apache.apex.shaded.ning19.com.ning.http.client.cookie.Cookie;
import org.apache.apex.shaded.ning19.com.ning.http.client.ws.WebSocket;
import org.apache.apex.shaded.ning19.com.ning.http.client.ws.WebSocketTextListener;
import org.apache.apex.shaded.ning19.com.ning.http.client.ws.WebSocketUpgradeHandler;
import org.codehaus.jackson.JsonParseException;
import org.codehaus.jackson.map.JsonMappingException;
import org.codehaus.jackson.map.ObjectMapper;
import org.codehaus.jettison.json.JSONException;
import org.codehaus.jettison.json.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/datatorrent/stram/util/PubSubWebSocketClient.class */
public abstract class PubSubWebSocketClient implements Component<Context> {
    private final AsyncHttpClient client;
    private WebSocket connection;
    private URI uri;
    private String loginUrl;
    private String userName;
    private String password;
    private static final Logger logger = LoggerFactory.getLogger(PubSubWebSocketClient.class);
    private final AtomicReference<Throwable> throwable = new AtomicReference<>();
    private int ioThreadMultiplier = 1;
    private final ObjectMapper mapper = new JacksonObjectMapperProvider().getContext((Class) null);
    private final PubSubMessageCodec<Object> codec = new PubSubMessageCodec<>(this.mapper);

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/datatorrent/stram/util/PubSubWebSocketClient$PubSubWebSocket.class */
    public class PubSubWebSocket implements WebSocketTextListener {
        private PubSubWebSocket() {
        }

        public void onMessage(String str) {
            try {
                PubSubMessage parseMessage = PubSubWebSocketClient.this.codec.parseMessage(str);
                PubSubWebSocketClient.this.onMessage(parseMessage.getType().getIdentifier(), parseMessage.getTopic(), parseMessage.getData());
            } catch (JsonMappingException e) {
                PubSubWebSocketClient.logger.warn("Ignoring JSON mapping in message: {}", str, e);
            } catch (IOException e2) {
                onError(e2);
            } catch (JsonParseException e3) {
                PubSubWebSocketClient.logger.warn("Ignoring unparseable JSON message: {}", str, e3);
            }
        }

        public void onOpen(WebSocket webSocket) {
            PubSubWebSocketClient.this.onOpen(webSocket);
        }

        public void onClose(WebSocket webSocket) {
            PubSubWebSocketClient.this.onClose(webSocket);
        }

        public void onError(Throwable th) {
            PubSubWebSocketClient.this.onError(th);
        }
    }

    public PubSubWebSocketClient() {
        AsyncHttpClientConfigBean asyncHttpClientConfigBean = new AsyncHttpClientConfigBean();
        asyncHttpClientConfigBean.setIoThreadMultiplier(this.ioThreadMultiplier);
        asyncHttpClientConfigBean.setApplicationThreadPool(Executors.newCachedThreadPool(new NameableThreadFactory("AsyncHttpClient")));
        this.client = new AsyncHttpClient(asyncHttpClientConfigBean);
    }

    public void setUri(URI uri) {
        this.uri = uri;
    }

    public void setIoThreadMultiplier(int i) {
        this.ioThreadMultiplier = i;
    }

    public void setLoginUrl(String str) {
        this.loginUrl = str;
    }

    public void setUserName(String str) {
        this.userName = str;
    }

    public void setPassword(String str) {
        this.password = str;
    }

    public void openConnection(long j) throws IOException, ExecutionException, InterruptedException, TimeoutException {
        this.throwable.set(null);
        List list = null;
        if (this.loginUrl != null && this.userName != null && this.password != null) {
            JSONObject jSONObject = new JSONObject();
            try {
                jSONObject.put("userName", this.userName);
                jSONObject.put("password", this.password);
                list = ((Response) this.client.preparePost(this.loginUrl).setHeader("Content-Type", "application/json").setBody(jSONObject.toString()).execute().get()).getCookies();
            } catch (JSONException e) {
                throw new RuntimeException((Throwable) e);
            }
        }
        AsyncHttpClient.BoundRequestBuilder prepareGet = this.client.prepareGet(this.uri.toString());
        if (list != null) {
            Iterator it = list.iterator();
            while (it.hasNext()) {
                prepareGet.addCookie((Cookie) it.next());
            }
        }
        this.connection = (WebSocket) prepareGet.execute(new WebSocketUpgradeHandler.Builder().addWebSocketListener(new PubSubWebSocket()).build()).get(j, TimeUnit.MILLISECONDS);
    }

    public void openConnectionAsync() throws IOException {
        this.throwable.set(null);
        if (this.loginUrl == null || this.userName == null || this.password == null) {
            this.client.prepareGet(this.uri.toString()).execute(new WebSocketUpgradeHandler.Builder().addWebSocketListener(new PubSubWebSocket() { // from class: com.datatorrent.stram.util.PubSubWebSocketClient.2
                @Override // com.datatorrent.stram.util.PubSubWebSocketClient.PubSubWebSocket
                public void onOpen(WebSocket webSocket) {
                    PubSubWebSocketClient.this.connection = webSocket;
                    super.onOpen(webSocket);
                }
            }).build());
            return;
        }
        JSONObject jSONObject = new JSONObject();
        try {
            jSONObject.put("userName", this.userName);
            jSONObject.put("password", this.password);
            this.client.preparePost(this.loginUrl).setHeader("Content-Type", "application/json").setBody(jSONObject.toString()).execute(new AsyncCompletionHandler<Response>() { // from class: com.datatorrent.stram.util.PubSubWebSocketClient.1
                /* renamed from: onCompleted, reason: merged with bridge method [inline-methods] */
                public Response m151onCompleted(Response response) throws Exception {
                    List cookies = response.getCookies();
                    AsyncHttpClient.BoundRequestBuilder prepareGet = PubSubWebSocketClient.this.client.prepareGet(PubSubWebSocketClient.this.uri.toString());
                    if (cookies != null) {
                        Iterator it = cookies.iterator();
                        while (it.hasNext()) {
                            prepareGet.addCookie((Cookie) it.next());
                        }
                    }
                    PubSubWebSocketClient.this.connection = (WebSocket) prepareGet.execute(new WebSocketUpgradeHandler.Builder().addWebSocketListener(new PubSubWebSocket()).build()).get();
                    return response;
                }
            });
        } catch (JSONException e) {
            throw new RuntimeException((Throwable) e);
        }
    }

    public boolean isConnectionOpen() {
        if (this.connection == null) {
            return false;
        }
        return this.connection.isOpen();
    }

    public void assertUsable() throws IOException {
        if (this.throwable.get() == null) {
            if (this.connection == null) {
                throw new IOException("Connection is not open");
            }
        } else {
            Throwable th = this.throwable.get();
            if (!(th instanceof IOException)) {
                throw Throwables.propagate(th);
            }
            throw ((IOException) th);
        }
    }

    public void publish(String str, Object obj) throws IOException {
        assertUsable();
        this.connection.sendMessage(PubSubMessageCodec.constructPublishMessage(str, obj, this.codec));
    }

    public void subscribe(String str) throws IOException {
        assertUsable();
        this.connection.sendMessage(PubSubMessageCodec.constructSubscribeMessage(str, this.codec));
    }

    public void unsubscribe(String str) throws IOException {
        assertUsable();
        this.connection.sendMessage(PubSubMessageCodec.constructUnsubscribeMessage(str, this.codec));
    }

    public void subscribeNumSubscribers(String str) throws IOException {
        assertUsable();
        this.connection.sendMessage(PubSubMessageCodec.constructSubscribeNumSubscribersMessage(str, this.codec));
    }

    public void unsubscribeNumSubscribers(String str) throws IOException {
        assertUsable();
        this.connection.sendMessage(PubSubMessageCodec.constructUnsubscribeNumSubscribersMessage(str, this.codec));
    }

    public abstract void onOpen(WebSocket webSocket);

    public abstract void onMessage(String str, String str2, Object obj);

    public abstract void onClose(WebSocket webSocket);

    public void setup(Context context) {
    }

    public void teardown() {
        if (this.connection != null) {
            this.connection.close();
        }
        if (this.client != null) {
            this.client.close();
        }
        this.throwable.set(null);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void onError(Throwable th) {
        this.throwable.set(th);
    }
}
