package io.activej.http;

import io.activej.async.function.AsyncRunnable;
import io.activej.async.function.AsyncSupplier;
import io.activej.async.process.AbstractAsyncCloseable;
import io.activej.bytebuf.ByteBuf;
import io.activej.bytebuf.ByteBufs;
import io.activej.common.Checks;
import io.activej.common.Utils;
import io.activej.common.recycle.Recyclable;
import io.activej.common.ref.Ref;
import io.activej.csp.consumer.AbstractChannelConsumer;
import io.activej.csp.consumer.ChannelConsumer;
import io.activej.csp.supplier.AbstractChannelSupplier;
import io.activej.csp.supplier.ChannelSupplier;
import io.activej.csp.supplier.ChannelSuppliers;
import io.activej.http.IWebSocket;
import io.activej.promise.Promise;
import io.activej.promise.Promises;
import io.activej.promise.SettableCallback;
import io.activej.promise.SettablePromise;
import io.activej.reactor.Reactive;
import java.nio.charset.CharacterCodingException;
import java.nio.charset.StandardCharsets;
import java.util.Objects;
import java.util.function.Consumer;
import org.jetbrains.annotations.Nullable;

/* loaded from: input_file:io/activej/http/WebSocket.class */
public final class WebSocket extends AbstractAsyncCloseable implements IWebSocket {
    private static final boolean CHECKS;
    private final HttpRequest request;
    private final HttpResponse response;
    private final Consumer<WebSocketException> onProtocolError;
    private final ChannelSupplier<IWebSocket.Frame> frameInput;
    private final ChannelConsumer<IWebSocket.Frame> frameOutput;
    private final int maxMessageSize;

    @Nullable
    private SettablePromise<?> readPromise;

    @Nullable
    private SettablePromise<Void> writePromise;
    static final /* synthetic */ boolean $assertionsDisabled;

    /* JADX INFO: Access modifiers changed from: package-private */
    public WebSocket(HttpRequest httpRequest, HttpResponse httpResponse, ChannelSupplier<IWebSocket.Frame> channelSupplier, ChannelConsumer<IWebSocket.Frame> channelConsumer, Consumer<WebSocketException> consumer, int i) {
        this.request = httpRequest;
        this.response = httpResponse;
        this.frameInput = ChannelSuppliers.prefetch(sanitize(channelSupplier));
        this.frameOutput = sanitize(channelConsumer);
        this.onProtocolError = consumer;
        this.maxMessageSize = i;
    }

    @Override // io.activej.http.IWebSocket
    public Promise<IWebSocket.Message> readMessage() {
        if (CHECKS) {
            Reactive.checkInReactorThread(this);
        }
        return doRead(() -> {
            ByteBufs byteBufs = new ByteBufs();
            Ref ref = new Ref();
            return Promises.repeat(() -> {
                return this.frameInput.get().thenCallback((frame, settableCallback) -> {
                    if (frame == null) {
                        if (ref.get() == null) {
                            settableCallback.set(false);
                            return;
                        } else {
                            settableCallback.setException(WebSocketConstants.REGULAR_CLOSE);
                            return;
                        }
                    }
                    if (ref.get() == null) {
                        ref.set(HttpUtils.frameToMessageType(frame.getType()));
                    }
                    ByteBuf payload = frame.getPayload();
                    if (byteBufs.remainingBytes() + payload.readRemaining() > this.maxMessageSize) {
                        protocolError(WebSocketConstants.MESSAGE_TOO_BIG, settableCallback);
                    } else {
                        byteBufs.add(payload);
                        settableCallback.set(Boolean.valueOf(!frame.isLastFrame()));
                    }
                });
            }).whenException(exc -> {
                byteBufs.recycle();
            }).thenCallback((r7, settableCallback) -> {
                ByteBuf takeRemaining = byteBufs.takeRemaining();
                IWebSocket.Message.MessageType messageType = (IWebSocket.Message.MessageType) ref.get();
                try {
                    if (messageType == IWebSocket.Message.MessageType.TEXT) {
                        try {
                            settableCallback.set(IWebSocket.Message.text(HttpUtils.getUTF8(takeRemaining)));
                            takeRemaining.recycle();
                        } catch (CharacterCodingException e) {
                            protocolError(WebSocketConstants.NOT_A_VALID_UTF_8, settableCallback);
                            takeRemaining.recycle();
                        }
                        return;
                    }
                    if (messageType == IWebSocket.Message.MessageType.BINARY) {
                        settableCallback.set(IWebSocket.Message.binary(takeRemaining));
                    } else {
                        settableCallback.set((Object) null);
                    }
                } catch (Throwable th) {
                    takeRemaining.recycle();
                    throw th;
                }
            });
        });
    }

