/*
 * Decompiled with CFR 0.152.
 */
package io.activej.csp.process.frames;

import io.activej.bytebuf.ByteBuf;
import io.activej.common.initializer.WithInitializer;
import io.activej.csp.ChannelConsumer;
import io.activej.csp.ChannelInput;
import io.activej.csp.ChannelOutput;
import io.activej.csp.ChannelSupplier;
import io.activej.csp.dsl.WithChannelTransformer;
import io.activej.csp.process.AbstractCommunicatingProcess;
import io.activej.csp.process.frames.BlockEncoder;
import io.activej.csp.process.frames.FrameFormat;
import org.jetbrains.annotations.NotNull;

public final class ChannelFrameEncoder
extends AbstractCommunicatingProcess
implements WithChannelTransformer<ChannelFrameEncoder, ByteBuf, ByteBuf>,
WithInitializer<ChannelFrameEncoder> {
    @NotNull
    private final BlockEncoder encoder;
    private boolean encoderResets;
    private ChannelSupplier<ByteBuf> input;
    private ChannelConsumer<ByteBuf> output;

    private ChannelFrameEncoder(@NotNull BlockEncoder encoder) {
        this.encoder = encoder;
    }

    public static ChannelFrameEncoder create(@NotNull FrameFormat format) {
        return ChannelFrameEncoder.create(format.createEncoder());
    }

    public static ChannelFrameEncoder create(@NotNull BlockEncoder encoder) {
        return new ChannelFrameEncoder(encoder);
    }

    public ChannelFrameEncoder withEncoderResets() {
        return this.withEncoderResets(true);
    }

    public ChannelFrameEncoder withEncoderResets(boolean encoderResets) {
        this.encoderResets = encoderResets;
        return this;
    }

    @Override
    public ChannelInput<ByteBuf> getInput() {
        return input -> {
            this.input = this.sanitize(input);
            if (this.input != null && this.output != null) {
                this.startProcess();
            }
            return this.getProcessCompletion();
        };
    }

    @Override
    public ChannelOutput<ByteBuf> getOutput() {
        return output -> {
            this.output = this.sanitize(output);
            if (this.input != null && this.output != null) {
                this.startProcess();
            }
        };
    }

    @Override
    protected void doProcess() {
        this.encodeBufs();
    }

    private void encodeBufs() {
        this.input.filter(ByteBuf::canRead).get().whenResult(buf -> {
            if (this.encoderResets) {
                this.encoder.reset();
            }
            if (buf != null) {
                ByteBuf outputBuf = this.encoder.encode((ByteBuf)buf);
                buf.recycle();
                this.output.accept(outputBuf).whenResult(this::encodeBufs);
            } else {
                this.output.acceptAll((ByteBuf[])new ByteBuf[]{this.encoder.encodeEndOfStreamBlock(), null}).whenResult(() -> this.completeProcess());
            }
        });
    }

    @Override
    protected void doClose(Exception e) {
        this.input.closeEx(e);
        this.output.closeEx(e);
    }
}

