package org.apache.beam.runners.apex.translation.operators;

import com.datatorrent.api.DefaultInputPort;
import com.datatorrent.api.DefaultOutputPort;
import com.datatorrent.api.annotation.OutputPortFieldAnnotation;
import com.datatorrent.common.util.BaseOperator;
import com.esotericsoftware.kryo.serializers.FieldSerializer;
import com.esotericsoftware.kryo.serializers.JavaSerializer;
import java.io.Serializable;
import java.util.Collections;
import java.util.Iterator;
import org.apache.beam.runners.apex.ApexPipelineOptions;
import org.apache.beam.runners.apex.repackaged.com.google.common.base.Throwables;
import org.apache.beam.runners.apex.repackaged.com.google.common.collect.Iterables;
import org.apache.beam.runners.apex.translation.utils.ApexStreamTuple;
import org.apache.beam.runners.core.KeyedWorkItems;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.WindowFn;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.KV;
import org.joda.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/beam/runners/apex/translation/operators/ApexProcessFnOperator.class */
public class ApexProcessFnOperator<InputT> extends BaseOperator {
    private static final Logger LOG = LoggerFactory.getLogger(ApexProcessFnOperator.class);
    private boolean traceTuples;

    @FieldSerializer.Bind(JavaSerializer.class)
    private final ApexOperatorFn<InputT> fn;
    private final transient OutputEmitter<ApexStreamTuple<? extends WindowedValue<?>>> outputEmitter;
    public final transient DefaultInputPort<ApexStreamTuple<WindowedValue<InputT>>> inputPort;

    @OutputPortFieldAnnotation(optional = true)
    public final transient DefaultOutputPort<ApexStreamTuple<? extends WindowedValue<?>>> outputPort;

    /* loaded from: input_file:org/apache/beam/runners/apex/translation/operators/ApexProcessFnOperator$ApexOperatorFn.class */
    public interface ApexOperatorFn<InputT> extends Serializable {
        void process(ApexStreamTuple<WindowedValue<InputT>> apexStreamTuple, OutputEmitter<ApexStreamTuple<? extends WindowedValue<?>>> outputEmitter) throws Exception;
    }

    /* loaded from: input_file:org/apache/beam/runners/apex/translation/operators/ApexProcessFnOperator$AssignWindows.class */
    private static class AssignWindows<T, W extends BoundedWindow> implements ApexOperatorFn<T> {
        private final WindowFn<T, W> windowFn;

        private AssignWindows(WindowFn<T, W> windowFn) {
            this.windowFn = windowFn;
        }

        @Override // org.apache.beam.runners.apex.translation.operators.ApexProcessFnOperator.ApexOperatorFn
        public final void process(ApexStreamTuple<WindowedValue<T>> apexStreamTuple, OutputEmitter<ApexStreamTuple<? extends WindowedValue<?>>> outputEmitter) throws Exception {
            if (apexStreamTuple instanceof ApexStreamTuple.WatermarkTuple) {
                outputEmitter.emit(apexStreamTuple);
                return;
            }
            WindowedValue<T> value = apexStreamTuple.getValue();
            WindowFn<T, W> windowFn = this.windowFn;
            WindowFn<T, W> windowFn2 = this.windowFn;
            windowFn2.getClass();
            Iterator it = windowFn.assignWindows(new WindowFn<T, W>.AssignContext(windowFn2, value) { // from class: org.apache.beam.runners.apex.translation.operators.ApexProcessFnOperator.AssignWindows.1
                final /* synthetic */ WindowedValue val$input;

                /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
                {
                    super(windowFn2);
                    this.val$input = value;
                    windowFn2.getClass();
                }

                public T element() {
                    return (T) this.val$input.getValue();
                }

                public Instant timestamp() {
                    return this.val$input.getTimestamp();
                }

                public BoundedWindow window() {
                    return (BoundedWindow) Iterables.getOnlyElement(this.val$input.getWindows());
                }
            }).iterator();
            while (it.hasNext()) {
                outputEmitter.emit(ApexStreamTuple.DataTuple.of(WindowedValue.of(value.getValue(), value.getTimestamp(), (BoundedWindow) it.next(), value.getPane())));
            }
        }
    }

    /* loaded from: input_file:org/apache/beam/runners/apex/translation/operators/ApexProcessFnOperator$OutputEmitter.class */
    public interface OutputEmitter<T> {
        void emit(T t);
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/runners/apex/translation/operators/ApexProcessFnOperator$ToKeyedWorkItems.class */
    public static class ToKeyedWorkItems<K, V> implements ApexOperatorFn<KV<K, V>> {
        private ToKeyedWorkItems() {
        }

