package org.apache.beam.runners.direct;

import java.util.concurrent.ThreadLocalRandom;
import org.apache.beam.runners.direct.repackaged.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.runners.direct.repackaged.com.google.common.base.Preconditions;
import org.apache.beam.sdk.io.Write;
import org.apache.beam.sdk.runners.PTransformOverrideFactory;
import org.apache.beam.sdk.transforms.Count;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.Flatten;
import org.apache.beam.sdk.transforms.GroupByKey;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.Values;
import org.apache.beam.sdk.transforms.windowing.DefaultTrigger;
import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.PDone;
import org.joda.time.Duration;

/* loaded from: input_file:org/apache/beam/runners/direct/WriteWithShardingFactory.class */
class WriteWithShardingFactory<InputT> implements PTransformOverrideFactory<PCollection<InputT>, PDone, Write.Bound<InputT>> {
    static final int MAX_RANDOM_EXTRA_SHARDS = 3;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/runners/direct/WriteWithShardingFactory$DynamicallyReshardedWrite.class */
    public static class DynamicallyReshardedWrite<T> extends PTransform<PCollection<T>, PDone> {
        private final transient Write.Bound<T> original;

        private DynamicallyReshardedWrite(Write.Bound<T> bound) {
            this.original = bound;
        }

        public PDone expand(PCollection<T> pCollection) {
            Preconditions.checkArgument(PCollection.IsBounded.BOUNDED == pCollection.isBounded(), "%s can only be applied to a Bounded PCollection", getClass().getSimpleName());
            PCollection apply = pCollection.apply("RewindowInputs", Window.into(new GlobalWindows()).triggering(DefaultTrigger.of()).withAllowedLateness(Duration.ZERO).discardingFiredPanes());
            PCollectionView apply2 = apply.apply("CountRecords", Count.globally().asSingletonView());
            return this.original.expand(apply.apply("ApplySharding", ParDo.withSideInputs(new PCollectionView[]{apply2}).of(new KeyBasedOnCountFn(apply2, ThreadLocalRandom.current().nextInt(3)))).apply("GroupIntoShards", GroupByKey.create()).apply("DropShardingKeys", Values.create()).apply("FlattenShardIterables", Flatten.iterables()));
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @VisibleForTesting
    /* loaded from: input_file:org/apache/beam/runners/direct/WriteWithShardingFactory$KeyBasedOnCountFn.class */
    public static class KeyBasedOnCountFn<T> extends DoFn<T, KV<Integer, T>> {

        @VisibleForTesting
        static final int MIN_SHARDS_FOR_LOG = 3;
        private final PCollectionView<Long> numRecords;
        private final int randomExtraShards;
        private int currentShard;
        private int maxShards = 0;

        KeyBasedOnCountFn(PCollectionView<Long> pCollectionView, int i) {
            this.numRecords = pCollectionView;
            this.randomExtraShards = i;
        }

        @DoFn.ProcessElement
        public void processElement(DoFn<T, KV<Integer, T>>.ProcessContext processContext) throws Exception {
            if (this.maxShards == 0) {
                this.maxShards = calculateShards(((Long) processContext.sideInput(this.numRecords)).longValue());
                this.currentShard = ThreadLocalRandom.current().nextInt(this.maxShards);
            }
            int i = this.currentShard;
            this.currentShard = (this.currentShard + 1) % this.maxShards;
            processContext.output(KV.of(Integer.valueOf(i), processContext.element()));
        }

        private int calculateShards(long j) {
            Preconditions.checkArgument(j > 0, "KeyBasedOnCountFn cannot be invoked on an element if there are no elements");
            return j < ((long) (3 + this.randomExtraShards)) ? (int) j : Math.max(Double.valueOf(Math.log10(j)).intValue(), 3) + this.randomExtraShards;
        }
    }

    public PTransform<PCollection<InputT>, PDone> getReplacementTransform(Write.Bound<InputT> bound) {
        return bound.getNumShards() == 0 ? new DynamicallyReshardedWrite(bound) : bound;
    }
}
