package org.apache.beam.runners.apex.translation.operators;

import com.datatorrent.api.Context;
import com.datatorrent.api.DefaultInputPort;
import com.datatorrent.api.DefaultOutputPort;
import com.datatorrent.api.Operator;
import com.datatorrent.api.annotation.OutputPortFieldAnnotation;
import com.esotericsoftware.kryo.serializers.FieldSerializer;
import com.esotericsoftware.kryo.serializers.JavaSerializer;
import java.util.Collection;
import java.util.Collections;
import org.apache.beam.runners.apex.ApexPipelineOptions;
import org.apache.beam.runners.apex.repackaged.com.google.common.base.Preconditions;
import org.apache.beam.runners.apex.repackaged.com.google.common.base.Throwables;
import org.apache.beam.runners.apex.translation.operators.ApexTimerInternals;
import org.apache.beam.runners.apex.translation.utils.ApexStateInternals;
import org.apache.beam.runners.apex.translation.utils.ApexStreamTuple;
import org.apache.beam.runners.apex.translation.utils.SerializablePipelineOptions;
import org.apache.beam.runners.core.NullSideInputReader;
import org.apache.beam.runners.core.OutputWindowedValue;
import org.apache.beam.runners.core.ReduceFnRunner;
import org.apache.beam.runners.core.StateInternalsFactory;
import org.apache.beam.runners.core.SystemReduceFn;
import org.apache.beam.runners.core.TimerInternals;
import org.apache.beam.runners.core.construction.TriggerTranslation;
import org.apache.beam.runners.core.triggers.ExecutableTriggerStateMachine;
import org.apache.beam.runners.core.triggers.TriggerStateMachines;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.state.TimeDomain;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.WindowingStrategy;
import org.joda.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.class */
public class ApexGroupByKeyOperator<K, V> implements Operator, ApexTimerInternals.TimerProcessor<K> {
    private static final Logger LOG = LoggerFactory.getLogger(ApexGroupByKeyOperator.class);
    private boolean traceTuples;

    @FieldSerializer.Bind(JavaSerializer.class)
    private WindowingStrategy<V, BoundedWindow> windowingStrategy;

    @FieldSerializer.Bind(JavaSerializer.class)
    private Coder<K> keyCoder;

    @FieldSerializer.Bind(JavaSerializer.class)
    private Coder<V> valueCoder;

    @FieldSerializer.Bind(JavaSerializer.class)
    private final SerializablePipelineOptions serializedOptions;

    @FieldSerializer.Bind(JavaSerializer.class)
    private final StateInternalsFactory<K> stateInternalsFactory;
    private final ApexTimerInternals<K> timerInternals;
    private Instant inputWatermark;
    public final transient DefaultInputPort<ApexStreamTuple<WindowedValue<KV<K, V>>>> input;

    @OutputPortFieldAnnotation(optional = true)
    public final transient DefaultOutputPort<ApexStreamTuple<WindowedValue<KV<K, Iterable<V>>>>> output;

    public ApexGroupByKeyOperator(ApexPipelineOptions apexPipelineOptions, PCollection<KV<K, V>> pCollection, ApexStateInternals.ApexStateBackend apexStateBackend) {
        this.traceTuples = true;
        this.inputWatermark = BoundedWindow.TIMESTAMP_MIN_VALUE;
        this.input = new DefaultInputPort<ApexStreamTuple<WindowedValue<KV<K, V>>>>() { // from class: org.apache.beam.runners.apex.translation.operators.ApexGroupByKeyOperator.1
            public void process(ApexStreamTuple<WindowedValue<KV<K, V>>> apexStreamTuple) {
                try {
                    if (!(apexStreamTuple instanceof ApexStreamTuple.WatermarkTuple)) {
                        if (ApexGroupByKeyOperator.this.traceTuples) {
                            ApexGroupByKeyOperator.LOG.debug("\ninput {}\n", apexStreamTuple.getValue());
                        }
                        ApexGroupByKeyOperator.this.processElement(apexStreamTuple.getValue());
                    } else {
                        ApexStreamTuple.WatermarkTuple watermarkTuple = (ApexStreamTuple.WatermarkTuple) apexStreamTuple;
                        ApexGroupByKeyOperator.this.processWatermark(watermarkTuple);
                        if (ApexGroupByKeyOperator.this.traceTuples) {
                            ApexGroupByKeyOperator.LOG.debug("\nemitting watermark {}\n", Long.valueOf(watermarkTuple.getTimestamp()));
                        }
                        ApexGroupByKeyOperator.this.output.emit(ApexStreamTuple.WatermarkTuple.of(watermarkTuple.getTimestamp()));
                    }
                } catch (Exception e) {
                    Throwables.throwIfUnchecked(e);
                    throw new RuntimeException(e);
                }
            }
        };
        this.output = new DefaultOutputPort<>();
        Preconditions.checkNotNull(apexPipelineOptions);
        this.serializedOptions = new SerializablePipelineOptions(apexPipelineOptions);
        this.windowingStrategy = pCollection.getWindowingStrategy();
        this.keyCoder = pCollection.getCoder().getKeyCoder();
        this.valueCoder = pCollection.getCoder().getValueCoder();
        this.stateInternalsFactory = apexStateBackend.newStateInternalsFactory(this.keyCoder);
        this.timerInternals = new ApexTimerInternals<>(TimerInternals.TimerDataCoder.of(this.windowingStrategy.getWindowFn().windowCoder()));
    }