        @Override // org.apache.beam.runners.apex.translation.operators.ApexProcessFnOperator.ApexOperatorFn
        public final void process(ApexStreamTuple<WindowedValue<KV<K, V>>> apexStreamTuple, OutputEmitter<ApexStreamTuple<? extends WindowedValue<?>>> outputEmitter) {
            if (apexStreamTuple instanceof ApexStreamTuple.WatermarkTuple) {
                outputEmitter.emit(apexStreamTuple);
                return;
            }
            for (WindowedValue windowedValue : apexStreamTuple.getValue().explodeWindows()) {
                outputEmitter.emit(ApexStreamTuple.DataTuple.of(windowedValue.withValue(KeyedWorkItems.elementsWorkItem(((KV) windowedValue.getValue()).getKey(), Collections.singletonList(windowedValue.withValue(((KV) windowedValue.getValue()).getValue()))))));
            }
        }
    }

    public ApexProcessFnOperator(ApexOperatorFn<InputT> apexOperatorFn, boolean z) {
        this.traceTuples = false;
        this.outputEmitter = new OutputEmitter<ApexStreamTuple<? extends WindowedValue<?>>>() { // from class: org.apache.beam.runners.apex.translation.operators.ApexProcessFnOperator.1
            @Override // org.apache.beam.runners.apex.translation.operators.ApexProcessFnOperator.OutputEmitter
            public void emit(ApexStreamTuple<? extends WindowedValue<?>> apexStreamTuple) {
                if (ApexProcessFnOperator.this.traceTuples) {
                    ApexProcessFnOperator.LOG.debug("\nemitting {}\n", apexStreamTuple);
                }
                ApexProcessFnOperator.this.outputPort.emit(apexStreamTuple);
            }
        };
        this.inputPort = new DefaultInputPort<ApexStreamTuple<WindowedValue<InputT>>>() { // from class: org.apache.beam.runners.apex.translation.operators.ApexProcessFnOperator.2
            public void process(ApexStreamTuple<WindowedValue<InputT>> apexStreamTuple) {
                try {
                    ApexProcessFnOperator.this.fn.process(apexStreamTuple, ApexProcessFnOperator.this.outputEmitter);
                } catch (Exception e) {
                    Throwables.throwIfUnchecked(e);
                    throw new RuntimeException(e);
                }
            }
        };
        this.outputPort = new DefaultOutputPort<>();
        this.traceTuples = z;
        this.fn = apexOperatorFn;
    }

    private ApexProcessFnOperator() {
        this.traceTuples = false;
        this.outputEmitter = new OutputEmitter<ApexStreamTuple<? extends WindowedValue<?>>>() { // from class: org.apache.beam.runners.apex.translation.operators.ApexProcessFnOperator.1
            @Override // org.apache.beam.runners.apex.translation.operators.ApexProcessFnOperator.OutputEmitter
            public void emit(ApexStreamTuple<? extends WindowedValue<?>> apexStreamTuple) {
                if (ApexProcessFnOperator.this.traceTuples) {
                    ApexProcessFnOperator.LOG.debug("\nemitting {}\n", apexStreamTuple);
                }
                ApexProcessFnOperator.this.outputPort.emit(apexStreamTuple);
            }
        };
        this.inputPort = new DefaultInputPort<ApexStreamTuple<WindowedValue<InputT>>>() { // from class: org.apache.beam.runners.apex.translation.operators.ApexProcessFnOperator.2
            public void process(ApexStreamTuple<WindowedValue<InputT>> apexStreamTuple) {
                try {
                    ApexProcessFnOperator.this.fn.process(apexStreamTuple, ApexProcessFnOperator.this.outputEmitter);
                } catch (Exception e) {
                    Throwables.throwIfUnchecked(e);
                    throw new RuntimeException(e);
                }
            }
        };
        this.outputPort = new DefaultOutputPort<>();
        this.fn = null;
    }

    public static <K, V> ApexProcessFnOperator<KV<K, V>> toKeyedWorkItems(ApexPipelineOptions apexPipelineOptions) {
        return new ApexProcessFnOperator<>(new ToKeyedWorkItems(), apexPipelineOptions.isTupleTracingEnabled());
    }

    public static <T, W extends BoundedWindow> ApexProcessFnOperator<T> assignWindows(WindowFn<T, W> windowFn, ApexPipelineOptions apexPipelineOptions) {
        return new ApexProcessFnOperator<>(new AssignWindows(windowFn), apexPipelineOptions.isTupleTracingEnabled());
    }
}
