package com.datatorrent.contrib.elasticsearch;

import com.datatorrent.api.Context;
import com.datatorrent.contrib.elasticsearch.ElasticSearchConnectable;
import com.datatorrent.lib.db.AbstractStoreOutputOperator;
import com.datatorrent.netlet.util.DTThrowable;
import java.util.Queue;
import java.util.concurrent.ArrayBlockingQueue;
import javax.validation.constraints.Min;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.index.IndexRequestBuilder;

/* loaded from: input_file:com/datatorrent/contrib/elasticsearch/AbstractElasticSearchOutputOperator.class */
public abstract class AbstractElasticSearchOutputOperator<T, S extends ElasticSearchConnectable> extends AbstractStoreOutputOperator<T, S> {
    protected static final int DEFAULT_BATCH_SIZE = 1000;

    @Min(1)
    protected int batchSize = 1000;
    protected transient Queue<T> tupleBatch;

    public void setup(Context.OperatorContext operatorContext) {
        super.setup(operatorContext);
        this.tupleBatch = new ArrayBlockingQueue(this.batchSize);
    }

    public void processTuple(T t) {
        this.tupleBatch.add(t);
        if (this.tupleBatch.size() >= this.batchSize) {
            processBatch();
        }
    }

    public void endWindow() {
        super.endWindow();
        processBatch();
    }

    private void processBatch() {
        BulkRequestBuilder bulkRequestBuilder = new BulkRequestBuilder(((ElasticSearchConnectable) this.store).client);
        while (!this.tupleBatch.isEmpty()) {
            bulkRequestBuilder.add(getIndexRequestBuilder(this.tupleBatch.remove()));
        }
        BulkResponse bulkResponse = (BulkResponse) bulkRequestBuilder.execute().actionGet();
        if (bulkResponse.hasFailures()) {
            DTThrowable.rethrow(new Exception(bulkResponse.buildFailureMessage()));
        }
    }

    protected IndexRequestBuilder getIndexRequestBuilder(T t) {
        IndexRequestBuilder indexRequestBuilder = new IndexRequestBuilder(((ElasticSearchConnectable) this.store).client, getIndexName(t));
        String id = getId(t);
        if (id != null) {
            indexRequestBuilder.setId(id);
        }
        indexRequestBuilder.setType(getType(t));
        return setSource(indexRequestBuilder, t);
    }

    protected abstract IndexRequestBuilder setSource(IndexRequestBuilder indexRequestBuilder, T t);

    protected abstract String getId(T t);

    protected abstract String getIndexName(T t);

    protected abstract String getType(T t);

    public int getBatchSize() {
        return this.batchSize;
    }

    public void setBatchSize(int i) {
        this.batchSize = i;
    }
}
