package org.apache.flink.test.streaming.runtime;

import java.io.File;
import java.io.IOException;
import java.lang.invoke.SerializedLambda;
import java.net.URISyntaxException;
import java.nio.file.FileVisitOption;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.connector.sink.Committer;
import org.apache.flink.api.connector.sink.GlobalCommitter;
import org.apache.flink.api.connector.sink.Sink;
import org.apache.flink.api.connector.sink.SinkWriter;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.core.execution.SavepointFormatType;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.core.memory.DataInputDeserializer;
import org.apache.flink.core.memory.DataOutputSerializer;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.jobgraph.SavepointConfigOptions;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.test.junit5.MiniClusterExtension;
import org.apache.flink.testutils.junit.SharedObjectsExtension;
import org.apache.flink.testutils.junit.SharedReference;
import org.apache.flink.util.TestLoggerExtension;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ExtendWith({TestLoggerExtension.class})
/* loaded from: input_file:org/apache/flink/test/streaming/runtime/UnifiedSinkMigrationITCase.class */
class UnifiedSinkMigrationITCase {
    private static final String SAVEPOINT_FOLDER_NAME = "unified-sink-migration-test";

    @RegisterExtension
    private final SharedObjectsExtension sharedObjects = SharedObjectsExtension.create();
    private static final int WRITER_STATE = 1;
    private static final int COMMITTER_STATE = 2;
    private static final int GLOBAL_COMMITTER_STATE = 3;
    private static final String SINK_UUID = "1c4ec0f9-2d96-46e9-99ea-45e8c3df5202";
    private static final Logger LOG = LoggerFactory.getLogger(UnifiedSinkMigrationITCase.class);

    @RegisterExtension
    private static final MiniClusterExtension miniClusterExtension = new MiniClusterExtension(new MiniClusterResourceConfiguration.Builder().build());

    /* loaded from: input_file:org/apache/flink/test/streaming/runtime/UnifiedSinkMigrationITCase$IntegerSerializer.class */
    private static class IntegerSerializer implements SimpleVersionedSerializer<Integer> {
        private IntegerSerializer() {
        }

        public int getVersion() {
            return 0;
        }

        public byte[] serialize(Integer num) throws IOException {
            DataOutputSerializer dataOutputSerializer = new DataOutputSerializer(8);
            dataOutputSerializer.writeInt(num.intValue());
            return dataOutputSerializer.getCopyOfBuffer();
        }

