/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.runners.direct;

import java.util.concurrent.ThreadLocalRandom;
import org.apache.beam.runners.direct.repackaged.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.runners.direct.repackaged.com.google.common.base.Preconditions;
import org.apache.beam.sdk.io.Write;
import org.apache.beam.sdk.runners.PTransformOverrideFactory;
import org.apache.beam.sdk.transforms.Count;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.Flatten;
import org.apache.beam.sdk.transforms.GroupByKey;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.Values;
import org.apache.beam.sdk.transforms.windowing.DefaultTrigger;
import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
import org.apache.beam.sdk.transforms.windowing.Trigger;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.transforms.windowing.WindowFn;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.PDone;
import org.joda.time.Duration;

class WriteWithShardingFactory<InputT>
implements PTransformOverrideFactory<PCollection<InputT>, PDone, Write.Bound<InputT>> {
    static final int MAX_RANDOM_EXTRA_SHARDS = 3;

    WriteWithShardingFactory() {
    }

    public PTransform<PCollection<InputT>, PDone> getReplacementTransform(Write.Bound<InputT> transform) {
        if (transform.getNumShards() == 0) {
            return new DynamicallyReshardedWrite(transform);
        }
        return transform;
    }

    @VisibleForTesting
    static class KeyBasedOnCountFn<T>
    extends DoFn<T, KV<Integer, T>> {
        @VisibleForTesting
        static final int MIN_SHARDS_FOR_LOG = 3;
        private final PCollectionView<Long> numRecords;
        private final int randomExtraShards;
        private int currentShard;
        private int maxShards = 0;

        KeyBasedOnCountFn(PCollectionView<Long> numRecords, int extraShards) {
            this.numRecords = numRecords;
            this.randomExtraShards = extraShards;
        }

        @DoFn.ProcessElement
        public void processElement(DoFn.ProcessContext c) throws Exception {
            if (this.maxShards == 0) {
                this.maxShards = this.calculateShards((Long)c.sideInput(this.numRecords));
                this.currentShard = ThreadLocalRandom.current().nextInt(this.maxShards);
            }
            int shard = this.currentShard;
            this.currentShard = (this.currentShard + 1) % this.maxShards;
            c.output((Object)KV.of((Object)shard, (Object)c.element()));
        }

        private int calculateShards(long totalRecords) {
            Preconditions.checkArgument(totalRecords > 0L, "KeyBasedOnCountFn cannot be invoked on an element if there are no elements");
            if (totalRecords < (long)(3 + this.randomExtraShards)) {
                return (int)totalRecords;
            }
            int floorLogRecs = Double.valueOf(Math.log10(totalRecords)).intValue();
            int shards = Math.max(floorLogRecs, 3) + this.randomExtraShards;
            return shards;
        }
    }

    private static class DynamicallyReshardedWrite<T>
    extends PTransform<PCollection<T>, PDone> {
        private final transient Write.Bound<T> original;

        private DynamicallyReshardedWrite(Write.Bound<T> original) {
            this.original = original;
        }

        public PDone expand(PCollection<T> input) {
            Preconditions.checkArgument(PCollection.IsBounded.BOUNDED == input.isBounded(), "%s can only be applied to a Bounded PCollection", (Object)((Object)((Object)this)).getClass().getSimpleName());
            PCollection records = (PCollection)input.apply("RewindowInputs", (PTransform)Window.into((WindowFn)new GlobalWindows()).triggering((Trigger)DefaultTrigger.of()).withAllowedLateness(Duration.ZERO).discardingFiredPanes());
            PCollectionView numRecords = (PCollectionView)records.apply("CountRecords", (PTransform)Count.globally().asSingletonView());
            PCollection resharded = (PCollection)((PCollection)((PCollection)((PCollection)records.apply("ApplySharding", (PTransform)ParDo.withSideInputs((PCollectionView[])new PCollectionView[]{numRecords}).of(new KeyBasedOnCountFn((PCollectionView<Long>)numRecords, ThreadLocalRandom.current().nextInt(3))))).apply("GroupIntoShards", (PTransform)GroupByKey.create())).apply("DropShardingKeys", (PTransform)Values.create())).apply("FlattenShardIterables", (PTransform)Flatten.iterables());
            return this.original.expand(resharded);
        }
    }
}

