/*
 * Decompiled with CFR 0.152.
 */
package kafka.etl.impl;

import kafka.etl.KafkaETLInputFormat;
import kafka.etl.KafkaETLJob;
import kafka.etl.Props;
import kafka.etl.impl.SimpleKafkaETLMapper;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RunningJob;
import org.apache.hadoop.mapred.TextOutputFormat;

public class SimpleKafkaETLJob {
    protected String _name;
    protected Props _props;
    protected String _input;
    protected String _output;
    protected String _topic;

    public SimpleKafkaETLJob(String name, Props props) throws Exception {
        this._name = name;
        this._props = props;
        this._input = this._props.getProperty("input");
        this._output = this._props.getProperty("output");
        this._topic = props.getProperty("kafka.etl.topic");
    }

    protected JobConf createJobConf() throws Exception {
        JobConf jobConf = KafkaETLJob.createJobConf("SimpleKafakETL", this._topic, this._props, this.getClass());
        jobConf.setMapperClass(SimpleKafkaETLMapper.class);
        KafkaETLInputFormat.setInputPaths((JobConf)jobConf, (Path[])new Path[]{new Path(this._input)});
        jobConf.setOutputKeyClass(LongWritable.class);
        jobConf.setOutputValueClass(Text.class);
        jobConf.setOutputFormat(TextOutputFormat.class);
        TextOutputFormat.setCompressOutput((JobConf)jobConf, (boolean)false);
        Path output = new Path(this._output);
        FileSystem fs = output.getFileSystem((Configuration)jobConf);
        if (fs.exists(output)) {
            fs.delete(output);
        }
        TextOutputFormat.setOutputPath((JobConf)jobConf, (Path)output);
        jobConf.setNumReduceTasks(0);
        return jobConf;
    }

    public void execute() throws Exception {
        JobConf conf = this.createJobConf();
        RunningJob runningJob = new JobClient(conf).submitJob(conf);
        String id = runningJob.getJobID();
        System.out.println("Hadoop job id=" + id);
        runningJob.waitForCompletion();
        if (!runningJob.isSuccessful()) {
            throw new Exception("Hadoop ETL job failed! Please check status on http://" + conf.get("mapred.job.tracker") + "/jobdetails.jsp?jobid=" + id);
        }
    }

    public static void main(String[] args) throws Exception {
        if (args.length < 1) {
            throw new Exception("Usage: - config_file");
        }
        Props props = new Props(args[0]);
        SimpleKafkaETLJob job = new SimpleKafkaETLJob("SimpleKafkaETLJob", props);
        job.execute();
    }
}