        /* renamed from: deserialize, reason: merged with bridge method [inline-methods] */
        public Integer m963deserialize(int i, byte[] bArr) throws IOException {
            return Integer.valueOf(new DataInputDeserializer(bArr).readInt());
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/test/streaming/runtime/UnifiedSinkMigrationITCase$StateFulSinkV1.class */
    public static class StateFulSinkV1 implements Sink<Long, Integer, Integer, String> {
        private final boolean recovered;
        private final SharedReference<OneShotLatch> latch;
        private final SharedReference<CountDownLatch> commitLatch;

        StateFulSinkV1(boolean z, SharedReference<OneShotLatch> sharedReference, SharedReference<CountDownLatch> sharedReference2) {
            this.recovered = z;
            this.latch = sharedReference;
            this.commitLatch = sharedReference2;
        }

        public SinkWriter<Long, Integer, Integer> createWriter(Sink.InitContext initContext, List<Integer> list) throws IOException {
            return new TestWriter(this.recovered, list);
        }

        public Optional<SimpleVersionedSerializer<Integer>> getWriterStateSerializer() {
            return Optional.of(new IntegerSerializer());
        }

        public Optional<Committer<Integer>> createCommitter() throws IOException {
            return Optional.of(new TestCommitter(this.recovered, this.commitLatch));
        }

        public Optional<GlobalCommitter<Integer, String>> createGlobalCommitter() throws IOException {
            return Optional.of(new TestGlobalCommitter(this.recovered, this.latch, this.commitLatch));
        }

        public Optional<SimpleVersionedSerializer<Integer>> getCommittableSerializer() {
            return Optional.of(new IntegerSerializer());
        }

        public Optional<SimpleVersionedSerializer<String>> getGlobalCommittableSerializer() {
            return Optional.of(new StringSerializer());
        }
    }

    /* loaded from: input_file:org/apache/flink/test/streaming/runtime/UnifiedSinkMigrationITCase$StringSerializer.class */
    private static class StringSerializer implements SimpleVersionedSerializer<String> {
        private StringSerializer() {
        }

        public int getVersion() {
            return 0;
        }

        public byte[] serialize(String str) throws IOException {
            DataOutputSerializer dataOutputSerializer = new DataOutputSerializer(8);
            dataOutputSerializer.writeUTF(str);
            return dataOutputSerializer.getCopyOfBuffer();
        }

        /* renamed from: deserialize, reason: merged with bridge method [inline-methods] */
        public String m964deserialize(int i, byte[] bArr) throws IOException {
            return new DataInputDeserializer(bArr).readUTF();
        }
    }

    /* loaded from: input_file:org/apache/flink/test/streaming/runtime/UnifiedSinkMigrationITCase$TestCommitter.class */
    private static class TestCommitter implements Committer<Integer> {
        private final boolean recovered;
        private final SharedReference<CountDownLatch> commitLatch;
        boolean firstCommit = true;

        TestCommitter(boolean z, SharedReference<CountDownLatch> sharedReference) {
            this.recovered = z;
            this.commitLatch = sharedReference;
        }

        public List<Integer> commit(List<Integer> list) throws IOException, InterruptedException {
            if (!this.firstCommit || this.recovered) {
                Assertions.assertThat(list).containsExactly(new Integer[]{Integer.valueOf(UnifiedSinkMigrationITCase.COMMITTER_STATE)});
            } else {
                Assertions.assertThat(list).containsExactly(new Integer[]{Integer.valueOf(UnifiedSinkMigrationITCase.COMMITTER_STATE), Integer.valueOf(UnifiedSinkMigrationITCase.GLOBAL_COMMITTER_STATE)});
            }
            UnifiedSinkMigrationITCase.LOG.info("Committing {}", list);
            ((CountDownLatch) this.commitLatch.get()).countDown();
            this.firstCommit = false;
            return Collections.singletonList(Integer.valueOf(UnifiedSinkMigrationITCase.COMMITTER_STATE));
        }

        public void close() throws Exception {
        }
    }

    /* loaded from: input_file:org/apache/flink/test/streaming/runtime/UnifiedSinkMigrationITCase$TestGlobalCommitter.class */
    private static class TestGlobalCommitter implements GlobalCommitter<Integer, String> {
        private final boolean recover;
        private final SharedReference<OneShotLatch> latch;
        private final SharedReference<CountDownLatch> commitLatch;
        private boolean firstCommitAfterRecover;

        TestGlobalCommitter(boolean z, SharedReference<OneShotLatch> sharedReference, SharedReference<CountDownLatch> sharedReference2) {
            this.recover = z;
            this.firstCommitAfterRecover = z;
            this.latch = sharedReference;
            this.commitLatch = sharedReference2;
        }

        public List<String> filterRecoveredCommittables(List<String> list) throws IOException {
            if (this.recover) {
                Assertions.assertThat(list).containsExactly(new String[]{String.valueOf(UnifiedSinkMigrationITCase.GLOBAL_COMMITTER_STATE)});
            }
            return list;
        }

        public String combine(List<Integer> list) throws IOException {
            Assertions.assertThat(list).hasSize(UnifiedSinkMigrationITCase.WRITER_STATE);
            return String.valueOf(list.get(0));
        }

        public List<String> commit(List<String> list) throws IOException, InterruptedException {
            UnifiedSinkMigrationITCase.LOG.info("Global committing {}", list);
            UnifiedSinkMigrationITCase.LOG.info("Latch count: {}", Long.valueOf(((CountDownLatch) this.commitLatch.get()).getCount()));
            if (!this.firstCommitAfterRecover && ((CountDownLatch) this.commitLatch.get()).getCount() <= 0) {
                ((OneShotLatch) this.latch.get()).trigger();
                Assertions.assertThat(list).containsExactly(new String[]{String.valueOf(UnifiedSinkMigrationITCase.GLOBAL_COMMITTER_STATE)});
            }
            this.firstCommitAfterRecover = false;
            return list;
        }

        public void endOfInput() throws IOException, InterruptedException {
        }

        public void close() throws Exception {
        }

        /* renamed from: combine, reason: collision with other method in class */
        public /* bridge */ /* synthetic */ Object m965combine(List list) throws IOException {
            return combine((List<Integer>) list);
        }
    }

    /* loaded from: input_file:org/apache/flink/test/streaming/runtime/UnifiedSinkMigrationITCase$TestWriter.class */
    private static class TestWriter implements SinkWriter<Long, Integer, Integer> {
        private final boolean recovered;
        private boolean emitted = false;

        TestWriter(boolean z, List<Integer> list) {
            this.recovered = z;
            if (z) {
                Assertions.assertThat(list).containsExactly(new Integer[]{Integer.valueOf(UnifiedSinkMigrationITCase.WRITER_STATE)});
            } else {
                Assertions.assertThat(list).isEmpty();
            }
        }

        public void write(Long l, SinkWriter.Context context) throws IOException, InterruptedException {
        }

        public List<Integer> prepareCommit(boolean z) throws IOException, InterruptedException {
            if (this.emitted || this.recovered) {
                return Collections.emptyList();
            }
            this.emitted = true;
            return Arrays.asList(Integer.valueOf(UnifiedSinkMigrationITCase.COMMITTER_STATE), Integer.valueOf(UnifiedSinkMigrationITCase.GLOBAL_COMMITTER_STATE));
        }

        public List<Integer> snapshotState(long j) throws IOException {
            return Collections.singletonList(Integer.valueOf(UnifiedSinkMigrationITCase.WRITER_STATE));
        }

        public void close() throws Exception {
        }
    }

    UnifiedSinkMigrationITCase() {
    }

    @Disabled
    @Test
    void prepareSinkSavepoint() throws Exception {
        LOG.warn("Deleting the previous savepoints.");
        Path resolve = Paths.get("src/test/resources/", new String[0]).resolve(SAVEPOINT_FOLDER_NAME);
        Files.walk(resolve, new FileVisitOption[0]).skip(1L).sorted(Comparator.reverseOrder()).map((v0) -> {
            return v0.toFile();
        }).forEach((v0) -> {
            v0.delete();
        });
        String str = (String) executeJob(false).stopWithSavepoint(false, resolve.toString(), SavepointFormatType.CANONICAL).get();
        LOG.info("Savepoint path: {}", str);
        Assertions.assertThat(str).contains(new CharSequence[]{resolve.toString()});
    }

    @Test
    void testRestoreSinkState() throws Exception {
        executeJob(true).cancel();
    }

    private JobClient executeJob(boolean z) throws Exception {
        Configuration configuration = new Configuration();
        if (z) {
            configuration.set(SavepointConfigOptions.SAVEPOINT_PATH, findSavepointPath());
        }
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment(configuration);
        SharedReference add = this.sharedObjects.add(new OneShotLatch());
        SharedReference add2 = this.sharedObjects.add(new CountDownLatch(COMMITTER_STATE));
        executionEnvironment.enableCheckpointing(100L);
        executionEnvironment.setParallelism(WRITER_STATE);
        executionEnvironment.setRestartStrategy(RestartStrategies.noRestart());
        executionEnvironment.fromSequence(1L, Long.MAX_VALUE).map(l -> {
            Thread.sleep(10L);
            return l;
        }).sinkTo(new StateFulSinkV1(z, add, add2)).disableChaining().uid(SINK_UUID);
        JobClient executeAsync = executionEnvironment.executeAsync();
        ((OneShotLatch) add.get()).await();
        Assertions.assertThat((Comparable) executeAsync.getJobStatus().get()).isEqualTo(JobStatus.RUNNING);
        return executeAsync;
    }

    private File buildSavepointPath() throws URISyntaxException {
        return new File(getClass().getResource("/unified-sink-migration-test").toURI());
    }

    private String findSavepointPath() throws URISyntaxException {
        File buildSavepointPath = buildSavepointPath();
        LOG.info("Base path: {}", buildSavepointPath.getAbsolutePath());
        File[] listFiles = buildSavepointPath.listFiles((v0) -> {
            return v0.isDirectory();
        });
        Assertions.assertThat(listFiles).isNotNull().hasSize(WRITER_STATE);
        File[] listFiles2 = listFiles[0].listFiles();
        Assertions.assertThat(listFiles2).isNotNull().hasSize(WRITER_STATE);
        return listFiles2[0].getAbsolutePath();
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case 939605231:
                if (implMethodName.equals("lambda$executeJob$70a93ba$1")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/common/functions/MapFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("map") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/flink/test/streaming/runtime/UnifiedSinkMigrationITCase") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/Long;)Ljava/lang/Long;")) {
                    return l -> {
                        Thread.sleep(10L);
                        return l;
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
