package org.apache.flink.test.runtime;

import java.io.IOException;
import java.util.concurrent.TimeUnit;
import org.apache.flink.client.ClientUtils;
import org.apache.flink.core.io.IOReadableWritable;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.io.network.api.reader.RecordReader;
import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
import org.apache.flink.runtime.io.network.api.writer.RecordWriterBuilder;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.jobgraph.DistributionPattern;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.apache.flink.testutils.junit.category.AlsoRunWithLegacyScheduler;
import org.apache.flink.util.TestLogger;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Category({AlsoRunWithLegacyScheduler.class})
/* loaded from: input_file:org/apache/flink/test/runtime/NetworkStackThroughputITCase.class */
public class NetworkStackThroughputITCase extends TestLogger {
    private static final Logger LOG = LoggerFactory.getLogger(NetworkStackThroughputITCase.class);
    private static final String DATA_VOLUME_GB_CONFIG_KEY = "data.volume.gb";
    private static final String IS_SLOW_SENDER_CONFIG_KEY = "is.slow.sender";
    private static final String IS_SLOW_RECEIVER_CONFIG_KEY = "is.slow.receiver";
    private static final int IS_SLOW_SLEEP_MS = 10;
    private static final int IS_SLOW_EVERY_NUM_RECORDS = 512;

    /* loaded from: input_file:org/apache/flink/test/runtime/NetworkStackThroughputITCase$SpeedTestConsumer.class */
    public static class SpeedTestConsumer extends AbstractInvokable {
        public SpeedTestConsumer(Environment environment) {
            super(environment);
        }

        public void invoke() throws Exception {
            RecordReader recordReader = new RecordReader(getEnvironment().getInputGate(0), SpeedTestRecord.class, getEnvironment().getTaskManagerInfo().getTmpDirectories());
            try {
                boolean z = getTaskConfiguration().getBoolean(NetworkStackThroughputITCase.IS_SLOW_RECEIVER_CONFIG_KEY, false);
                int i = 0;
                while (recordReader.next() != null) {
                    if (z) {
                        int i2 = i;
                        i++;
                        if (i2 % NetworkStackThroughputITCase.IS_SLOW_EVERY_NUM_RECORDS == 0) {
                            Thread.sleep(10L);
                        }
                    }
                }
            } finally {
                recordReader.clearBuffers();
            }
        }
    }

    /* loaded from: input_file:org/apache/flink/test/runtime/NetworkStackThroughputITCase$SpeedTestForwarder.class */
    public static class SpeedTestForwarder extends AbstractInvokable {
        public SpeedTestForwarder(Environment environment) {
            super(environment);
        }

        public void invoke() throws Exception {
            RecordReader recordReader = new RecordReader(getEnvironment().getInputGate(0), SpeedTestRecord.class, getEnvironment().getTaskManagerInfo().getTmpDirectories());
            RecordWriter build = new RecordWriterBuilder().build(getEnvironment().getWriter(0));
            while (true) {
                try {
                    SpeedTestRecord speedTestRecord = (SpeedTestRecord) recordReader.next();
                    if (speedTestRecord == null) {
                        return;
                    } else {
                        build.emit(speedTestRecord);
                    }
                } finally {
                    recordReader.clearBuffers();
                    build.clearBuffers();
                    build.flushAll();
                }
            }
        }
    }

    /* loaded from: input_file:org/apache/flink/test/runtime/NetworkStackThroughputITCase$SpeedTestProducer.class */
    public static class SpeedTestProducer extends AbstractInvokable {
        public SpeedTestProducer(Environment environment) {
            super(environment);
        }

        public void invoke() throws Exception {
            RecordWriter build = new RecordWriterBuilder().build(getEnvironment().getWriter(0));
            try {
                long integer = (getTaskConfiguration().getInteger(NetworkStackThroughputITCase.DATA_VOLUME_GB_CONFIG_KEY, 1) * NetworkStackThroughputITCase.IS_SLOW_SLEEP_MS) / getCurrentNumberOfSubtasks();
                long j = ((integer * 1024) * 1024) / 128;
                NetworkStackThroughputITCase.LOG.info(String.format("%d/%d: Producing %d records (each record: %d bytes, total: %.2f GB)", Integer.valueOf(getIndexInSubtaskGroup() + 1), Integer.valueOf(getCurrentNumberOfSubtasks()), Long.valueOf(j), 128, Double.valueOf(integer / 1024.0d)));
                boolean z = getTaskConfiguration().getBoolean(NetworkStackThroughputITCase.IS_SLOW_SENDER_CONFIG_KEY, false);
                int i = 0;
                SpeedTestRecord speedTestRecord = new SpeedTestRecord();
                for (long j2 = 0; j2 < j; j2++) {
                    if (z) {
                        int i2 = i;
                        i++;
                        if (i2 % NetworkStackThroughputITCase.IS_SLOW_EVERY_NUM_RECORDS == 0) {
                            Thread.sleep(10L);
                        }
                    }
                    build.emit(speedTestRecord);
                }
            } finally {
                build.clearBuffers();
                build.flushAll();
            }
        }
    }

    /* loaded from: input_file:org/apache/flink/test/runtime/NetworkStackThroughputITCase$SpeedTestRecord.class */
    public static class SpeedTestRecord implements IOReadableWritable {
        private static final int RECORD_SIZE = 128;
        private final byte[] buf = new byte[RECORD_SIZE];

