/*
 * Decompiled with CFR 0.152.
 */
package com.datatorrent.contrib.couchbase;

import com.datatorrent.api.Context;
import com.datatorrent.api.Operator;
import com.datatorrent.contrib.couchbase.CouchBaseSerializer;
import com.datatorrent.contrib.couchbase.CouchBaseWindowStore;
import com.datatorrent.lib.db.AbstractAggregateTransactionableStoreOutputOperator;
import com.datatorrent.netlet.util.DTThrowable;
import java.util.HashMap;
import java.util.Iterator;
import java.util.TreeMap;
import net.spy.memcached.internal.OperationCompletionListener;
import net.spy.memcached.internal.OperationFuture;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class AbstractCouchBaseOutputOperator<T>
extends AbstractAggregateTransactionableStoreOutputOperator<T, CouchBaseWindowStore> {
    private static final transient Logger logger = LoggerFactory.getLogger(AbstractCouchBaseOutputOperator.class);
    protected transient HashMap<OperationFuture<Boolean>, Long> mapFuture = new HashMap();
    protected int numTuples;
    protected CouchBaseSerializer serializer;
    protected TreeMap<Long, T> mapTuples = new TreeMap();
    protected long id = 0L;
    private final transient CompletionListener listener;
    private transient boolean failure;
    private final transient Object syncObj;

    public AbstractCouchBaseOutputOperator() {
        this.store = new CouchBaseWindowStore();
        this.listener = new CompletionListener();
        this.numTuples = 0;
        this.syncObj = new Object();
    }

    public void setup(Context.OperatorContext context) {
        Operator.ProcessingMode mode = (Operator.ProcessingMode)context.getValue(Context.OperatorContext.PROCESSING_MODE);
        if (mode == Operator.ProcessingMode.AT_MOST_ONCE && this.numTuples == 0) {
            this.mapTuples.clear();
        }
        this.numTuples = 0;
        if (!this.mapTuples.isEmpty()) {
            Iterator<T> itr = this.mapTuples.values().iterator();
            while (itr.hasNext()) {
                this.processTuple(itr.next());
            }
        }
        super.setup(context);
    }

    public void beginWindow(long windowId) {
        this.numTuples = 0;
        super.beginWindow(windowId);
    }

    public void processTuple(T tuple) {
        if (this.failure) {
            throw new RuntimeException("Operation Failed");
        }
        this.waitForQueueSize(((CouchBaseWindowStore)this.store).queueSize - 1);
        this.setKeyValueInCouchBase(tuple);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void setKeyValueInCouchBase(T tuple) {
        ++this.id;
        String key = this.getKey(tuple);
        Object value = this.getValue(tuple);
        if (!(value instanceof Boolean || value instanceof Integer || value instanceof String || value instanceof Float || value instanceof Double || value instanceof Character || value instanceof Long || value instanceof Short || value instanceof Byte || this.serializer == null)) {
            value = this.serializer.serialize(value);
        }
        OperationFuture<Boolean> future = this.processKeyValue(key, value);
        Object object = this.syncObj;
        synchronized (object) {
            future.addListener((OperationCompletionListener)this.listener);
            this.mapFuture.put(future, this.id);
            if (!this.mapTuples.containsKey(this.id)) {
                this.mapTuples.put(this.id, tuple);
            }
            ++this.numTuples;
        }
    }

    public void storeAggregate() {
        this.waitForQueueSize(0);
        this.id = 0L;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void waitForQueueSize(int sizeOfQueue) {
        long startTms = System.currentTimeMillis();
        while (this.numTuples > sizeOfQueue) {
            long elapsedTime;
            Object object = this.syncObj;
            synchronized (object) {
                if (this.numTuples > sizeOfQueue) {
                    try {
                        elapsedTime = System.currentTimeMillis() - startTms;
                        if (elapsedTime >= ((CouchBaseWindowStore)this.store).timeout) {
                            throw new RuntimeException("Timed out waiting for space in queue");
                        }
                        this.syncObj.wait(((CouchBaseWindowStore)this.store).timeout - elapsedTime);
                    }
                    catch (InterruptedException ex) {
                        DTThrowable.rethrow((Exception)ex);
                    }
                }
            }
            elapsedTime = System.currentTimeMillis() - startTms;
            if (elapsedTime < ((CouchBaseWindowStore)this.store).timeout) continue;
            throw new RuntimeException("Timed out waiting for space in queue");
        }
    }

    public CouchBaseSerializer getSerializer() {
        return this.serializer;
    }

    public void setSerializer(CouchBaseSerializer serializer) {
        this.serializer = serializer;
    }

    public abstract String getKey(T var1);

    public abstract Object getValue(T var1);

    protected abstract OperationFuture<Boolean> processKeyValue(String var1, Object var2);

    protected class CompletionListener
    implements OperationCompletionListener {
        protected CompletionListener() {
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void onComplete(OperationFuture<?> f) throws Exception {
            if (!((Boolean)f.get()).booleanValue()) {
                logger.error("Operation failed {}", f);
                AbstractCouchBaseOutputOperator.this.failure = true;
                return;
            }
            Object object = AbstractCouchBaseOutputOperator.this.syncObj;
            synchronized (object) {
                long idProcessed = AbstractCouchBaseOutputOperator.this.mapFuture.get(f);
                AbstractCouchBaseOutputOperator.this.mapTuples.remove(idProcessed);
                AbstractCouchBaseOutputOperator.this.mapFuture.remove(f);
                --AbstractCouchBaseOutputOperator.this.numTuples;
                AbstractCouchBaseOutputOperator.this.syncObj.notify();
            }
        }
    }
}

