/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.checkpoint.savepoint;

import java.io.DataOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.Serializable;
import java.net.URI;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.concurrent.ThreadLocalRandom;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.IntSerializer;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.core.fs.FSDataInputStream;
import org.apache.flink.core.fs.FSDataOutputStream;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.migration.runtime.checkpoint.savepoint.SavepointV0;
import org.apache.flink.migration.runtime.checkpoint.savepoint.SavepointV0Serializer;
import org.apache.flink.migration.runtime.state.KvStateSnapshot;
import org.apache.flink.migration.runtime.state.StateHandle;
import org.apache.flink.migration.runtime.state.memory.MemValueState;
import org.apache.flink.migration.runtime.state.memory.SerializedStateHandle;
import org.apache.flink.migration.streaming.runtime.tasks.StreamTaskState;
import org.apache.flink.migration.streaming.runtime.tasks.StreamTaskStateList;
import org.apache.flink.migration.util.MigrationInstantiationUtil;
import org.apache.flink.migration.util.SerializedValue;
import org.apache.flink.runtime.checkpoint.SubtaskState;
import org.apache.flink.runtime.checkpoint.TaskState;
import org.apache.flink.runtime.checkpoint.savepoint.Savepoint;
import org.apache.flink.runtime.checkpoint.savepoint.SavepointStore;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.state.ChainedStateHandle;
import org.apache.flink.runtime.state.KeyGroupsStateHandle;
import org.apache.flink.runtime.state.KeyedStateHandle;
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.runtime.state.VoidNamespaceSerializer;
import org.apache.flink.runtime.state.memory.ByteStreamStateHandle;
import org.apache.flink.util.FileUtils;
import org.apache.flink.util.InstantiationUtil;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;

public class MigrationV0ToV1Test {
    @Rule
    public TemporaryFolder tmp = new TemporaryFolder();

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testSavepointMigrationV0ToV1() throws Exception {
        String target = this.tmp.getRoot().getAbsolutePath();
        Assert.assertEquals((long)0L, (long)this.tmp.getRoot().listFiles().length);
        long checkpointId = ThreadLocalRandom.current().nextLong(Integer.MAX_VALUE);
        int numTaskStates = 4;
        int numSubtaskStates = 16;
        Collection<org.apache.flink.migration.runtime.checkpoint.TaskState> expected = MigrationV0ToV1Test.createTaskStatesOld(numTaskStates, numSubtaskStates);
        SavepointV0 savepoint = new SavepointV0(checkpointId, expected);
        Assert.assertEquals((long)0L, (long)savepoint.getVersion());
        Assert.assertEquals((long)checkpointId, (long)savepoint.getCheckpointId());
        Assert.assertEquals(expected, (Object)savepoint.getOldTaskStates());
        Assert.assertFalse((boolean)savepoint.getOldTaskStates().isEmpty());
        Exception latestException = null;
        Path path = null;
        FSDataOutputStream fdos = null;
        FileSystem fs = null;
        try {
            for (int attempt = 0; attempt < 10; ++attempt) {
                path = new Path(target, FileUtils.getRandomFilename((String)"savepoint-"));
                if (fs == null) {
                    fs = FileSystem.get((URI)path.toUri());
                }
                try {
                    fdos = fs.create(path, false);
                    break;
                }
                catch (Exception e) {
                    latestException = e;
                    continue;
                }
            }
            if (fdos == null) {
                throw new IOException("Failed to create file output stream at " + path, latestException);
            }
            try (DataOutputStream dos = new DataOutputStream((OutputStream)fdos);){
                dos.writeInt(1231054637);
                dos.writeInt(savepoint.getVersion());
                SavepointV0Serializer.INSTANCE.serializeOld(savepoint, dos);
            }
            ClassLoader cl = Thread.currentThread().getContextClassLoader();
            Savepoint sp2 = SavepointStore.loadSavepoint((String)path.toString(), (ClassLoader)cl);
            int t = 0;
            for (TaskState taskState : sp2.getTaskStates()) {
                for (int p = 0; p < taskState.getParallelism(); ++p) {
                    SubtaskState subtaskState = taskState.getState(p);
                    ChainedStateHandle legacyOperatorState = subtaskState.getLegacyOperatorState();
                    for (int c = 0; c < legacyOperatorState.getLength(); ++c) {
                        StreamStateHandle stateHandle = (StreamStateHandle)legacyOperatorState.get(c);
                        try (FSDataInputStream is = stateHandle.openInputStream();){
                            Tuple4 actTestState;
                            Tuple4 expTestState = new Tuple4((Object)0, (Object)t, (Object)p, (Object)c);
                            if (p % 4 != 0) {
                                Assert.assertEquals((long)1L, (long)is.read());
                                actTestState = (Tuple4)InstantiationUtil.deserializeObject((InputStream)is, (ClassLoader)cl);
                                Assert.assertEquals((Object)expTestState, (Object)actTestState);
                            } else {
                                Assert.assertEquals((long)0L, (long)is.read());
                            }
                            expTestState.f0 = 1;
                            actTestState = (Tuple4)InstantiationUtil.deserializeObject((InputStream)is, (ClassLoader)cl);
                            Assert.assertEquals((Object)expTestState, (Object)actTestState);
                            continue;
                        }
                    }
                    KeyedStateHandle keyedStateHandle = subtaskState.getManagedKeyedState();
                    if (t % 3 != 0) {
                        Assert.assertTrue((boolean)(keyedStateHandle instanceof KeyGroupsStateHandle));
                        KeyGroupsStateHandle keyGroupsStateHandle = (KeyGroupsStateHandle)keyedStateHandle;
                        Assert.assertEquals((long)1L, (long)keyGroupsStateHandle.getKeyGroupRange().getNumberOfKeyGroups());
                        Assert.assertEquals((long)p, (long)keyGroupsStateHandle.getGroupRangeOffsets().getKeyGroupRange().getStartKeyGroup());
                        ByteStreamStateHandle stateHandle = (ByteStreamStateHandle)keyGroupsStateHandle.getDelegateStateHandle();
                        HashMap testKeyedState = (HashMap)MigrationInstantiationUtil.deserializeObject((byte[])stateHandle.getData(), (ClassLoader)cl);
                        Assert.assertEquals((long)2L, (long)testKeyedState.size());
                        for (KvStateSnapshot snapshot : testKeyedState.values()) {
                            MemValueState.Snapshot castedSnapshot = (MemValueState.Snapshot)snapshot;
                            byte[] data = castedSnapshot.getData();
                            Assert.assertEquals((long)t, (long)data[0]);
                            Assert.assertEquals((long)p, (long)data[1]);
                        }
                        continue;
                    }
                    Assert.assertEquals(null, (Object)keyedStateHandle);
                }
                ++t;
            }
            savepoint.dispose();
        }
        finally {
            SavepointStore.removeSavepointFile((String)path.toString());
        }
    }

