package org.apache.beam.runners.apex.translation.operators;

import com.datatorrent.api.Context;
import com.datatorrent.api.DefaultOutputPort;
import com.datatorrent.api.InputOperator;
import com.datatorrent.api.annotation.OutputPortFieldAnnotation;
import com.datatorrent.common.util.BaseOperator;
import com.esotericsoftware.kryo.serializers.FieldSerializer;
import com.esotericsoftware.kryo.serializers.JavaSerializer;
import java.io.IOException;
import org.apache.beam.runners.apex.ApexPipelineOptions;
import org.apache.beam.runners.apex.repackaged.com.google.common.base.Throwables;
import org.apache.beam.runners.apex.translation.utils.ApexStreamTuple;
import org.apache.beam.runners.apex.translation.utils.SerializablePipelineOptions;
import org.apache.beam.runners.apex.translation.utils.ValuesSource;
import org.apache.beam.sdk.io.UnboundedSource;
import org.apache.beam.sdk.io.UnboundedSource.CheckpointMark;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.util.WindowedValue;
import org.joda.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/beam/runners/apex/translation/operators/ApexReadUnboundedInputOperator.class */
public class ApexReadUnboundedInputOperator<OutputT, CheckpointMarkT extends UnboundedSource.CheckpointMark> implements InputOperator {
    private static final Logger LOG = LoggerFactory.getLogger(ApexReadUnboundedInputOperator.class);
    private boolean traceTuples;
    private long outputWatermark;

    @FieldSerializer.Bind(JavaSerializer.class)
    private final SerializablePipelineOptions pipelineOptions;

    @FieldSerializer.Bind(JavaSerializer.class)
    private final UnboundedSource<OutputT, CheckpointMarkT> source;
    private final boolean isBoundedSource;
    private transient UnboundedSource.UnboundedReader<OutputT> reader;
    private transient boolean available;

    @OutputPortFieldAnnotation(optional = true)
    public final transient DefaultOutputPort<ApexStreamTuple<WindowedValue<OutputT>>> output;

    public ApexReadUnboundedInputOperator(UnboundedSource<OutputT, CheckpointMarkT> unboundedSource, ApexPipelineOptions apexPipelineOptions) {
        this.traceTuples = false;
        this.outputWatermark = 0L;
        this.available = false;
        this.output = new DefaultOutputPort<>();
        this.pipelineOptions = new SerializablePipelineOptions(apexPipelineOptions);
        this.source = unboundedSource;
        this.isBoundedSource = false;
    }

    public ApexReadUnboundedInputOperator(UnboundedSource<OutputT, CheckpointMarkT> unboundedSource, boolean z, ApexPipelineOptions apexPipelineOptions) {
        this.traceTuples = false;
        this.outputWatermark = 0L;
        this.available = false;
        this.output = new DefaultOutputPort<>();
        this.pipelineOptions = new SerializablePipelineOptions(apexPipelineOptions);
        this.source = unboundedSource;
        this.isBoundedSource = z;
    }

    private ApexReadUnboundedInputOperator() {
        this.traceTuples = false;
        this.outputWatermark = 0L;
        this.available = false;
        this.output = new DefaultOutputPort<>();
        this.pipelineOptions = null;
        this.source = null;
        this.isBoundedSource = false;
    }

    public void beginWindow(long j) {
        if (this.available || !(this.isBoundedSource || (this.source instanceof ValuesSource))) {
            emitWatermarkIfNecessary(this.reader.getWatermark().getMillis());
        } else {
            emitWatermarkIfNecessary(BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis());
        }
    }

    private void emitWatermarkIfNecessary(long j) {
        if (j > this.outputWatermark) {
            this.outputWatermark = j;
            if (this.traceTuples) {
                LOG.debug("\nemitting watermark {}\n", Long.valueOf(j));
            }
            this.output.emit(ApexStreamTuple.WatermarkTuple.of(j));
        }
    }

    public void endWindow() {
        if (this.outputWatermark >= BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis()) {
            if (this.traceTuples) {
                LOG.debug("terminating input after final watermark");
            }
            try {
                Thread.sleep(100L);
            } catch (InterruptedException e) {
            }
            BaseOperator.shutdown();
        }
    }

    public void setup(Context.OperatorContext operatorContext) {
        this.traceTuples = ApexStreamTuple.Logging.isDebugEnabled(this.pipelineOptions.get(), this);
        try {
            this.reader = this.source.createReader(this.pipelineOptions.get(), (UnboundedSource.CheckpointMark) null);
            this.available = this.reader.start();
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    public void teardown() {
        try {
            if (this.reader != null) {
                this.reader.close();
            }
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    public void emitTuples() {
        try {
            if (!this.available) {
                this.available = this.reader.advance();
            }
            if (this.available) {
                Object current = this.reader.getCurrent();
                Instant currentTimestamp = this.reader.getCurrentTimestamp();
                this.available = this.reader.advance();
                if (this.traceTuples) {
                    LOG.debug("\nemitting '{}' timestamp {}\n", current, currentTimestamp);
                }
                this.output.emit(ApexStreamTuple.DataTuple.of(WindowedValue.of(current, currentTimestamp, GlobalWindow.INSTANCE, PaneInfo.NO_FIRING)));
            }
        } catch (Exception e) {
            Throwables.propagateIfPossible(e);
            throw new RuntimeException(e);
        }
    }
}
