package com.datatorrent.lib.db.jdbc;

import com.datatorrent.api.Context;
import com.datatorrent.api.DefaultPartition;
import com.datatorrent.api.Operator;
import com.datatorrent.api.Partitioner;
import com.datatorrent.api.annotation.OperatorAnnotation;
import com.datatorrent.lib.db.AbstractStoreInputOperator;
import com.datatorrent.lib.util.KeyValPair;
import com.datatorrent.lib.util.KryoCloneUtils;
import com.datatorrent.netlet.util.DTThrowable;
import com.google.common.annotations.VisibleForTesting;
import java.io.IOException;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import javax.validation.constraints.Min;
import javax.validation.constraints.NotNull;
import org.apache.apex.malhar.lib.wal.FSWindowDataManager;
import org.apache.apex.malhar.lib.wal.WindowDataManager;
import org.apache.commons.lang3.tuple.MutablePair;
import org.apache.hadoop.classification.InterfaceStability;
import org.jooq.Condition;
import org.jooq.DSLContext;
import org.jooq.SelectField;
import org.jooq.conf.ParamType;
import org.jooq.impl.DSL;
import org.jooq.tools.jdbc.JDBCUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@OperatorAnnotation(checkpointableWithinAppWindow = false)
@InterfaceStability.Evolving
/* loaded from: input_file:com/datatorrent/lib/db/jdbc/AbstractJdbcPollInputOperator.class */
public abstract class AbstractJdbcPollInputOperator<T> extends AbstractStoreInputOperator<T, JdbcStore> implements Operator.ActivationListener<Context.OperatorContext>, Partitioner<AbstractJdbcPollInputOperator<T>> {

    @NotNull
    private String tableName;

    @NotNull
    private String columnsExpression;

    @NotNull
    private String key;
    private long currentWindowId;
    protected KeyValPair<Integer, Integer> rangeQueryPair;
    protected Integer lowerBound;
    protected Integer lastEmittedRow;
    private transient int operatorId;
    private transient DSLContext create;
    private volatile transient boolean execute;
    private transient ScheduledExecutorService scanService;
    private transient AtomicReference<Throwable> threadException;
    protected transient boolean isPolled;
    protected transient Integer lastPolledRow;
    protected transient LinkedBlockingDeque<T> emitQueue;
    protected transient PreparedStatement ps;
    protected boolean isPollerPartition;
    private static int DEFAULT_QUEUE_CAPACITY = 4096;
    private static int DEFAULT_POLL_INTERVAL = 10000;
    private static int DEFAULT_FETCH_SIZE = 20000;
    private static int DEFAULT_BATCH_SIZE = 2000;
    private static int DEFAULT_SLEEP_TIME = 100;
    private static final Logger LOG = LoggerFactory.getLogger(AbstractJdbcPollInputOperator.class);
    private int pollInterval = DEFAULT_POLL_INTERVAL;
    private int queueCapacity = DEFAULT_QUEUE_CAPACITY;
    private int fetchSize = DEFAULT_FETCH_SIZE;

    @Min(1)
    private int partitionCount = 1;
    private int batchSize = DEFAULT_BATCH_SIZE;
    private String whereCondition = null;
    protected transient MutablePair<Integer, Integer> currentWindowRecoveryState = new MutablePair<>();
    private WindowDataManager windowManager = new FSWindowDataManager();

    /* loaded from: input_file:com/datatorrent/lib/db/jdbc/AbstractJdbcPollInputOperator$DBPoller.class */
    public class DBPoller implements Runnable {
        public DBPoller() {
        }

        @Override // java.lang.Runnable
        public void run() {
            while (AbstractJdbcPollInputOperator.this.execute) {
                if ((AbstractJdbcPollInputOperator.this.isPollerPartition && !AbstractJdbcPollInputOperator.this.isPolled) || !AbstractJdbcPollInputOperator.this.isPollerPartition) {
                    AbstractJdbcPollInputOperator.this.pollRecords();
                }
            }
        }
    }

