/*
 * Decompiled with CFR 0.152.
 */
package com.datatorrent.contrib.splunk;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.Properties;
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;

public abstract class SplunkInputFromForwarder<T> {
    private final int DEFAULT_PORT = 6789;
    private int port = 6789;
    protected Producer<String, T> producer;
    private String topic;
    private Properties configProperties = new Properties();
    protected ServerSocket serverSocket;
    protected Socket connectionSocket;

    public int getPort() {
        return this.port;
    }

    public void setPort(int port) {
        this.port = port;
    }

    public String getTopic() {
        return this.topic;
    }

    public void setTopic(String topic) {
        this.topic = topic;
    }

    public Properties getConfigProperties() {
        return this.configProperties;
    }

    public void setConfigProperties(Properties configProperties) {
        this.configProperties = configProperties;
    }

    public abstract T getMessage(String var1);

    public void writeToKafka(String line) {
        Object message = null;
        if (line != null) {
            message = this.getMessage(line);
        }
        if (message != null) {
            this.producer.send(new KeyedMessage(this.getTopic(), message));
        }
    }

    public void startServer() throws IOException {
        ProducerConfig producerConfig = new ProducerConfig(this.configProperties);
        this.producer = new Producer(producerConfig);
        this.serverSocket = new ServerSocket(this.port);
    }

    public void process() throws IOException {
        this.connectionSocket = this.serverSocket.accept();
        BufferedReader reader = new BufferedReader(new InputStreamReader(this.connectionSocket.getInputStream()));
        while (true) {
            String line = reader.readLine();
            this.writeToKafka(line);
        }
    }

    public void stopServer() throws IOException {
        this.serverSocket.close();
        this.connectionSocket.close();
        this.producer.close();
    }
}

