package io.joshworks.stream.client.sse;

import io.joshworks.stream.client.ClientConfiguration;
import io.joshworks.stream.client.ClientException;
import io.joshworks.stream.client.StreamConnection;
import io.undertow.client.ClientCallback;
import io.undertow.client.ClientConnection;
import io.undertow.client.ClientExchange;
import io.undertow.client.ClientRequest;
import io.undertow.client.ClientStatistics;
import io.undertow.client.UndertowClient;
import io.undertow.server.DefaultByteBufferPool;
import io.undertow.util.Headers;
import io.undertow.util.HttpString;
import io.undertow.util.Methods;
import java.io.IOException;
import java.net.URI;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.xnio.OptionMap;

/* loaded from: input_file:io/joshworks/stream/client/sse/SSEConnection.class */
public class SSEConnection extends StreamConnection {
    private static final Logger logger = LoggerFactory.getLogger(SSEConnection.class);
    final SseClientCallback callback;
    private ClientConnection connection;
    String lastEventId;

    /* loaded from: input_file:io/joshworks/stream/client/sse/SSEConnection$DisconnectedStatistics.class */
    public class DisconnectedStatistics implements ClientStatistics {
        public DisconnectedStatistics() {
        }

        public long getRequests() {
            return 0L;
        }

        public long getRead() {
            return 0L;
        }

        public long getWritten() {
            return 0L;
        }

        public void reset() {
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/joshworks/stream/client/sse/SSEConnection$StreamHandler.class */
    public class StreamHandler implements ClientCallback<ClientExchange> {
        private final SseClientCallback callback;
        private final EventStreamChannelListener listener;
        private final UTF8Output dataReader;

        StreamHandler(SseClientCallback sseClientCallback, EventStreamParser eventStreamParser) {
            this.callback = sseClientCallback;
            this.dataReader = new UTF8Output(eventStreamParser);
            this.listener = new EventStreamChannelListener(new DefaultByteBufferPool(false, 8192), this.dataReader);
        }

        public void completed(ClientExchange clientExchange) {
            int responseCode = clientExchange.getResponse().getResponseCode();
            if (responseCode != 200) {
                this.callback.onError(new ClientException(responseCode, "Server returned [" + responseCode + " - " + clientExchange.getResponse().getStatus() + "] after connecting"));
                SSEConnection.this.closeChannel();
            } else {
                this.callback.onOpen();
                clientExchange.getResponseChannel().getCloseSetter().set(channel -> {
                    SSEConnection.this.closeChannel();
                    SSEConnection.this.reconnect();
                });
                this.listener.setup(clientExchange.getResponseChannel());
            }
        }

        public void failed(IOException iOException) {
            this.callback.onError(iOException);
        }
    }

    public SSEConnection(ClientConfiguration clientConfiguration, String str, SseClientCallback sseClientCallback) {
        super(clientConfiguration);
        this.lastEventId = str;
        this.callback = sseClientCallback;
    }

    @Override // io.joshworks.stream.client.StreamConnection
    protected synchronized void tryConnect() throws Exception {
        try {
            this.shuttingDown = false;
            logger.info("Connecting to {}", this.url);
            if (this.connection != null) {
                return;
            }
            this.connection = (ClientConnection) UndertowClient.getInstance().connect(URI.create(this.url), this.worker, new DefaultByteBufferPool(false, 8192), OptionMap.EMPTY).get();
            ClientRequest path = new ClientRequest().setMethod(Methods.GET).setPath(this.url);
            path.getRequestHeaders().put(Headers.CONNECTION, "keep-alive");
            path.getRequestHeaders().put(Headers.ACCEPT, "text/event-stream");
            path.getRequestHeaders().put(Headers.HOST, this.url);
            if (this.lastEventId != null && !this.lastEventId.isEmpty()) {
                path.getRequestHeaders().put(HttpString.tryFromString("Last-Event-ID"), this.lastEventId);
            }
            this.connection.sendRequest(path, createClientCallback());
        } catch (Exception e) {
            try {
                this.callback.onError(e);
            } catch (Exception e2) {
                logger.error("Error handling 'onError' callback", e);
            }
            throw e;
        }
    }

    public String close() {
        this.shuttingDown = true;
        closeChannel();
        return this.lastEventId;
    }

    public ClientStatistics statistics() {
        return this.connection == null ? new DisconnectedStatistics() : this.connection.getStatistics();
    }

    @Override // io.joshworks.stream.client.StreamConnection
    protected void closeChannel() {
        if (this.connection != null) {
            StreamConnection.closeChannel(this.connection);
            this.connection = null;
            this.callback.onClose(this.lastEventId);
        }
        this.monitor.remove(this.uuid);
    }

    public boolean isOpen() {
        return this.connection != null;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void retryAfter(long j) {
        logger.info("Reconnecting after {}ms", Long.valueOf(j));
        reconnect(j);
    }

    private ClientCallback<ClientExchange> createClientCallback() {
        final EventStreamParser eventStreamParser = new EventStreamParser(this);
        return new ClientCallback<ClientExchange>() { // from class: io.joshworks.stream.client.sse.SSEConnection.1
            public void completed(ClientExchange clientExchange) {
                clientExchange.setResponseListener(new StreamHandler(SSEConnection.this.callback, eventStreamParser));
                SSEConnection.this.monitor.add(SSEConnection.this.uuid, () -> {
                    SSEConnection.this.close();
                });
                SSEConnection.logger.info("Connected to {}", SSEConnection.this.url);
            }

            public void failed(IOException iOException) {
                SSEConnection.this.callback.onError(iOException);
                SSEConnection.this.closeChannel();
                SSEConnection.this.reconnect();
            }
        };
    }
}
