package org.apache.flink.api.java.record.io;

import java.io.IOException;
import java.io.InputStream;
import java.util.StringTokenizer;
import org.apache.flink.api.java.record.io.ExternalProcessInputSplit;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.io.GenericInputSplit;

/* loaded from: input_file:org/apache/flink/api/java/record/io/ExternalProcessInputFormat.class */
public abstract class ExternalProcessInputFormat<T extends ExternalProcessInputSplit> extends GenericInputFormat {
    private static final long serialVersionUID = 1;
    public static final String ALLOWEDEXITCODES_PARAMETER_KEY = "pact.input.externalProcess.allowedExitCodes";
    private Process extProc;
    protected InputStream extProcOutStream;
    protected InputStream extProcErrStream;
    protected int[] allowedExitCodes;

    public void configure(Configuration configuration) {
        StringTokenizer stringTokenizer = new StringTokenizer(configuration.getString(ALLOWEDEXITCODES_PARAMETER_KEY, "0"), ",");
        this.allowedExitCodes = new int[stringTokenizer.countTokens()];
        for (int i = 0; i < this.allowedExitCodes.length; i++) {
            this.allowedExitCodes[i] = Integer.parseInt(stringTokenizer.nextToken().trim());
        }
    }

    public void close() throws IOException {
        try {
            try {
                int exitValue = this.extProc.exitValue();
                boolean z = false;
                int[] iArr = this.allowedExitCodes;
                int length = iArr.length;
                int i = 0;
                while (true) {
                    if (i >= length) {
                        break;
                    }
                    if (iArr[i] == exitValue) {
                        z = true;
                        break;
                    }
                    i++;
                }
                if (!z) {
                    throw new RuntimeException("External process did not finish with an allowed exit code: " + exitValue);
                }
                this.extProcErrStream.close();
                this.extProcOutStream.close();
            } catch (IllegalThreadStateException e) {
                this.extProc.destroy();
                if (!reachedEnd()) {
                    throw new RuntimeException("External process was destroyed although stream was not fully read.");
                }
                this.extProcErrStream.close();
                this.extProcOutStream.close();
            }
        } catch (Throwable th) {
            this.extProcErrStream.close();
            this.extProcOutStream.close();
            throw th;
        }
    }

    @Override // 
    public void open(GenericInputSplit genericInputSplit) throws IOException {
        if (!(genericInputSplit instanceof ExternalProcessInputSplit)) {
            throw new IOException("Invalid InputSplit type.");
        }
        ExternalProcessInputSplit externalProcessInputSplit = (ExternalProcessInputSplit) genericInputSplit;
        if (externalProcessInputSplit.getExternalProcessCommand() == null || externalProcessInputSplit.getExternalProcessCommand().equals("")) {
            throw new IllegalArgumentException("External Process Command not set");
        }
        try {
            this.extProc = Runtime.getRuntime().exec(externalProcessInputSplit.getExternalProcessCommand());
            this.extProcOutStream = this.extProc.getInputStream();
            this.extProcErrStream = this.extProc.getErrorStream();
        } catch (IOException e) {
            throw new IOException("IO Exception when starting external process: " + externalProcessInputSplit.getExternalProcessCommand());
        }
    }

    public void waitForProcessToFinish() throws InterruptedException {
        this.extProc.waitFor();
    }
}