    private ApexGroupByKeyOperator() {
        this.traceTuples = true;
        this.inputWatermark = BoundedWindow.TIMESTAMP_MIN_VALUE;
        this.input = new DefaultInputPort<ApexStreamTuple<WindowedValue<KV<K, V>>>>() { // from class: org.apache.beam.runners.apex.translation.operators.ApexGroupByKeyOperator.1
            public void process(ApexStreamTuple<WindowedValue<KV<K, V>>> apexStreamTuple) {
                try {
                    if (!(apexStreamTuple instanceof ApexStreamTuple.WatermarkTuple)) {
                        if (ApexGroupByKeyOperator.this.traceTuples) {
                            ApexGroupByKeyOperator.LOG.debug("\ninput {}\n", apexStreamTuple.getValue());
                        }
                        ApexGroupByKeyOperator.this.processElement(apexStreamTuple.getValue());
                    } else {
                        ApexStreamTuple.WatermarkTuple watermarkTuple = (ApexStreamTuple.WatermarkTuple) apexStreamTuple;
                        ApexGroupByKeyOperator.this.processWatermark(watermarkTuple);
                        if (ApexGroupByKeyOperator.this.traceTuples) {
                            ApexGroupByKeyOperator.LOG.debug("\nemitting watermark {}\n", Long.valueOf(watermarkTuple.getTimestamp()));
                        }
                        ApexGroupByKeyOperator.this.output.emit(ApexStreamTuple.WatermarkTuple.of(watermarkTuple.getTimestamp()));
                    }
                } catch (Exception e) {
                    Throwables.throwIfUnchecked(e);
                    throw new RuntimeException(e);
                }
            }
        };
        this.output = new DefaultOutputPort<>();
        this.serializedOptions = null;
        this.stateInternalsFactory = null;
        this.timerInternals = null;
    }

    public void beginWindow(long j) {
    }

    public void endWindow() {
        this.timerInternals.fireReadyTimers(this.timerInternals.currentProcessingTime().getMillis(), this, TimeDomain.PROCESSING_TIME);
    }

    public void setup(Context.OperatorContext operatorContext) {
        this.traceTuples = ApexStreamTuple.Logging.isDebugEnabled(this.serializedOptions.get(), this);
    }

    public void teardown() {
    }

    private ReduceFnRunner<K, V, Iterable<V>, BoundedWindow> newReduceFnRunner(K k) {
        return new ReduceFnRunner<>(k, this.windowingStrategy, ExecutableTriggerStateMachine.create(TriggerStateMachines.stateMachineForTrigger(TriggerTranslation.toProto(this.windowingStrategy.getTrigger()))), this.stateInternalsFactory.stateInternalsForKey(k), this.timerInternals, new OutputWindowedValue<KV<K, Iterable<V>>>() { // from class: org.apache.beam.runners.apex.translation.operators.ApexGroupByKeyOperator.2
            public void outputWindowedValue(KV<K, Iterable<V>> kv, Instant instant, Collection<? extends BoundedWindow> collection, PaneInfo paneInfo) {
                if (ApexGroupByKeyOperator.this.traceTuples) {
                    ApexGroupByKeyOperator.LOG.debug("\nemitting {} timestamp {}\n", kv, instant);
                }
                ApexGroupByKeyOperator.this.output.emit(ApexStreamTuple.DataTuple.of(WindowedValue.of(kv, instant, collection, paneInfo)));
            }

            public <AdditionalOutputT> void outputWindowedValue(TupleTag<AdditionalOutputT> tupleTag, AdditionalOutputT additionaloutputt, Instant instant, Collection<? extends BoundedWindow> collection, PaneInfo paneInfo) {
                throw new UnsupportedOperationException("GroupAlsoByWindow should not use side outputs");
            }

            public /* bridge */ /* synthetic */ void outputWindowedValue(Object obj, Instant instant, Collection collection, PaneInfo paneInfo) {
                outputWindowedValue((KV) obj, instant, (Collection<? extends BoundedWindow>) collection, paneInfo);
            }
        }, NullSideInputReader.empty(), SystemReduceFn.buffering(this.valueCoder), this.serializedOptions.get());
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* JADX WARN: Multi-variable type inference failed */
    public void processElement(WindowedValue<KV<K, V>> windowedValue) throws Exception {
        KV kv = (KV) windowedValue.getValue();
        WindowedValue of = WindowedValue.of(kv.getValue(), windowedValue.getTimestamp(), windowedValue.getWindows(), windowedValue.getPane());
        this.timerInternals.setContext(kv.getKey(), this.keyCoder, this.inputWatermark, null);
        ReduceFnRunner newReduceFnRunner = newReduceFnRunner(kv.getKey());
        newReduceFnRunner.processElements(Collections.singletonList(of));
        newReduceFnRunner.persist();
    }

    @Override // org.apache.beam.runners.apex.translation.operators.ApexTimerInternals.TimerProcessor
    public void fireTimer(K k, Collection<TimerInternals.TimerData> collection) {
        this.timerInternals.setContext(k, this.keyCoder, this.inputWatermark, null);
        ReduceFnRunner<K, V, Iterable<V>, BoundedWindow> newReduceFnRunner = newReduceFnRunner(k);
        try {
            newReduceFnRunner.onTimers(collection);
            newReduceFnRunner.persist();
        } catch (Exception e) {
            Throwables.throwIfUnchecked(e);
            throw new RuntimeException(e);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void processWatermark(ApexStreamTuple.WatermarkTuple<?> watermarkTuple) {
        this.inputWatermark = new Instant(watermarkTuple.getTimestamp());
        this.timerInternals.fireReadyTimers(this.inputWatermark.getMillis(), this, TimeDomain.EVENT_TIME);
    }
}
