/*
 * Decompiled with CFR 0.152.
 */
package com.datatorrent.contrib.apachelog.zmq;

import com.datatorrent.api.Context;
import com.datatorrent.api.DAG;
import com.datatorrent.api.Operator;
import com.datatorrent.api.StreamingApplication;
import com.datatorrent.api.annotation.ApplicationAnnotation;
import com.datatorrent.contrib.zmq.SimpleSinglePortZeroMQPullStringInputOperator;
import com.datatorrent.lib.algo.UniqueCounter;
import com.datatorrent.lib.io.ConsoleOutputOperator;
import com.datatorrent.lib.logs.ApacheLogParseOperator;
import com.datatorrent.lib.math.Sum;
import org.apache.hadoop.conf.Configuration;

@ApplicationAnnotation(name="ApacheLog")
public class Application
implements StreamingApplication {
    private DAG.Locality locality = null;
    private final String addr = "tcp://127.0.0.1:5555";

    public void populateDAG(DAG dag, Configuration conf) {
        this.locality = DAG.Locality.CONTAINER_LOCAL;
        dag.getAttributes().put(DAG.STREAMING_WINDOW_SIZE_MILLIS, (Object)1000);
        SimpleSinglePortZeroMQPullStringInputOperator input = (SimpleSinglePortZeroMQPullStringInputOperator)dag.addOperator("input", (Operator)new SimpleSinglePortZeroMQPullStringInputOperator("tcp://127.0.0.1:5555"));
        ApacheLogParseOperator parse = (ApacheLogParseOperator)dag.addOperator("parse", (Operator)new ApacheLogParseOperator());
        UniqueCounter ipAddrCount = (UniqueCounter)dag.addOperator("ipAddrCount", (Operator)new UniqueCounter());
        UniqueCounter urlCount = (UniqueCounter)dag.addOperator("urlCount", (Operator)new UniqueCounter());
        UniqueCounter httpStatusCount = (UniqueCounter)dag.addOperator("httpStatusCount", (Operator)new UniqueCounter());
        Sum numOfBytesSum = (Sum)dag.addOperator("numOfBytesSum", (Operator)new Sum());
        dag.getMeta((Operator)numOfBytesSum).getAttributes().put(Context.OperatorContext.APPLICATION_WINDOW_COUNT, (Object)3);
        dag.addStream("input-parse", (Operator.OutputPort)input.outputPort, (Operator.InputPort)parse.data).setLocality(this.locality);
        dag.addStream("parse-ipAddrCount", (Operator.OutputPort)parse.outputIPAddress, (Operator.InputPort)ipAddrCount.data).setLocality(this.locality);
        dag.addStream("parse-urlCount", (Operator.OutputPort)parse.outputUrl, (Operator.InputPort)urlCount.data).setLocality(this.locality);
        dag.addStream("parse-httpStatusCount", (Operator.OutputPort)parse.outputStatusCode, (Operator.InputPort)httpStatusCount.data).setLocality(this.locality);
        dag.addStream("parse-numOfBytesSum", (Operator.OutputPort)parse.outputBytes, (Operator.InputPort)numOfBytesSum.data).setLocality(this.locality);
        ConsoleOutputOperator consoleOperator1 = (ConsoleOutputOperator)dag.addOperator("console1", (Operator)new ConsoleOutputOperator());
        ConsoleOutputOperator consoleOperator2 = (ConsoleOutputOperator)dag.addOperator("console2", (Operator)new ConsoleOutputOperator());
        ConsoleOutputOperator consoleOperator3 = (ConsoleOutputOperator)dag.addOperator("console3", (Operator)new ConsoleOutputOperator());
        ConsoleOutputOperator consoleOperator4 = (ConsoleOutputOperator)dag.addOperator("console4", (Operator)new ConsoleOutputOperator());
        dag.addStream("ipAddrCount-console", (Operator.OutputPort)ipAddrCount.count, (Operator.InputPort)consoleOperator1.input);
        dag.addStream("urlCount-console", (Operator.OutputPort)urlCount.count, (Operator.InputPort)consoleOperator2.input);
        dag.addStream("httpStatusCount-console", (Operator.OutputPort)httpStatusCount.count, (Operator.InputPort)consoleOperator3.input);
        dag.addStream("numOfBytesSum-console", (Operator.OutputPort)numOfBytesSum.sumLong, (Operator.InputPort)consoleOperator4.input);
    }
}

