package org.apache.apex.malhar.lib.state.managed;

import com.datatorrent.lib.util.KryoCloneUtils;
import java.io.IOException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.apex.malhar.lib.state.managed.TimeBucketAssigner;
import org.joda.time.Duration;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestWatcher;
import org.junit.runner.Description;

/* loaded from: input_file:org/apache/apex/malhar/lib/state/managed/TimeBucketAssignerTest.class */
public class TimeBucketAssignerTest {

    @Rule
    public TestMeta testMeta = new TestMeta();

    /* loaded from: input_file:org/apache/apex/malhar/lib/state/managed/TimeBucketAssignerTest$TestMeta.class */
    class TestMeta extends TestWatcher {
        TimeBucketAssigner timeBucketAssigner;
        MockManagedStateContext mockManagedStateContext;

        TestMeta() {
        }

        protected void starting(Description description) {
            this.timeBucketAssigner = new TimeBucketAssigner();
            this.mockManagedStateContext = new MockManagedStateContext(ManagedStateTestUtils.getOperatorContext(9));
        }

        protected void finished(Description description) {
        }
    }

    @Test
    public void testSerde() throws IOException {
        Assert.assertNotNull("time bucket assigner", (TimeBucketAssigner) KryoCloneUtils.cloneObject(this.testMeta.timeBucketAssigner));
    }

    @Test
    public void testNumBuckets() {
        this.testMeta.timeBucketAssigner.setExpireBefore(Duration.standardHours(1L));
        this.testMeta.timeBucketAssigner.setBucketSpan(Duration.standardMinutes(30L));
        this.testMeta.timeBucketAssigner.setup(this.testMeta.mockManagedStateContext);
        Assert.assertEquals("num buckets", 2L, this.testMeta.timeBucketAssigner.getNumBuckets());
        this.testMeta.timeBucketAssigner.teardown();
    }

    @Test
    public void testTimeBucketKey() {
        this.testMeta.timeBucketAssigner.setExpireBefore(Duration.standardHours(1L));
        this.testMeta.timeBucketAssigner.setBucketSpan(Duration.standardMinutes(30L));
        long millis = this.testMeta.timeBucketAssigner.getReferenceInstant().getMillis();
        this.testMeta.timeBucketAssigner.setup(this.testMeta.mockManagedStateContext);
        Assert.assertEquals("time bucket", 1L, this.testMeta.timeBucketAssigner.getTimeBucketFor(millis - Duration.standardMinutes(2L).getMillis()));
        Assert.assertEquals("time bucket", 0L, this.testMeta.timeBucketAssigner.getTimeBucketFor(millis - Duration.standardMinutes(40L).getMillis()));
        Assert.assertEquals("time bucket", -1L, this.testMeta.timeBucketAssigner.getTimeBucketFor(millis - Duration.standardMinutes(65L).getMillis()));
        this.testMeta.timeBucketAssigner.teardown();
    }

    @Test
    public void testSlidingOnlyBetweenWindow() throws InterruptedException {
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        final AtomicInteger atomicInteger = new AtomicInteger();
        this.testMeta.timeBucketAssigner.setPurgeListener(new TimeBucketAssigner.PurgeListener() { // from class: org.apache.apex.malhar.lib.state.managed.TimeBucketAssignerTest.1
            public void purgeTimeBucketsLessThanEqualTo(long j) {
                atomicInteger.getAndIncrement();
                countDownLatch.countDown();
            }
        });
        this.testMeta.timeBucketAssigner.setup(this.testMeta.mockManagedStateContext);
        this.testMeta.timeBucketAssigner.beginWindow(0L);
        countDownLatch.await();
        this.testMeta.timeBucketAssigner.endWindow();
        int i = atomicInteger.get();
        Thread.sleep(1000L);
        Assert.assertEquals("value should not change", i, atomicInteger.get());
        this.testMeta.timeBucketAssigner.teardown();
    }
}