    @Override // com.datatorrent.lib.db.AbstractStoreInputOperator
    public void setup(Context.OperatorContext operatorContext) {
        super.setup(operatorContext);
        intializeDSLContext();
        if (this.scanService == null) {
            this.scanService = Executors.newScheduledThreadPool(1);
        }
        this.execute = true;
        this.emitQueue = new LinkedBlockingDeque<>(this.queueCapacity);
        this.operatorId = operatorContext.getId();
        this.windowManager.setup(operatorContext);
    }

    private void intializeDSLContext() {
        this.create = DSL.using(((JdbcStore) this.store).getConnection(), JDBCUtils.dialect(((JdbcStore) this.store).getDatabaseUrl()));
    }

    @Override // 
    public void activate(Context.OperatorContext operatorContext) {
        initializePreparedStatement();
        long largestCompletedWindow = this.windowManager.getLargestCompletedWindow();
        if (largestCompletedWindow == -1 || ((Long) operatorContext.getValue(Context.OperatorContext.ACTIVATION_WINDOW_ID)).longValue() > largestCompletedWindow) {
            this.scanService.scheduleAtFixedRate(new DBPoller(), 0L, this.pollInterval, TimeUnit.MILLISECONDS);
        }
    }

    protected void initializePreparedStatement() {
        try {
            if (this.isPollerPartition) {
                this.ps = ((JdbcStore) this.store).getConnection().prepareStatement(buildRangeQuery(this.rangeQueryPair.getKey().intValue(), Integer.MAX_VALUE), 1003, 1007);
            } else {
                this.ps = ((JdbcStore) this.store).getConnection().prepareStatement(buildRangeQuery(this.rangeQueryPair.getKey().intValue(), this.rangeQueryPair.getValue().intValue() - this.rangeQueryPair.getKey().intValue()), 1003, 1007);
            }
        } catch (SQLException e) {
            LOG.error("Exception in initializing the range query for a given partition", e);
            throw new RuntimeException(e);
        }
    }

    @Override // com.datatorrent.lib.db.AbstractStoreInputOperator
    public void beginWindow(long j) {
        this.currentWindowId = j;
        if (this.currentWindowId <= this.windowManager.getLargestCompletedWindow()) {
            try {
                replay(this.currentWindowId);
                return;
            } catch (SQLException e) {
                LOG.error("Exception in replayed windows", e);
                throw new RuntimeException(e);
            }
        }
        if (this.isPollerPartition) {
            updatePollQuery();
            this.isPolled = false;
        }
        this.lowerBound = this.lastEmittedRow;
    }

    private void updatePollQuery() {
        if (this.lastPolledRow != this.lastEmittedRow) {
            if (this.lastEmittedRow == null) {
                this.lastPolledRow = this.rangeQueryPair.getKey();
            } else {
                this.lastPolledRow = this.lastEmittedRow;
            }
            try {
                this.ps = ((JdbcStore) this.store).getConnection().prepareStatement(buildRangeQuery(this.lastPolledRow.intValue(), Integer.MAX_VALUE), 1003, 1007);
            } catch (SQLException e) {
                throw new RuntimeException(e);
            }
        }
    }

    public void emitTuples() {
        if (this.currentWindowId <= this.windowManager.getLargestCompletedWindow()) {
            return;
        }
        int size = this.emitQueue.size() < this.batchSize ? this.emitQueue.size() : this.batchSize;
        while (true) {
            int i = size;
            size--;
            if (i <= 0) {
                return;
            }
            T poll = this.emitQueue.poll();
            if (poll != null) {
                emitTuple(poll);
            }
            Integer num = this.lastEmittedRow;
            this.lastEmittedRow = Integer.valueOf(this.lastEmittedRow.intValue() + 1);
        }
    }

    protected abstract void emitTuple(T t);

