package com.datatorrent.stram.engine;

import com.datatorrent.bufferserver.packet.MessageType;
import com.datatorrent.common.util.ScheduledExecutorService;
import com.datatorrent.netlet.util.CircularBuffer;
import com.datatorrent.stram.tuple.EndWindowTuple;
import com.datatorrent.stram.tuple.ResetWindowTuple;
import com.datatorrent.stram.tuple.Tuple;
import java.util.Queue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/datatorrent/stram/engine/WindowGenerator.class */
public class WindowGenerator extends MuxReservoir implements Stream, Runnable {
    public static final int WINDOW_MASK = 16383;
    public static final int MIN_WINDOW_ID = 0;
    public static final int MAX_WINDOW_ID = 15999;
    public static final int MAX_WINDOW_WIDTH = 503079487;
    private final ScheduledExecutorService ses;
    private final BlockingQueue<Tuple> queue;
    private long firstWindowMillis;
    private int windowWidthMillis;
    private long currentWindowMillis;
    private long baseSeconds;
    private int windowId;
    private long resetWindowMillis;
    private int checkPointWindowCount;
    private int checkpointCount = 60;
    private static final Logger logger;
    static final /* synthetic */ boolean $assertionsDisabled;

    public WindowGenerator(ScheduledExecutorService scheduledExecutorService, int i) {
        this.ses = scheduledExecutorService;
        this.queue = new CircularBuffer(i);
    }

