package org.apache.flink.runtime.state;

import java.io.IOException;
import java.util.Collections;
import java.util.Optional;
import java.util.UUID;
import junit.framework.TestCase;
import org.apache.flink.api.common.JobID;
import org.apache.flink.core.execution.SavepointFormatType;
import org.apache.flink.core.fs.FSDataInputStream;
import org.apache.flink.runtime.checkpoint.CheckpointProperties;
import org.apache.flink.runtime.checkpoint.CheckpointRetentionPolicy;
import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
import org.apache.flink.runtime.checkpoint.CompletedCheckpointStats;
import org.apache.flink.runtime.checkpoint.OperatorState;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.jobgraph.RestoreMode;
import org.apache.flink.runtime.state.ChangelogTestUtils;
import org.apache.flink.runtime.state.changelog.ChangelogStateBackendHandle;
import org.apache.flink.runtime.state.memory.ByteStreamStateHandle;
import org.apache.flink.runtime.state.testutils.TestCompletedCheckpointStorageLocation;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/runtime/state/SharedStateRegistryTest.class */
public class SharedStateRegistryTest {
    private static final String RESTORED_STATE_ID = "restored-state";

    /* loaded from: input_file:org/apache/flink/runtime/state/SharedStateRegistryTest$TestSharedState.class */
    private static class TestSharedState implements TestStreamStateHandle {
        private static final long serialVersionUID = 4468635881465159780L;
        private SharedStateRegistryKey key;
        private boolean discarded = false;

        TestSharedState(String str) {
            this.key = new SharedStateRegistryKey(str);
        }

        public SharedStateRegistryKey getRegistrationKey() {
            return this.key;
        }

        public void discardState() throws Exception {
            this.discarded = true;
        }

        public long getStateSize() {
            return this.key.toString().length();
        }

        public int hashCode() {
            return this.key.hashCode();
        }

        public FSDataInputStream openInputStream() throws IOException {
            throw new UnsupportedOperationException();
        }

        public Optional<byte[]> asBytesIfInMemory() {
            return Optional.empty();
        }

        public boolean isDiscarded() {
            return this.discarded;
        }
    }

    @Test
    public void testRegistryNormal() {
        SharedStateRegistryImpl sharedStateRegistryImpl = new SharedStateRegistryImpl();
        TestSharedState testSharedState = new TestSharedState("first");
        Assert.assertTrue(testSharedState == sharedStateRegistryImpl.registerReference(testSharedState.getRegistrationKey(), testSharedState, 0L));
        TestCase.assertFalse(testSharedState.isDiscarded());
        TestSharedState testSharedState2 = new TestSharedState("second");
        Assert.assertTrue(testSharedState2 == sharedStateRegistryImpl.registerReference(testSharedState2.getRegistrationKey(), testSharedState2, 0L));
        TestCase.assertFalse(testSharedState.isDiscarded());
        TestCase.assertFalse(testSharedState2.isDiscarded());
        TestSharedState testSharedState3 = new TestSharedState(testSharedState.getRegistrationKey().getKeyString());
        StreamStateHandle registerReference = sharedStateRegistryImpl.registerReference(testSharedState.getRegistrationKey(), testSharedState3, 0L);
        Assert.assertTrue(testSharedState3 == registerReference);
        TestCase.assertFalse(testSharedState3.isDiscarded());
        TestCase.assertFalse(testSharedState == registerReference);
        Assert.assertTrue(testSharedState.isDiscarded());
        sharedStateRegistryImpl.checkpointCompleted(0L);
        TestSharedState testSharedState4 = new TestSharedState(testSharedState.getRegistrationKey().getKeyString());
        StreamStateHandle registerReference2 = sharedStateRegistryImpl.registerReference(testSharedState.getRegistrationKey(), testSharedState4, 0L);
        TestCase.assertFalse(testSharedState4 == registerReference2);
        Assert.assertTrue(testSharedState4.isDiscarded());
        Assert.assertTrue(testSharedState3 == registerReference2);
        TestCase.assertFalse(testSharedState3.isDiscarded());
        Assert.assertTrue(testSharedState3 == sharedStateRegistryImpl.registerReference(testSharedState.getRegistrationKey(), testSharedState, 0L));
        TestCase.assertFalse(testSharedState3.isDiscarded());
        sharedStateRegistryImpl.unregisterUnusedState(1L);
        Assert.assertTrue(testSharedState2.isDiscarded());
        Assert.assertTrue(testSharedState.isDiscarded());
    }

