/*
 * Decompiled with CFR 0.152.
 */
package com.couchbase.client;

import com.couchbase.client.CouchbaseClientIF;
import com.couchbase.client.CouchbaseConnection;
import com.couchbase.client.CouchbaseConnectionFactory;
import com.couchbase.client.CouchbaseMemcachedConnection;
import com.couchbase.client.CouchbaseProperties;
import com.couchbase.client.ObservedException;
import com.couchbase.client.ObservedModifiedException;
import com.couchbase.client.ObservedTimeoutException;
import com.couchbase.client.ViewConnection;
import com.couchbase.client.clustermanager.FlushResponse;
import com.couchbase.client.internal.HttpFuture;
import com.couchbase.client.internal.ObserveFuture;
import com.couchbase.client.internal.ReplicaGetFuture;
import com.couchbase.client.internal.ViewFuture;
import com.couchbase.client.protocol.views.AbstractView;
import com.couchbase.client.protocol.views.DesignDocFetcherOperation;
import com.couchbase.client.protocol.views.DesignDocFetcherOperationImpl;
import com.couchbase.client.protocol.views.DesignDocOperationImpl;
import com.couchbase.client.protocol.views.DesignDocument;
import com.couchbase.client.protocol.views.DocsOperationImpl;
import com.couchbase.client.protocol.views.HttpOperation;
import com.couchbase.client.protocol.views.InvalidViewException;
import com.couchbase.client.protocol.views.NoDocsOperationImpl;
import com.couchbase.client.protocol.views.Paginator;
import com.couchbase.client.protocol.views.Query;
import com.couchbase.client.protocol.views.ReducedOperationImpl;
import com.couchbase.client.protocol.views.SpatialView;
import com.couchbase.client.protocol.views.SpatialViewFetcherOperation;
import com.couchbase.client.protocol.views.SpatialViewFetcherOperationImpl;
import com.couchbase.client.protocol.views.View;
import com.couchbase.client.protocol.views.ViewFetcherOperation;
import com.couchbase.client.protocol.views.ViewFetcherOperationImpl;
import com.couchbase.client.protocol.views.ViewOperation;
import com.couchbase.client.protocol.views.ViewResponse;
import com.couchbase.client.protocol.views.ViewRow;
import com.couchbase.client.vbucket.Reconfigurable;
import com.couchbase.client.vbucket.VBucketNodeLocator;
import com.couchbase.client.vbucket.config.Bucket;
import com.couchbase.client.vbucket.config.Config;
import com.couchbase.client.vbucket.config.ConfigType;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.net.SocketAddress;
import java.net.URI;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import java.util.logging.Level;
import java.util.logging.Logger;
import net.spy.memcached.AddrUtil;
import net.spy.memcached.BroadcastOpFactory;
import net.spy.memcached.CASResponse;
import net.spy.memcached.CASValue;
import net.spy.memcached.CachedData;
import net.spy.memcached.ConnectionFactory;
import net.spy.memcached.MemcachedClient;
import net.spy.memcached.MemcachedNode;
import net.spy.memcached.ObserveResponse;
import net.spy.memcached.OperationTimeoutException;
import net.spy.memcached.PersistTo;
import net.spy.memcached.ReplicateTo;
import net.spy.memcached.internal.BulkFuture;
import net.spy.memcached.internal.GetFuture;
import net.spy.memcached.internal.OperationCompletionListener;
import net.spy.memcached.internal.OperationFuture;
import net.spy.memcached.ops.GetOperation;
import net.spy.memcached.ops.GetlOperation;
import net.spy.memcached.ops.GetsOperation;
import net.spy.memcached.ops.ObserveOperation;
import net.spy.memcached.ops.Operation;
import net.spy.memcached.ops.OperationCallback;
import net.spy.memcached.ops.OperationStatus;
import net.spy.memcached.ops.ReplicaGetOperation;
import net.spy.memcached.ops.ReplicaGetsOperation;
import net.spy.memcached.ops.StatsOperation;
import net.spy.memcached.ops.StatusCode;
import net.spy.memcached.ops.UnlockOperation;
import net.spy.memcached.transcoders.Transcoder;
import org.apache.http.Header;
import org.apache.http.HttpEntity;
import org.apache.http.HttpRequest;
import org.apache.http.HttpVersion;
import org.apache.http.ProtocolVersion;
import org.apache.http.entity.StringEntity;
import org.apache.http.message.BasicHeader;
import org.apache.http.message.BasicHttpEntityEnclosingRequest;
import org.apache.http.message.BasicHttpRequest;