    public final void advanceWindow() {
        this.currentWindowMillis += this.windowWidthMillis;
        this.windowId++;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void resetBeginNewWindow() throws InterruptedException {
        this.resetWindowMillis = this.currentWindowMillis - ((this.currentWindowMillis - this.resetWindowMillis) % ((15999 * this.windowWidthMillis) + this.windowWidthMillis));
        this.windowId = (int) ((this.currentWindowMillis - this.resetWindowMillis) / this.windowWidthMillis);
        this.baseSeconds = (this.resetWindowMillis / 1000) << 32;
        this.queue.put(new ResetWindowTuple(this.baseSeconds | this.windowWidthMillis));
        this.queue.put(new Tuple(MessageType.BEGIN_WINDOW, this.baseSeconds | this.windowId));
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void endCurrentBeginNewWindow() throws InterruptedException {
        this.queue.put(new EndWindowTuple(this.baseSeconds | this.windowId));
        int i = this.checkPointWindowCount + 1;
        this.checkPointWindowCount = i;
        if (i == this.checkpointCount) {
            this.queue.put(new Tuple(MessageType.CHECKPOINT, this.baseSeconds | this.windowId));
            this.checkPointWindowCount = 0;
        }
        if (this.windowId == 15999) {
            advanceWindow();
            run();
        } else {
            advanceWindow();
            this.queue.put(new Tuple(MessageType.BEGIN_WINDOW, this.baseSeconds | this.windowId));
        }
    }

    @Override // java.lang.Runnable
    public final void run() {
        try {
            resetBeginNewWindow();
        } catch (InterruptedException e) {
            handleException(e);
        }
    }

    public void setFirstWindow(long j) {
        this.firstWindowMillis = j;
    }

    public void setResetWindow(long j) {
        this.resetWindowMillis = j;
    }

    public void setWindowWidth(int i) {
        if (i > 503079487 || i < 1) {
            throw new IllegalArgumentException(String.format("Window width %d is invalid as it's not in the range 1 to %d", Integer.valueOf(i), Integer.valueOf(MAX_WINDOW_WIDTH)));
        }
        this.windowWidthMillis = i;
    }

    public void setCheckpointCount(int i, int i2) {
        logger.debug("setCheckpointCount: {} {}", Integer.valueOf(i), Integer.valueOf(i2));
        this.checkpointCount = i;
        this.checkPointWindowCount = i2;
    }

    public void setup(StreamContext streamContext) {
        logger.info("WindowGenerator::setup does not do anything useful, please use setFirstWindow/setResetWindow/setWindowWidth do set properties.");
    }

    public void activate(StreamContext streamContext) {
        this.currentWindowMillis = this.firstWindowMillis;
        Runnable runnable = new Runnable() { // from class: com.datatorrent.stram.engine.WindowGenerator.1
            @Override // java.lang.Runnable
            public void run() {
                try {
                    WindowGenerator.this.endCurrentBeginNewWindow();
                } catch (InterruptedException e) {
                    WindowGenerator.this.handleException(e);
                }
            }
        };
        long currentTimeMillis = this.ses.getCurrentTimeMillis();
        if (this.currentWindowMillis < currentTimeMillis) {
            logger.info("Catching up from {} to {}", Long.valueOf(this.currentWindowMillis), Long.valueOf(currentTimeMillis));
            this.ses.schedule(new Runnable() { // from class: com.datatorrent.stram.engine.WindowGenerator.2
                @Override // java.lang.Runnable
                public void run() {
                    try {
                        WindowGenerator.this.resetBeginNewWindow();
                        do {
                            WindowGenerator.this.endCurrentBeginNewWindow();
                        } while (WindowGenerator.this.currentWindowMillis < WindowGenerator.this.ses.getCurrentTimeMillis());
                    } catch (InterruptedException e) {
                        WindowGenerator.this.handleException(e);
                    }
                }
            }, 0L, TimeUnit.MILLISECONDS);
        } else {
            logger.info("The input will start to be sliced in {} milliseconds", Long.valueOf(this.currentWindowMillis - currentTimeMillis));
            this.ses.schedule(this, this.currentWindowMillis - currentTimeMillis, TimeUnit.MILLISECONDS);
        }
        this.ses.scheduleAtFixedRate(runnable, (this.currentWindowMillis - currentTimeMillis) + this.windowWidthMillis, this.windowWidthMillis, TimeUnit.MILLISECONDS);
    }

    public void deactivate() {
        this.ses.shutdown();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void handleException(Exception exc) {
        if (!(exc instanceof InterruptedException)) {
            throw new RuntimeException(exc);
        }
        this.ses.shutdown();
    }

    public void teardown() {
    }

    public void put(Object obj) {
        throw new UnsupportedOperationException("Not supported yet.");
    }

    @Override // com.datatorrent.stram.engine.MuxReservoir
    protected Queue getQueue() {
        return this.queue;
    }

    public int getCount(boolean z) {
        throw new UnsupportedOperationException("Not supported yet.");
    }

    public static long getWindowCount(long j, long j2, long j3) {
        return (j - j2) / j3;
    }

    public static long getWindowId(long j, long j2, long j3) {
        long j4 = (j - j2) % (j3 * 16000);
        long j5 = (j - j4) / 1000;
        return (j5 << 32) | (j4 / j3);
    }

    public static long getNextWindowMillis(long j, long j2, long j3) {
        return getWindowMillis(j, j2, j3) + j3;
    }

    public static long getNextWindowId(long j, long j2, long j3) {
        return getAheadWindowId(j, j2, j3, 1);
    }

    public static long getAheadWindowId(long j, long j2, long j3, int i) {
        return getWindowId(getWindowMillis(j, j2, j3) + (i * j3), j2, j3);
    }

    public static long compareWindowId(long j, long j2, long j3) {
        return (getWindowMillis(j, 0L, j3) - getWindowMillis(j2, 0L, j3)) / j3;
    }

    public static long getWindowMillis(long j, long j2, long j3) {
        if (j == -1) {
            return j2;
        }
        long j4 = ((j >> 32) * 1000) - j2;
        long j5 = j3 * 16000;
        if (!$assertionsDisabled && j5 <= 0) {
            throw new AssertionError();
        }
        long j6 = j4 / j5;
        if (j4 % j5 > 0) {
            j6++;
        }
        if ($assertionsDisabled || j6 >= 0) {
            return j2 + (j6 * j5) + ((j & 16383) * j3);
        }
        throw new AssertionError();
    }

    public static long getBaseSecondsFromWindowId(long j) {
        return j >>> 32;
    }

    static {
        $assertionsDisabled = !WindowGenerator.class.desiredAssertionStatus();
        logger = LoggerFactory.getLogger(WindowGenerator.class);
    }
}
