package com.datatorrent.contrib.redis;

import com.datatorrent.api.Context;
import com.datatorrent.api.Operator;
import com.datatorrent.lib.db.AbstractKeyValueStoreInputOperator;
import com.datatorrent.netlet.util.DTThrowable;
import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
import javax.validation.constraints.NotNull;
import org.apache.apex.malhar.lib.wal.FSWindowDataManager;
import org.apache.apex.malhar.lib.wal.WindowDataManager;
import redis.clients.jedis.ScanParams;
import redis.clients.jedis.ScanResult;

/* loaded from: input_file:com/datatorrent/contrib/redis/AbstractRedisInputOperator.class */
public abstract class AbstractRedisInputOperator<T> extends AbstractKeyValueStoreInputOperator<T, RedisStore> implements Operator.CheckpointListener {
    protected transient Integer scanOffset;
    protected transient ScanParams scanParameters;
    private transient boolean scanComplete;
    private transient Integer backupOffset;
    private transient boolean replay;

    @NotNull
    private WindowDataManager windowDataManager;
    private transient Context.OperatorContext context;
    private transient long currentWindowId;
    private transient Integer sleepTimeMillis;
    private transient Integer scanCallsInCurrentWindow;
    protected transient List<String> keys = new ArrayList();
    private transient boolean skipOffsetRecovery = true;
    private int scanCount = 100;
    private RecoveryState recoveryState = new RecoveryState();

    /* loaded from: input_file:com/datatorrent/contrib/redis/AbstractRedisInputOperator$RecoveryState.class */
    public static class RecoveryState implements Serializable {
        public Integer scanOffsetAtBeginWindow;
        public Integer numberOfScanCallsInWindow;
    }

    public AbstractRedisInputOperator() {
        this.recoveryState.scanOffsetAtBeginWindow = 0;
        this.recoveryState.numberOfScanCallsInWindow = 0;
        setWindowDataManager(new FSWindowDataManager());
    }

    public void beginWindow(long j) {
        this.currentWindowId = j;
        this.scanCallsInCurrentWindow = 0;
        this.replay = false;
        if (this.currentWindowId <= getWindowDataManager().getLargestRecoveryWindow()) {
            replay(j);
        }
    }

    private void replay(long j) {
        try {
            if (!this.skipOffsetRecovery) {
                RecoveryState recoveryState = (RecoveryState) getWindowDataManager().load(this.context.getId(), j - 1);
                this.recoveryState.scanOffsetAtBeginWindow = recoveryState.scanOffsetAtBeginWindow;
            }
            this.skipOffsetRecovery = false;
            RecoveryState recoveryState2 = (RecoveryState) getWindowDataManager().load(this.context.getId(), j);
            this.recoveryState.numberOfScanCallsInWindow = recoveryState2.numberOfScanCallsInWindow;
            if (this.recoveryState.scanOffsetAtBeginWindow != null) {
                this.scanOffset = this.recoveryState.scanOffsetAtBeginWindow;
            }
            this.replay = true;
        } catch (IOException e) {
            DTThrowable.rethrow(e);
        }
    }

    private void scanKeysFromOffset() {
        if (!this.scanComplete) {
            if (this.replay && this.scanCallsInCurrentWindow.intValue() >= this.recoveryState.numberOfScanCallsInWindow.intValue()) {
                try {
                    Thread.sleep(this.sleepTimeMillis.intValue());
                    return;
                } catch (InterruptedException e) {
                    DTThrowable.rethrow(e);
                    return;
                }
            }
            ScanResult<String> ScanKeys = this.store.ScanKeys(this.scanOffset, this.scanParameters);
            this.backupOffset = this.scanOffset;
            this.scanOffset = Integer.valueOf(Integer.parseInt(ScanKeys.getStringCursor()));
            if (this.scanOffset.intValue() == 0) {
                this.scanComplete = true;
                this.scanOffset = Integer.valueOf(this.backupOffset.intValue() + ScanKeys.getResult().size());
            }
            this.keys = ScanKeys.getResult();
        }
        Integer num = this.scanCallsInCurrentWindow;
        this.scanCallsInCurrentWindow = Integer.valueOf(this.scanCallsInCurrentWindow.intValue() + 1);
    }

    @Override // 
    public void setup(Context.OperatorContext operatorContext) {
        super.setup(operatorContext);
        this.sleepTimeMillis = (Integer) operatorContext.getValue(Context.OperatorContext.SPIN_MILLIS);
        getWindowDataManager().setup(operatorContext);
        this.context = operatorContext;
        this.scanOffset = 0;
        this.scanComplete = false;
        this.scanParameters = new ScanParams();
        this.scanParameters.count(this.scanCount);
        this.scanOffset = this.recoveryState.scanOffsetAtBeginWindow;
        this.skipOffsetRecovery = true;
    }

    public void endWindow() {
        while (this.replay && this.scanCallsInCurrentWindow.intValue() < this.recoveryState.numberOfScanCallsInWindow.intValue()) {
            scanKeysFromOffset();
            processTuples();
        }
        super.endWindow();
        this.recoveryState.scanOffsetAtBeginWindow = this.scanOffset;
        this.recoveryState.numberOfScanCallsInWindow = this.scanCallsInCurrentWindow;
        if (this.currentWindowId > getWindowDataManager().getLargestRecoveryWindow()) {
            try {
                getWindowDataManager().save(this.recoveryState, this.context.getId(), this.currentWindowId);
            } catch (IOException e) {
                DTThrowable.rethrow(e);
            }
        }
    }

    public void teardown() {
        super.teardown();
        getWindowDataManager().teardown();
    }

    public int getScanCount() {
        return this.scanCount;
    }

    public void setScanCount(int i) {
        this.scanCount = i;
    }

    public void emitTuples() {
        scanKeysFromOffset();
        processTuples();
    }

    public abstract void processTuples();

    public void checkpointed(long j) {
    }

    public void committed(long j) {
        try {
            getWindowDataManager().deleteUpTo(this.context.getId(), j);
        } catch (IOException e) {
            throw new RuntimeException("committing", e);
        }
    }

    public WindowDataManager getWindowDataManager() {
        return this.windowDataManager;
    }

    public void setWindowDataManager(WindowDataManager windowDataManager) {
        this.windowDataManager = windowDataManager;
    }
}
