package org.apache.flink.test.typeserializerupgrade;

import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.lang.reflect.Field;
import java.net.URL;
import java.net.URLClassLoader;
import java.util.Arrays;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import javax.tools.ToolProvider;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.ReducingState;
import org.apache.flink.api.common.state.ReducingStateDescriptor;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeutils.base.LongSerializer;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.configuration.CheckpointingOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.testutils.CommonTestUtils;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder;
import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.state.StateBackendLoader;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.StreamMap;
import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.flink.util.DynamicCodeLoadingException;
import org.apache.flink.util.IOUtils;
import org.apache.flink.util.StateMigrationException;
import org.apache.flink.util.TestLogger;
import org.junit.Assert;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.slf4j.Logger;

@RunWith(Parameterized.class)
/* loaded from: input_file:org/apache/flink/test/typeserializerupgrade/PojoSerializerUpgradeTest.class */
public class PojoSerializerUpgradeTest extends TestLogger {

    @ClassRule
    public static TemporaryFolder temporaryFolder = new TemporaryFolder();
    private StateBackend stateBackend;
    private static final String POJO_NAME = "Pojo";
    private static final String SOURCE_A = "import java.util.Objects;public class Pojo { private long a; private String b; public long getA() { return a;} public void setA(long value) { a = value; }public String getB() { return b; }public void setB(String value) { b = value; }@Override public boolean equals(Object obj) { if (obj instanceof Pojo) { Pojo other = (Pojo) obj; return a == other.a && b.equals(other.b);} else { return false; }}@Override public int hashCode() { return Objects.hash(a, b); } @Override public String toString() {return \"(\" + a + \", \" + b + \")\";}}";
    private static final String SOURCE_B = "import java.util.Objects;public class Pojo { private String b; private long a; public long getA() { return a;} public void setA(long value) { a = value; }public String getB() { return b; }public void setB(String value) { b = value; }@Override public boolean equals(Object obj) { if (obj instanceof Pojo) { Pojo other = (Pojo) obj; return a == other.a && b.equals(other.b);} else { return false; }}@Override public int hashCode() { return Objects.hash(a, b); } @Override public String toString() {return \"(\" + a + \", \" + b + \")\";}}";
    private static final String SOURCE_C = "import java.util.Objects;public class Pojo { private double a; private String b; public double getA() { return a;} public void setA(double value) { a = value; }public String getB() { return b; }public void setB(String value) { b = value; }@Override public boolean equals(Object obj) { if (obj instanceof Pojo) { Pojo other = (Pojo) obj; return a == other.a && b.equals(other.b);} else { return false; }}@Override public int hashCode() { return Objects.hash(a, b); } @Override public String toString() {return \"(\" + a + \", \" + b + \")\";}}";
    private static final String SOURCE_D = "import java.util.Objects;public class Pojo { private long a; private String b; private double c; public long getA() { return a;} public void setA(long value) { a = value; }public String getB() { return b; }public void setB(String value) { b = value; }public double getC() { return c; } public void setC(double value) { c = value; }@Override public boolean equals(Object obj) { if (obj instanceof Pojo) { Pojo other = (Pojo) obj; return a == other.a && b.equals(other.b) && c == other.c;} else { return false; }}@Override public int hashCode() { return Objects.hash(a, b, c); } @Override public String toString() {return \"(\" + a + \", \" + b + \", \" + c + \")\";}}";
    private static final String SOURCE_E = "import java.util.Objects;public class Pojo { private long a; public long getA() { return a;} public void setA(long value) { a = value; }@Override public boolean equals(Object obj) { if (obj instanceof Pojo) { Pojo other = (Pojo) obj; return a == other.a;} else { return false; }}@Override public int hashCode() { return Objects.hash(a); } @Override public String toString() {return \"(\" + a + \")\";}}";

    /* loaded from: input_file:org/apache/flink/test/typeserializerupgrade/PojoSerializerUpgradeTest$FirstValueReducer.class */
    private static final class FirstValueReducer<T> implements ReduceFunction<T> {
        private static final long serialVersionUID = -9222976423336835926L;