public class CouchbaseClient
extends MemcachedClient
implements CouchbaseClientIF,
Reconfigurable {
    private static final Logger LOGGER = Logger.getLogger(CouchbaseClient.class.getName());
    private static final String MODE_PRODUCTION = "production";
    private static final String MODE_DEVELOPMENT = "development";
    private static final String DEV_PREFIX = "dev_";
    private static final String PROD_PREFIX = "";
    public static final String MODE_PREFIX;
    private static final String MODE_ERROR;
    private ViewConnection vconn = null;
    protected volatile boolean reconfiguring = false;
    private final CouchbaseConnectionFactory cbConnFactory;
    protected final ExecutorService executorService;

    public CouchbaseClient(List<URI> baseList, String bucketName, String pwd) throws IOException {
        this(new CouchbaseConnectionFactory(baseList, bucketName, pwd));
    }

    public CouchbaseClient(List<URI> baseList, String bucketName, String user, String pwd) throws IOException {
        this(new CouchbaseConnectionFactory(baseList, bucketName, pwd));
    }

    public CouchbaseClient(CouchbaseConnectionFactory cf) throws IOException {
        super((ConnectionFactory)cf, AddrUtil.getAddresses(cf.getVBucketConfig().getServers()));
        this.getLogger().info((Object)cf.toString());
        this.cbConnFactory = cf;
        if (cf.getVBucketConfig().getConfigType() == ConfigType.COUCHBASE) {
            List addrs = AddrUtil.getAddressesFromURL(cf.getVBucketConfig().getCouchServers());
            this.vconn = cf.createViewConnection(addrs);
        }
        this.executorService = this.cbConnFactory.getListenerExecutorService();
        this.getLogger().info((Object)MODE_ERROR);
        cf.getConfigurationProvider().subscribe(this);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void reconfigure(Bucket bucket) {
        this.reconfiguring = true;
        try {
            if (this.vconn != null) {
                this.vconn.reconfigure(bucket);
            }
            if (this.mconn instanceof CouchbaseConnection) {
                CouchbaseConnection cbConn = (CouchbaseConnection)this.mconn;
                cbConn.reconfigure(bucket);
            } else {
                CouchbaseMemcachedConnection cbMConn = (CouchbaseMemcachedConnection)this.mconn;
                cbMConn.reconfigure(bucket);
            }
        }
        catch (IllegalArgumentException ex) {
            this.getLogger().warn((Object)"Failed to reconfigure client, staying with previous configuration.", (Throwable)ex);
        }
        finally {
            this.reconfiguring = false;
        }
    }

    public void connectionLost(SocketAddress sa) {
        this.getLogger().debug((Object)("Connection lost for node: " + sa));
        this.cbConnFactory.configurationProvider.reloadConfig();
        super.connectionLost(sa);
    }

    @Override
    public HttpFuture<View> asyncGetView(String designDocumentName, String viewName) {
        CouchbaseConnectionFactory factory = (CouchbaseConnectionFactory)this.connFactory;
        designDocumentName = MODE_PREFIX + designDocumentName;
        String bucket = factory.getBucketName();
        String uri = "/" + bucket + "/_design/" + designDocumentName;
        final CountDownLatch couchLatch = new CountDownLatch(1);
        final HttpFuture<View> crv = new HttpFuture<View>(couchLatch, factory.getViewTimeout(), this.executorService);
        BasicHttpRequest request = new BasicHttpRequest("GET", uri, (ProtocolVersion)HttpVersion.HTTP_1_1);
        ViewFetcherOperationImpl op = new ViewFetcherOperationImpl((HttpRequest)request, bucket, designDocumentName, viewName, new ViewFetcherOperation.ViewFetcherCallback(){
            private View view = null;

            public void receivedStatus(OperationStatus status) {
                crv.set(this.view, status);
            }

            public void complete() {
                couchLatch.countDown();
                crv.signalComplete();
            }

            @Override
            public void gotData(View v) {
                this.view = v;
            }
        });
        crv.setOperation(op);
        this.addOp(op);
        assert (crv != null) : "Problem retrieving view";
        return crv;
    }

    @Override
    public HttpFuture<SpatialView> asyncGetSpatialView(String designDocumentName, String viewName) {
        CouchbaseConnectionFactory factory = (CouchbaseConnectionFactory)this.connFactory;
        designDocumentName = MODE_PREFIX + designDocumentName;
        String bucket = factory.getBucketName();
        String uri = "/" + bucket + "/_design/" + designDocumentName;
        final CountDownLatch couchLatch = new CountDownLatch(1);
        final HttpFuture<SpatialView> crv = new HttpFuture<SpatialView>(couchLatch, factory.getViewTimeout(), this.executorService);
        BasicHttpRequest request = new BasicHttpRequest("GET", uri, (ProtocolVersion)HttpVersion.HTTP_1_1);
        SpatialViewFetcherOperationImpl op = new SpatialViewFetcherOperationImpl((HttpRequest)request, bucket, designDocumentName, viewName, new SpatialViewFetcherOperation.ViewFetcherCallback(){
            private SpatialView view = null;

            public void receivedStatus(OperationStatus status) {
                crv.set(this.view, status);
            }

            public void complete() {
                couchLatch.countDown();
                crv.signalComplete();
            }

            @Override
            public void gotData(SpatialView v) {
                this.view = v;
            }
        });
        crv.setOperation(op);
        this.addOp(op);
        assert (crv != null) : "Problem retrieving spatial view";
        return crv;
    }

    @Override
    public HttpFuture<DesignDocument> asyncGetDesignDoc(String designDocumentName) {
        designDocumentName = MODE_PREFIX + designDocumentName;
        String bucket = ((CouchbaseConnectionFactory)this.connFactory).getBucketName();
        String uri = "/" + bucket + "/_design/" + designDocumentName;
        final CountDownLatch couchLatch = new CountDownLatch(1);
        final HttpFuture<DesignDocument> crv = new HttpFuture<DesignDocument>(couchLatch, 60000L, this.executorService);
        BasicHttpRequest request = new BasicHttpRequest("GET", uri, (ProtocolVersion)HttpVersion.HTTP_1_1);
        DesignDocFetcherOperationImpl op = new DesignDocFetcherOperationImpl((HttpRequest)request, designDocumentName, new DesignDocFetcherOperation.DesignDocFetcherCallback(){
            private DesignDocument design = null;

            public void receivedStatus(OperationStatus status) {
                crv.set(this.design, status);
            }

            public void complete() {
                couchLatch.countDown();
                crv.signalComplete();
            }

            @Override
            public void gotData(DesignDocument d) {
                this.design = d;
            }
        });
        crv.setOperation(op);
        this.addOp(op);
        return crv;
    }

    @Override
    public View getView(String designDocumentName, String viewName) {
        try {
            View view = this.asyncGetView(designDocumentName, viewName).get();
            if (view == null) {
                throw new InvalidViewException("Could not load view \"" + viewName + "\" for design doc \"" + designDocumentName + "\"");
            }
            return view;
        }
        catch (InterruptedException e) {
            throw new RuntimeException("Interrupted getting views", e);
        }
        catch (ExecutionException e) {
            if (e.getCause() instanceof CancellationException) {
                throw (CancellationException)e.getCause();
            }
            throw new RuntimeException("Failed getting views", e);
        }
    }

    @Override
    public SpatialView getSpatialView(String designDocumentName, String viewName) {
        try {
            SpatialView view = this.asyncGetSpatialView(designDocumentName, viewName).get();
            if (view == null) {
                throw new InvalidViewException("Could not load spatial view \"" + viewName + "\" for design doc \"" + designDocumentName + "\"");
            }
            return view;
        }
        catch (InterruptedException e) {
            throw new RuntimeException("Interrupted getting spatial view", e);
        }
        catch (ExecutionException e) {
            if (e.getCause() instanceof CancellationException) {
                throw (CancellationException)e.getCause();
            }
            throw new RuntimeException("Failed getting views", e);
        }
    }

    @Override
    public DesignDocument getDesignDoc(String designDocumentName) {
        try {
            DesignDocument design = this.asyncGetDesignDocument(designDocumentName).get();
            if (design == null) {
                throw new InvalidViewException("Could not load design document \"" + designDocumentName + "\"");
            }
            return design;
        }
        catch (InterruptedException e) {
            throw new RuntimeException("Interrupted getting design document", e);
        }
        catch (ExecutionException e) {
            if (e.getCause() instanceof CancellationException) {
                throw (CancellationException)e.getCause();
            }
            throw new RuntimeException("Failed getting design document", e);
        }
    }

    @Override
    @Deprecated
    public HttpFuture<DesignDocument> asyncGetDesignDocument(String designDocumentName) {
        return this.asyncGetDesignDoc(designDocumentName);
    }

    @Override
    @Deprecated
    public DesignDocument getDesignDocument(String designDocumentName) {
        return this.getDesignDoc(designDocumentName);
    }

    @Override
    public Boolean createDesignDoc(DesignDocument doc) {
        try {
            return this.asyncCreateDesignDoc(doc).get();
        }
        catch (InterruptedException e) {
            throw new RuntimeException("Interrupted creating design document", e);
        }
        catch (ExecutionException e) {
            if (e.getCause() instanceof CancellationException) {
                throw (CancellationException)e.getCause();
            }
            throw new RuntimeException("Failed creating design document", e);
        }
        catch (UnsupportedEncodingException e) {
            throw new RuntimeException("Failed creating design document", e);
        }
    }

    @Override
    public HttpFuture<Boolean> asyncCreateDesignDoc(String name, String value) throws UnsupportedEncodingException {
        this.getLogger().info((Object)("Creating Design Document:" + name));
        String bucket = ((CouchbaseConnectionFactory)this.connFactory).getBucketName();
        String uri = "/" + bucket + "/_design/" + MODE_PREFIX + name;
        final CountDownLatch couchLatch = new CountDownLatch(1);
        final HttpFuture<Boolean> crv = new HttpFuture<Boolean>(couchLatch, 60000L, this.executorService);
        BasicHttpEntityEnclosingRequest request = new BasicHttpEntityEnclosingRequest("PUT", uri, (ProtocolVersion)HttpVersion.HTTP_1_1);
        request.setHeader((Header)new BasicHeader("Content-Type", "application/json"));
        StringEntity entity = new StringEntity(value);
        request.setEntity((HttpEntity)entity);
        DesignDocOperationImpl op = new DesignDocOperationImpl((HttpRequest)request, new OperationCallback(){

            public void receivedStatus(OperationStatus status) {
                crv.set(status.getMessage().equals("Error Code: 201"), status);
            }

            public void complete() {
                couchLatch.countDown();
                crv.signalComplete();
            }
        });
        crv.setOperation(op);
        this.addOp(op);
        return crv;
    }

    @Override
    public HttpFuture<Boolean> asyncCreateDesignDoc(DesignDocument doc) throws UnsupportedEncodingException {
        return this.asyncCreateDesignDoc(doc.getName(), doc.toJson());
    }

    @Override
    public Boolean deleteDesignDoc(String name) {
        try {
            return this.asyncDeleteDesignDoc(name).get();
        }
        catch (InterruptedException e) {
            throw new RuntimeException("Interrupted deleting design document", e);
        }
        catch (ExecutionException e) {
            if (e.getCause() instanceof CancellationException) {
                throw (CancellationException)e.getCause();
            }
            throw new RuntimeException("Failed deleting design document", e);
        }
        catch (UnsupportedEncodingException e) {
            throw new RuntimeException("Failed deleting design document", e);
        }
    }

    @Override
    public HttpFuture<Boolean> asyncDeleteDesignDoc(String name) throws UnsupportedEncodingException {
        this.getLogger().info((Object)("Deleting Design Document:" + name));
        String bucket = ((CouchbaseConnectionFactory)this.connFactory).getBucketName();
        String uri = "/" + bucket + "/_design/" + MODE_PREFIX + name;
        final CountDownLatch couchLatch = new CountDownLatch(1);
        final HttpFuture<Boolean> crv = new HttpFuture<Boolean>(couchLatch, 60000L, this.executorService);
        BasicHttpEntityEnclosingRequest request = new BasicHttpEntityEnclosingRequest("DELETE", uri, (ProtocolVersion)HttpVersion.HTTP_1_1);
        request.setHeader((Header)new BasicHeader("Content-Type", "application/json"));
        DesignDocOperationImpl op = new DesignDocOperationImpl((HttpRequest)request, new OperationCallback(){

            public void receivedStatus(OperationStatus status) {
                crv.set(status.getMessage().equals("Error Code: 200"), status);
            }

            public void complete() {
                couchLatch.countDown();
                crv.signalComplete();
            }
        });
        crv.setOperation(op);
        this.addOp(op);
        return crv;
    }

    @Override
    public HttpFuture<ViewResponse> asyncQuery(AbstractView view, Query query) {
        if (view.hasReduce() && !query.getArgs().containsKey("reduce")) {
            query.setReduce(true);
        }
        if (query.willReduce()) {
            return this.asyncQueryAndReduce(view, query);
        }
        if (query.willIncludeDocs()) {
            return this.asyncQueryAndIncludeDocs(view, query);
        }
        return this.asyncQueryAndExcludeDocs(view, query);
    }

    private HttpFuture<ViewResponse> asyncQueryAndIncludeDocs(AbstractView view, Query query) {
        assert (view != null) : "Who passed me a null view";
        assert (query != null) : "who passed me a null query";
        String viewUri = view.getURI();
        String queryToRun = query.toString();
        assert (viewUri != null) : "view URI seems to be null";
        assert (queryToRun != null) : "query seems to be null";
        String uri = viewUri + queryToRun;
        final CountDownLatch couchLatch = new CountDownLatch(1);
        int timeout = ((CouchbaseConnectionFactory)this.connFactory).getViewTimeout();
        final ViewFuture crv = new ViewFuture(couchLatch, timeout, view, this.executorService);
        BasicHttpRequest request = new BasicHttpRequest("GET", uri, (ProtocolVersion)HttpVersion.HTTP_1_1);
        DocsOperationImpl op = new DocsOperationImpl((HttpRequest)request, view, new ViewOperation.ViewCallback(){
            private ViewResponse vr = null;

            public void receivedStatus(OperationStatus status) {
                if (this.vr != null) {
                    LinkedList<String> ids = new LinkedList<String>();
                    Iterator<ViewRow> itr = this.vr.iterator();
                    while (itr.hasNext()) {
                        ids.add(itr.next().getId());
                    }
                    crv.set(this.vr, (BulkFuture<Map<String, Object>>)CouchbaseClient.this.asyncGetBulk(ids), status);
                } else {
                    crv.set(null, null, status);
                }
            }

            public void complete() {
                couchLatch.countDown();
                crv.signalComplete();
            }

            @Override
            public void gotData(ViewResponse response) {
                this.vr = response;
            }
        });
        crv.setOperation(op);
        this.addOp(op);
        return crv;
    }

    private HttpFuture<ViewResponse> asyncQueryAndExcludeDocs(AbstractView view, Query query) {
        String uri = view.getURI() + query.toString();
        final CountDownLatch couchLatch = new CountDownLatch(1);
        int timeout = ((CouchbaseConnectionFactory)this.connFactory).getViewTimeout();
        final HttpFuture<ViewResponse> crv = new HttpFuture<ViewResponse>(couchLatch, timeout, this.executorService);
        BasicHttpRequest request = new BasicHttpRequest("GET", uri, (ProtocolVersion)HttpVersion.HTTP_1_1);
        NoDocsOperationImpl op = new NoDocsOperationImpl((HttpRequest)request, view, new ViewOperation.ViewCallback(){
            private ViewResponse vr = null;

            public void receivedStatus(OperationStatus status) {
                crv.set(this.vr, status);
            }

            public void complete() {
                couchLatch.countDown();
                crv.signalComplete();
            }

            @Override
            public void gotData(ViewResponse response) {
                this.vr = response;
            }
        });
        crv.setOperation(op);
        this.addOp(op);
        return crv;
    }

    private HttpFuture<ViewResponse> asyncQueryAndReduce(AbstractView view, Query query) {
        if (!view.hasReduce()) {
            throw new RuntimeException("This view doesn't contain a reduce function");
        }
        String uri = view.getURI() + query.toString();
        final CountDownLatch couchLatch = new CountDownLatch(1);
        int timeout = ((CouchbaseConnectionFactory)this.connFactory).getViewTimeout();
        final HttpFuture<ViewResponse> crv = new HttpFuture<ViewResponse>(couchLatch, timeout, this.executorService);
        BasicHttpRequest request = new BasicHttpRequest("GET", uri, (ProtocolVersion)HttpVersion.HTTP_1_1);
        ReducedOperationImpl op = new ReducedOperationImpl((HttpRequest)request, view, new ViewOperation.ViewCallback(){
            private ViewResponse vr = null;

            public void receivedStatus(OperationStatus status) {
                crv.set(this.vr, status);
            }

            public void complete() {
                couchLatch.countDown();
                crv.signalComplete();
            }

            @Override
            public void gotData(ViewResponse response) {
                this.vr = response;
            }
        });
        crv.setOperation(op);
        this.addOp(op);
        return crv;
    }

    @Override
    public ViewResponse query(AbstractView view, Query query) {
        try {
            return this.asyncQuery(view, query).get();
        }
        catch (InterruptedException e) {
            throw new RuntimeException("Interrupted while accessing the view", e);
        }
        catch (ExecutionException e) {
            if (e.getCause() instanceof CancellationException) {
                throw (CancellationException)e.getCause();
            }
            throw new RuntimeException("Failed to access the view", e);
        }
    }

    @Override
    public Paginator paginatedQuery(View view, Query query, int docsPerPage) {
        return new Paginator(this, view, query, docsPerPage);
    }

    protected void addOp(HttpOperation op) {
        if (this.vconn != null) {
            this.vconn.addOp(op);
        }
    }

    public <T> OperationFuture<CASValue<T>> asyncGetAndLock(final String key, int exp, final Transcoder<T> tc) {
        final CountDownLatch latch = new CountDownLatch(1);
        final OperationFuture rv = new OperationFuture(key, latch, this.operationTimeout, this.executorService);
        GetlOperation op = this.opFact.getl(key, exp, new GetlOperation.Callback(){
            private CASValue<T> val = null;

            public void receivedStatus(OperationStatus status) {
                if (!status.isSuccess()) {
                    this.val = new CASValue(-1L, null);
                }
                rv.set(this.val, status);
            }

            public void gotData(String k, int flags, long cas, byte[] data) {
                assert (key.equals(k)) : "Wrong key returned";
                assert (cas > 0L) : "CAS was less than zero:  " + cas;
                this.val = new CASValue(cas, tc.decode(new CachedData(flags, data, tc.getMaxSize())));
            }

            public void complete() {
                latch.countDown();
                rv.signalComplete();
            }
        });
        rv.setOperation((Operation)op);
        this.mconn.enqueueOperation(key, (Operation)op);
        return rv;
    }

    public OperationFuture<CASValue<Object>> asyncGetAndLock(String key, int exp) {
        return this.asyncGetAndLock(key, exp, (Transcoder<T>)this.transcoder);
    }

    @Override
    public Object getFromReplica(String key) {
        return this.getFromReplica(key, this.transcoder);
    }

    @Override
    public CASValue<Object> getsFromReplica(String key) {
        return this.getsFromReplica(key, this.transcoder);
    }

    @Override
    public <T> T getFromReplica(String key, Transcoder<T> tc) {
        try {
            return this.asyncGetFromReplica(key, tc).get(this.operationTimeout, TimeUnit.MILLISECONDS);
        }
        catch (InterruptedException e) {
            throw new RuntimeException("Interrupted waiting for value", e);
        }
        catch (ExecutionException e) {
            throw new RuntimeException("Exception waiting for value", e);
        }
        catch (TimeoutException e) {
            throw new OperationTimeoutException("Timeout waiting for value", (Throwable)e);
        }
    }

    @Override
    public <T> CASValue<T> getsFromReplica(String key, Transcoder<T> tc) {
        try {
            return this.asyncGetsFromReplica(key, tc).get(this.operationTimeout, TimeUnit.MILLISECONDS);
        }
        catch (InterruptedException e) {
            throw new RuntimeException("Interrupted waiting for value", e);
        }
        catch (ExecutionException e) {
            throw new RuntimeException("Exception waiting for value", e);
        }
        catch (TimeoutException e) {
            throw new OperationTimeoutException("Timeout waiting for value", (Throwable)e);
        }
    }

    @Override
    public ReplicaGetFuture<Object> asyncGetFromReplica(String key) {
        return this.asyncGetFromReplica(key, this.transcoder);
    }

    @Override
    public ReplicaGetFuture<CASValue<Object>> asyncGetsFromReplica(String key) {
        return this.asyncGetsFromReplica(key, this.transcoder);
    }

    @Override
    public <T> ReplicaGetFuture<T> asyncGetFromReplica(String key, Transcoder<T> tc) {
        int discardedOps = 0;
        int bucketReplicaCount = this.cbConnFactory.getVBucketConfig().getReplicasCount();
        if (bucketReplicaCount == 0) {
            this.getLogger().debug((Object)"No replica configured for this bucket, trying to get the document from active node only.");
        }
        VBucketNodeLocator locator = (VBucketNodeLocator)this.mconn.getLocator();
        List<Integer> actualReplicaIndexes = locator.getReplicaIndexes(key);
        ReplicaGetFuture replicaFuture = new ReplicaGetFuture(this.operationTimeout, this.executorService);
        for (int index : actualReplicaIndexes) {
            CountDownLatch latch = new CountDownLatch(1);
            GetFuture rv = new GetFuture(latch, this.operationTimeout, key, this.executorService);
            Operation op = this.createOperationForReplicaGet(key, rv, replicaFuture, latch, tc, index, true);
            rv.setOperation(op);
            this.mconn.enqueueOperation(key, op);
            if (op.isCancelled()) {
                ++discardedOps;
                this.getLogger().debug((Object)("Silently discarding replica get for key \"" + key + "\" (cancelled)."));
                continue;
            }
            replicaFuture.addFutureToMonitor(rv);
        }
        if (locator.hasActiveMaster(key)) {
            CountDownLatch latch = new CountDownLatch(1);
            GetFuture additionalActiveGet = new GetFuture(latch, this.operationTimeout, key, this.executorService);
            Operation op = this.createOperationForReplicaGet(key, additionalActiveGet, replicaFuture, latch, tc, 0, false);
            additionalActiveGet.setOperation(op);
            this.mconn.enqueueOperation(key, op);
            if (op.isCancelled()) {
                ++discardedOps;
                this.getLogger().debug((Object)("Silently discarding replica (active) get for key \"" + key + "\" (cancelled)."));
            } else {
                replicaFuture.addFutureToMonitor(additionalActiveGet);
            }
        } else {
            ++discardedOps;
        }
        if (discardedOps == actualReplicaIndexes.size() + 1) {
            throw new IllegalStateException("No replica get operation could be dispatched because all operations have been cancelled.");
        }
        return replicaFuture;
    }

    @Override
    public <T> ReplicaGetFuture<CASValue<T>> asyncGetsFromReplica(String key, Transcoder<T> tc) {
        int discardedOps = 0;
        int bucketReplicaCount = this.cbConnFactory.getVBucketConfig().getReplicasCount();
        if (bucketReplicaCount == 0) {
            this.getLogger().debug((Object)"No replica configured for this bucket, trying to get the document from active node only.");
        }
        VBucketNodeLocator locator = (VBucketNodeLocator)this.mconn.getLocator();
        List<Integer> actualReplicaIndexes = locator.getReplicaIndexes(key);
        ReplicaGetFuture<CASValue<T>> replicaFuture = new ReplicaGetFuture<CASValue<T>>(this.operationTimeout, this.executorService);
        for (int index : actualReplicaIndexes) {
            CountDownLatch latch = new CountDownLatch(1);
            OperationFuture rv = new OperationFuture(key, latch, this.operationTimeout, this.executorService);
            Operation op = this.createOperationForReplicaGets(key, rv, replicaFuture, latch, tc, index, true);
            rv.setOperation(op);
            this.mconn.enqueueOperation(key, op);
            if (op.isCancelled()) {
                ++discardedOps;
                this.getLogger().debug((Object)("Silently discarding replica get for key \"" + key + "\" (cancelled)."));
                continue;
            }
            replicaFuture.addFutureToMonitor((Future<CASValue<T>>)rv);
        }
        if (locator.hasActiveMaster(key)) {
            CountDownLatch latch = new CountDownLatch(1);
            OperationFuture additionalActiveGet = new OperationFuture(key, latch, this.operationTimeout, this.executorService);
            Operation op = this.createOperationForReplicaGets(key, additionalActiveGet, replicaFuture, latch, tc, 0, false);
            additionalActiveGet.setOperation(op);
            this.mconn.enqueueOperation(key, op);
            if (op.isCancelled()) {
                ++discardedOps;
                this.getLogger().debug((Object)("Silently discarding replica (active) get for key \"" + key + "\" (cancelled)."));
            } else {
                replicaFuture.addFutureToMonitor((Future<CASValue<T>>)additionalActiveGet);
            }
        } else {
            ++discardedOps;
        }
        if (discardedOps == actualReplicaIndexes.size() + 1) {
            throw new IllegalStateException("No replica get operation could be dispatched because all operations have been cancelled.");
        }
        return replicaFuture;
    }

    private <T> Operation createOperationForReplicaGet(final String key, final GetFuture<T> future, final ReplicaGetFuture<T> replicaFuture, final CountDownLatch latch, final Transcoder<T> tc, int replicaIndex, boolean replica) {
        if (replica) {
            return this.opFact.replicaGet(key, replicaIndex, new ReplicaGetOperation.Callback(){
                private Future<T> val;
                private boolean usedFuture;

                public void receivedStatus(OperationStatus status) {
                    future.set(this.val, status);
                    if (!replicaFuture.isDone() && (status.isSuccess() || status.getStatusCode() == StatusCode.ERR_NOT_FOUND)) {
                        this.usedFuture = replicaFuture.setCompletedFuture(future);
                    }
                }

                public void gotData(String k, int flags, byte[] data) {
                    assert (key.equals(k)) : "Wrong key returned";
                    this.val = CouchbaseClient.this.tcService.decode(tc, new CachedData(flags, data, tc.getMaxSize()));
                }

                public void complete() {
                    latch.countDown();
                    if (this.usedFuture) {
                        replicaFuture.signalComplete();
                    }
                }
            });
        }
        return this.opFact.get(key, new GetOperation.Callback(){
            private Future<T> val = null;
            private boolean usedFuture;

            public void receivedStatus(OperationStatus status) {
                future.set(this.val, status);
                if (!replicaFuture.isDone() && (status.isSuccess() || status.getStatusCode() == StatusCode.ERR_NOT_FOUND)) {
                    this.usedFuture = replicaFuture.setCompletedFuture(future);
                }
            }

            public void gotData(String k, int flags, byte[] data) {
                assert (key.equals(k)) : "Wrong key returned";
                this.val = CouchbaseClient.this.tcService.decode(tc, new CachedData(flags, data, tc.getMaxSize()));
            }

            public void complete() {
                latch.countDown();
                if (this.usedFuture) {
                    replicaFuture.signalComplete();
                }
            }
        });
    }

    private <T> Operation createOperationForReplicaGets(String key, final OperationFuture<CASValue<T>> future, final ReplicaGetFuture<CASValue<T>> replicaFuture, final CountDownLatch latch, final Transcoder<T> tc, int replicaIndex, boolean replica) {
        if (replica) {
            return this.opFact.replicaGets(key, replicaIndex, new ReplicaGetsOperation.Callback(){
                private CASValue<T> val;
                private boolean usedFuture;

                public void receivedStatus(OperationStatus status) {
                    future.set(this.val, status);
                    if (!replicaFuture.isDone() && (status.isSuccess() || status.getStatusCode() == StatusCode.ERR_NOT_FOUND)) {
                        this.usedFuture = replicaFuture.setCompletedFuture(future);
                    }
                }

                public void gotData(String key, int flags, long cas, byte[] data) {
                    assert (key.equals(key)) : "Wrong key returned";
                    this.val = new CASValue(cas, tc.decode(new CachedData(flags, data, tc.getMaxSize())));
                }

                public void complete() {
                    latch.countDown();
                    if (this.usedFuture) {
                        replicaFuture.signalComplete();
                    }
                }
            });
        }
        return this.opFact.gets(key, new GetsOperation.Callback(){
            private CASValue<T> val = null;
            private boolean usedFuture;

            public void receivedStatus(OperationStatus status) {
                future.set(this.val, status);
                if (!replicaFuture.isDone() && (status.isSuccess() || status.getStatusCode() == StatusCode.ERR_NOT_FOUND)) {
                    this.usedFuture = replicaFuture.setCompletedFuture(future);
                }
            }

            public void gotData(String key, int flags, long cas, byte[] data) {
                assert (key.equals(key)) : "Wrong key returned";
                this.val = new CASValue(cas, tc.decode(new CachedData(flags, data, tc.getMaxSize())));
            }

            public void complete() {
                latch.countDown();
                if (this.usedFuture) {
                    replicaFuture.signalComplete();
                }
            }
        });
    }

    @Override
    public <T> CASValue<T> getAndLock(String key, int exp, Transcoder<T> tc) {
        try {
            return (CASValue)this.asyncGetAndLock(key, exp, tc).get(this.operationTimeout, TimeUnit.MILLISECONDS);
        }
        catch (InterruptedException e) {
            throw new RuntimeException("Interrupted waiting for value", e);
        }
        catch (ExecutionException e) {
            if (e.getCause() instanceof CancellationException) {
                throw (CancellationException)e.getCause();
            }
            throw new RuntimeException("Exception waiting for value", e);
        }
        catch (TimeoutException e) {
            throw new OperationTimeoutException("Timeout waiting for value", (Throwable)e);
        }
    }

    @Override
    public CASValue<Object> getAndLock(String key, int exp) {
        return this.getAndLock(key, exp, this.transcoder);
    }

    @Override
    public <T> OperationFuture<Boolean> asyncUnlock(String key, long casId, Transcoder<T> tc) {
        final CountDownLatch latch = new CountDownLatch(1);
        final OperationFuture rv = new OperationFuture(key, latch, this.operationTimeout, this.executorService);
        UnlockOperation op = this.opFact.unlock(key, casId, new OperationCallback(){

            public void receivedStatus(OperationStatus s) {
                rv.set((Object)s.isSuccess(), s);
            }

            public void complete() {
                latch.countDown();
                rv.signalComplete();
            }
        });
        rv.setOperation((Operation)op);
        this.mconn.enqueueOperation(key, (Operation)op);
        return rv;
    }

    @Override
    public OperationFuture<Boolean> asyncUnlock(String key, long casId) {
        return this.asyncUnlock(key, casId, this.transcoder);
    }

    @Override
    public <T> Boolean unlock(String key, long casId, Transcoder<T> tc) {
        try {
            return (Boolean)this.asyncUnlock(key, casId, tc).get(this.operationTimeout, TimeUnit.MILLISECONDS);
        }
        catch (InterruptedException e) {
            throw new RuntimeException("Interrupted waiting for value", e);
        }
        catch (ExecutionException e) {
            if (e.getCause() instanceof CancellationException) {
                throw (CancellationException)e.getCause();
            }
            throw new RuntimeException("Exception waiting for value", e);
        }
        catch (TimeoutException e) {
            throw new OperationTimeoutException("Timeout waiting for value", (Throwable)e);
        }
    }

    @Override
    public Boolean unlock(String key, long casId) {
        return this.unlock(key, casId, this.transcoder);
    }

    @Override
    public OperationFuture<Boolean> delete(String key, PersistTo req, ReplicateTo rep) {
        if (this.mconn instanceof CouchbaseMemcachedConnection) {
            throw new IllegalArgumentException("Durability options are not supported on memcached type buckets.");
        }
        OperationFuture deleteOp = this.delete(key);
        if (req == PersistTo.ZERO && rep == ReplicateTo.ZERO) {
            return deleteOp;
        }
        return this.asyncObserveStore(key, (OperationFuture<Boolean>)deleteOp, req, rep, "Delete", true);
    }

    @Override
    public OperationFuture<Boolean> delete(String key, PersistTo req) {
        return this.delete(key, req, ReplicateTo.ZERO);
    }

    @Override
    public OperationFuture<Boolean> delete(String key, ReplicateTo req) {
        return this.delete(key, PersistTo.ZERO, req);
    }

    @Override
    public OperationFuture<Boolean> set(String key, Object value) {
        return this.set(key, 0, value);
    }

    @Override
    public OperationFuture<Boolean> set(String key, int exp, Object value, PersistTo req, ReplicateTo rep) {
        if (this.mconn instanceof CouchbaseMemcachedConnection) {
            throw new IllegalArgumentException("Durability options are not supported on memcached type buckets.");
        }
        OperationFuture setOp = this.set(key, exp, value);
        if (req == PersistTo.ZERO && rep == ReplicateTo.ZERO) {
            return setOp;
        }
        return this.asyncObserveStore(key, (OperationFuture<Boolean>)setOp, req, rep, "Set", false);
    }

    @Override
    public OperationFuture<Boolean> set(String key, Object value, PersistTo req, ReplicateTo rep) {
        return this.set(key, 0, value, req, rep);
    }

    @Override
    public OperationFuture<Boolean> add(String key, Object value) {
        return this.add(key, 0, value);
    }

    @Override
    public OperationFuture<Boolean> set(String key, int exp, Object value, PersistTo req) {
        return this.set(key, exp, value, req, ReplicateTo.ZERO);
    }

    @Override
    public OperationFuture<Boolean> set(String key, Object value, PersistTo req) {
        return this.set(key, 0, value, req);
    }

    @Override
    public OperationFuture<Boolean> set(String key, int exp, Object value, ReplicateTo rep) {
        return this.set(key, exp, value, PersistTo.ZERO, rep);
    }

    @Override
    public OperationFuture<Boolean> set(String key, Object value, ReplicateTo rep) {
        return this.set(key, 0, value, rep);
    }

    @Override
    public OperationFuture<Boolean> add(String key, int exp, Object value, PersistTo req, ReplicateTo rep) {
        if (this.mconn instanceof CouchbaseMemcachedConnection) {
            throw new IllegalArgumentException("Durability options are not supported on memcached type buckets.");
        }
        OperationFuture addOp = this.add(key, exp, value);
        if (req == PersistTo.ZERO && rep == ReplicateTo.ZERO) {
            return addOp;
        }
        return this.asyncObserveStore(key, (OperationFuture<Boolean>)addOp, req, rep, "Add", false);
    }

    @Override
    public OperationFuture<Boolean> add(String key, Object value, PersistTo req, ReplicateTo rep) {
        return this.add(key, 0, value, req, rep);
    }

    @Override
    public OperationFuture<Boolean> replace(String key, Object value) {
        return this.replace(key, 0, value);
    }

    @Override
    public OperationFuture<Boolean> add(String key, int exp, Object value, PersistTo req) {
        return this.add(key, exp, value, req, ReplicateTo.ZERO);
    }

    @Override
    public OperationFuture<Boolean> add(String key, Object value, PersistTo req) {
        return this.add(key, 0, value, req);
    }

    @Override
    public OperationFuture<Boolean> add(String key, int exp, Object value, ReplicateTo rep) {
        return this.add(key, exp, value, PersistTo.ZERO, rep);
    }

    @Override
    public OperationFuture<Boolean> add(String key, Object value, ReplicateTo rep) {
        return this.add(key, 0, value, rep);
    }

    @Override
    public OperationFuture<Boolean> replace(String key, int exp, Object value, PersistTo req, ReplicateTo rep) {
        if (this.mconn instanceof CouchbaseMemcachedConnection) {
            throw new IllegalArgumentException("Durability options are not supported on memcached type buckets.");
        }
        OperationFuture replaceOp = this.replace(key, exp, value);
        if (req == PersistTo.ZERO && rep == ReplicateTo.ZERO) {
            return replaceOp;
        }
        return this.asyncObserveStore(key, (OperationFuture<Boolean>)replaceOp, req, rep, "Replace", false);
    }

    private ObserveFuture<Boolean> asyncObserveStore(final String key, OperationFuture<Boolean> original, final PersistTo req, final ReplicateTo rep, final String prefix, final boolean delete) {
        final CountDownLatch latch = new CountDownLatch(1);
        final ObserveFuture<Boolean> observeFuture = new ObserveFuture<Boolean>(key, latch, this.cbConnFactory.getObsTimeout(), this.executorService);
        original.addListener(new OperationCompletionListener(){

            public void onComplete(OperationFuture<?> future) throws Exception {
                boolean replaceStatus = false;
                try {
                    replaceStatus = (Boolean)future.get();
                    observeFuture.set(replaceStatus, future.getStatus());
                    if (future.getCas() != null) {
                        observeFuture.setCas(future.getCas());
                    }
                }
                catch (InterruptedException e) {
                    observeFuture.set(false, new OperationStatus(false, prefix + " get " + "timed out"));
                }
                catch (ExecutionException e) {
                    if (e.getCause() instanceof CancellationException) {
                        observeFuture.set(false, new OperationStatus(false, prefix + " get " + "cancellation exception "));
                    }
                    observeFuture.set(false, new OperationStatus(false, prefix + " get " + "execution exception "));
                }
                if (!replaceStatus) {
                    latch.countDown();
                    observeFuture.signalComplete();
                    return;
                }
                try {
                    CouchbaseClient.this.observePoll(key, future.getCas(), req, rep, delete);
                    observeFuture.set(true, future.getStatus());
                }
                catch (ObservedException e) {
                    observeFuture.set(false, new OperationStatus(false, e.getMessage()));
                }
                catch (ObservedTimeoutException e) {
                    observeFuture.set(false, new OperationStatus(false, e.getMessage()));
                }
                catch (ObservedModifiedException e) {
                    observeFuture.set(false, new OperationStatus(false, e.getMessage()));
                }
                latch.countDown();
                observeFuture.signalComplete();
            }
        });
        return observeFuture;
    }

    @Override
    public OperationFuture<Boolean> replace(String key, Object value, PersistTo req, ReplicateTo rep) {
        return this.replace(key, 0, value, req, rep);
    }

    @Override
    public OperationFuture<Boolean> replace(String key, int exp, Object value, PersistTo req) {
        return this.replace(key, exp, value, req, ReplicateTo.ZERO);
    }

    @Override
    public OperationFuture<Boolean> replace(String key, Object value, PersistTo req) {
        return this.replace(key, 0, value, req);
    }

    @Override
    public OperationFuture<Boolean> replace(String key, int exp, Object value, ReplicateTo rep) {
        return this.replace(key, exp, value, PersistTo.ZERO, rep);
    }

    @Override
    public OperationFuture<Boolean> replace(String key, Object value, ReplicateTo rep) {
        return this.replace(key, 0, value, rep);
    }

    @Override
    public CASResponse cas(String key, long cas, Object value, PersistTo req, ReplicateTo rep) {
        return this.cas(key, cas, 0, value, req, rep);
    }

    @Override
    public CASResponse cas(String key, long cas, int exp, Object value, PersistTo req, ReplicateTo rep) {
        CASResponse casr = null;
        try {
            OperationFuture<CASResponse> casOp = this.asyncCas(key, cas, exp, value, req, rep);
            long timeout = this.cbConnFactory.getObsTimeout();
            if (req == PersistTo.ZERO && rep == ReplicateTo.ZERO) {
                timeout = this.operationTimeout;
            }
            casr = (CASResponse)casOp.get(timeout, TimeUnit.MILLISECONDS);
            return casr;
        }
        catch (InterruptedException e) {
            throw new RuntimeException("Interrupted waiting for value", e);
        }
        catch (ExecutionException e) {
            if (e.getCause() instanceof CancellationException) {
                throw (CancellationException)e.getCause();
            }
            throw new RuntimeException("Exception waiting for value", e);
        }
        catch (TimeoutException e) {
            throw new OperationTimeoutException("Timeout waiting for value: ", (Throwable)e);
        }
    }

    @Override
    public CASResponse cas(String key, long cas, Object value, PersistTo req) {
        return this.cas(key, cas, value, req, ReplicateTo.ZERO);
    }

    @Override
    public CASResponse cas(String key, long cas, int exp, Object value, PersistTo req) {
        return this.cas(key, cas, exp, value, req, ReplicateTo.ZERO);
    }

    @Override
    public CASResponse cas(String key, long cas, Object value, ReplicateTo rep) {
        return this.cas(key, cas, value, PersistTo.ZERO, rep);
    }

    @Override
    public CASResponse cas(String key, long cas, int exp, Object value, ReplicateTo rep) {
        return this.cas(key, cas, exp, value, PersistTo.ZERO, rep);
    }

    @Override
    public OperationFuture<CASResponse> asyncCas(String key, long cas, Object value, PersistTo req, ReplicateTo rep) {
        return this.asyncCas(key, cas, 0, value, req, rep);
    }

    @Override
    public OperationFuture<CASResponse> asyncCas(String key, long cas, Object value, PersistTo req) {
        return this.asyncCas(key, cas, value, req, ReplicateTo.ZERO);
    }

    public OperationFuture<CASResponse> asyncCas(String key, long cas, Object value, ReplicateTo rep) {
        return this.asyncCas(key, cas, value, PersistTo.ZERO, rep);
    }

    @Override
    public OperationFuture<CASResponse> asyncCas(String key, long cas, int exp, Object value, PersistTo req) {
        return this.asyncCas(key, cas, exp, value, req, ReplicateTo.ZERO);
    }

    @Override
    public OperationFuture<CASResponse> asyncCas(String key, long cas, int exp, Object value, ReplicateTo rep) {
        return this.asyncCas(key, cas, exp, value, PersistTo.ZERO, rep);
    }

    @Override
    public OperationFuture<CASResponse> asyncCas(final String key, long cas, int exp, Object value, final PersistTo req, final ReplicateTo rep) {
        if (this.mconn instanceof CouchbaseMemcachedConnection) {
            throw new IllegalArgumentException("Durability options are not supported on memcached type buckets.");
        }
        OperationFuture casOp = this.asyncCAS(key, cas, exp, value, this.transcoder);
        final CountDownLatch latch = new CountDownLatch(1);
        final ObserveFuture<CASResponse> observeFuture = new ObserveFuture<CASResponse>(key, latch, this.cbConnFactory.getObsTimeout(), this.executorService);
        casOp.addListener(new OperationCompletionListener(){

            public void onComplete(OperationFuture<?> future) throws Exception {
                CASResponse casr;
                try {
                    casr = (CASResponse)future.get();
                    observeFuture.set(casr, future.getStatus());
                    if (future.getCas() != null) {
                        observeFuture.setCas(future.getCas());
                    }
                }
                catch (InterruptedException e) {
                    casr = CASResponse.EXISTS;
                }
                catch (ExecutionException e) {
                    casr = CASResponse.EXISTS;
                }
                if (casr != CASResponse.OK || req == PersistTo.ZERO && rep == ReplicateTo.ZERO) {
                    latch.countDown();
                    observeFuture.signalComplete();
                    return;
                }
                try {
                    CouchbaseClient.this.observePoll(key, future.getCas(), req, rep, false);
                    observeFuture.set(casr, future.getStatus());
                }
                catch (ObservedException e) {
                    observeFuture.set(CASResponse.OBSERVE_ERROR_IN_ARGS, new OperationStatus(false, e.getMessage()));
                }
                catch (ObservedTimeoutException e) {
                    observeFuture.set(CASResponse.OBSERVE_TIMEOUT, new OperationStatus(false, e.getMessage()));
                }
                catch (ObservedModifiedException e) {
                    observeFuture.set(CASResponse.OBSERVE_MODIFIED, new OperationStatus(false, e.getMessage()));
                }
                latch.countDown();
                observeFuture.signalComplete();
            }
        });
        return observeFuture;
    }

    private Map<MemcachedNode, ObserveResponse> observe(final String key, final long cas, boolean toMaster, boolean toReplica) {
        MemcachedNode primary;
        Config cfg = ((CouchbaseConnectionFactory)this.connFactory).getVBucketConfig();
        VBucketNodeLocator locator = (VBucketNodeLocator)this.mconn.getLocator();
        final int vb = locator.getVBucketIndex(key);
        ArrayList<MemcachedNode> bcastNodes = new ArrayList<MemcachedNode>();
        if (toMaster && (primary = locator.getPrimary(key)) != null) {
            bcastNodes.add(primary);
        }
        if (toReplica) {
            for (int i = 0; i < cfg.getReplicasCount(); ++i) {
                MemcachedNode replica = locator.getReplica(key, i);
                if (replica == null) continue;
                bcastNodes.add(replica);
            }
        }
        final HashMap<MemcachedNode, ObserveResponse> response = new HashMap<MemcachedNode, ObserveResponse>();
        CountDownLatch blatch = this.broadcastOp(new BroadcastOpFactory(){

            public Operation newOp(MemcachedNode n, final CountDownLatch latch) {
                return CouchbaseClient.this.opFact.observe(key, cas, vb, new ObserveOperation.Callback(){

                    public void receivedStatus(OperationStatus s) {
                    }

                    public void gotData(String key, long retCas, MemcachedNode node, ObserveResponse or) {
                        if (cas == retCas) {
                            response.put(node, or);
                        } else if (or == ObserveResponse.NOT_FOUND_PERSISTED) {
                            response.put(node, or);
                        } else {
                            response.put(node, ObserveResponse.MODIFIED);
                        }
                    }

                    public void complete() {
                        latch.countDown();
                    }
                });
            }
        }, bcastNodes);
        try {
            blatch.await(this.operationTimeout, TimeUnit.MILLISECONDS);
            return response;
        }
        catch (InterruptedException e) {
            throw new RuntimeException("Interrupted waiting for value", e);
        }
    }

    @Override
    public Map<MemcachedNode, ObserveResponse> observe(String key, long cas) {
        return this.observe(key, cas, true, true);
    }

    @Override
    public int getNumVBuckets() {
        return ((CouchbaseConnectionFactory)this.connFactory).getVBucketConfig().getVbucketsCount();
    }

    public boolean shutdown(long timeout, TimeUnit unit) {
        boolean shutdownResult = false;
        try {
            shutdownResult = super.shutdown(timeout, unit);
        }
        catch (Exception ex) {
            LOGGER.log(Level.SEVERE, "Unexpected Exception in shutdown", ex);
        }
        try {
            CouchbaseConnectionFactory cf = (CouchbaseConnectionFactory)this.connFactory;
            cf.getConfigurationProvider().shutdown();
            if (this.vconn != null) {
                this.vconn.shutdown();
            }
        }
        catch (IOException ex) {
            LOGGER.log(Level.SEVERE, "Unexpected IOException in shutdown", ex);
            shutdownResult = false;
        }
        return shutdownResult;
    }

    private void checkObserveReplica(String key, int numPersist, int numReplica) {
        int vBucketIndex;
        int currentReplicaNum;
        Config cfg = ((CouchbaseConnectionFactory)this.connFactory).getVBucketConfig();
        VBucketNodeLocator locator = (VBucketNodeLocator)this.mconn.getLocator();
        if (numReplica > 0 && (currentReplicaNum = cfg.getReplica(vBucketIndex = locator.getVBucketIndex(key), numReplica - 1)) < 0) {
            throw new ObservedException("Currently, there is no replica node available for the given replication index (" + numReplica + ").");
        }
        int replicaCount = Math.min(locator.getAll().size() - 1, cfg.getReplicasCount());
        if (numReplica > replicaCount) {
            throw new ObservedException("Requested replication to " + numReplica + " node(s), but only " + replicaCount + " are available.");
        }
        if (numPersist > replicaCount + 1) {
            throw new ObservedException("Requested persistence to " + (numPersist + 1) + " node(s), but only " + (replicaCount + 1) + " are available.");
        }
    }

    @Override
    public void observePoll(String key, long cas, PersistTo persist, ReplicateTo replicate, boolean isDelete) {
        if (persist == null) {
            persist = PersistTo.ZERO;
        }
        if (replicate == null) {
            replicate = ReplicateTo.ZERO;
        }
        int maxPolls = this.cbConnFactory.getObsPollMax();
        long pollInterval = this.cbConnFactory.getObsPollInterval();
        VBucketNodeLocator locator = (VBucketNodeLocator)this.mconn.getLocator();
        int shouldPersistTo = persist.getValue() > 0 ? persist.getValue() - 1 : 0;
        int shouldReplicateTo = replicate.getValue();
        boolean shouldPersistToMaster = persist.getValue() > 0;
        boolean toMaster = persist.getValue() > 0;
        boolean toReplica = replicate.getValue() > 0 || persist.getValue() > 1;
        int donePolls = 0;
        int alreadyPersistedTo = 0;
        int alreadyReplicatedTo = 0;
        boolean alreadyPersistedToMaster = false;
        while (shouldReplicateTo > alreadyReplicatedTo || shouldPersistTo - 1 > alreadyPersistedTo || !alreadyPersistedToMaster && shouldPersistToMaster) {
            this.checkObserveReplica(key, shouldPersistTo, shouldReplicateTo);
            if (++donePolls >= maxPolls) {
                long timeTried = (long)maxPolls * pollInterval;
                throw new ObservedTimeoutException("Observe Timeout - Polled Unsuccessfully for at least " + TimeUnit.MILLISECONDS.toSeconds(timeTried) + " seconds.");
            }
            Map<MemcachedNode, ObserveResponse> response = this.observe(key, cas, toMaster, toReplica);
            MemcachedNode master = locator.getPrimary(key);
            alreadyPersistedTo = 0;
            alreadyReplicatedTo = 0;
            alreadyPersistedToMaster = false;
            for (Map.Entry<MemcachedNode, ObserveResponse> r : response.entrySet()) {
                boolean isMaster;
                MemcachedNode node = r.getKey();
                ObserveResponse observeResponse = r.getValue();
                boolean bl = isMaster = node == master;
                if (isMaster && observeResponse == ObserveResponse.MODIFIED) {
                    throw new ObservedModifiedException("Key was modified");
                }
                if (isDelete) {
                    if (!isMaster && observeResponse == ObserveResponse.NOT_FOUND_NOT_PERSISTED) {
                        ++alreadyReplicatedTo;
                    }
                    if (observeResponse != ObserveResponse.NOT_FOUND_PERSISTED) continue;
                    if (isMaster) {
                        alreadyPersistedToMaster = true;
                        continue;
                    }
                    ++alreadyReplicatedTo;
                    ++alreadyPersistedTo;
                    continue;
                }
                if (!isMaster && observeResponse == ObserveResponse.FOUND_NOT_PERSISTED) {
                    ++alreadyReplicatedTo;
                }
                if (observeResponse != ObserveResponse.FOUND_PERSISTED) continue;
                if (isMaster) {
                    alreadyPersistedToMaster = true;
                    continue;
                }
                ++alreadyReplicatedTo;
                ++alreadyPersistedTo;
            }
            try {
                if (shouldReplicateTo <= alreadyReplicatedTo && shouldPersistTo - 1 <= alreadyPersistedTo && (alreadyPersistedToMaster || !shouldPersistToMaster)) continue;
                Thread.sleep(pollInterval);
            }
            catch (InterruptedException e) {
                this.getLogger().error((Object)"Interrupted while in observe loop.", (Throwable)e);
                throw new ObservedException("Observe was Interrupted ");
            }
        }
    }

    @Override
    public OperationFuture<Map<String, String>> getKeyStats(String key) {
        final CountDownLatch latch = new CountDownLatch(1);
        final OperationFuture rv = new OperationFuture(key, latch, this.operationTimeout, this.executorService);
        StatsOperation op = this.opFact.keyStats(key, new StatsOperation.Callback(){
            private final Map<String, String> stats = new HashMap<String, String>();

            public void gotStat(String name, String val) {
                this.stats.put(name, val);
            }

            public void receivedStatus(OperationStatus status) {
                rv.set(this.stats, status);
            }

            public void complete() {
                latch.countDown();
                rv.signalComplete();
            }
        });
        rv.setOperation((Operation)op);
        this.mconn.enqueueOperation(key, (Operation)op);
        return rv;
    }

    public OperationFuture<Boolean> flush() {
        return this.flush(-1);
    }

    public OperationFuture<Boolean> flush(int delay) {
        if (this.connectionShutDown()) {
            throw new IllegalStateException("Flush can not be used after shutdown.");
        }
        final CountDownLatch latch = new CountDownLatch(1);
        AtomicReference<OperationFuture<Boolean>> rv = new AtomicReference<OperationFuture<Boolean>>();
        final FlushRunner flushRunner = new FlushRunner(latch, rv);
        rv.set(new OperationFuture<Boolean>(PROD_PREFIX, latch, this.operationTimeout, this.executorService){
            private final CouchbaseConnectionFactory factory;
            {
                super(x0, x1, x2, x3);
                this.factory = (CouchbaseConnectionFactory)CouchbaseClient.this.connFactory;
            }

            public boolean cancel() {
                throw new UnsupportedOperationException("Flush cannot be canceled");
            }

            public boolean isDone() {
                return flushRunner.status();
            }

            public Boolean get(long duration, TimeUnit units) throws InterruptedException, TimeoutException, ExecutionException {
                if (!latch.await(duration, units)) {
                    throw new TimeoutException("Flush not completed within timeout.");
                }
                return flushRunner.status();
            }

            public Boolean get() throws InterruptedException, ExecutionException {
                try {
                    return this.get(this.factory.getViewTimeout(), TimeUnit.MILLISECONDS);
                }
                catch (TimeoutException e) {
                    throw new RuntimeException("Timed out waiting for operation", e);
                }
            }

            public Long getCas() {
                throw new UnsupportedOperationException("Flush has no CAS value.");
            }

            public String getKey() {
                throw new UnsupportedOperationException("Flush has no associated key.");
            }

            public OperationStatus getStatus() {
                throw new UnsupportedOperationException("Flush has no OperationStatus.");
            }

            public boolean isCancelled() {
                throw new UnsupportedOperationException("Flush cannot be canceled.");
            }
        });
        Thread flusher = new Thread((Runnable)flushRunner, "Temporary Flusher");
        flusher.setDaemon(true);
        flusher.start();
        return rv.get();
    }

    private boolean flushBucket() {
        FlushResponse res = this.cbConnFactory.getClusterManager().flushBucket(this.cbConnFactory.getBucketName());
        return res.equals((Object)FlushResponse.OK);
    }

    protected boolean connectionShutDown() {
        if (this.mconn instanceof CouchbaseConnection) {
            return ((CouchbaseConnection)this.mconn).isShutDown();
        }
        if (this.mconn instanceof CouchbaseMemcachedConnection) {
            return ((CouchbaseMemcachedConnection)this.mconn).isShutDown();
        }
        throw new IllegalStateException("Unknown connection type: " + this.mconn.getClass().getCanonicalName());
    }

    static {
        CouchbaseProperties.setPropertyFile("cbclient.properties");
        String viewmode = CouchbaseProperties.getProperty("viewmode");
        if (viewmode == null) {
            viewmode = CouchbaseProperties.getProperty("viewmode", true);
        }
        if (viewmode == null) {
            MODE_ERROR = "viewmode property isn't defined. Setting viewmode to production mode";
            MODE_PREFIX = PROD_PREFIX;
        } else if (viewmode.equals(MODE_PRODUCTION)) {
            MODE_ERROR = "viewmode set to production mode";
            MODE_PREFIX = PROD_PREFIX;
        } else if (viewmode.equals(MODE_DEVELOPMENT)) {
            MODE_ERROR = "viewmode set to development mode";
            MODE_PREFIX = DEV_PREFIX;
        } else {
            MODE_ERROR = "unknown value \"" + viewmode + "\" for property viewmode" + " Setting to production mode";
            MODE_PREFIX = PROD_PREFIX;
        }
    }

    private class FlushRunner
    implements Runnable {
        private final CountDownLatch flatch;
        private Boolean flushStatus = false;
        private AtomicReference<OperationFuture<Boolean>> future;

        public FlushRunner(CountDownLatch latch, AtomicReference<OperationFuture<Boolean>> rv) {
            this.flatch = latch;
            this.future = rv;
        }

        @Override
        public void run() {
            this.flushStatus = CouchbaseClient.this.flushBucket();
            this.flatch.countDown();
            if (this.future.get() != null) {
                this.future.get().signalComplete();
            }
        }

        private boolean status() {
            return this.flushStatus;
        }
    }
}

