/*
 * Decompiled with CFR 0.152.
 */
package kafka.etl;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import kafka.common.KafkaException;
import kafka.etl.KafkaETLContext;
import kafka.etl.KafkaETLKey;
import kafka.etl.KafkaETLUtils;
import kafka.etl.Props;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.SequenceFileRecordReader;
import org.apache.hadoop.mapred.lib.MultipleOutputs;

public class KafkaETLRecordReader
extends SequenceFileRecordReader<KafkaETLKey, BytesWritable> {
    protected Props _props;
    protected JobConf _job;
    protected Reporter _reporter;
    protected MultipleOutputs _mos;
    protected List<KafkaETLContext> _contextList;
    protected int _contextIndex;
    protected long _totalBytes;
    protected long _readBytes;
    protected long _readCounts;
    protected String _attemptId = null;
    private static long _limit = 100L;

    public KafkaETLRecordReader(InputSplit split, JobConf job, Reporter reporter) throws IOException {
        super((Configuration)job, (FileSplit)split);
        this._props = KafkaETLUtils.getPropsFromJob((Configuration)job);
        this._contextList = new ArrayList<KafkaETLContext>();
        this._job = job;
        this._reporter = reporter;
        this._contextIndex = -1;
        this._mos = new MultipleOutputs(job);
        try {
            _limit = this._props.getInt("kafka.request.limit", -1).intValue();
            String taskId = this._job.get("mapred.task.id");
            if (taskId == null) {
                throw new KafkaException("Configuration does not contain the property mapred.task.id");
            }
            String[] parts = taskId.split("_");
            if (parts.length != 6 || !parts[0].equals("attempt") || !"m".equals(parts[3]) && !"r".equals(parts[3])) {
                throw new KafkaException("TaskAttemptId string : " + taskId + " is not properly formed");
            }
            this._attemptId = parts[4] + parts[3];
        }
        catch (Exception e) {
            throw new IOException(e);
        }
    }

    public synchronized void close() throws IOException {
        super.close();
        for (KafkaETLContext context : this._contextList) {
            context.output(this._attemptId);
            context.close();
        }
        this._mos.close();
    }

    public KafkaETLKey createKey() {
        return (KafkaETLKey)super.createKey();
    }

    public BytesWritable createValue() {
        return (BytesWritable)super.createValue();
    }

    public float getProgress() throws IOException {
        if (this._totalBytes == 0L) {
            return 0.0f;
        }
        if (this._contextIndex >= this._contextList.size()) {
            return 1.0f;
        }
        if (_limit < 0L) {
            double p = (double)(this._readBytes + this.getContext().getReadBytes()) / (double)this._totalBytes;
            return (float)p;
        }
        double p = (double)(this._readCounts + this.getContext().getCount()) / ((double)_limit * (double)this._contextList.size());
        return (float)p;
    }

    public synchronized boolean next(KafkaETLKey key, BytesWritable value) throws IOException {
        try {
            if (this._contextIndex < 0) {
                System.out.println("RecordReader.next init()");
                this._totalBytes = 0L;
                while (super.next((Object)key, (Object)value)) {
                    String input = new String(value.getBytes(), "UTF-8");
                    int index = this._contextList.size();
                    KafkaETLContext context = new KafkaETLContext(this._job, this._props, this._reporter, this._mos, index, input);
                    this._contextList.add(context);
                    this._totalBytes += context.getTotalBytes();
                }
                System.out.println("Number of requests=" + this._contextList.size());
                this._readBytes = 0L;
                this._readCounts = 0L;
                this._contextIndex = 0;
            }
            while (this._contextIndex < this._contextList.size()) {
                KafkaETLContext currContext = this.getContext();
                while (currContext.hasMore() && (_limit < 0L || currContext.getCount() < _limit)) {
                    if (currContext.getNext(key, value)) {
                        return true;
                    }
                    currContext.fetchMore();
                }
                this._readBytes += currContext.getReadBytes();
                this._readCounts += currContext.getCount();
                ++this._contextIndex;
                System.out.println("RecordReader.next will get from request " + this._contextIndex);
            }
        }
        catch (Exception e) {
            throw new IOException(e);
        }
        return false;
    }

    protected KafkaETLContext getContext() throws IOException {
        if (this._contextIndex >= this._contextList.size()) {
            throw new IOException("context index " + this._contextIndex + " is out of bound " + this._contextList.size());
        }
        return this._contextList.get(this._contextIndex);
    }
}

