package io.activej.csp.net;

import io.activej.async.process.AbstractAsyncCloseable;
import io.activej.bytebuf.ByteBuf;
import io.activej.bytebuf.ByteBufs;
import io.activej.common.Checks;
import io.activej.common.exception.TruncatedDataException;
import io.activej.csp.binary.BinaryChannelSupplier;
import io.activej.csp.binary.codec.ByteBufsCodec;
import io.activej.csp.consumer.ChannelConsumer;
import io.activej.csp.consumer.ChannelConsumers;
import io.activej.csp.supplier.ChannelSupplier;
import io.activej.csp.supplier.ChannelSuppliers;
import io.activej.net.socket.tcp.ITcpSocket;
import io.activej.promise.Promise;
import io.activej.reactor.Reactive;
import java.util.Objects;

/* loaded from: input_file:io/activej/csp/net/Messaging.class */
public final class Messaging<I, O> extends AbstractAsyncCloseable implements IMessaging<I, O> {
    private static final boolean CHECKS = Checks.isEnabled(Messaging.class);
    private final ITcpSocket socket;
    private final ByteBufsCodec<I, O> codec;
    private final ByteBufs bufs = new ByteBufs();
    private final BinaryChannelSupplier bufsSupplier = BinaryChannelSupplier.ofProvidedBufs(this.bufs, () -> {
        return this.socket.read().whenResult(byteBuf -> {
            if (byteBuf == null) {
                throw new TruncatedDataException();
            }
            this.bufs.add(byteBuf);
        }).toVoid().whenException(this::closeEx);
    }, Promise::complete, this);
    private boolean readDone;
    private boolean writeDone;

    private Messaging(ITcpSocket iTcpSocket, ByteBufsCodec<I, O> byteBufsCodec) {
        this.socket = iTcpSocket;
        this.codec = byteBufsCodec;
    }

    public static <I, O> Messaging<I, O> create(ITcpSocket iTcpSocket, ByteBufsCodec<I, O> byteBufsCodec) {
        Messaging<I, O> messaging = new Messaging<>(iTcpSocket, byteBufsCodec);
        messaging.prefetch();
        return messaging;
    }

    private void prefetch() {
        if (this.bufs.isEmpty()) {
            this.socket.read().whenResult(byteBuf -> {
                if (byteBuf != null) {
                    this.bufs.add(byteBuf);
                } else {
                    this.readDone = true;
                    closeIfDone();
                }
            }).whenException(this::closeEx);
        }
    }

    @Override // io.activej.csp.net.IMessaging
    public Promise<I> receive() {
        if (CHECKS) {
            Reactive.checkInReactorThread(this);
        }
        BinaryChannelSupplier binaryChannelSupplier = this.bufsSupplier;
        ByteBufsCodec<I, O> byteBufsCodec = this.codec;
        Objects.requireNonNull(byteBufsCodec);
        return binaryChannelSupplier.decode(byteBufsCodec::tryDecode).whenResult(this::prefetch).whenException(this::closeEx);
    }

    @Override // io.activej.csp.net.IMessaging
    public Promise<Void> send(O o) {
        if (CHECKS) {
            Reactive.checkInReactorThread(this);
        }
        return this.socket.write(this.codec.encode(o));
    }

    @Override // io.activej.csp.net.IMessaging
    public Promise<Void> sendEndOfStream() {
        if (CHECKS) {
            Reactive.checkInReactorThread(this);
        }
        return this.socket.write((ByteBuf) null).whenResult(() -> {
            this.writeDone = true;
            closeIfDone();
        }).whenException(this::closeEx);
    }

    @Override // io.activej.csp.net.IMessaging
    public ChannelConsumer<ByteBuf> sendBinaryStream() {
        if (CHECKS) {
            Reactive.checkInReactorThread(this);
        }
        return ChannelConsumers.ofSocket(this.socket).withAcknowledgement(promise -> {
            return promise.whenResult(() -> {
                this.writeDone = true;
                closeIfDone();
            });
        });
    }

    @Override // io.activej.csp.net.IMessaging
    public ChannelSupplier<ByteBuf> receiveBinaryStream() {
        if (CHECKS) {
            Reactive.checkInReactorThread(this);
        }
        return ChannelSuppliers.concat(ChannelSuppliers.ofIterator(this.bufs.asIterator()), ChannelSuppliers.ofSocket(this.socket)).withEndOfStream(promise -> {
            return promise.whenResult(() -> {
                this.readDone = true;
                closeIfDone();
            });
        });
    }

    protected void onClosed(Exception exc) {
        this.socket.closeEx(exc);
        this.bufs.recycle();
    }

    private void closeIfDone() {
        if (this.readDone && this.writeDone) {
            close();
        }
    }

    public String toString() {
        return "Messaging{socket=" + this.socket + "}";
    }
}
