/*
 * Decompiled with CFR 0.152.
 */
package com.datatorrent.contrib.redis;

import com.datatorrent.api.Context;
import com.datatorrent.api.Operator;
import com.datatorrent.contrib.redis.RedisStore;
import com.datatorrent.lib.db.AbstractKeyValueStoreInputOperator;
import com.datatorrent.netlet.util.DTThrowable;
import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
import javax.validation.constraints.NotNull;
import org.apache.apex.malhar.lib.wal.FSWindowDataManager;
import org.apache.apex.malhar.lib.wal.WindowDataManager;
import redis.clients.jedis.ScanParams;
import redis.clients.jedis.ScanResult;

public abstract class AbstractRedisInputOperator<T>
extends AbstractKeyValueStoreInputOperator<T, RedisStore>
implements Operator.CheckpointListener {
    protected transient List<String> keys = new ArrayList<String>();
    protected transient Integer scanOffset;
    protected transient ScanParams scanParameters;
    private transient boolean scanComplete;
    private transient Integer backupOffset;
    private int scanCount = 100;
    private transient boolean replay;
    private transient boolean skipOffsetRecovery = true;
    @NotNull
    private WindowDataManager windowDataManager;
    private transient Context.OperatorContext context;
    private transient long currentWindowId;
    private transient Integer sleepTimeMillis;
    private transient Integer scanCallsInCurrentWindow;
    private RecoveryState recoveryState = new RecoveryState();

    public AbstractRedisInputOperator() {
        this.recoveryState.scanOffsetAtBeginWindow = 0;
        this.recoveryState.numberOfScanCallsInWindow = 0;
        this.setWindowDataManager((WindowDataManager)new FSWindowDataManager());
    }

    public void beginWindow(long windowId) {
        this.currentWindowId = windowId;
        this.scanCallsInCurrentWindow = 0;
        this.replay = false;
        if (this.currentWindowId <= this.getWindowDataManager().getLargestCompletedWindow()) {
            this.replay(windowId);
        }
    }

    private void replay(long windowId) {
        try {
            if (!this.skipOffsetRecovery) {
                RecoveryState recoveryStateForLastWindow = (RecoveryState)this.getWindowDataManager().retrieve(windowId - 1L);
                this.recoveryState.scanOffsetAtBeginWindow = recoveryStateForLastWindow.scanOffsetAtBeginWindow;
            }
            this.skipOffsetRecovery = false;
            RecoveryState recoveryStateForCurrentWindow = (RecoveryState)this.getWindowDataManager().retrieve(windowId);
            this.recoveryState.numberOfScanCallsInWindow = recoveryStateForCurrentWindow.numberOfScanCallsInWindow;
            if (this.recoveryState.scanOffsetAtBeginWindow != null) {
                this.scanOffset = this.recoveryState.scanOffsetAtBeginWindow;
            }
            this.replay = true;
        }
        catch (IOException e) {
            DTThrowable.rethrow((Exception)e);
        }
    }

    private void scanKeysFromOffset() {
        if (!this.scanComplete) {
            if (this.replay && this.scanCallsInCurrentWindow >= this.recoveryState.numberOfScanCallsInWindow) {
                try {
                    Thread.sleep(this.sleepTimeMillis.intValue());
                }
                catch (InterruptedException e) {
                    DTThrowable.rethrow((Exception)e);
                }
                return;
            }
            ScanResult<String> result = ((RedisStore)this.store).ScanKeys(this.scanOffset, this.scanParameters);
            this.backupOffset = this.scanOffset;
            this.scanOffset = Integer.parseInt(result.getStringCursor());
            if (this.scanOffset == 0) {
                this.scanComplete = true;
                this.scanOffset = this.backupOffset + result.getResult().size();
            }
            this.keys = result.getResult();
        }
        Integer n = this.scanCallsInCurrentWindow;
        Integer n2 = this.scanCallsInCurrentWindow = Integer.valueOf(this.scanCallsInCurrentWindow + 1);
    }

    public void setup(Context.OperatorContext context) {
        super.setup(context);
        this.sleepTimeMillis = (Integer)context.getValue(Context.OperatorContext.SPIN_MILLIS);
        this.getWindowDataManager().setup((Context)context);
        this.context = context;
        this.scanOffset = 0;
        this.scanComplete = false;
        this.scanParameters = new ScanParams();
        this.scanParameters.count(this.scanCount);
        this.scanOffset = this.recoveryState.scanOffsetAtBeginWindow;
        this.skipOffsetRecovery = true;
    }

    public void endWindow() {
        while (this.replay && this.scanCallsInCurrentWindow < this.recoveryState.numberOfScanCallsInWindow) {
            this.scanKeysFromOffset();
            this.processTuples();
        }
        super.endWindow();
        this.recoveryState.scanOffsetAtBeginWindow = this.scanOffset;
        this.recoveryState.numberOfScanCallsInWindow = this.scanCallsInCurrentWindow;
        if (this.currentWindowId > this.getWindowDataManager().getLargestCompletedWindow()) {
            try {
                this.getWindowDataManager().save((Object)this.recoveryState, this.currentWindowId);
            }
            catch (IOException e) {
                DTThrowable.rethrow((Exception)e);
            }
        }
    }

    public void teardown() {
        super.teardown();
        this.getWindowDataManager().teardown();
    }

    public int getScanCount() {
        return this.scanCount;
    }

    public void setScanCount(int scanCount) {
        this.scanCount = scanCount;
    }

    public void emitTuples() {
        this.scanKeysFromOffset();
        this.processTuples();
    }

    public abstract void processTuples();

    public void checkpointed(long windowId) {
    }

    public void committed(long windowId) {
        try {
            this.getWindowDataManager().committed(windowId);
        }
        catch (IOException e) {
            throw new RuntimeException("committing", e);
        }
    }

    public WindowDataManager getWindowDataManager() {
        return this.windowDataManager;
    }

    public void setWindowDataManager(WindowDataManager windowDataManager) {
        this.windowDataManager = windowDataManager;
    }

    public static class RecoveryState
    implements Serializable {
        public Integer scanOffsetAtBeginWindow;
        public Integer numberOfScanCallsInWindow;
    }
}

