package org.apache.beam.runners.direct;

import java.util.Collection;
import java.util.EnumSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import javax.annotation.Nullable;
import org.apache.beam.runners.core.TimerInternals;
import org.apache.beam.runners.direct.AggregatorContainer;
import org.apache.beam.runners.direct.CommittedResult;
import org.apache.beam.runners.direct.DirectRunner;
import org.apache.beam.runners.direct.WatermarkManager;
import org.apache.beam.runners.direct.repackaged.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.runners.direct.repackaged.com.google.common.base.Preconditions;
import org.apache.beam.runners.direct.repackaged.com.google.common.collect.ImmutableList;
import org.apache.beam.runners.direct.repackaged.com.google.common.collect.Iterables;
import org.apache.beam.runners.direct.repackaged.com.google.common.util.concurrent.MoreExecutors;
import org.apache.beam.sdk.transforms.AppliedPTransform;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.ReadyCheckingSideInputReader;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.util.WindowingStrategy;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.PValue;
import org.apache.beam.sdk.values.TaggedPValue;
import org.joda.time.Instant;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/apache/beam/runners/direct/EvaluationContext.class */
public class EvaluationContext {
    private final DirectGraph graph;
    private final DirectOptions options;
    private final Clock clock;
    private final BundleFactory bundleFactory;
    private final WatermarkManager watermarkManager;
    private final SideInputContainer sideInputContainer;
    private final Set<PValue> keyedPValues;
    private final ConcurrentMap<StepAndKey, CopyOnAccessInMemoryStateInternals<?>> applicationStateInternals = new ConcurrentHashMap();
    private final AggregatorContainer mergedAggregators = AggregatorContainer.create();
    private final DirectMetrics metrics = new DirectMetrics();
    private final WatermarkCallbackExecutor callbackExecutor = WatermarkCallbackExecutor.create(MoreExecutors.directExecutor());

    public static EvaluationContext create(DirectOptions directOptions, Clock clock, BundleFactory bundleFactory, DirectGraph directGraph, Set<PValue> set) {
        return new EvaluationContext(directOptions, clock, bundleFactory, directGraph, set);
    }

    private EvaluationContext(DirectOptions directOptions, Clock clock, BundleFactory bundleFactory, DirectGraph directGraph, Set<PValue> set) {
        this.options = (DirectOptions) Preconditions.checkNotNull(directOptions);
        this.clock = clock;
        this.bundleFactory = (BundleFactory) Preconditions.checkNotNull(bundleFactory);
        this.graph = (DirectGraph) Preconditions.checkNotNull(directGraph);
        this.keyedPValues = set;
        this.watermarkManager = WatermarkManager.create(clock, directGraph);
        this.sideInputContainer = SideInputContainer.create(this, directGraph.getViews());
    }

    public void initialize(Map<AppliedPTransform<?, ?, ?>, ? extends Iterable<DirectRunner.CommittedBundle<?>>> map) {
        this.watermarkManager.initialize(map);
    }

    public CommittedResult handleResult(@Nullable DirectRunner.CommittedBundle<?> committedBundle, Iterable<TimerInternals.TimerData> iterable, TransformResult<?> transformResult) {
        Iterable<? extends DirectRunner.CommittedBundle<?>> commitBundles = commitBundles(transformResult.getOutputBundles());
        this.metrics.commitLogical(committedBundle, transformResult.getLogicalMetricUpdates());
        EnumSet copyOf = EnumSet.copyOf((Collection) transformResult.getOutputTypes());
        if (Iterables.isEmpty(commitBundles)) {
            copyOf.remove(CommittedResult.OutputType.BUNDLE);
        } else {
            copyOf.add(CommittedResult.OutputType.BUNDLE);
        }
        CommittedResult create = CommittedResult.create(transformResult, committedBundle == null ? null : committedBundle.withElements(transformResult.getUnprocessedElements()), commitBundles, copyOf);
        if (transformResult.getAggregatorChanges() != null) {
            transformResult.getAggregatorChanges().commit();
        }
        CopyOnAccessInMemoryStateInternals<?> state = transformResult.getState();
        if (state != null) {
            CopyOnAccessInMemoryStateInternals<?> commit = state.commit();
            StepAndKey of = StepAndKey.of(transformResult.getTransform(), committedBundle == null ? null : committedBundle.getKey());
            if (commit.isEmpty()) {
                this.applicationStateInternals.remove(of);
            } else {
                this.applicationStateInternals.put(of, commit);
            }
        }
        this.watermarkManager.updateWatermarks(committedBundle, transformResult.getTimerUpdate().withCompletedTimers(iterable), create, transformResult.getWatermarkHold());
        return create;
    }

    private Iterable<? extends DirectRunner.CommittedBundle<?>> commitBundles(Iterable<? extends DirectRunner.UncommittedBundle<?>> iterable) {
        ImmutableList.Builder builder = ImmutableList.builder();
        for (DirectRunner.UncommittedBundle<?> uncommittedBundle : iterable) {
            DirectRunner.CommittedBundle<?> commit = uncommittedBundle.commit(this.watermarkManager.getWatermarks(this.graph.getProducer(uncommittedBundle.getPCollection())).getSynchronizedProcessingOutputTime());
            if (!Iterables.isEmpty(commit.getElements())) {
                builder.add((ImmutableList.Builder) commit);
            }
        }
        return builder.build();
    }