    @Override // com.datatorrent.lib.db.AbstractStoreInputOperator
    public void endWindow() {
        try {
            if (this.currentWindowId > this.windowManager.getLargestCompletedWindow()) {
                this.currentWindowRecoveryState = new MutablePair<>(this.lowerBound, this.lastEmittedRow);
                this.windowManager.save(this.currentWindowRecoveryState, this.currentWindowId);
            }
            if (this.threadException != null) {
                ((JdbcStore) this.store).disconnect();
                DTThrowable.rethrow(this.threadException.get());
            }
        } catch (IOException e) {
            throw new RuntimeException("saving recovery", e);
        }
    }

    public void deactivate() {
        this.scanService.shutdownNow();
        ((JdbcStore) this.store).disconnect();
    }

    protected void pollRecords() {
        try {
            if (this.isPolled) {
                return;
            }
            try {
                this.ps.setFetchSize(getFetchSize());
                ResultSet executeQuery = this.ps.executeQuery();
                if (executeQuery.next()) {
                    while (true) {
                        if (!this.emitQueue.offer(getTuple(executeQuery))) {
                            Thread.sleep(DEFAULT_SLEEP_TIME);
                        } else if (!executeQuery.next()) {
                            break;
                        }
                    }
                }
                this.isPolled = true;
                if (this.isPollerPartition) {
                    return;
                }
                ((JdbcStore) this.store).disconnect();
            } catch (InterruptedException e) {
                this.threadException = new AtomicReference<>(e);
                if (this.isPollerPartition) {
                    return;
                }
                ((JdbcStore) this.store).disconnect();
            } catch (SQLException e2) {
                this.execute = false;
                this.threadException = new AtomicReference<>(e2);
                if (this.isPollerPartition) {
                    return;
                }
                ((JdbcStore) this.store).disconnect();
            }
        } catch (Throwable th) {
            if (!this.isPollerPartition) {
                ((JdbcStore) this.store).disconnect();
            }
            throw th;
        }
    }

    public abstract T getTuple(ResultSet resultSet);

    protected void replay(long j) throws SQLException {
        try {
            MutablePair<Integer, Integer> mutablePair = (MutablePair) this.windowManager.retrieve(j);
            if (mutablePair != null && shouldReplayWindow(mutablePair)) {
                LOG.debug("[Recovering Window ID - {} for record range: {}, {}]", new Object[]{Long.valueOf(j), mutablePair.left, mutablePair.right});
                this.ps = ((JdbcStore) this.store).getConnection().prepareStatement(buildRangeQuery(((Integer) mutablePair.left).intValue(), ((Integer) mutablePair.right).intValue() - ((Integer) mutablePair.left).intValue()), 1003, 1007);
                LOG.info("Query formed to recover data - {}", this.ps.toString());
                emitReplayedTuples(this.ps);
            }
            if (this.currentWindowId == this.windowManager.getLargestCompletedWindow()) {
                try {
                    if (this.isPollerPartition || this.rangeQueryPair.getValue() == null) {
                        this.ps = ((JdbcStore) this.store).getConnection().prepareStatement(buildRangeQuery((this.lastEmittedRow == null ? this.rangeQueryPair.getKey() : this.lastEmittedRow).intValue(), Integer.MAX_VALUE), 1003, 1007);
                    } else {
                        this.ps = ((JdbcStore) this.store).getConnection().prepareStatement(buildRangeQuery(this.lastEmittedRow.intValue(), this.rangeQueryPair.getValue().intValue() - this.lastEmittedRow.intValue()), 1003, 1007);
                    }
                    this.scanService.scheduleAtFixedRate(new DBPoller(), 0L, this.pollInterval, TimeUnit.MILLISECONDS);
                } catch (SQLException e) {
                    throw new RuntimeException(e);
                }
            }
        } catch (IOException e2) {
            throw new RuntimeException("Exception during replay of records.", e2);
        }
    }

