package com.datatorrent.contrib.apachelog.zmq;

import com.datatorrent.api.Context;
import com.datatorrent.api.DAG;
import com.datatorrent.api.StreamingApplication;
import com.datatorrent.api.annotation.ApplicationAnnotation;
import com.datatorrent.contrib.hbase.HBaseScanOperator;
import com.datatorrent.contrib.zmq.SimpleSinglePortZeroMQPullStringInputOperator;
import com.datatorrent.lib.algo.UniqueCounter;
import com.datatorrent.lib.io.ConsoleOutputOperator;
import com.datatorrent.lib.logs.ApacheLogParseOperator;
import com.datatorrent.lib.math.Sum;
import org.apache.hadoop.conf.Configuration;

@ApplicationAnnotation(name = "ApacheLog")
/* loaded from: input_file:com/datatorrent/contrib/apachelog/zmq/Application.class */
public class Application implements StreamingApplication {
    private DAG.Locality locality = null;
    private final String addr = "tcp://127.0.0.1:5555";

    public void populateDAG(DAG dag, Configuration configuration) {
        this.locality = DAG.Locality.CONTAINER_LOCAL;
        dag.getAttributes().put(DAG.STREAMING_WINDOW_SIZE_MILLIS, Integer.valueOf(HBaseScanOperator.DEF_QUEUE_SIZE));
        SimpleSinglePortZeroMQPullStringInputOperator addOperator = dag.addOperator("input", new SimpleSinglePortZeroMQPullStringInputOperator("tcp://127.0.0.1:5555"));
        ApacheLogParseOperator addOperator2 = dag.addOperator("parse", new ApacheLogParseOperator());
        UniqueCounter addOperator3 = dag.addOperator("ipAddrCount", new UniqueCounter());
        UniqueCounter addOperator4 = dag.addOperator("urlCount", new UniqueCounter());
        UniqueCounter addOperator5 = dag.addOperator("httpStatusCount", new UniqueCounter());
        Sum addOperator6 = dag.addOperator("numOfBytesSum", new Sum());
        dag.getMeta(addOperator6).getAttributes().put(Context.OperatorContext.APPLICATION_WINDOW_COUNT, 3);
        dag.addStream("input-parse", addOperator.outputPort, addOperator2.data).setLocality(this.locality);
        dag.addStream("parse-ipAddrCount", addOperator2.outputIPAddress, addOperator3.data).setLocality(this.locality);
        dag.addStream("parse-urlCount", addOperator2.outputUrl, addOperator4.data).setLocality(this.locality);
        dag.addStream("parse-httpStatusCount", addOperator2.outputStatusCode, addOperator5.data).setLocality(this.locality);
        dag.addStream("parse-numOfBytesSum", addOperator2.outputBytes, addOperator6.data).setLocality(this.locality);
        ConsoleOutputOperator addOperator7 = dag.addOperator("console1", new ConsoleOutputOperator());
        ConsoleOutputOperator addOperator8 = dag.addOperator("console2", new ConsoleOutputOperator());
        ConsoleOutputOperator addOperator9 = dag.addOperator("console3", new ConsoleOutputOperator());
        ConsoleOutputOperator addOperator10 = dag.addOperator("console4", new ConsoleOutputOperator());
        dag.addStream("ipAddrCount-console", addOperator3.count, addOperator7.input);
        dag.addStream("urlCount-console", addOperator4.count, addOperator8.input);
        dag.addStream("httpStatusCount-console", addOperator5.count, addOperator9.input);
        dag.addStream("numOfBytesSum-console", addOperator6.sumLong, addOperator10.input);
    }
}