    private static Collection<org.apache.flink.migration.runtime.checkpoint.TaskState> createTaskStatesOld(int numTaskStates, int numSubtaskStates) throws Exception {
        ArrayList<org.apache.flink.migration.runtime.checkpoint.TaskState> taskStates = new ArrayList<org.apache.flink.migration.runtime.checkpoint.TaskState>(numTaskStates);
        for (int i = 0; i < numTaskStates; ++i) {
            org.apache.flink.migration.runtime.checkpoint.TaskState taskState = new org.apache.flink.migration.runtime.checkpoint.TaskState(new JobVertexID(), numSubtaskStates);
            for (int j = 0; j < numSubtaskStates; ++j) {
                StreamTaskState[] streamTaskStates = new StreamTaskState[2];
                for (int k = 0; k < streamTaskStates.length; ++k) {
                    StreamTaskState state = new StreamTaskState();
                    Tuple4 testState = new Tuple4((Object)0, (Object)i, (Object)j, (Object)k);
                    if (j % 4 != 0) {
                        state.setFunctionState((StateHandle)new SerializedStateHandle((Serializable)testState));
                    }
                    testState = new Tuple4((Object)1, (Object)i, (Object)j, (Object)k);
                    state.setOperatorState((StateHandle)new SerializedStateHandle((Serializable)testState));
                    if (0 == k && i % 3 != 0) {
                        HashMap<String, MemValueState.Snapshot> testKeyedState = new HashMap<String, MemValueState.Snapshot>(2);
                        for (int l = 0; l < 2; ++l) {
                            String name = "keyed-" + l;
                            MemValueState.Snapshot testKeyedSnapshot = new MemValueState.Snapshot((TypeSerializer)IntSerializer.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (TypeSerializer)IntSerializer.INSTANCE, new ValueStateDescriptor(name, Integer.class, (Object)0), new byte[]{(byte)i, (byte)j});
                            testKeyedState.put(name, testKeyedSnapshot);
                        }
                        state.setKvStates(testKeyedState);
                    }
                    streamTaskStates[k] = state;
                }
                StreamTaskStateList streamTaskStateList = new StreamTaskStateList(streamTaskStates);
                SerializedValue handle = new SerializedValue((Object)streamTaskStateList);
                taskState.putState(j, new org.apache.flink.migration.runtime.checkpoint.SubtaskState(handle, 0L, 0L));
            }
            taskStates.add(taskState);
        }
        return taskStates;
    }
}

