/*
 * Decompiled with CFR 0.152.
 */
package com.holdenkarau.spark.testing;

import com.holdenkarau.spark.testing.StreamingActionBase;
import com.holdenkarau.spark.testing.StreamingActionBase$;
import com.holdenkarau.spark.testing.TestInputStream;
import org.apache.spark.streaming.TestStreamingContext;
import org.apache.spark.streaming.util.TestManualClock;
import org.scalactic.Bool;
import org.scalactic.Bool$;
import org.scalatest.Assertions;
import scala.Function0;
import scala.Function1;
import scala.Predef$;
import scala.Serializable;
import scala.collection.Seq;
import scala.collection.mutable.StringBuilder;
import scala.reflect.ClassTag;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.RichInt$;

public abstract class StreamingActionBase$class {
    public static void runAction(StreamingActionBase $this, Seq input, Function1 operation, ClassTag evidence$1) {
        int numBatches_ = input.size();
        $this.withStreamingContext($this.setupStream(input, operation, evidence$1), (Function1<TestStreamingContext, BoxedUnit>)new Serializable($this, numBatches_){
            public static final long serialVersionUID = 0L;
            private final /* synthetic */ StreamingActionBase $outer;
            private final int numBatches_$1;

            public final void apply(TestStreamingContext ssc) {
                this.$outer.runActionStream(ssc, this.numBatches_$1);
            }
            {
                if ($outer == null) {
                    throw new NullPointerException();
                }
                this.$outer = $outer;
                this.numBatches_$1 = numBatches_$1;
            }
        });
        BoxedUnit output = BoxedUnit.UNIT;
    }

    public static void withStreamingContext(StreamingActionBase $this, TestStreamingContext outputStreamSSC, Function1 block) {
        try {
            block.apply((Object)outputStreamSSC);
        }
        catch (Throwable throwable) {
            try {
                outputStreamSSC.stop(false);
            }
            catch (Exception exception) {
                $this.logError((Function0)new Serializable($this){
                    public static final long serialVersionUID = 0L;

                    public final String apply() {
                        return "Error stopping StreamingContext";
                    }
                }, exception);
            }
            throw throwable;
        }
        try {
            outputStreamSSC.stop(false);
        }
        catch (Exception exception) {
            $this.logError((Function0)new /* invalid duplicate definition of identical inner class */, exception);
        }
    }

    public static TestStreamingContext setupStream(StreamingActionBase $this, Seq input, Function1 operation, ClassTag evidence$2) {
        TestStreamingContext ssc = new TestStreamingContext($this.sc(), $this.batchDuration());
        if ($this.checkpointDir() != null) {
            ssc.checkpoint($this.checkpointDir());
        }
        TestInputStream inputStream = $this.createTestInputStream($this.sc(), ssc, input, evidence$2);
        operation.apply(inputStream);
        return ssc;
    }

    public static void runActionStream(StreamingActionBase $this, TestStreamingContext ssc, int numBatches) {
        int $org_scalatest_assert_macro_left = numBatches;
        int $org_scalatest_assert_macro_right = 0;
        Bool $org_scalatest_assert_macro_expr = Bool$.MODULE$.binaryMacroBool((Object)BoxesRunTime.boxToInteger((int)$org_scalatest_assert_macro_left), ">", (Object)BoxesRunTime.boxToInteger((int)$org_scalatest_assert_macro_right), $org_scalatest_assert_macro_left > $org_scalatest_assert_macro_right);
        ((Assertions)$this).assertionsHelper().macroAssert($org_scalatest_assert_macro_expr, (Object)"Number of batches to run stream computation is zero");
        try {
            ssc.start();
            TestManualClock clock = (TestManualClock)ssc.getScheduler().clock();
            $this.logInfo((Function0)new Serializable($this, clock){
                public static final long serialVersionUID = 0L;
                private final TestManualClock clock$1;

                public final String apply() {
                    return new StringBuilder().append((Object)"Manual clock before advancing = ").append((Object)BoxesRunTime.boxToLong((long)this.clock$1.currentTime())).toString();
                }
                {
                    this.clock$1 = clock$1;
                }
            });
            if ($this.actuallyWait()) {
                RichInt$.MODULE$.to$extension0(Predef$.MODULE$.intWrapper(1), numBatches).foreach$mVc$sp((Function1)new Serializable($this, clock){
                    public static final long serialVersionUID = 0L;
                    private final /* synthetic */ StreamingActionBase $outer;
                    private final TestManualClock clock$1;

                    public final void apply(int i) {
                        this.apply$mcVI$sp(i);
                    }

                    public void apply$mcVI$sp(int i) {
                        this.$outer.logInfo((Function0)new Serializable(this){
                            public static final long serialVersionUID = 0L;
                            private final /* synthetic */ StreamingActionBase$.anonfun.runActionStream.1 $outer;

                            public final String apply() {
                                return new StringBuilder().append((Object)"Actually waiting for ").append((Object)this.$outer.com$holdenkarau$spark$testing$StreamingActionBase$$anonfun$$$outer().batchDuration()).toString();
                            }
                            {
                                if ($outer == null) {
                                    throw new NullPointerException();
                                }
                                this.$outer = $outer;
                            }
                        });
                        this.clock$1.addToTime(this.$outer.batchDuration().milliseconds());
                        Thread.sleep(this.$outer.batchDuration().milliseconds());
                    }

                    public /* synthetic */ StreamingActionBase com$holdenkarau$spark$testing$StreamingActionBase$$anonfun$$$outer() {
                        return this.$outer;
                    }
                    {
                        if ($outer == null) {
                            throw new NullPointerException();
                        }
                        this.$outer = $outer;
                        this.clock$1 = clock$1;
                    }
                });
            } else {
                clock.addToTime((long)numBatches * $this.batchDuration().milliseconds());
            }
            $this.logInfo((Function0)new Serializable($this, clock){
                public static final long serialVersionUID = 0L;
                private final TestManualClock clock$1;

                public final String apply() {
                    return new StringBuilder().append((Object)"Manual clock after advancing = ").append((Object)BoxesRunTime.boxToLong((long)this.clock$1.currentTime())).toString();
                }
                {
                    this.clock$1 = clock$1;
                }
            });
            ssc.awaitTerminationOrTimeout(100L);
            Thread.sleep(100L);
            return;
        }
        finally {
            ssc.stop(false);
        }
    }

    public static void $init$(StreamingActionBase $this) {
    }
}

