/*
 * Decompiled with CFR 0.152.
 */
package com.datatorrent.contrib.aerospike;

import com.aerospike.client.AerospikeException;
import com.aerospike.client.Bin;
import com.aerospike.client.Key;
import com.aerospike.client.query.Filter;
import com.aerospike.client.query.IndexType;
import com.aerospike.client.query.RecordSet;
import com.aerospike.client.query.Statement;
import com.aerospike.client.task.IndexTask;
import com.datatorrent.contrib.aerospike.AerospikeStore;
import com.datatorrent.lib.db.TransactionableStore;
import javax.annotation.Nonnull;

public class AerospikeTransactionalStore
extends AerospikeStore
implements TransactionableStore {
    public static String DEFAULT_APP_ID_COL = "dt_app_id";
    public static String DEFAULT_OPERATOR_ID_COL = "dt_operator_id";
    public static String DEFAULT_WINDOW_COL = "dt_window";
    public static String DEFAULT_META_SET = "dt_meta";
    @Nonnull
    protected String metaTableAppIdColumn;
    @Nonnull
    protected String metaTableOperatorIdColumn;
    @Nonnull
    protected String metaTableWindowColumn;
    @Nonnull
    protected String metaSet = DEFAULT_META_SET;
    @Nonnull
    protected String namespace;
    private transient boolean inTransaction = false;
    private transient Statement lastWindowFetchCommand;

    public AerospikeTransactionalStore() {
        this.metaTableAppIdColumn = DEFAULT_APP_ID_COL;
        this.metaTableOperatorIdColumn = DEFAULT_OPERATOR_ID_COL;
        this.metaTableWindowColumn = DEFAULT_WINDOW_COL;
    }

    public void setMetaSet(@Nonnull String metaSet) {
        this.metaSet = metaSet;
    }

    public void setMetaTableAppIdColumn(@Nonnull String appIdColumn) {
        this.metaTableAppIdColumn = appIdColumn;
    }

    public void setMetaTableOperatorIdColumn(@Nonnull String operatorIdColumn) {
        this.metaTableOperatorIdColumn = operatorIdColumn;
    }

    public void setMetaTableWindowColumn(@Nonnull String windowColumn) {
        this.metaTableWindowColumn = windowColumn;
    }

    public void setNamespace(@Nonnull String namespace) {
        this.namespace = namespace;
    }

    @Override
    public void connect() {
        super.connect();
        this.createIndexes();
        try {
            this.lastWindowFetchCommand = new Statement();
            this.lastWindowFetchCommand.setNamespace(this.namespace);
            this.lastWindowFetchCommand.setSetName(this.metaSet);
            this.lastWindowFetchCommand.setBinNames(new String[]{this.metaTableWindowColumn});
        }
        catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    @Override
    public void disconnect() {
        super.disconnect();
    }

    public void beginTransaction() {
        this.inTransaction = true;
    }

    public void commitTransaction() {
        this.inTransaction = false;
    }

    public void rollbackTransaction() {
        this.inTransaction = false;
    }

    public boolean isInTransaction() {
        return this.inTransaction;
    }

    private void createIndexes() {
        try {
            IndexTask task = this.client.createIndex(null, this.namespace, this.metaSet, "operatorIdIndex", this.metaTableOperatorIdColumn, IndexType.NUMERIC);
            task.waitTillComplete();
            task = this.client.createIndex(null, this.namespace, this.metaSet, "appIdIndex", this.metaTableAppIdColumn, IndexType.STRING);
            task.waitTillComplete();
        }
        catch (AerospikeException ex) {
            throw new RuntimeException(ex);
        }
    }

    public long getCommittedWindowId(String appId, int operatorId) {
        try {
            this.lastWindowFetchCommand.setFilters(new Filter[]{Filter.equal((String)this.metaTableOperatorIdColumn, (long)operatorId)});
            this.lastWindowFetchCommand.setFilters(new Filter[]{Filter.equal((String)this.metaTableAppIdColumn, (String)appId)});
            long lastWindow = -1L;
            RecordSet recordSet = this.client.query(null, this.lastWindowFetchCommand);
            while (recordSet.next()) {
                lastWindow = Long.parseLong(recordSet.getRecord().getValue(this.metaTableWindowColumn).toString());
            }
            return lastWindow;
        }
        catch (AerospikeException ex) {
            throw new RuntimeException(ex);
        }
    }

    public void storeCommittedWindowId(String appId, int operatorId, long windowId) {
        try {
            String keyString = appId + String.valueOf(operatorId);
            Key key = new Key(this.namespace, this.metaSet, keyString.hashCode());
            Bin bin1 = new Bin(this.metaTableAppIdColumn, appId);
            Bin bin2 = new Bin(this.metaTableOperatorIdColumn, operatorId);
            Bin bin3 = new Bin(this.metaTableWindowColumn, windowId);
            this.client.put(null, key, new Bin[]{bin1, bin2, bin3});
        }
        catch (AerospikeException e) {
            throw new RuntimeException(e);
        }
    }

    public void removeCommittedWindowId(String appId, int operatorId) {
        try {
            String keyString = appId + String.valueOf(operatorId);
            Key key = new Key(this.namespace, this.metaSet, keyString.hashCode());
            this.client.delete(null, key);
        }
        catch (AerospikeException e) {
            throw new RuntimeException(e);
        }
    }
}