    private void fireAllAvailableCallbacks() {
        Iterator<AppliedPTransform<?, ?, ?>> it = this.graph.getPrimitiveTransforms().iterator();
        while (it.hasNext()) {
            fireAvailableCallbacks(it.next());
        }
    }

    private void fireAvailableCallbacks(AppliedPTransform<?, ?, ?> appliedPTransform) {
        this.callbackExecutor.fireForWatermark(appliedPTransform, this.watermarkManager.getWatermarks(appliedPTransform).getOutputWatermark());
    }

    public <T> DirectRunner.UncommittedBundle<T> createRootBundle() {
        return this.bundleFactory.createRootBundle();
    }

    public <T> DirectRunner.UncommittedBundle<T> createBundle(PCollection<T> pCollection) {
        return this.bundleFactory.createBundle(pCollection);
    }

    public <K, T> DirectRunner.UncommittedBundle<T> createKeyedBundle(StructuralKey<K> structuralKey, PCollection<T> pCollection) {
        return this.bundleFactory.createKeyedBundle(structuralKey, pCollection);
    }

    public <T> boolean isKeyed(PValue pValue) {
        return this.keyedPValues.contains(pValue);
    }

    public <ElemT, ViewT> DirectRunner.PCollectionViewWriter<ElemT, ViewT> createPCollectionViewWriter(PCollection<Iterable<ElemT>> pCollection, final PCollectionView<ViewT> pCollectionView) {
        return new DirectRunner.PCollectionViewWriter<ElemT, ViewT>() { // from class: org.apache.beam.runners.direct.EvaluationContext.1
            @Override // org.apache.beam.runners.direct.DirectRunner.PCollectionViewWriter
            public void add(Iterable<WindowedValue<ElemT>> iterable) {
                EvaluationContext.this.sideInputContainer.write(pCollectionView, iterable);
            }
        };
    }

    public void scheduleAfterOutputWouldBeProduced(PValue pValue, BoundedWindow boundedWindow, WindowingStrategy<?, ?> windowingStrategy, Runnable runnable) {
        AppliedPTransform<?, ?, ?> producer = this.graph.getProducer(pValue);
        this.callbackExecutor.callOnGuaranteedFiring(producer, boundedWindow, windowingStrategy, runnable);
        fireAvailableCallbacks(producer);
    }

    public void scheduleAfterWindowExpiration(AppliedPTransform<?, ?, ?> appliedPTransform, BoundedWindow boundedWindow, WindowingStrategy<?, ?> windowingStrategy, Runnable runnable) {
        this.callbackExecutor.callOnWindowExpiration(appliedPTransform, boundedWindow, windowingStrategy, runnable);
        fireAvailableCallbacks(appliedPTransform);
    }

    public DirectOptions getPipelineOptions() {
        return this.options;
    }

    public DirectExecutionContext getExecutionContext(AppliedPTransform<?, ?, ?> appliedPTransform, StructuralKey<?> structuralKey) {
        return new DirectExecutionContext(this.clock, structuralKey, this.applicationStateInternals.get(StepAndKey.of(appliedPTransform, structuralKey)), this.watermarkManager.getWatermarks(appliedPTransform));
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public String getStepName(AppliedPTransform<?, ?, ?> appliedPTransform) {
        return this.graph.getStepName(appliedPTransform);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Collection<AppliedPTransform<?, ?, ?>> getSteps() {
        return this.graph.getPrimitiveTransforms();
    }

    public ReadyCheckingSideInputReader createSideInputReader(List<PCollectionView<?>> list) {
        return this.sideInputContainer.createReaderForViews(list);
    }

    public AggregatorContainer.Mutator getAggregatorMutator() {
        return this.mergedAggregators.createMutator();
    }

    public AggregatorContainer getAggregatorContainer() {
        return this.mergedAggregators;
    }

    public DirectMetrics getMetrics() {
        return this.metrics;
    }

    @VisibleForTesting
    void forceRefresh() {
        this.watermarkManager.refreshAll();
        fireAllAvailableCallbacks();
    }

    public Collection<WatermarkManager.FiredTimers> extractFiredTimers() {
        forceRefresh();
        return this.watermarkManager.extractFiredTimers();
    }

    public boolean isDone(AppliedPTransform<?, ?, ?> appliedPTransform) {
        if (this.watermarkManager.getWatermarks(appliedPTransform).getOutputWatermark().isBefore(BoundedWindow.TIMESTAMP_MAX_VALUE)) {
            return false;
        }
        for (TaggedPValue taggedPValue : appliedPTransform.getOutputs()) {
            if ((taggedPValue.getValue() instanceof PCollection) && taggedPValue.getValue().isBounded().equals(PCollection.IsBounded.UNBOUNDED) && !this.options.isShutdownUnboundedProducersWithMaxWatermark()) {
                return false;
            }
        }
        return true;
    }

    public boolean isDone() {
        Iterator<AppliedPTransform<?, ?, ?>> it = this.graph.getPrimitiveTransforms().iterator();
        while (it.hasNext()) {
            if (!isDone(it.next())) {
                return false;
            }
        }
        return true;
    }

    public Instant now() {
        return this.clock.now();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Clock getClock() {
        return this.clock;
    }
}