        private FirstValueReducer() {
        }

        public T reduce(T t, T t2) throws Exception {
            return t;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/test/typeserializerupgrade/PojoSerializerUpgradeTest$IdentityKeySelector.class */
    public static final class IdentityKeySelector<T> implements KeySelector<T, T> {
        private static final long serialVersionUID = -3263628393881929147L;

        private IdentityKeySelector() {
        }

        public T getKey(T t) throws Exception {
            return t;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/test/typeserializerupgrade/PojoSerializerUpgradeTest$StatefulMapper.class */
    public static final class StatefulMapper extends RichMapFunction<Long, Long> implements CheckpointedFunction {
        private static final long serialVersionUID = -520490739059396832L;
        private final boolean keyed;
        private final boolean verify;
        private final boolean hasBField;
        private transient ValueState<Object> keyedValueState;
        private transient ListState<Object> keyedListState;
        private transient ReducingState<Object> keyedReducingState;
        private transient ListState<Object> partitionableListState;
        private transient ListState<Object> unionListState;
        private transient Class<?> pojoClass;
        private transient Field fieldA;
        private transient Field fieldB;

        StatefulMapper(boolean z, boolean z2, boolean z3) {
            this.keyed = z;
            this.verify = z2;
            this.hasBField = z3;
        }

        public Long map(Long l) throws Exception {
            boolean z;
            Object newInstance = this.pojoClass.newInstance();
            this.fieldA.set(newInstance, l);
            if (this.hasBField) {
                this.fieldB.set(newInstance, l + "");
            }
            if (this.verify) {
                if (this.keyed) {
                    Assert.assertEquals(newInstance, this.keyedValueState.value());
                    Iterator it = ((Iterable) this.keyedListState.get()).iterator();
                    boolean z2 = false;
                    while (true) {
                        z = z2;
                        if (!it.hasNext()) {
                            break;
                        }
                        z2 = z | newInstance.equals(it.next());
                    }
                    Assert.assertTrue(z);
                    Assert.assertEquals(newInstance, this.keyedReducingState.get());
                } else {
                    boolean z3 = false;
                    Iterator it2 = ((Iterable) this.partitionableListState.get()).iterator();
                    while (it2.hasNext()) {
                        z3 |= newInstance.equals(it2.next());
                    }
                    Assert.assertTrue(z3);
                    boolean z4 = false;
                    Iterator it3 = ((Iterable) this.unionListState.get()).iterator();
                    while (it3.hasNext()) {
                        z4 |= newInstance.equals(it3.next());
                    }
                    Assert.assertTrue(z4);
                }
            } else if (this.keyed) {
                this.keyedValueState.update(newInstance);
                this.keyedListState.add(newInstance);
                this.keyedReducingState.add(newInstance);
            } else {
                this.partitionableListState.add(newInstance);
                this.unionListState.add(newInstance);
            }
            return l;
        }

        public void snapshotState(FunctionSnapshotContext functionSnapshotContext) throws Exception {
        }

        public void initializeState(FunctionInitializationContext functionInitializationContext) throws Exception {
            this.pojoClass = getRuntimeContext().getUserCodeClassLoader().loadClass(PojoSerializerUpgradeTest.POJO_NAME);
            this.fieldA = this.pojoClass.getDeclaredField("a");
            this.fieldA.setAccessible(true);
            if (this.hasBField) {
                this.fieldB = this.pojoClass.getDeclaredField("b");
                this.fieldB.setAccessible(true);
            }
            if (!this.keyed) {
                this.partitionableListState = functionInitializationContext.getOperatorStateStore().getListState(new ListStateDescriptor("partitionableListState", this.pojoClass));
                this.unionListState = functionInitializationContext.getOperatorStateStore().getUnionListState(new ListStateDescriptor("unionListState", this.pojoClass));
            } else {
                this.keyedValueState = functionInitializationContext.getKeyedStateStore().getState(new ValueStateDescriptor("keyedValueState", this.pojoClass));
                this.keyedListState = functionInitializationContext.getKeyedStateStore().getListState(new ListStateDescriptor("keyedListState", this.pojoClass));
                this.keyedReducingState = functionInitializationContext.getKeyedStateStore().getReducingState(new ReducingStateDescriptor("keyedReducingState", new FirstValueReducer(), this.pojoClass));
            }
        }
    }

    @Parameterized.Parameters(name = "StateBackend: {0}")
    public static Collection<String> parameters() {
        return Arrays.asList("jobmanager", "filesystem", "rocksdb");
    }

    public PojoSerializerUpgradeTest(String str) throws IOException, DynamicCodeLoadingException {
        Configuration configuration = new Configuration();
        configuration.setString(CheckpointingOptions.STATE_BACKEND, str);
        configuration.setString(CheckpointingOptions.CHECKPOINTS_DIRECTORY, temporaryFolder.newFolder().toURI().toString());
        this.stateBackend = StateBackendLoader.loadStateBackendFromConfig(configuration, Thread.currentThread().getContextClassLoader(), (Logger) null);
    }

    @Test
    public void testChangedFieldOrderWithKeyedState() throws Exception {
        testPojoSerializerUpgrade(SOURCE_A, SOURCE_B, true, true);
    }

    @Test
    public void testChangedFieldOrderWithOperatorState() throws Exception {
        testPojoSerializerUpgrade(SOURCE_A, SOURCE_B, true, false);
    }

    @Test
    public void testChangedFieldTypesWithKeyedState() throws Exception {
        try {
            testPojoSerializerUpgrade(SOURCE_A, SOURCE_C, true, true);
            Assert.fail("Expected a state migration exception.");
        } catch (Exception e) {
            if (!CommonTestUtils.containsCause(e, StateMigrationException.class)) {
                throw e;
            }
        }
    }

    @Test
    public void testChangedFieldTypesWithOperatorState() throws Exception {
        try {
            testPojoSerializerUpgrade(SOURCE_A, SOURCE_C, true, false);
            Assert.fail("Expected a state migration exception.");
        } catch (Exception e) {
            if (!CommonTestUtils.containsCause(e, StateMigrationException.class)) {
                throw e;
            }
        }
    }

    @Test
    public void testAdditionalFieldWithKeyedState() throws Exception {
        testPojoSerializerUpgrade(SOURCE_A, SOURCE_D, true, true);
    }

    @Test
    public void testAdditionalFieldWithOperatorState() throws Exception {
        testPojoSerializerUpgrade(SOURCE_A, SOURCE_D, true, false);
    }

    @Test
    public void testMissingFieldWithKeyedState() throws Exception {
        testPojoSerializerUpgrade(SOURCE_A, SOURCE_E, false, true);
    }

    @Test
    public void testMissingFieldWithOperatorState() throws Exception {
        testPojoSerializerUpgrade(SOURCE_A, SOURCE_E, false, false);
    }

    private void testPojoSerializerUpgrade(String str, String str2, boolean z, boolean z2) throws Exception {
        Configuration configuration = new Configuration();
        ExecutionConfig executionConfig = new ExecutionConfig();
        IdentityKeySelector identityKeySelector = new IdentityKeySelector();
        List asList = Arrays.asList(1L, 2L, 45L, 67L, 1337L);
        File newFolder = temporaryFolder.newFolder();
        compileClass(writeSourceFile(newFolder, "Pojo.java", str));
        OperatorSubtaskState runOperator = runOperator(configuration, executionConfig, new StreamMap(new StatefulMapper(z2, false, z)), identityKeySelector, z2, this.stateBackend, URLClassLoader.newInstance(new URL[]{newFolder.toURI().toURL()}, Thread.currentThread().getContextClassLoader()), null, asList);
        File newFolder2 = temporaryFolder.newFolder();
        compileClass(writeSourceFile(newFolder2, "Pojo.java", str2));
        runOperator(configuration, executionConfig, new StreamMap(new StatefulMapper(z2, true, z)), identityKeySelector, z2, this.stateBackend, URLClassLoader.newInstance(new URL[]{newFolder2.toURI().toURL()}, Thread.currentThread().getContextClassLoader()), runOperator, asList);
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r4v2 */
    /* JADX WARN: Type inference failed for: r4v3 */
    /* JADX WARN: Type inference failed for: r4v7 */
    private OperatorSubtaskState runOperator(Configuration configuration, ExecutionConfig executionConfig, OneInputStreamOperator<Long, Long> oneInputStreamOperator, KeySelector<Long, Long> keySelector, boolean z, StateBackend stateBackend, ClassLoader classLoader, OperatorSubtaskState operatorSubtaskState, Iterable<Long> iterable) throws Exception {
        BasicTypeInfo basicTypeInfo;
        BasicTypeInfo build = new MockEnvironmentBuilder().setTaskName("test task").setManagedMemorySize(32768L).setInputSplitProvider(new MockInputSplitProvider()).setBufferSize(256).setTaskConfiguration(configuration).setExecutionConfig(executionConfig).setMaxParallelism(16).setUserCodeClassLoader(classLoader).build();
        Throwable th = null;
        try {
            KeyedOneInputStreamOperatorTestHarness keyedOneInputStreamOperatorTestHarness = null;
            try {
                if (z) {
                    BasicTypeInfo basicTypeInfo2 = BasicTypeInfo.LONG_TYPE_INFO;
                    keyedOneInputStreamOperatorTestHarness = new KeyedOneInputStreamOperatorTestHarness(oneInputStreamOperator, keySelector, basicTypeInfo2, build);
                    basicTypeInfo = basicTypeInfo2;
                } else {
                    BasicTypeInfo basicTypeInfo3 = build;
                    keyedOneInputStreamOperatorTestHarness = new OneInputStreamOperatorTestHarness(oneInputStreamOperator, LongSerializer.INSTANCE, basicTypeInfo3);
                    basicTypeInfo = basicTypeInfo3;
                }
                keyedOneInputStreamOperatorTestHarness.setStateBackend(stateBackend);
                keyedOneInputStreamOperatorTestHarness.setup();
                keyedOneInputStreamOperatorTestHarness.initializeState(operatorSubtaskState);
                keyedOneInputStreamOperatorTestHarness.open();
                long j = 0;
                Iterator<Long> it = iterable.iterator();
                ?? r4 = basicTypeInfo;
                while (it.hasNext()) {
                    long j2 = j;
                    long j3 = r4;
                    r4 = 1;
                    j = j3 + 1;
                    keyedOneInputStreamOperatorTestHarness.processElement(it.next(), j2);
                }
                OperatorSubtaskState snapshot = keyedOneInputStreamOperatorTestHarness.snapshot(1L, j + 1);
                IOUtils.closeQuietly(keyedOneInputStreamOperatorTestHarness);
                if (build != null) {
                    if (0 != 0) {
                        try {
                            build.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        build.close();
                    }
                }
                return snapshot;
            } catch (Throwable th3) {
                IOUtils.closeQuietly(keyedOneInputStreamOperatorTestHarness);
                throw th3;
            }
        } catch (Throwable th4) {
            if (build != null) {
                if (0 != 0) {
                    try {
                        build.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    build.close();
                }
            }
            throw th4;
        }
    }

    private static File writeSourceFile(File file, String str, String str2) throws IOException {
        File file2 = new File(file, str);
        file2.getParentFile().mkdirs();
        FileWriter fileWriter = new FileWriter(file2);
        Throwable th = null;
        try {
            try {
                fileWriter.write(str2);
                if (fileWriter != null) {
                    if (0 != 0) {
                        try {
                            fileWriter.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        fileWriter.close();
                    }
                }
                return file2;
            } finally {
            }
        } catch (Throwable th3) {
            if (fileWriter != null) {
                if (th != null) {
                    try {
                        fileWriter.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    fileWriter.close();
                }
            }
            throw th3;
        }
    }

    private static int compileClass(File file) {
        return ToolProvider.getSystemJavaCompiler().run((InputStream) null, (OutputStream) null, (OutputStream) null, new String[]{"-proc:none", file.getPath()});
    }
}
