package org.apache.beam.runners.spark.stateful;

import com.google.common.base.Stopwatch;
import com.google.common.collect.Iterators;
import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.concurrent.TimeUnit;
import org.apache.beam.runners.spark.coders.CoderHelpers;
import org.apache.beam.runners.spark.io.EmptyCheckpointMark;
import org.apache.beam.runners.spark.io.MicrobatchSource;
import org.apache.beam.runners.spark.translation.SparkRuntimeContext;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.io.BoundedSource;
import org.apache.beam.sdk.io.Source;
import org.apache.beam.sdk.io.UnboundedSource;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.spark.streaming.State;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Function3;
import scala.Option;
import scala.runtime.AbstractFunction3;

/* loaded from: input_file:org/apache/beam/runners/spark/stateful/StateSpecFunctions.class */
public class StateSpecFunctions {
    private static final Logger LOG = LoggerFactory.getLogger(StateSpecFunctions.class);

    /* loaded from: input_file:org/apache/beam/runners/spark/stateful/StateSpecFunctions$SerializableFunction3.class */
    private static abstract class SerializableFunction3<T1, T2, T3, T4> extends AbstractFunction3<T1, T2, T3, T4> implements Serializable {
        private SerializableFunction3() {
        }
    }

    public static <T, CheckpointMarkT extends UnboundedSource.CheckpointMark> Function3<Source<T>, Option<CheckpointMarkT>, State<byte[]>, Iterator<WindowedValue<T>>> mapSourceFunction(final SparkRuntimeContext sparkRuntimeContext) {
        return new SerializableFunction3<Source<T>, Option<CheckpointMarkT>, State<byte[]>, Iterator<WindowedValue<T>>>() { // from class: org.apache.beam.runners.spark.stateful.StateSpecFunctions.1
            {
                super();
            }

            public Iterator<WindowedValue<T>> apply(Source<T> source, Option<CheckpointMarkT> option, State<byte[]> state) {
                UnboundedSource.CheckpointMark checkpointMark;
                MicrobatchSource microbatchSource = (MicrobatchSource) source;
                Coder checkpointMarkCoder = microbatchSource.getCheckpointMarkCoder();
                if (state.exists()) {
                    checkpointMark = (UnboundedSource.CheckpointMark) CoderHelpers.fromByteArray((byte[]) state.get(), checkpointMarkCoder);
                    StateSpecFunctions.LOG.info("Continue reading from an existing CheckpointMark.");
                } else if (!option.isDefined() || ((UnboundedSource.CheckpointMark) option.get()).equals(EmptyCheckpointMark.get())) {
                    checkpointMark = null;
                    StateSpecFunctions.LOG.info("No CheckpointMark provided, start reading from default.");
                } else {
                    checkpointMark = (UnboundedSource.CheckpointMark) option.get();
                    StateSpecFunctions.LOG.info("Start reading from a provided CheckpointMark.");
                }
                try {
                    BoundedSource.BoundedReader<T> createReader = microbatchSource.createReader(SparkRuntimeContext.this.getPipelineOptions(), checkpointMark);
                    ArrayList arrayList = new ArrayList();
                    try {
                        Stopwatch createStarted = Stopwatch.createStarted();
                        for (boolean z = !createReader.start(); !z; z = !createReader.advance()) {
                            arrayList.add(WindowedValue.of(createReader.getCurrent(), createReader.getCurrentTimestamp(), GlobalWindow.INSTANCE, PaneInfo.NO_FIRING));
                        }
                        createReader.close();
                        StateSpecFunctions.LOG.info("Source id {} spent {} msec on reading.", microbatchSource.getId(), Long.valueOf(createStarted.stop().elapsed(TimeUnit.MILLISECONDS)));
                        UnboundedSource.CheckpointMark checkpointMark2 = ((MicrobatchSource.Reader) createReader).getCheckpointMark();
                        if (checkpointMark2 != null) {
                            state.update(CoderHelpers.toByteArray(checkpointMark2, checkpointMarkCoder));
                        } else {
                            StateSpecFunctions.LOG.info("Skipping checkpoint marking because the reader failed to supply one.");
                        }
                        return Iterators.unmodifiableIterator(arrayList.iterator());
                    } catch (IOException e) {
                        throw new RuntimeException("Failed to read from reader.", e);
                    }
                } catch (IOException e2) {
                    throw new RuntimeException(e2);
                }
            }
        };
    }
}
