package org.apache.beam.runners.flink.translation.wrappers.streaming.io;

import com.google.common.base.Preconditions;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.util.Collections;
import java.util.List;
import java.util.NoSuchElementException;
import javax.annotation.Nullable;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.io.UnboundedSource;
import org.apache.beam.sdk.io.UnboundedSource.CheckpointMark;
import org.apache.beam.sdk.options.PipelineOptions;
import org.joda.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSocketSource.class */
public class UnboundedSocketSource<CheckpointMarkT extends UnboundedSource.CheckpointMark> extends UnboundedSource<String, CheckpointMarkT> {
    private static final Coder<String> DEFAULT_SOCKET_CODER = StringUtf8Coder.of();
    private static final long serialVersionUID = 1;
    private static final int DEFAULT_CONNECTION_RETRY_SLEEP = 500;
    private static final int CONNECTION_TIMEOUT_TIME = 0;
    private final String hostname;
    private final int port;
    private final char delimiter;
    private final long maxNumRetries;
    private final long delayBetweenRetries;

    /* loaded from: input_file:org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSocketSource$UnboundedSocketReader.class */
    public static class UnboundedSocketReader extends UnboundedSource.UnboundedReader<String> {
        private static final Logger LOG = LoggerFactory.getLogger(UnboundedSocketReader.class);
        private final UnboundedSocketSource source;
        private Socket socket;
        private BufferedReader reader;
        private boolean isRunning;
        private String currentRecord;

        public UnboundedSocketReader(UnboundedSocketSource unboundedSocketSource) {
            this.source = unboundedSocketSource;
        }

        private void openConnection() throws IOException {
            this.socket = new Socket();
            this.socket.connect(new InetSocketAddress(this.source.getHostname(), this.source.getPort()), UnboundedSocketSource.CONNECTION_TIMEOUT_TIME);
            this.reader = new BufferedReader(new InputStreamReader(this.socket.getInputStream()));
            this.isRunning = true;
        }

        public boolean start() throws IOException {
            int i = UnboundedSocketSource.CONNECTION_TIMEOUT_TIME;
            while (true) {
                if (this.isRunning) {
                    break;
                }
                try {
                    openConnection();
                    LOG.info("Connected to server socket " + this.source.getHostname() + ':' + this.source.getPort());
                    return advance();
                } catch (IOException e) {
                    LOG.info("Lost connection to server socket " + this.source.getHostname() + ':' + this.source.getPort() + ". Retrying in " + this.source.getDelayBetweenRetries() + " msecs...");
                    if (this.source.getMaxNumRetries() != -1) {
                        int i2 = i;
                        i++;
                        if (i2 >= this.source.getMaxNumRetries()) {
                            this.isRunning = false;
                            LOG.error("Unable to connect to host " + this.source.getHostname() + " : " + this.source.getPort());
                            return false;
                        }
                    }
                    try {
                        Thread.sleep(this.source.getDelayBetweenRetries());
                    } catch (InterruptedException e2) {
                        e2.printStackTrace();
                    }
                }
            }
        }

        public boolean advance() throws IOException {
            int read;
            StringBuilder sb = new StringBuilder();
            while (this.isRunning && (read = this.reader.read()) != -1) {
                if (read == this.source.getDelimiter()) {
                    if (sb.length() > 0 && sb.charAt(sb.length() - 1) == '\r') {
                        sb.setLength(sb.length() - 1);
                    }
                    this.currentRecord = sb.toString();
                    sb.setLength(UnboundedSocketSource.CONNECTION_TIMEOUT_TIME);
                    return true;
                }
                sb.append((char) read);
            }
            return false;
        }

        public byte[] getCurrentRecordId() throws NoSuchElementException {
            return new byte[UnboundedSocketSource.CONNECTION_TIMEOUT_TIME];
        }

        /* renamed from: getCurrent, reason: merged with bridge method [inline-methods] */
        public String m28getCurrent() throws NoSuchElementException {
            return this.currentRecord;
        }

        public Instant getCurrentTimestamp() throws NoSuchElementException {
            return Instant.now();
        }

        public void close() throws IOException {
            this.reader.close();
            this.socket.close();
            this.isRunning = false;
            LOG.info("Closed connection to server socket at " + this.source.getHostname() + ":" + this.source.getPort() + ".");
        }

        public Instant getWatermark() {
            return Instant.now();
        }

        public UnboundedSource.CheckpointMark getCheckpointMark() {
            return null;
        }

        /* renamed from: getCurrentSource, reason: merged with bridge method [inline-methods] */
        public UnboundedSource<String, ?> m27getCurrentSource() {
            return this.source;
        }
    }

    public UnboundedSocketSource(String str, int i, char c, long j) {
        this(str, i, c, j, 500L);
    }

    public UnboundedSocketSource(String str, int i, char c, long j, long j2) {
        this.hostname = str;
        this.port = i;
        this.delimiter = c;
        this.maxNumRetries = j;
        this.delayBetweenRetries = j2;
    }

    public String getHostname() {
        return this.hostname;
    }

    public int getPort() {
        return this.port;
    }

    public char getDelimiter() {
        return this.delimiter;
    }

    public long getMaxNumRetries() {
        return this.maxNumRetries;
    }

    public long getDelayBetweenRetries() {
        return this.delayBetweenRetries;
    }

    public List<? extends UnboundedSource<String, CheckpointMarkT>> generateInitialSplits(int i, PipelineOptions pipelineOptions) throws Exception {
        return Collections.singletonList(this);
    }

    public UnboundedSource.UnboundedReader<String> createReader(PipelineOptions pipelineOptions, @Nullable CheckpointMarkT checkpointmarkt) {
        return new UnboundedSocketReader(this);
    }

    @Nullable
    public Coder getCheckpointMarkCoder() {
        return null;
    }

    public void validate() {
        Preconditions.checkArgument(this.port > 0 && this.port < 65536, "port is out of range");
        Preconditions.checkArgument(this.maxNumRetries >= -1, "maxNumRetries must be zero or larger (num retries), or -1 (infinite retries)");
        Preconditions.checkArgument(this.delayBetweenRetries >= 0, "delayBetweenRetries must be zero or positive");
    }

    public Coder getDefaultOutputCoder() {
        return DEFAULT_SOCKET_CODER;
    }
}
