/*
 * Decompiled with CFR 0.152.
 */
package com.datatorrent.contrib.cassandra;

import com.datastax.driver.core.BoundStatement;
import com.datastax.driver.core.ColumnDefinitions;
import com.datastax.driver.core.DataType;
import com.datastax.driver.core.LocalDate;
import com.datastax.driver.core.PreparedStatement;
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.Statement;
import com.datastax.driver.core.exceptions.DriverException;
import com.datatorrent.api.AutoMetric;
import com.datatorrent.api.Context;
import com.datatorrent.api.DefaultInputPort;
import com.datatorrent.api.DefaultOutputPort;
import com.datatorrent.api.annotation.InputPortFieldAnnotation;
import com.datatorrent.api.annotation.OutputPortFieldAnnotation;
import com.datatorrent.contrib.cassandra.AbstractCassandraTransactionableOutputOperator;
import com.datatorrent.contrib.cassandra.CassandraTransactionalStore;
import com.datatorrent.lib.db.AbstractTransactionableStoreOutputOperator;
import com.datatorrent.lib.util.FieldInfo;
import com.datatorrent.lib.util.PojoUtils;
import com.google.common.collect.Lists;
import java.lang.reflect.Field;
import java.math.BigDecimal;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import org.apache.hadoop.classification.InterfaceStability;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@InterfaceStability.Evolving
public class CassandraPOJOOutputOperator
extends AbstractCassandraTransactionableOutputOperator<Object> {
    private List<FieldInfo> fieldInfos;
    private String tablename;
    private String query;
    protected final transient ArrayList<DataType> columnDataTypes;
    protected final transient ArrayList<Object> getters;
    protected transient Class<?> pojoClass;
    @AutoMetric
    private long successfulRecords;
    @AutoMetric
    private long errorRecords;
    @InputPortFieldAnnotation(optional=true)
    public final transient DefaultInputPort<Object> input = new DefaultInputPort<Object>(){

        public void setup(Context.PortContext context) {
            CassandraPOJOOutputOperator.this.pojoClass = (Class)context.getValue(Context.PortContext.TUPLE_CLASS);
        }

        public void process(Object tuple) {
            ((AbstractTransactionableStoreOutputOperator)CassandraPOJOOutputOperator.this).input.process(tuple);
        }
    };
    @OutputPortFieldAnnotation(error=true)
    public final transient DefaultOutputPort<Object> error = new DefaultOutputPort();
    private static final Logger LOG = LoggerFactory.getLogger(CassandraPOJOOutputOperator.class);

    public CassandraPOJOOutputOperator() {
        this.columnDataTypes = new ArrayList();
        this.getters = new ArrayList();
    }

    public void beginWindow(long windowId) {
        super.beginWindow(windowId);
        this.successfulRecords = 0L;
        this.errorRecords = 0L;
    }

    @Override
    public void activate(Context.OperatorContext context) {
        ResultSet rs = ((CassandraTransactionalStore)this.store).getSession().execute("select * from " + ((CassandraTransactionalStore)this.store).keyspace + "." + this.tablename);
        ColumnDefinitions rsMetaData = rs.getColumnDefinitions();
        if (this.fieldInfos == null) {
            this.populateFieldInfosFromPojo(rsMetaData);
        }
        for (FieldInfo fieldInfo : this.getFieldInfos()) {
            PojoUtils.GetterBoolean getter;
            DataType type = rsMetaData.getType(fieldInfo.getColumnName());
            this.columnDataTypes.add(type);
            String getterExpr = fieldInfo.getPojoFieldExpression();
            switch (type.getName()) {
                case ASCII: 
                case TEXT: 
                case VARCHAR: {
                    getter = PojoUtils.createGetter(this.pojoClass, (String)getterExpr, String.class);
                    break;
                }
                case BOOLEAN: {
                    getter = PojoUtils.createGetterBoolean(this.pojoClass, (String)getterExpr);
                    break;
                }
                case INT: {
                    getter = PojoUtils.createGetterInt(this.pojoClass, (String)getterExpr);
                    break;
                }
                case BIGINT: 
                case COUNTER: {
                    getter = PojoUtils.createGetterLong(this.pojoClass, (String)getterExpr);
                    break;
                }
                case FLOAT: {
                    getter = PojoUtils.createGetterFloat(this.pojoClass, (String)getterExpr);
                    break;
                }
                case DOUBLE: {
                    getter = PojoUtils.createGetterDouble(this.pojoClass, (String)getterExpr);
                    break;
                }
                case DECIMAL: {
                    getter = PojoUtils.createGetter(this.pojoClass, (String)getterExpr, BigDecimal.class);
                    break;
                }
                case SET: {
                    getter = PojoUtils.createGetter(this.pojoClass, (String)getterExpr, Set.class);
                    break;
                }
                case MAP: {
                    getter = PojoUtils.createGetter(this.pojoClass, (String)getterExpr, Map.class);
                    break;
                }
                case LIST: {
                    getter = PojoUtils.createGetter(this.pojoClass, (String)getterExpr, List.class);
                    break;
                }
                case TIMESTAMP: {
                    getter = PojoUtils.createGetter(this.pojoClass, (String)getterExpr, Date.class);
                    break;
                }
                case UUID: {
                    getter = PojoUtils.createGetter(this.pojoClass, (String)getterExpr, UUID.class);
                    break;
                }
                default: {
                    getter = PojoUtils.createGetter(this.pojoClass, (String)getterExpr, Object.class);
                }
            }
            this.getters.add(getter);
        }
        super.activate(context);
    }

    private void populateFieldInfosFromPojo(ColumnDefinitions rsMetaData) {
        this.fieldInfos = Lists.newArrayList();
        Field[] fields = this.pojoClass.getDeclaredFields();
        for (int i = 0; i < rsMetaData.size(); ++i) {
            String columnName = rsMetaData.getName(i);
            String pojoField = this.getMatchingField(fields, columnName);
            if (pojoField != null && pojoField.length() != 0) {
                this.fieldInfos.add(new FieldInfo(columnName, pojoField, null));
                continue;
            }
            LOG.warn("Couldn't find corrosponding pojo field for column: " + columnName);
        }
    }

    private String getMatchingField(Field[] fields, String columnName) {
        for (Field f : fields) {
            if (!f.getName().equalsIgnoreCase(columnName)) continue;
            return f.getName();
        }
        return null;
    }

    @Override
    protected PreparedStatement getUpdateCommand() {
        PreparedStatement statement = this.query == null ? this.prepareStatementFromFieldsAndTableName() : ((CassandraTransactionalStore)this.store).getSession().prepare(this.query);
        LOG.debug("Statement is: " + statement.getQueryString());
        return statement;
    }

    private PreparedStatement prepareStatementFromFieldsAndTableName() {
        if (this.tablename == null || this.tablename.length() == 0) {
            throw new RuntimeException("Please sepcify query or table name.");
        }
        StringBuilder queryfields = new StringBuilder();
        StringBuilder values = new StringBuilder();
        for (FieldInfo fieldInfo : this.fieldInfos) {
            if (queryfields.length() == 0) {
                queryfields.append(fieldInfo.getColumnName());
                values.append("?");
                continue;
            }
            queryfields.append(",").append(fieldInfo.getColumnName());
            values.append(",").append("?");
        }
        String statement = "INSERT INTO " + ((CassandraTransactionalStore)this.store).keyspace + "." + this.tablename + " (" + queryfields.toString() + ") " + "VALUES (" + values.toString() + ");";
        LOG.debug("statement is {}", (Object)statement);
        return ((CassandraTransactionalStore)this.store).getSession().prepare(statement);
    }

    @Override
    protected Statement setStatementParameters(PreparedStatement updateCommand, Object tuple) throws DriverException {
        BoundStatement boundStmnt = new BoundStatement(updateCommand);
        int size = this.columnDataTypes.size();
        block14: for (int i = 0; i < size; ++i) {
            DataType type = this.columnDataTypes.get(i);
            switch (type.getName()) {
                case UUID: {
                    UUID id = (UUID)((PojoUtils.Getter)this.getters.get(i)).get(tuple);
                    boundStmnt.setUUID(i, id);
                    continue block14;
                }
                case ASCII: 
                case TEXT: 
                case VARCHAR: {
                    String ascii = (String)((PojoUtils.Getter)this.getters.get(i)).get(tuple);
                    boundStmnt.setString(i, ascii);
                    continue block14;
                }
                case BOOLEAN: {
                    boolean bool = ((PojoUtils.GetterBoolean)this.getters.get(i)).get(tuple);
                    boundStmnt.setBool(i, bool);
                    continue block14;
                }
                case INT: {
                    int intValue = ((PojoUtils.GetterInt)this.getters.get(i)).get(tuple);
                    boundStmnt.setInt(i, intValue);
                    continue block14;
                }
                case BIGINT: 
                case COUNTER: {
                    long longValue = ((PojoUtils.GetterLong)this.getters.get(i)).get(tuple);
                    boundStmnt.setLong(i, longValue);
                    continue block14;
                }
                case FLOAT: {
                    float floatValue = ((PojoUtils.GetterFloat)this.getters.get(i)).get(tuple);
                    boundStmnt.setFloat(i, floatValue);
                    continue block14;
                }
                case DOUBLE: {
                    double doubleValue = ((PojoUtils.GetterDouble)this.getters.get(i)).get(tuple);
                    boundStmnt.setDouble(i, doubleValue);
                    continue block14;
                }
                case DECIMAL: {
                    BigDecimal decimal = (BigDecimal)((PojoUtils.Getter)this.getters.get(i)).get(tuple);
                    boundStmnt.setDecimal(i, decimal);
                    continue block14;
                }
                case SET: {
                    Set set = (Set)((PojoUtils.Getter)this.getters.get(i)).get(tuple);
                    boundStmnt.setSet(i, set);
                    continue block14;
                }
                case MAP: {
                    Map map = (Map)((PojoUtils.Getter)this.getters.get(i)).get(tuple);
                    boundStmnt.setMap(i, map);
                    continue block14;
                }
                case LIST: {
                    List list = (List)((PojoUtils.Getter)this.getters.get(i)).get(tuple);
                    boundStmnt.setList(i, list);
                    continue block14;
                }
                case TIMESTAMP: {
                    Date date = (Date)((PojoUtils.Getter)this.getters.get(i)).get(tuple);
                    boundStmnt.setDate(i, LocalDate.fromMillisSinceEpoch((long)date.getTime()));
                    continue block14;
                }
                default: {
                    throw new RuntimeException("unsupported data type " + type.getName());
                }
            }
        }
        return boundStmnt;
    }

    public void processTuple(Object tuple) {
        try {
            super.processTuple(tuple);
            ++this.successfulRecords;
        }
        catch (RuntimeException e) {
            LOG.error(e.getMessage());
            this.error.emit(tuple);
            ++this.errorRecords;
        }
    }

    public List<FieldInfo> getFieldInfos() {
        return this.fieldInfos;
    }

    public void setFieldInfos(List<FieldInfo> fieldInfos) {
        this.fieldInfos = fieldInfos;
    }

    public String getTablename() {
        return this.tablename;
    }

    public void setTablename(String tablename) {
        this.tablename = tablename;
    }

    public String getQuery() {
        return this.query;
    }

    public void setQuery(String query) {
        this.query = query;
    }
}