        public SpeedTestRecord() {
            for (int i = 0; i < RECORD_SIZE; i++) {
                this.buf[i] = (byte) (i % RECORD_SIZE);
            }
        }

        public void write(DataOutputView dataOutputView) throws IOException {
            dataOutputView.write(this.buf);
        }

        public void read(DataInputView dataInputView) throws IOException {
            dataInputView.readFully(this.buf);
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Test
    public void testThroughput() throws Exception {
        for (Object[] objArr : new Object[]{new Object[]{1, false, false, false, 4, 2}, new Object[]{1, true, false, false, 4, 2}, new Object[]{1, true, true, false, 4, 2}, new Object[]{1, true, false, true, 4, 2}, new Object[]{2, true, false, false, 4, 2}, new Object[]{4, true, false, false, 4, 2}, new Object[]{4, true, false, false, 8, 4}}) {
            int intValue = ((Integer) objArr[0]).intValue();
            boolean booleanValue = ((Boolean) objArr[1]).booleanValue();
            boolean booleanValue2 = ((Boolean) objArr[2]).booleanValue();
            boolean booleanValue3 = ((Boolean) objArr[3]).booleanValue();
            int intValue2 = ((Integer) objArr[4]).intValue();
            int intValue3 = ((Integer) objArr[5]).intValue();
            if (intValue2 % intValue3 != 0) {
                throw new RuntimeException("The test case defines a parallelism that is not a multiple of the slots per task manager.");
            }
            MiniClusterWithClientResource miniClusterWithClientResource = new MiniClusterWithClientResource(new MiniClusterResourceConfiguration.Builder().setNumberTaskManagers(intValue2 / intValue3).setNumberSlotsPerTaskManager(intValue3).build());
            miniClusterWithClientResource.before();
            try {
                System.out.println(String.format("Running test with parameters: dataVolumeGB=%s, useForwarder=%s, isSlowSender=%s, isSlowReceiver=%s, parallelism=%s, numSlotsPerTM=%s", Integer.valueOf(intValue), Boolean.valueOf(booleanValue), Boolean.valueOf(booleanValue2), Boolean.valueOf(booleanValue3), Integer.valueOf(intValue2), Integer.valueOf(intValue3)));
                testProgram(miniClusterWithClientResource, intValue, booleanValue, booleanValue2, booleanValue3, intValue2);
                miniClusterWithClientResource.after();
            } catch (Throwable th) {
                miniClusterWithClientResource.after();
                throw th;
            }
        }
    }

    private void testProgram(MiniClusterWithClientResource miniClusterWithClientResource, int i, boolean z, boolean z2, boolean z3, int i2) throws Exception {
        long j = i * 8192;
        long netRuntime = ClientUtils.submitJobAndWaitForResult(miniClusterWithClientResource.getClusterClient(), createJobGraph(i, z, z2, z3, i2), getClass().getClassLoader()).getNetRuntime(TimeUnit.SECONDS);
        LOG.info(String.format("Test finished with throughput of %d MBit/s (runtime [secs]: %d, data volume [gb/mbits]: %d/%d)", Integer.valueOf((int) (j / netRuntime)), Long.valueOf(netRuntime), Integer.valueOf(i), Long.valueOf(j)));
    }

    private JobGraph createJobGraph(int i, boolean z, boolean z2, boolean z3, int i2) {
        JobGraph jobGraph = new JobGraph("Speed Test");
        SlotSharingGroup slotSharingGroup = new SlotSharingGroup();
        JobVertex jobVertex = new JobVertex("Speed Test Producer");
        jobGraph.addVertex(jobVertex);
        jobVertex.setSlotSharingGroup(slotSharingGroup);
        jobVertex.setInvokableClass(SpeedTestProducer.class);
        jobVertex.setParallelism(i2);
        jobVertex.getConfiguration().setInteger(DATA_VOLUME_GB_CONFIG_KEY, i);
        jobVertex.getConfiguration().setBoolean(IS_SLOW_SENDER_CONFIG_KEY, z2);
        JobVertex jobVertex2 = null;
        if (z) {
            jobVertex2 = new JobVertex("Speed Test Forwarder");
            jobGraph.addVertex(jobVertex2);
            jobVertex2.setSlotSharingGroup(slotSharingGroup);
            jobVertex2.setInvokableClass(SpeedTestForwarder.class);
            jobVertex2.setParallelism(i2);
        }
        JobVertex jobVertex3 = new JobVertex("Speed Test Consumer");
        jobGraph.addVertex(jobVertex3);
        jobVertex3.setSlotSharingGroup(slotSharingGroup);
        jobVertex3.setInvokableClass(SpeedTestConsumer.class);
        jobVertex3.setParallelism(i2);
        jobVertex3.getConfiguration().setBoolean(IS_SLOW_RECEIVER_CONFIG_KEY, z3);
        if (z) {
            jobVertex2.connectNewDataSetAsInput(jobVertex, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
            jobVertex3.connectNewDataSetAsInput(jobVertex2, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
        } else {
            jobVertex3.connectNewDataSetAsInput(jobVertex, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
        }
        return jobGraph;
    }

    public static void main(String[] strArr) throws Exception {
        new NetworkStackThroughputITCase().testThroughput();
        System.out.println("Done.");
    }
}
