package io.activej.csp.process;

import io.activej.common.Checks;
import io.activej.common.builder.AbstractBuilder;
import io.activej.common.recycle.Recyclers;
import io.activej.csp.ChannelInput;
import io.activej.csp.ChannelOutput;
import io.activej.csp.consumer.ChannelConsumer;
import io.activej.csp.dsl.WithChannelInput;
import io.activej.csp.dsl.WithChannelOutputs;
import io.activej.csp.supplier.ChannelSupplier;
import io.activej.promise.Promises;
import io.activej.reactor.Reactive;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.function.Function;
import java.util.function.UnaryOperator;

/* loaded from: input_file:io/activej/csp/process/ChannelSplitter.class */
public final class ChannelSplitter<T> extends AbstractCommunicatingProcess implements WithChannelInput<ChannelSplitter<T>, T>, WithChannelOutputs<T> {
    private ChannelSupplier<T> input;
    private final List<ChannelConsumer<T>> outputs = new ArrayList();
    private Function<T, T> splitFn = Function.identity();

    /* loaded from: input_file:io/activej/csp/process/ChannelSplitter$Builder.class */
    public final class Builder extends AbstractBuilder<ChannelSplitter<T>.Builder, ChannelSplitter<T>> {
        private Builder() {
        }

        public ChannelSplitter<T>.Builder withSplitFunction(UnaryOperator<T> unaryOperator) {
            checkNotBuilt(this);
            ChannelSplitter.this.splitFn = unaryOperator;
            return this;
        }

        /* JADX INFO: Access modifiers changed from: protected */
        /* renamed from: doBuild, reason: merged with bridge method [inline-methods] */
        public ChannelSplitter<T> m8doBuild() {
            return ChannelSplitter.this;
        }
    }

    private ChannelSplitter() {
    }

    public static <T> ChannelSplitter<T> create() {
        return (ChannelSplitter) builder().build();
    }

    public static <T> ChannelSplitter<T> create(ChannelSupplier<T> channelSupplier) {
        return create().withInput(channelSupplier);
    }

    public static <T> ChannelSplitter<T>.Builder builder() {
        return new Builder();
    }

    public static <T> ChannelSplitter<T> builder(ChannelSupplier<T> channelSupplier) {
        return new ChannelSplitter().withInput(channelSupplier);
    }

    public boolean hasOutputs() {
        return !this.outputs.isEmpty();
    }

    @Override // io.activej.csp.dsl.HasChannelInput
    /* renamed from: getInput */
    public ChannelInput<T> getInput2() {
        return channelSupplier -> {
            Checks.checkState(!isProcessStarted(), "Can't configure splitter while it is running");
            this.input = sanitize(channelSupplier);
            tryStart();
            return getProcessCompletion();
        };
    }

    @Override // io.activej.csp.dsl.WithChannelOutputs
    public ChannelOutput<T> addOutput() {
        int size = this.outputs.size();
        this.outputs.add(null);
        return channelConsumer -> {
            Reactive.checkInReactorThread(this);
            this.outputs.set(size, sanitize(channelConsumer));
            tryStart();
        };
    }

    private void tryStart() {
        if (this.input == null || !this.outputs.stream().allMatch((v0) -> {
            return Objects.nonNull(v0);
        })) {
            return;
        }
        this.reactor.post(this::startProcess);
    }

    @Override // io.activej.csp.process.AbstractCommunicatingProcess
    protected void beforeProcess() {
        Checks.checkState(this.input != null, "No splitter input");
        Checks.checkState(!this.outputs.isEmpty(), "No splitter outputs");
    }

    @Override // io.activej.csp.process.AbstractCommunicatingProcess
    protected void doProcess() {
        if (isProcessComplete()) {
            return;
        }
        this.input.get().subscribe((obj, exc) -> {
            if (exc != null) {
                closeEx(exc);
            } else if (obj == null) {
                Promises.all(this.outputs.stream().map((v0) -> {
                    return v0.acceptEndOfStream();
                })).subscribe((r4, exc) -> {
                    completeProcessEx(exc);
                });
            } else {
                Promises.all(this.outputs.stream().map(channelConsumer -> {
                    return channelConsumer.accept(this.splitFn.apply(obj));
                })).subscribe((r42, exc2) -> {
                    if (exc2 == null) {
                        doProcess();
                    } else {
                        closeEx(exc2);
                    }
                });
                Recyclers.recycle(obj);
            }
        });
    }

    @Override // io.activej.csp.process.AbstractCommunicatingProcess
    protected void doClose(Exception exc) {
        this.input.closeEx(exc);
        this.outputs.forEach(channelConsumer -> {
            channelConsumer.closeEx(exc);
        });
    }
}