    private boolean shouldReplayWindow(MutablePair<Integer, Integer> mutablePair) {
        return (mutablePair.left == null || mutablePair.right == null || ((Integer) mutablePair.right).equals(this.rangeQueryPair.getValue()) || ((Integer) mutablePair.right).equals(this.lastEmittedRow)) ? false : true;
    }

    public void emitReplayedTuples(PreparedStatement preparedStatement) {
        Throwable th = null;
        try {
            try {
                try {
                    preparedStatement.setFetchSize(getFetchSize());
                    ResultSet executeQuery = preparedStatement.executeQuery();
                    if (executeQuery == null || executeQuery.isClosed()) {
                        if (preparedStatement != null) {
                            if (0 == 0) {
                                preparedStatement.close();
                                return;
                            }
                            try {
                                preparedStatement.close();
                                return;
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                                return;
                            }
                        }
                        return;
                    }
                    while (executeQuery.next()) {
                        emitTuple(getTuple(executeQuery));
                        Integer num = this.lastEmittedRow;
                        this.lastEmittedRow = Integer.valueOf(this.lastEmittedRow.intValue() + 1);
                    }
                    if (preparedStatement != null) {
                        if (0 != 0) {
                            try {
                                preparedStatement.close();
                            } catch (Throwable th3) {
                                th.addSuppressed(th3);
                            }
                        } else {
                            preparedStatement.close();
                        }
                    }
                    return;
                } catch (Throwable th4) {
                    th = th4;
                    throw th4;
                }
            } finally {
            }
        } catch (SQLException e) {
            throw new RuntimeException(e);
        }
        throw new RuntimeException(e);
    }

    public Collection<Partitioner.Partition<AbstractJdbcPollInputOperator<T>>> definePartitions(Collection<Partitioner.Partition<AbstractJdbcPollInputOperator<T>>> collection, Partitioner.PartitioningContext partitioningContext) {
        ArrayList arrayList = new ArrayList(getPartitionCount());
        try {
            try {
                ((JdbcStore) this.store).connect();
                intializeDSLContext();
                HashMap<Integer, KeyValPair<Integer, Integer>> partitionedQueryRangeMap = getPartitionedQueryRangeMap(getPartitionCount());
                ((JdbcStore) this.store).disconnect();
                KryoCloneUtils createCloneUtils = KryoCloneUtils.createCloneUtils(this);
                for (int i = 0; i <= getPartitionCount(); i++) {
                    AbstractJdbcPollInputOperator abstractJdbcPollInputOperator = (AbstractJdbcPollInputOperator) createCloneUtils.getClone();
                    if (i < getPartitionCount()) {
                        abstractJdbcPollInputOperator.rangeQueryPair = partitionedQueryRangeMap.get(Integer.valueOf(i));
                        abstractJdbcPollInputOperator.lastEmittedRow = partitionedQueryRangeMap.get(Integer.valueOf(i)).getKey();
                        abstractJdbcPollInputOperator.isPollerPartition = false;
                    } else {
                        int intValue = partitionedQueryRangeMap.get(Integer.valueOf(i - 1)).getValue().intValue();
                        abstractJdbcPollInputOperator.rangeQueryPair = new KeyValPair<>(Integer.valueOf(intValue), null);
                        abstractJdbcPollInputOperator.lastEmittedRow = Integer.valueOf(intValue);
                        abstractJdbcPollInputOperator.isPollerPartition = true;
                    }
                    arrayList.add(new DefaultPartition(abstractJdbcPollInputOperator));
                }
                return arrayList;
            } catch (SQLException e) {
                LOG.error("Exception in initializing the partition range", e);
                throw new RuntimeException(e);
            }
        } catch (Throwable th) {
            ((JdbcStore) this.store).disconnect();
            throw th;
        }
    }

    public void partitioned(Map<Integer, Partitioner.Partition<AbstractJdbcPollInputOperator<T>>> map) {
    }

