/*
 * Decompiled with CFR 0.152.
 */
package com.datatorrent.contrib.avro;

import com.datatorrent.api.AutoMetric;
import com.datatorrent.api.Context;
import com.datatorrent.api.DefaultInputPort;
import com.datatorrent.api.DefaultOutputPort;
import com.datatorrent.api.annotation.InputPortFieldAnnotation;
import com.datatorrent.common.util.BaseOperator;
import com.datatorrent.contrib.avro.AvroRecordHelper;
import com.datatorrent.lib.util.PojoUtils;
import com.google.common.annotations.VisibleForTesting;
import java.io.IOException;
import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.List;
import org.apache.avro.AvroRuntimeException;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.commons.lang3.ClassUtils;
import org.apache.hadoop.classification.InterfaceStability;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@InterfaceStability.Evolving
public class PojoToAvro
extends BaseOperator {
    private List<Schema.Field> columnNames;
    private Class<?> cls;
    private List<PojoUtils.Getter> keyMethodMap;
    private transient String schemaString;
    private transient Schema schema;
    @AutoMetric
    @VisibleForTesting
    int recordCount = 0;
    @AutoMetric
    @VisibleForTesting
    int errorCount = 0;
    @AutoMetric
    @VisibleForTesting
    int fieldErrorCount = 0;
    public final transient DefaultOutputPort<GenericRecord> output = new DefaultOutputPort();
    public final transient DefaultOutputPort<Object> errorPort = new DefaultOutputPort();
    @InputPortFieldAnnotation(schemaRequired=true)
    public final transient DefaultInputPort<Object> data = new DefaultInputPort<Object>(){

        public void setup(Context.PortContext context) {
            PojoToAvro.this.cls = (Class)context.getValue(Context.PortContext.TUPLE_CLASS);
            try {
                PojoToAvro.this.parseSchema();
                PojoToAvro.this.initializeColumnMap(PojoToAvro.this.getSchema());
            }
            catch (IOException e) {
                LOG.error("Exception in parsing schema", (Throwable)e);
            }
        }

        public void process(Object tuple) {
            PojoToAvro.this.processTuple(tuple);
        }
    };
    private static final Logger LOG = LoggerFactory.getLogger(PojoToAvro.class);

    private void parseSchema() throws IOException {
        this.setSchema(new Schema.Parser().parse(this.getSchemaString()));
    }

    public String getSchemaString() {
        return this.schemaString;
    }

    public void setSchemaString(String schemaString) {
        this.schemaString = schemaString;
    }

    private Schema getSchema() {
        return this.schema;
    }

    private void setSchema(Schema schema) {
        this.schema = schema;
    }

    private List<Schema.Field> getColumnNames() {
        return this.columnNames;
    }

    private void setColumnNames(List<Schema.Field> columnNames) {
        this.columnNames = columnNames;
    }

    private PojoUtils.Getter<?, ?> generateGettersForField(Class<?> cls, String inputFieldName) throws NoSuchFieldException, SecurityException {
        Field f = cls.getDeclaredField(inputFieldName);
        Class c = ClassUtils.primitiveToWrapper(f.getType());
        PojoUtils.Getter classGetter = PojoUtils.createGetter(cls, (String)inputFieldName, (Class)c);
        return classGetter;
    }

    private void initializeColumnMap(Schema schema) {
        this.setColumnNames(schema.getFields());
        this.keyMethodMap = new ArrayList<PojoUtils.Getter>();
        for (int i = 0; i < this.getColumnNames().size(); ++i) {
            try {
                this.keyMethodMap.add(this.generateGettersForField(this.cls, this.getColumnNames().get(i).name()));
                continue;
            }
            catch (NoSuchFieldException | SecurityException e) {
                throw new RuntimeException("Failed to initialize pojo class getters for field - ", e);
            }
        }
    }

    protected void processTuple(Object tuple) {
        GenericRecord record = null;
        try {
            record = this.getGenericRecord(tuple);
        }
        catch (Exception e) {
            LOG.error("Exception in creating record");
            ++this.errorCount;
        }
        if (record != null) {
            this.output.emit((Object)record);
            ++this.recordCount;
        } else if (this.errorPort.isConnected()) {
            this.errorPort.emit(tuple);
            ++this.errorCount;
        }
    }

    private GenericRecord getGenericRecord(Object tuple) throws Exception {
        int writeErrorCount = 0;
        GenericData.Record rec = new GenericData.Record(this.getSchema());
        for (int i = 0; i < this.columnNames.size(); ++i) {
            try {
                rec.put(this.columnNames.get(i).name(), AvroRecordHelper.convertValueStringToAvroKeyType(this.getSchema(), this.columnNames.get(i).name(), this.keyMethodMap.get(i).get(tuple).toString()));
                continue;
            }
            catch (AvroRuntimeException e) {
                LOG.error("Could not set Field [" + this.columnNames.get(i).name() + "] in the generic record", (Throwable)e);
                ++this.fieldErrorCount;
                continue;
            }
            catch (Exception e) {
                LOG.error("Parse Exception", (Throwable)e);
                ++this.fieldErrorCount;
                ++writeErrorCount;
            }
        }
        if (this.columnNames.size() == writeErrorCount) {
            ++this.errorCount;
            return null;
        }
        return rec;
    }

    public void beginWindow(long windowId) {
        this.recordCount = 0;
        this.errorCount = 0;
        this.fieldErrorCount = 0;
    }
}

