/*
 * Decompiled with CFR 0.152.
 */
package com.datatorrent.contrib.couchbase;

import com.couchbase.client.CouchbaseClient;
import com.couchbase.client.vbucket.config.Config;
import com.datatorrent.api.Context;
import com.datatorrent.api.DefaultPartition;
import com.datatorrent.api.Partitioner;
import com.datatorrent.contrib.couchbase.CouchBaseStore;
import com.datatorrent.lib.db.AbstractStoreInputOperator;
import com.datatorrent.lib.util.KryoCloneUtils;
import com.datatorrent.netlet.util.DTThrowable;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class AbstractCouchBaseInputOperator<T>
extends AbstractStoreInputOperator<T, CouchBaseStore>
implements Partitioner<AbstractCouchBaseInputOperator<T>> {
    private static final Logger logger = LoggerFactory.getLogger(AbstractCouchBaseInputOperator.class);
    protected transient CouchbaseClient clientPartition = null;
    private int serverIndex;
    protected transient Config conf;
    protected String serverURIString;

    public String getServerURIString() {
        return this.serverURIString;
    }

    @VisibleForTesting
    public void setServerURIString(String serverURIString) {
        this.serverURIString = serverURIString;
    }

    public int getServerIndex() {
        return this.serverIndex;
    }

    public void setServerIndex(int serverIndex) {
        this.serverIndex = serverIndex;
    }

    public AbstractCouchBaseInputOperator() {
        this.store = new CouchBaseStore();
    }

    public void setup(Context.OperatorContext context) {
        if (this.clientPartition == null) {
            if (this.conf == null) {
                this.conf = ((CouchBaseStore)this.store).getConf();
            }
            try {
                this.clientPartition = ((CouchBaseStore)this.store).connectServer(this.serverURIString);
            }
            catch (IOException ex) {
                DTThrowable.rethrow((Exception)ex);
            }
        }
    }

    public void teardown() {
        if (this.clientPartition != null) {
            this.clientPartition.shutdown((long)((CouchBaseStore)this.store).shutdownTimeout, TimeUnit.SECONDS);
        }
        super.teardown();
    }

    public void emitTuples() {
        List<String> keys = this.getKeys();
        Object result = null;
        for (String key : keys) {
            int master = this.conf.getMaster(this.conf.getVbucketByKey(key));
            if (master != this.getServerIndex()) continue;
            result = this.clientPartition.get(key);
        }
        if (result != null) {
            T tuple = this.getTuple(result);
            this.outputPort.emit(tuple);
        }
    }

    public abstract T getTuple(Object var1);

    public abstract List<String> getKeys();

    public void partitioned(Map<Integer, Partitioner.Partition<AbstractCouchBaseInputOperator<T>>> partitions) {
    }

    public Collection<Partitioner.Partition<AbstractCouchBaseInputOperator<T>>> definePartitions(Collection<Partitioner.Partition<AbstractCouchBaseInputOperator<T>>> partitions, Partitioner.PartitioningContext incrementalCapacity) {
        this.conf = ((CouchBaseStore)this.store).getConf();
        int numPartitions = this.conf.getServers().size();
        List list = this.conf.getServers();
        ArrayList newPartitions = Lists.newArrayListWithExpectedSize((int)numPartitions);
        KryoCloneUtils cloneUtils = KryoCloneUtils.createCloneUtils((Object)((Object)this));
        for (int i = 0; i < numPartitions; ++i) {
            AbstractCouchBaseInputOperator oper = (AbstractCouchBaseInputOperator)((Object)cloneUtils.getClone());
            oper.setServerIndex(i);
            oper.setServerURIString((String)list.get(i));
            logger.debug("oper {} urlstring is {}", (Object)i, (Object)oper.getServerURIString());
            newPartitions.add(new DefaultPartition((Object)oper));
        }
        return newPartitions;
    }
}

