/*
 * Decompiled with CFR 0.152.
 */
package com.datatorrent.contrib.avro;

import com.datatorrent.api.AutoMetric;
import com.datatorrent.api.DefaultOutputPort;
import com.datatorrent.lib.io.fs.AbstractFileInputOperator;
import com.google.common.annotations.VisibleForTesting;
import java.io.IOException;
import java.io.InputStream;
import org.apache.avro.AvroRuntimeException;
import org.apache.avro.file.DataFileStream;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.DatumReader;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@InterfaceStability.Evolving
public class AvroFileInputOperator
extends AbstractFileInputOperator<GenericRecord> {
    private transient long offset = 0L;
    @AutoMetric
    @VisibleForTesting
    int recordCount = 0;
    @AutoMetric
    @VisibleForTesting
    int errorCount = 0;
    private transient DataFileStream<GenericRecord> avroDataStream;
    public final transient DefaultOutputPort<GenericRecord> output = new DefaultOutputPort();
    public final transient DefaultOutputPort<String> completedFilesPort = new DefaultOutputPort();
    public final transient DefaultOutputPort<String> errorRecordsPort = new DefaultOutputPort();
    private static final Logger LOG = LoggerFactory.getLogger(AvroFileInputOperator.class);

    protected InputStream openFile(Path path) throws IOException {
        InputStream is = super.openFile(path);
        if (is != null) {
            GenericDatumReader datumReader = new GenericDatumReader();
            this.avroDataStream = new DataFileStream(is, (DatumReader)datumReader);
            datumReader.setSchema(this.avroDataStream.getSchema());
        }
        return is;
    }

    protected GenericRecord readEntity() throws IOException {
        GenericRecord record = null;
        record = null;
        try {
            if (this.avroDataStream != null && this.avroDataStream.hasNext()) {
                ++this.offset;
                record = (GenericRecord)this.avroDataStream.next();
                ++this.recordCount;
                return record;
            }
        }
        catch (AvroRuntimeException are) {
            LOG.error("Exception in parsing record for file - " + this.currentFile + " at offset - " + this.offset, (Throwable)are);
            if (this.errorRecordsPort.isConnected()) {
                this.errorRecordsPort.emit((Object)("FileName:" + this.currentFile + ", Offset:" + this.offset));
            }
            ++this.errorCount;
            throw new AvroRuntimeException((Throwable)are);
        }
        return record;
    }

    protected void closeFile(InputStream is) throws IOException {
        String fileName = this.currentFile;
        if (this.avroDataStream != null) {
            this.avroDataStream.close();
        }
        super.closeFile(is);
        if (this.completedFilesPort.isConnected()) {
            this.completedFilesPort.emit((Object)fileName);
        }
        this.offset = 0L;
    }

    protected void emit(GenericRecord tuple) {
        if (tuple != null) {
            this.output.emit((Object)tuple);
        }
    }

    public void beginWindow(long windowId) {
        this.errorCount = 0;
        this.recordCount = 0;
    }
}