    private HashMap<Integer, KeyValPair<Integer, Integer>> getPartitionedQueryRangeMap(int i) throws SQLException {
        int i2 = 0;
        try {
            i2 = getRecordsCount();
        } catch (SQLException e) {
            LOG.error("Exception in getting the record range", e);
        }
        HashMap<Integer, KeyValPair<Integer, Integer>> hashMap = new HashMap<>();
        int i3 = i2 / i;
        int i4 = 0;
        int i5 = 0;
        int i6 = i3;
        while (true) {
            int i7 = i6;
            if (i4 >= i - 1) {
                hashMap.put(Integer.valueOf(i - 1), new KeyValPair<>(Integer.valueOf(i3 * (i - 1)), Integer.valueOf(i2)));
                LOG.info("Partition map - " + hashMap.toString());
                return hashMap;
            }
            hashMap.put(Integer.valueOf(i4), new KeyValPair<>(Integer.valueOf(i5), Integer.valueOf(i7)));
            i4++;
            i5 += i3;
            i6 = i7 + i3;
        }
    }

    private int getRecordsCount() throws SQLException {
        Condition trueCondition = DSL.trueCondition();
        if (getWhereCondition() != null) {
            trueCondition = trueCondition.and(getWhereCondition());
        }
        return ((Integer) this.create.select(DSL.count()).from(getTableName()).where(new Condition[]{trueCondition}).fetchOne(0, Integer.TYPE)).intValue();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public String buildRangeQuery(int i, int i2) {
        String sql;
        Condition trueCondition = DSL.trueCondition();
        if (getWhereCondition() != null) {
            trueCondition = trueCondition.and(getWhereCondition());
        }
        if (getColumnsExpression() != null) {
            ArrayList arrayList = new ArrayList();
            for (String str : getColumnsExpression().split(",")) {
                arrayList.add(DSL.field(str));
            }
            sql = this.create.select(arrayList).from(getTableName()).where(new Condition[]{trueCondition}).orderBy(DSL.field(getKey())).limit(i2).offset(i).getSQL(ParamType.INLINED);
        } else {
            sql = this.create.select(new SelectField[0]).from(getTableName()).where(new Condition[]{trueCondition}).orderBy(DSL.field(getKey())).limit(i2).offset(i).getSQL(ParamType.INLINED);
        }
        LOG.info("DSL Query: " + sql);
        return sql;
    }

    @VisibleForTesting
    protected void setScheduledExecutorService(ScheduledExecutorService scheduledExecutorService) {
        this.scanService = scheduledExecutorService;
    }

    public WindowDataManager getWindowManager() {
        return this.windowManager;
    }

    public void setWindowManager(WindowDataManager windowDataManager) {
        this.windowManager = windowDataManager;
    }

    public int getPartitionCount() {
        return this.partitionCount;
    }

    public void setPartitionCount(int i) {
        this.partitionCount = i;
    }

    public String getWhereCondition() {
        return this.whereCondition;
    }

    public void setWhereCondition(String str) {
        this.whereCondition = str;
    }

    public String getColumnsExpression() {
        return this.columnsExpression;
    }

    public void setColumnsExpression(String str) {
        this.columnsExpression = str;
    }

    public int getFetchSize() {
        return this.fetchSize;
    }

    public void setFetchSize(int i) {
        this.fetchSize = i;
    }

    public int getPollInterval() {
        return this.pollInterval;
    }

    public void setPollInterval(int i) {
        this.pollInterval = i;
    }

    public int getQueueCapacity() {
        return this.queueCapacity;
    }

    public void setQueueCapacity(int i) {
        this.queueCapacity = i;
    }

    public String getTableName() {
        return this.tableName;
    }

    public void setTableName(String str) {
        this.tableName = str;
    }

    public int getBatchSize() {
        return this.batchSize;
    }

    public void setBatchSize(int i) {
        this.batchSize = i;
    }

    public String getKey() {
        return this.key;
    }

    public void setKey(String str) {
        this.key = str;
    }
}
