/*
 * Decompiled with CFR 0.152.
 */
package com.datatorrent.contrib.elasticsearch;

import com.datatorrent.api.Context;
import com.datatorrent.contrib.elasticsearch.ElasticSearchConnectable;
import com.datatorrent.lib.db.AbstractStoreOutputOperator;
import com.datatorrent.netlet.util.DTThrowable;
import java.util.Queue;
import java.util.concurrent.ArrayBlockingQueue;
import javax.validation.constraints.Min;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.client.Client;

public abstract class AbstractElasticSearchOutputOperator<T, S extends ElasticSearchConnectable>
extends AbstractStoreOutputOperator<T, S> {
    protected static final int DEFAULT_BATCH_SIZE = 1000;
    @Min(value=1L)
    protected int batchSize = 1000;
    protected transient Queue<T> tupleBatch;

    public void setup(Context.OperatorContext context) {
        super.setup(context);
        this.tupleBatch = new ArrayBlockingQueue<T>(this.batchSize);
    }

    public void processTuple(T tuple) {
        this.tupleBatch.add(tuple);
        if (this.tupleBatch.size() >= this.batchSize) {
            this.processBatch();
        }
    }

    public void endWindow() {
        super.endWindow();
        this.processBatch();
    }

    private void processBatch() {
        BulkRequestBuilder bulkRequestBuilder = new BulkRequestBuilder((Client)((ElasticSearchConnectable)this.store).client);
        while (!this.tupleBatch.isEmpty()) {
            T tuple = this.tupleBatch.remove();
            IndexRequestBuilder indexRequestBuilder = this.getIndexRequestBuilder(tuple);
            bulkRequestBuilder.add(indexRequestBuilder);
        }
        BulkResponse bulkResponse = (BulkResponse)bulkRequestBuilder.execute().actionGet();
        if (bulkResponse.hasFailures()) {
            DTThrowable.rethrow((Exception)new Exception(bulkResponse.buildFailureMessage()));
        }
    }

    protected IndexRequestBuilder getIndexRequestBuilder(T tuple) {
        IndexRequestBuilder indexRequestBuilder = new IndexRequestBuilder((Client)((ElasticSearchConnectable)this.store).client, this.getIndexName(tuple));
        String id = this.getId(tuple);
        if (id != null) {
            indexRequestBuilder.setId(id);
        }
        indexRequestBuilder.setType(this.getType(tuple));
        return this.setSource(indexRequestBuilder, tuple);
    }

    protected abstract IndexRequestBuilder setSource(IndexRequestBuilder var1, T var2);

    protected abstract String getId(T var1);

    protected abstract String getIndexName(T var1);

    protected abstract String getType(T var1);

    public int getBatchSize() {
        return this.batchSize;
    }

    public void setBatchSize(int batchSize) {
        this.batchSize = batchSize;
    }
}

