/*
 * Decompiled with CFR 0.152.
 */
package com.datatorrent.contrib.kafka;

import com.datatorrent.api.AutoMetric;
import com.datatorrent.api.Context;
import com.datatorrent.api.DefaultInputPort;
import com.datatorrent.contrib.kafka.AbstractKafkaOutputOperator;
import com.datatorrent.lib.util.PojoUtils;
import java.lang.reflect.Field;
import javax.validation.constraints.Min;
import javax.validation.constraints.NotNull;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
import org.apache.commons.lang3.ClassUtils;

public class POJOKafkaOutputOperator
extends AbstractKafkaOutputOperator<Object, Object> {
    @AutoMetric
    private long outputMessagesPerSec;
    @AutoMetric
    private long outputBytesPerSec;
    protected final String BROKER_KEY = "metadata.broker.list";
    protected final String BATCH_NUM_KEY = "batch.num.messages";
    protected final String PRODUCER_KEY = "producer.type";
    protected final String QUEUE_BUFFER_KEY = "queue.buffering.max.ms";
    protected final String ASYNC_PRODUCER_TYPE = "async";
    private long messageCount;
    private long byteCount;
    private String brokerList;
    private double windowTimeSec;
    private String keyField = "";
    protected boolean isBatchProcessing = true;
    @Min(value=2L)
    protected int batchSize;
    protected transient PojoUtils.Getter keyMethod;
    protected transient Class<?> pojoClass;
    public final transient DefaultInputPort<Object> inputPort = new DefaultInputPort<Object>(){

        public void setup(Context.PortContext context) {
            if (context.getAttributes().contains(Context.PortContext.TUPLE_CLASS)) {
                POJOKafkaOutputOperator.this.pojoClass = (Class)context.getAttributes().get(Context.PortContext.TUPLE_CLASS);
            }
        }

        public void process(Object tuple) {
            POJOKafkaOutputOperator.this.processTuple(tuple);
        }
    };

    @Override
    protected ProducerConfig createKafkaProducerConfig() {
        if (this.brokerList != null) {
            this.getConfigProperties().setProperty("metadata.broker.list", this.brokerList);
        }
        if (this.isBatchProcessing) {
            if (this.batchSize != 0) {
                this.getConfigProperties().setProperty("batch.num.messages", String.valueOf(this.batchSize));
            }
            this.getConfigProperties().setProperty("producer.type", "async");
        }
        return super.createKafkaProducerConfig();
    }

    @Override
    public void setup(Context.OperatorContext context) {
        if (this.isBatchProcessing) {
            this.getConfigProperties().setProperty("queue.buffering.max.ms", String.valueOf(context.getValue(Context.DAGContext.STREAMING_WINDOW_SIZE_MILLIS)));
        }
        super.setup(context);
        this.windowTimeSec = (double)((Integer)context.getValue(Context.OperatorContext.APPLICATION_WINDOW_COUNT) * (Integer)context.getValue(Context.DAGContext.STREAMING_WINDOW_SIZE_MILLIS)) * 1.0 / 1000.0;
        if (this.pojoClass != null && this.keyField != "") {
            try {
                this.keyMethod = this.generateGetterForKeyField();
            }
            catch (NoSuchFieldException e) {
                throw new RuntimeException("Field " + this.keyField + " is invalid: " + e);
            }
        }
    }

    @Override
    public void beginWindow(long windowId) {
        super.beginWindow(windowId);
        this.outputMessagesPerSec = 0L;
        this.outputBytesPerSec = 0L;
        this.messageCount = 0L;
        this.byteCount = 0L;
    }

    protected void processTuple(Object tuple) {
        if (this.keyMethod == null && this.keyField != "") {
            this.pojoClass = tuple.getClass();
            try {
                this.keyMethod = this.generateGetterForKeyField();
            }
            catch (NoSuchFieldException e) {
                throw new RuntimeException("Field " + this.keyField + " is invalid: " + e);
            }
        }
        KeyedMessage msg = this.keyMethod != null ? new KeyedMessage(this.getTopic(), this.keyMethod.get(tuple), tuple) : new KeyedMessage(this.getTopic(), tuple, tuple);
        this.getProducer().send(msg);
        ++this.messageCount;
        if (tuple instanceof byte[]) {
            this.byteCount += (long)((byte[])tuple).length;
        }
    }

    @Override
    public void endWindow() {
        super.endWindow();
        this.outputBytesPerSec = (long)((double)this.byteCount / this.windowTimeSec);
        this.outputMessagesPerSec = (long)((double)this.messageCount / this.windowTimeSec);
    }

    private PojoUtils.Getter generateGetterForKeyField() throws NoSuchFieldException, SecurityException {
        Field f = this.pojoClass.getDeclaredField(this.keyField);
        Class c = ClassUtils.primitiveToWrapper(f.getType());
        PojoUtils.Getter classGetter = PojoUtils.createGetter(this.pojoClass, (String)this.keyField, (Class)c);
        return classGetter;
    }

    public String getBrokerList() {
        return this.brokerList;
    }

    public void setBrokerList(@NotNull String brokerList) {
        this.brokerList = brokerList;
    }

    public boolean isBatchProcessing() {
        return this.isBatchProcessing;
    }

    public void setBatchProcessing(boolean batchProcessing) {
        this.isBatchProcessing = batchProcessing;
    }

    public int getBatchSize() {
        return this.batchSize;
    }

    public void setBatchSize(int batchSize) {
        this.batchSize = batchSize;
    }

    public String getKeyField() {
        return this.keyField;
    }

    public void setKeyField(String keyField) {
        this.keyField = keyField;
    }
}

