package io.activej.csp.process.frames;

import io.activej.bytebuf.ByteBuf;
import io.activej.bytebuf.ByteBufs;
import io.activej.common.exception.MalformedDataException;
import io.activej.common.exception.TruncatedDataException;
import io.activej.common.initializer.WithInitializer;
import io.activej.csp.ChannelConsumer;
import io.activej.csp.ChannelInput;
import io.activej.csp.ChannelOutput;
import io.activej.csp.binary.BinaryChannelSupplier;
import io.activej.csp.dsl.WithBinaryChannelInput;
import io.activej.csp.dsl.WithChannelTransformer;
import io.activej.csp.process.AbstractCommunicatingProcess;
import io.activej.promise.Promise;
import org.jetbrains.annotations.NotNull;

/* loaded from: input_file:io/activej/csp/process/frames/ChannelFrameDecoder.class */
public final class ChannelFrameDecoder extends AbstractCommunicatingProcess implements WithChannelTransformer<ChannelFrameDecoder, ByteBuf, ByteBuf>, WithBinaryChannelInput<ChannelFrameDecoder>, WithInitializer<ChannelFrameDecoder> {

    @NotNull
    private final BlockDecoder decoder;
    private boolean decoderResets;
    private ByteBufs bufs;
    private BinaryChannelSupplier input;
    private ChannelConsumer<ByteBuf> output;

    private ChannelFrameDecoder(@NotNull BlockDecoder blockDecoder) {
        this.decoder = blockDecoder;
    }

    public static ChannelFrameDecoder create(@NotNull FrameFormat frameFormat) {
        return create(frameFormat.createDecoder());
    }

    public static ChannelFrameDecoder create(@NotNull BlockDecoder blockDecoder) {
        return new ChannelFrameDecoder(blockDecoder);
    }

    public ChannelFrameDecoder withDecoderResets() {
        return withDecoderResets(true);
    }

    public ChannelFrameDecoder withDecoderResets(boolean z) {
        this.decoderResets = z;
        return this;
    }

    @Override // io.activej.csp.dsl.HasChannelInput
    /* renamed from: getInput */
    public ChannelInput<ByteBuf> getInput2() {
        return binaryChannelSupplier -> {
            this.input = binaryChannelSupplier;
            this.bufs = binaryChannelSupplier.getBufs();
            if (this.input != null && this.output != null) {
                startProcess();
            }
            return getProcessCompletion();
        };
    }

    @Override // io.activej.csp.dsl.HasChannelOutput
    public ChannelOutput<ByteBuf> getOutput() {
        return channelConsumer -> {
            this.output = sanitize(channelConsumer);
            if (this.input == null || this.output == null) {
                return;
            }
            startProcess();
        };
    }

    @Override // io.activej.csp.process.AbstractCommunicatingProcess
    protected void doProcess() {
        decode().run((byteBuf, exc) -> {
            if (!(exc instanceof TruncatedDataException)) {
                doSanitize(byteBuf, exc).whenResult(byteBuf -> {
                    if (byteBuf != BlockDecoder.END_OF_STREAM) {
                        this.output.accept(byteBuf).whenResult(this::doProcess);
                    } else {
                        this.input.endOfStream().then((obj, exc) -> {
                            return this.doSanitize(obj, exc);
                        }).then(() -> {
                            return this.output.acceptEndOfStream();
                        }).whenResult(() -> {
                            this.completeProcess();
                        });
                    }
                });
                return;
            }
            if (!this.bufs.isEmpty()) {
                closeEx(new TruncatedBlockException(exc));
            } else if (this.decoder.ignoreMissingEndOfStreamBlock()) {
                this.output.acceptEndOfStream().whenResult(() -> {
                    this.completeProcess();
                });
            } else {
                closeEx(new MissingEndOfStreamBlockException(exc));
            }
        });
    }

    @NotNull
    private Promise<ByteBuf> decode() {
        Promise<Void> needMoreData;
        do {
            if (!this.bufs.isEmpty()) {
                try {
                    ByteBuf decode = this.decoder.decode(this.bufs);
                    if (decode != null) {
                        if (this.decoderResets) {
                            this.decoder.reset();
                        }
                        return Promise.of(decode);
                    }
                } catch (MalformedDataException e) {
                    closeEx(e);
                    return Promise.ofException(e);
                }
            }
            needMoreData = this.input.needMoreData();
        } while (needMoreData.isResult());
        return needMoreData.then(this::decode);
    }

    @Override // io.activej.csp.process.AbstractCommunicatingProcess
    protected void doClose(Exception exc) {
        this.input.closeEx(exc);
        this.output.closeEx(exc);
    }
}
