/*
 * Decompiled with CFR 0.152.
 */
package io.activej.csp.net;

import io.activej.async.process.AbstractAsyncCloseable;
import io.activej.bytebuf.ByteBuf;
import io.activej.bytebuf.ByteBufs;
import io.activej.common.exception.TruncatedDataException;
import io.activej.common.initializer.WithInitializer;
import io.activej.csp.ChannelConsumer;
import io.activej.csp.ChannelSupplier;
import io.activej.csp.ChannelSuppliers;
import io.activej.csp.binary.BinaryChannelSupplier;
import io.activej.csp.binary.ByteBufsCodec;
import io.activej.csp.net.Messaging;
import io.activej.net.socket.tcp.AsyncTcpSocket;
import io.activej.promise.Promise;
import org.jetbrains.annotations.NotNull;

public final class MessagingWithBinaryStreaming<I, O>
extends AbstractAsyncCloseable
implements Messaging<I, O>,
WithInitializer<MessagingWithBinaryStreaming<I, O>> {
    private final AsyncTcpSocket socket;
    private final ByteBufsCodec<I, O> codec;
    private final ByteBufs bufs = new ByteBufs();
    private final BinaryChannelSupplier bufsSupplier;
    private boolean readDone;
    private boolean writeDone;

    private MessagingWithBinaryStreaming(AsyncTcpSocket socket, ByteBufsCodec<I, O> codec) {
        this.socket = socket;
        this.codec = codec;
        this.bufsSupplier = BinaryChannelSupplier.ofProvidedBufs(this.bufs, () -> this.socket.read().whenResult(buf -> {
            if (buf == null) {
                throw new TruncatedDataException();
            }
            this.bufs.add(buf);
        }).toVoid().whenException(arg_0 -> ((MessagingWithBinaryStreaming)this).closeEx(arg_0)), Promise::complete, this);
    }

    public static <I, O> MessagingWithBinaryStreaming<I, O> create(AsyncTcpSocket socket, ByteBufsCodec<I, O> serializer) {
        MessagingWithBinaryStreaming<I, O> messaging = new MessagingWithBinaryStreaming<I, O>(socket, serializer);
        super.prefetch();
        return messaging;
    }

    private void prefetch() {
        if (this.bufs.isEmpty()) {
            this.socket.read().whenResult(buf -> {
                if (buf != null) {
                    this.bufs.add(buf);
                } else {
                    this.readDone = true;
                    this.closeIfDone();
                }
            }).whenException(arg_0 -> ((MessagingWithBinaryStreaming)this).closeEx(arg_0));
        }
    }

    @Override
    public Promise<I> receive() {
        return this.bufsSupplier.decode(this.codec::tryDecode).whenResult(this::prefetch).whenException(arg_0 -> ((MessagingWithBinaryStreaming)this).closeEx(arg_0));
    }

    @Override
    public Promise<Void> send(O msg) {
        return this.socket.write(this.codec.encode(msg));
    }

    @Override
    public Promise<Void> sendEndOfStream() {
        return this.socket.write(null).whenResult(() -> {
            this.writeDone = true;
            this.closeIfDone();
        }).whenException(arg_0 -> ((MessagingWithBinaryStreaming)this).closeEx(arg_0));
    }

    @Override
    public ChannelConsumer<ByteBuf> sendBinaryStream() {
        return ChannelConsumer.ofSocket(this.socket).withAcknowledgement(ack -> ack.whenResult(() -> {
            this.writeDone = true;
            this.closeIfDone();
        }));
    }

    @Override
    public ChannelSupplier<ByteBuf> receiveBinaryStream() {
        return ChannelSuppliers.concat(ChannelSupplier.ofIterator(this.bufs.asIterator()), ChannelSupplier.ofSocket(this.socket)).withEndOfStream(eos -> eos.whenResult(() -> {
            this.readDone = true;
            this.closeIfDone();
        }));
    }

    protected void onClosed(@NotNull Exception e) {
        this.socket.closeEx(e);
        this.bufs.recycle();
    }

    private void closeIfDone() {
        if (this.readDone && this.writeDone) {
            this.close();
        }
    }

    public String toString() {
        return "MessagingWithBinaryStreaming{socket=" + this.socket + "}";
    }
}