    @Override // io.activej.http.IWebSocket
    public Promise<IWebSocket.Frame> readFrame() {
        if (CHECKS) {
            Reactive.checkInReactorThread(this);
        }
        ChannelSupplier<IWebSocket.Frame> channelSupplier = this.frameInput;
        Objects.requireNonNull(channelSupplier);
        return doRead(channelSupplier::get);
    }

    @Override // io.activej.http.IWebSocket
    public Promise<Void> writeMessage(@Nullable IWebSocket.Message message) {
        if (CHECKS) {
            Reactive.checkInReactorThread(this);
        }
        return doWrite(() -> {
            return message == null ? this.frameOutput.accept((Object) null) : message.getType() == IWebSocket.Message.MessageType.TEXT ? this.frameOutput.accept(IWebSocket.Frame.text(ByteBuf.wrapForReading(message.getText().getBytes(StandardCharsets.UTF_8)))) : this.frameOutput.accept(IWebSocket.Frame.binary(message.getBuf()));
        }, message);
    }

    @Override // io.activej.http.IWebSocket
    public Promise<Void> writeFrame(@Nullable IWebSocket.Frame frame) {
        if (CHECKS) {
            Reactive.checkInReactorThread(this);
        }
        return doWrite(() -> {
            return this.frameOutput.accept(frame);
        }, frame);
    }

    @Override // io.activej.http.IWebSocket
    public HttpRequest getRequest() {
        return this.request;
    }

    @Override // io.activej.http.IWebSocket
    public HttpResponse getResponse() {
        return this.response;
    }

    protected void onClosed(Exception exc) {
        this.frameOutput.closeEx(exc);
        this.frameInput.closeEx(exc);
        this.readPromise = (SettablePromise) Utils.nullify(this.readPromise, (v0, v1) -> {
            v0.setException(v1);
        }, exc);
        this.writePromise = (SettablePromise) Utils.nullify(this.writePromise, (v0, v1) -> {
            v0.setException(v1);
        }, exc);
    }

    protected void onCleanup() {
        this.request.recycle();
        this.response.recycle();
    }

    private void protocolError(WebSocketException webSocketException, SettableCallback<?> settableCallback) {
        this.onProtocolError.accept(webSocketException);
        closeEx(webSocketException);
        settableCallback.setException(webSocketException);
    }

    private <T> Promise<T> doRead(AsyncSupplier<T> asyncSupplier) {
        Checks.checkState(this.readPromise == null, "Concurrent reads");
        if (isClosed()) {
            return Promise.ofException(getException());
        }
        SettablePromise<?> settablePromise = new SettablePromise<>();
        this.readPromise = settablePromise;
        asyncSupplier.get().subscribe((obj, exc) -> {
            this.readPromise = null;
            settablePromise.trySet(obj, exc);
        });
        return settablePromise;
    }

    private Promise<Void> doWrite(AsyncRunnable asyncRunnable, @Nullable Recyclable recyclable) {
        if (!$assertionsDisabled && !this.reactor.inReactorThread()) {
            throw new AssertionError();
        }
        Checks.checkState(this.writePromise == null, "Concurrent writes");
        if (isClosed()) {
            if (recyclable != null) {
                recyclable.recycle();
            }
            return Promise.ofException(getException());
        }
        SettablePromise<Void> settablePromise = new SettablePromise<>();
        this.writePromise = settablePromise;
        asyncRunnable.run().subscribe((r6, exc) -> {
            this.writePromise = null;
            settablePromise.trySet(r6, exc);
        });
        return settablePromise;
    }

    private ChannelSupplier<IWebSocket.Frame> sanitize(final ChannelSupplier<IWebSocket.Frame> channelSupplier) {
        return new AbstractChannelSupplier<IWebSocket.Frame>(channelSupplier) { // from class: io.activej.http.WebSocket.1
            protected Promise<IWebSocket.Frame> doGet() {
                return sanitize(channelSupplier.get());
            }

            protected void onClosed(Exception exc) {
                WebSocket.this.closeEx(exc);
            }
        };
    }

    private ChannelConsumer<IWebSocket.Frame> sanitize(final ChannelConsumer<IWebSocket.Frame> channelConsumer) {
        return new AbstractChannelConsumer<IWebSocket.Frame>(channelConsumer) { // from class: io.activej.http.WebSocket.2
            /* JADX INFO: Access modifiers changed from: protected */
            public Promise<Void> doAccept(@Nullable IWebSocket.Frame frame) {
                return sanitize(channelConsumer.accept(frame));
            }

            protected void onClosed(Exception exc) {
                WebSocket.this.closeEx(exc);
            }
        };
    }

    static {
        $assertionsDisabled = !WebSocket.class.desiredAssertionStatus();
        CHECKS = Checks.isEnabled(WebSocket.class);
    }
}
