/*
 * Decompiled with CFR 0.152.
 */
package com.datatorrent.contrib.couchbase;

import com.couchbase.client.CouchbaseClient;
import com.couchbase.client.CouchbaseConnectionFactoryBuilder;
import com.datatorrent.contrib.couchbase.CouchBaseStore;
import com.datatorrent.lib.db.TransactionableStore;
import com.datatorrent.netlet.util.DTThrowable;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class CouchBaseWindowStore
extends CouchBaseStore
implements TransactionableStore {
    private static final Logger logger = LoggerFactory.getLogger(CouchBaseWindowStore.class);
    private static final String DEFAULT_LAST_WINDOW_PREFIX = "last_window";
    private static String lastWindowValue;
    protected transient CouchbaseClient clientMeta = null;
    protected static String bucketMeta;
    protected String passwordMeta;

    public String getBucketMeta() {
        return bucketMeta;
    }

    public void setBucketMeta(String bucketMeta) {
        CouchBaseWindowStore.bucketMeta = bucketMeta;
    }

    public String getPasswordMeta() {
        return this.passwordMeta;
    }

    public void setPasswordMeta(String passwordMeta) {
        this.passwordMeta = passwordMeta;
    }

    public CouchBaseWindowStore() {
        lastWindowValue = DEFAULT_LAST_WINDOW_PREFIX;
        bucketMeta = "default";
        this.passwordMeta = "";
    }

    public CouchbaseClient getMetaInstance() {
        return this.clientMeta;
    }

    @Override
    public void connect() throws IOException {
        super.connect();
        logger.debug("connection established");
        try {
            CouchbaseConnectionFactoryBuilder cfb = new CouchbaseConnectionFactoryBuilder();
            cfb.setOpTimeout(this.timeout);
            cfb.setOpQueueMaxBlockTime((long)this.blockTime);
            this.clientMeta = new CouchbaseClient(cfb.buildCouchbaseConnection(this.baseURIs, bucketMeta, this.passwordMeta));
        }
        catch (IOException e) {
            logger.error("Error connecting to Couchbase: ", (Throwable)e);
            DTThrowable.rethrow((Exception)e);
        }
    }

    public long getCommittedWindowId(String appId, int operatorId) {
        byte[] value = null;
        String key = appId + "_" + operatorId + "_" + lastWindowValue;
        value = (byte[])this.clientMeta.get(key);
        if (value != null) {
            long longval = CouchBaseWindowStore.toLong(value);
            return longval;
        }
        return -1L;
    }

    public void storeCommittedWindowId(String appId, int operatorId, long windowId) {
        byte[] WindowIdBytes = CouchBaseWindowStore.toBytes(windowId);
        String key = appId + "_" + operatorId + "_" + lastWindowValue;
        try {
            this.clientMeta.set(key, (Object)WindowIdBytes).get();
        }
        catch (InterruptedException ex) {
            DTThrowable.rethrow((Exception)ex);
        }
        catch (ExecutionException ex) {
            DTThrowable.rethrow((Exception)ex);
        }
    }

    public void removeCommittedWindowId(String appId, int operatorId) {
    }

    public void beginTransaction() {
    }

    public void commitTransaction() {
    }

    public void rollbackTransaction() {
    }

    public boolean isInTransaction() {
        return false;
    }

    public static long toLong(byte[] b) {
        ByteArrayInputStream baos = new ByteArrayInputStream(b);
        DataInputStream dos = new DataInputStream(baos);
        long result = 0L;
        try {
            result = dos.readLong();
            dos.close();
        }
        catch (IOException e) {
            DTThrowable.rethrow((Exception)e);
        }
        return result;
    }

    public static byte[] toBytes(long l) {
        ByteArrayOutputStream baos = new ByteArrayOutputStream(8);
        DataOutputStream dos = new DataOutputStream(baos);
        byte[] result = null;
        try {
            dos.writeLong(l);
            result = baos.toByteArray();
            dos.close();
        }
        catch (IOException e) {
            DTThrowable.rethrow((Exception)e);
        }
        return result;
    }

    @Override
    public void disconnect() throws IOException {
        this.clientMeta.shutdown((long)this.shutdownTimeout, TimeUnit.SECONDS);
        super.disconnect();
    }
}

