package org.apache.flink.runtime.state.changelog.inmemory;

import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.mailbox.SyncMailboxExecutor;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.SnapshotResult;
import org.apache.flink.runtime.state.changelog.ChangelogStateHandle;
import org.apache.flink.runtime.state.changelog.SequenceNumber;
import org.apache.flink.runtime.state.changelog.StateChange;
import org.apache.flink.runtime.state.changelog.StateChangelogHandleReader;
import org.apache.flink.runtime.state.changelog.StateChangelogStorage;
import org.apache.flink.runtime.state.changelog.StateChangelogWriter;
import org.apache.flink.util.CloseableIterator;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.io.TempDir;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;

/* loaded from: input_file:org/apache/flink/runtime/state/changelog/inmemory/StateChangelogStorageTest.class */
public class StateChangelogStorageTest<T extends ChangelogStateHandle> {
    private final Random random = new Random();

    @TempDir
    public File temporaryFolder;

    public static Stream<Boolean> parameters() {
        return Stream.of(true);
    }

    @Disabled("FLINK-30729")
    @MethodSource({"parameters"})
    @ParameterizedTest(name = "compression = {0}")
    public void testNoAppendAfterClose(boolean z) throws IOException {
        Assertions.assertThatThrownBy(() -> {
            StateChangelogWriter createWriter = getFactory(z, this.temporaryFolder).createWriter(new OperatorID().toString(), KeyGroupRange.of(0, 0), new SyncMailboxExecutor());
            createWriter.close();
            createWriter.append(0, new byte[0]);
        }).isInstanceOf(IllegalStateException.class);
    }

    /* JADX WARN: Multi-variable type inference failed */
    @MethodSource({"parameters"})
    @ParameterizedTest(name = "compression = {0}")
    public void testWriteAndRead(boolean z) throws Exception {
        KeyGroupRange of = KeyGroupRange.of(0, 5);
        Map<Integer, List<byte[]>> generateAppends = generateAppends(of, 405, 20);
        StateChangelogStorage factory = getFactory(z, this.temporaryFolder);
        Throwable th = null;
        try {
            StateChangelogWriter createWriter = factory.createWriter(new OperatorID().toString(), of, new SyncMailboxExecutor());
            Throwable th2 = null;
            try {
                SequenceNumber initialSequenceNumber = createWriter.initialSequenceNumber();
                for (Map.Entry<Integer, List<byte[]>> entry : generateAppends.entrySet()) {
                    Integer key = entry.getKey();
                    Iterator<byte[]> it = entry.getValue().iterator();
                    while (it.hasNext()) {
                        createWriter.append(key.intValue(), it.next());
                    }
                    createWriter.nextSequenceNumber();
                }
                assertByteMapsEqual(generateAppends, extract(((SnapshotResult) createWriter.persist(initialSequenceNumber).get()).getJobManagerOwnedSnapshot(), factory.createReader()));
                if (createWriter != null) {
                    if (0 != 0) {
                        try {
                            createWriter.close();
                        } catch (Throwable th3) {
                            th2.addSuppressed(th3);
                        }
                    } else {
                        createWriter.close();
                    }
                }
                if (factory != null) {
                    if (0 == 0) {
                        factory.close();
                        return;
                    }
                    try {
                        factory.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                }
            } catch (Throwable th5) {
                if (createWriter != null) {
                    if (0 != 0) {
                        try {
                            createWriter.close();
                        } catch (Throwable th6) {
                            th2.addSuppressed(th6);
                        }
                    } else {
                        createWriter.close();
                    }
                }
                throw th5;
            }
        } catch (Throwable th7) {
            if (factory != null) {
                if (0 != 0) {
                    try {
                        factory.close();
                    } catch (Throwable th8) {
                        th.addSuppressed(th8);
                    }
                } else {
                    factory.close();
                }
            }
            throw th7;
        }
    }

    private void assertByteMapsEqual(Map<Integer, List<byte[]>> map, Map<Integer, List<byte[]>> map2) {
        Assertions.assertThat(map2).hasSameSizeAs(map);
        for (Map.Entry<Integer, List<byte[]>> entry : map.entrySet()) {
            List<byte[]> value = entry.getValue();
            List<byte[]> list = map2.get(entry.getKey());
            Iterator<byte[]> it = value.iterator();
            Iterator<byte[]> it2 = list.iterator();
            while (it.hasNext() && it2.hasNext()) {
                Assertions.assertThat(it2.next()).isEqualTo(it.next());
            }
            Assertions.assertThat(it.hasNext()).isFalse();
            Assertions.assertThat(it2.hasNext()).isFalse();
        }
    }

    private Map<Integer, List<byte[]>> extract(T t, StateChangelogHandleReader<T> stateChangelogHandleReader) throws Exception {
        HashMap hashMap = new HashMap();
        CloseableIterator changes = stateChangelogHandleReader.getChanges(t);
        Throwable th = null;
        while (changes.hasNext()) {
            try {
                try {
                    StateChange stateChange = (StateChange) changes.next();
                    ((List) hashMap.computeIfAbsent(Integer.valueOf(stateChange.getKeyGroup()), num -> {
                        return new ArrayList();
                    })).add(stateChange.getChange());
                } catch (Throwable th2) {
                    if (changes != null) {
                        if (th != null) {
                            try {
                                changes.close();
                            } catch (Throwable th3) {
                                th.addSuppressed(th3);
                            }
                        } else {
                            changes.close();
                        }
                    }
                    throw th2;
                }
            } finally {
            }
        }
        if (changes != null) {
            if (0 != 0) {
                try {
                    changes.close();
                } catch (Throwable th4) {
                    th.addSuppressed(th4);
                }
            } else {
                changes.close();
            }
        }
        return hashMap;
    }

    private Map<Integer, List<byte[]>> generateAppends(KeyGroupRange keyGroupRange, int i, int i2) {
        return (Map) StreamSupport.stream(keyGroupRange.spliterator(), false).collect(Collectors.toMap(Function.identity(), num -> {
            return generateData(i2, i);
        }));
    }

    private List<byte[]> generateData(int i, int i2) {
        return (List) Stream.generate(() -> {
            return randomBytes(i2);
        }).limit(i).collect(Collectors.toList());
    }

    private byte[] randomBytes(int i) {
        byte[] bArr = new byte[i];
        this.random.nextBytes(bArr);
        return bArr;
    }

    protected StateChangelogStorage<T> getFactory(boolean z, File file) throws IOException {
        return new InMemoryStateChangelogStorage();
    }
}