    @Test
    public void testUnregisterWithUnexistedKey() {
        SharedStateRegistryImpl sharedStateRegistryImpl = new SharedStateRegistryImpl();
        sharedStateRegistryImpl.unregisterUnusedState(-1L);
        sharedStateRegistryImpl.unregisterUnusedState(Long.MAX_VALUE);
    }

    @Test
    public void testRegisterChangelogStateBackendHandles() throws InterruptedException {
        SharedStateRegistryImpl sharedStateRegistryImpl = new SharedStateRegistryImpl();
        ChangelogTestUtils.IncrementalStateHandleWrapper createDummyIncrementalStateHandle = ChangelogTestUtils.createDummyIncrementalStateHandle(1L);
        ChangelogTestUtils.IncrementalStateHandleWrapper deserialize = createDummyIncrementalStateHandle.deserialize();
        ChangelogTestUtils.ChangelogStateHandleWrapper createDummyChangelogStateHandle = ChangelogTestUtils.createDummyChangelogStateHandle(1L, 2L);
        new ChangelogStateBackendHandle.ChangelogStateBackendHandleImpl(Collections.singletonList(deserialize), Collections.singletonList(createDummyChangelogStateHandle), createDummyIncrementalStateHandle.getKeyGroupRange(), 41L, 1L, createDummyChangelogStateHandle.getStateSize()).registerSharedStates(sharedStateRegistryImpl, 41L);
        sharedStateRegistryImpl.checkpointCompleted(41L);
        sharedStateRegistryImpl.unregisterUnusedState(41L);
        ChangelogTestUtils.IncrementalStateHandleWrapper deserialize2 = createDummyIncrementalStateHandle.deserialize();
        ChangelogTestUtils.ChangelogStateHandleWrapper createDummyChangelogStateHandle2 = ChangelogTestUtils.createDummyChangelogStateHandle(2L, 3L);
        new ChangelogStateBackendHandle.ChangelogStateBackendHandleImpl(Collections.singletonList(deserialize2), Collections.singletonList(createDummyChangelogStateHandle2), createDummyIncrementalStateHandle.getKeyGroupRange(), 42L, 1L, createDummyChangelogStateHandle2.getStateSize()).registerSharedStates(sharedStateRegistryImpl, 42L);
        sharedStateRegistryImpl.checkpointCompleted(42L);
        sharedStateRegistryImpl.unregisterUnusedState(42L);
        TestCase.assertFalse(deserialize.isDiscarded());
        TestCase.assertFalse(deserialize2.isDiscarded());
        Assert.assertTrue(createDummyChangelogStateHandle.isDiscarded());
        ChangelogTestUtils.IncrementalStateHandleWrapper deserialize3 = ChangelogTestUtils.createDummyIncrementalStateHandle(2L).deserialize();
        new ChangelogStateBackendHandle.ChangelogStateBackendHandleImpl(Collections.singletonList(deserialize3), Collections.singletonList(createDummyChangelogStateHandle2), deserialize3.getKeyGroupRange(), 43L, 2L, 0L).registerSharedStates(sharedStateRegistryImpl, 43L);
        sharedStateRegistryImpl.checkpointCompleted(43L);
        sharedStateRegistryImpl.unregisterUnusedState(43L);
        Assert.assertTrue(deserialize.isDiscarded());
        TestCase.assertFalse(createDummyChangelogStateHandle2.isDiscarded());
    }

