/*
 * Decompiled with CFR 0.152.
 */
package com.datatorrent.contrib.parser;

import com.datatorrent.api.AutoMetric;
import com.datatorrent.api.Context;
import com.datatorrent.api.DefaultOutputPort;
import com.datatorrent.lib.parser.Parser;
import com.datatorrent.lib.util.KeyValPair;
import com.datatorrent.netlet.util.DTThrowable;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.github.fge.jsonschema.exceptions.ProcessingException;
import com.github.fge.jsonschema.main.JsonSchema;
import com.github.fge.jsonschema.main.JsonSchemaFactory;
import com.github.fge.jsonschema.report.ProcessingMessage;
import com.github.fge.jsonschema.report.ProcessingReport;
import com.github.fge.jsonschema.util.JsonLoader;
import com.google.common.annotations.VisibleForTesting;
import java.io.IOException;
import java.util.Iterator;
import org.apache.hadoop.classification.InterfaceStability;
import org.codehaus.jettison.json.JSONException;
import org.codehaus.jettison.json.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@InterfaceStability.Evolving
public class JsonParser
extends Parser<byte[], KeyValPair<String, String>> {
    private String jsonSchema;
    private transient JsonSchema schema;
    private transient ObjectMapper objMapper;
    public transient DefaultOutputPort<JSONObject> parsedOutput = new DefaultOutputPort();
    @AutoMetric
    long parsedOutputCount;
    private static final Logger logger = LoggerFactory.getLogger(JsonParser.class);

    public void beginWindow(long windowId) {
        super.beginWindow(windowId);
        this.parsedOutputCount = 0L;
    }

    public void setup(Context.OperatorContext context) {
        try {
            if (this.jsonSchema != null) {
                JsonSchemaFactory factory = JsonSchemaFactory.byDefault();
                JsonNode schemaNode = JsonLoader.fromString((String)this.jsonSchema);
                this.schema = factory.getJsonSchema(schemaNode);
            }
            this.objMapper = new ObjectMapper();
            this.objMapper.disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES);
        }
        catch (ProcessingException | IOException e) {
            DTThrowable.wrapIfChecked((Exception)e);
        }
    }

    public void processTuple(byte[] tuple) {
        if (tuple == null) {
            if (this.err.isConnected()) {
                this.err.emit((Object)new KeyValPair(null, (Object)"null tuple"));
            }
            ++this.errorTupleCount;
            return;
        }
        String incomingString = new String(tuple);
        try {
            if (this.schema != null) {
                ProcessingReport report = null;
                JsonNode data = JsonLoader.fromString((String)incomingString);
                report = this.schema.validate(data);
                if (report != null && !report.isSuccess()) {
                    Iterator iter = report.iterator();
                    StringBuilder s = new StringBuilder();
                    while (iter.hasNext()) {
                        ProcessingMessage pm = (ProcessingMessage)iter.next();
                        s.append(pm.asJson().get("instance").findValue("pointer")).append(":").append(pm.asJson().get("message")).append(",");
                    }
                    s.setLength(s.length() - 1);
                    ++this.errorTupleCount;
                    if (this.err.isConnected()) {
                        this.err.emit((Object)new KeyValPair((Object)incomingString, (Object)s.toString()));
                    }
                    return;
                }
            }
            if (this.parsedOutput.isConnected()) {
                this.parsedOutput.emit((Object)new JSONObject(incomingString));
                ++this.parsedOutputCount;
            }
            if (this.out.isConnected()) {
                this.out.emit(this.objMapper.readValue(tuple, this.clazz));
                ++this.emittedObjectCount;
            }
        }
        catch (ProcessingException | IOException | JSONException e) {
            ++this.errorTupleCount;
            if (this.err.isConnected()) {
                this.err.emit((Object)new KeyValPair((Object)incomingString, (Object)e.getMessage()));
            }
            logger.error("Failed to parse json tuple {}, Exception = {} ", (Object)tuple, (Object)e);
        }
    }

    public Object convert(byte[] tuple) {
        throw new UnsupportedOperationException("Not supported");
    }

    public KeyValPair<String, String> processErrorTuple(byte[] input) {
        throw new UnsupportedOperationException("Not supported");
    }

    public String getJsonSchema() {
        return this.jsonSchema;
    }

    public void setJsonSchema(String jsonSchema) {
        this.jsonSchema = jsonSchema;
    }

    @VisibleForTesting
    public long getErrorTupleCount() {
        return this.errorTupleCount;
    }

    @VisibleForTesting
    public long getEmittedObjectCount() {
        return this.emittedObjectCount;
    }

    @VisibleForTesting
    public long getIncomingTuplesCount() {
        return this.incomingTuplesCount;
    }

    @VisibleForTesting
    public void setSchema(JsonSchema schema) {
        this.schema = schema;
    }
}