    @Test
    public void testUnregisterUnusedSavepointState() {
        SharedStateRegistryImpl sharedStateRegistryImpl = new SharedStateRegistryImpl();
        TestingStreamStateHandle testingStreamStateHandle = new TestingStreamStateHandle();
        registerInitialCheckpoint(sharedStateRegistryImpl, RESTORED_STATE_ID, CheckpointProperties.forSavepoint(false, SavepointFormatType.NATIVE));
        sharedStateRegistryImpl.registerReference(new SharedStateRegistryKey(RESTORED_STATE_ID), testingStreamStateHandle, 2L);
        sharedStateRegistryImpl.registerReference(new SharedStateRegistryKey(RESTORED_STATE_ID), testingStreamStateHandle, 3L);
        sharedStateRegistryImpl.registerReference(new SharedStateRegistryKey("new-state"), new TestingStreamStateHandle(), 4L);
        Assert.assertEquals("Only the initial checkpoint should be retained because its state is in use", Collections.singleton(1L), sharedStateRegistryImpl.unregisterUnusedState(3L));
        Assert.assertTrue("The initial checkpoint state is unused so it could be discarded", sharedStateRegistryImpl.unregisterUnusedState(4L).isEmpty());
    }

    @Test
    public void testUnregisterNonInitialCheckpoint() {
        SharedStateRegistryImpl sharedStateRegistryImpl = new SharedStateRegistryImpl();
        sharedStateRegistryImpl.registerReference(new SharedStateRegistryKey("stateId"), new TestingStreamStateHandle(), 1L);
        sharedStateRegistryImpl.registerReference(new SharedStateRegistryKey("stateId"), new TestingStreamStateHandle(), 2L);
        Assert.assertTrue("First (non-initial) checkpoint could be discarded", sharedStateRegistryImpl.unregisterUnusedState(2L).isEmpty());
    }

    @Test
    public void testUnregisterInitialCheckpoint() {
        SharedStateRegistryImpl sharedStateRegistryImpl = new SharedStateRegistryImpl();
        TestingStreamStateHandle testingStreamStateHandle = new TestingStreamStateHandle();
        registerInitialCheckpoint(sharedStateRegistryImpl, RESTORED_STATE_ID, CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.RETAIN_ON_CANCELLATION));
        sharedStateRegistryImpl.registerReference(new SharedStateRegistryKey(RESTORED_STATE_ID), testingStreamStateHandle, 2L);
        Assert.assertTrue("(retained) checkpoint - should NOT be considered in use even if its state is in use", sharedStateRegistryImpl.unregisterUnusedState(2L).isEmpty());
    }

    @Test
    public void testUnregisterInitialCheckpointUsedInChangelog() {
        SharedStateRegistryImpl sharedStateRegistryImpl = new SharedStateRegistryImpl();
        TestingStreamStateHandle testingStreamStateHandle = new TestingStreamStateHandle();
        registerInitialCheckpoint(sharedStateRegistryImpl, RESTORED_STATE_ID, CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.RETAIN_ON_CANCELLATION));
        sharedStateRegistryImpl.registerReference(new SharedStateRegistryKey(RESTORED_STATE_ID), testingStreamStateHandle, 2L, true);
        sharedStateRegistryImpl.registerReference(new SharedStateRegistryKey(RESTORED_STATE_ID), testingStreamStateHandle, 3L, false);
        Assert.assertEquals("(retained) checkpoint - should be considered in use as long as its state is in use by changelog", Collections.singleton(1L), sharedStateRegistryImpl.unregisterUnusedState(3L));
    }

    private void registerInitialCheckpoint(SharedStateRegistry sharedStateRegistry, String str, CheckpointProperties checkpointProperties) {
        IncrementalRemoteKeyedStateHandle restore = IncrementalRemoteKeyedStateHandle.restore(UUID.randomUUID(), KeyGroupRange.EMPTY_KEY_GROUP_RANGE, 1L, Collections.emptyMap(), Collections.emptyMap(), new ByteStreamStateHandle("meta", new byte[1]), 1024L, new StateHandleID(str));
        OperatorID operatorID = new OperatorID();
        OperatorState operatorState = new OperatorState(operatorID, 1, 1);
        operatorState.putState(0, OperatorSubtaskState.builder().setManagedKeyedState(restore).build());
        sharedStateRegistry.registerAllAfterRestored(new CompletedCheckpoint(new JobID(), 1L, 1L, 1L, Collections.singletonMap(operatorID, operatorState), Collections.emptyList(), CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION), new TestCompletedCheckpointStorageLocation(), (CompletedCheckpointStats) null, checkpointProperties), RestoreMode.DEFAULT);
    }
}
