package org.apache.flink.runtime.source.coordinator;

import java.time.Duration;
import java.util.concurrent.CompletableFuture;
import org.apache.flink.api.common.eventtime.WatermarkAlignmentParams;
import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.api.connector.source.mocks.MockSource;
import org.apache.flink.api.connector.source.mocks.MockSourceSplit;
import org.apache.flink.core.testutils.CommonTestUtils;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.operators.coordination.MockOperatorCoordinatorContext;
import org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator;
import org.apache.flink.runtime.source.event.ReaderRegistrationEvent;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/runtime/source/coordinator/SourceCoordinatorProviderTest.class */
public class SourceCoordinatorProviderTest {
    private static final OperatorID OPERATOR_ID = new OperatorID(1234, 5678);
    private static final int NUM_SPLITS = 10;
    private SourceCoordinatorProvider<MockSourceSplit> provider;

    @Before
    public void setup() {
        this.provider = new SourceCoordinatorProvider<>("SourceCoordinatorProviderTest", OPERATOR_ID, new MockSource(Boundedness.BOUNDED, 10), 1, WatermarkAlignmentParams.WATERMARK_ALIGNMENT_DISABLED, (String) null);
    }

    @Test
    public void testCreate() throws Exception {
        Assert.assertTrue(this.provider.create(new MockOperatorCoordinatorContext(OPERATOR_ID, 10)) instanceof RecreateOnResetOperatorCoordinator);
    }

    @Test
    public void testCheckpointAndReset() throws Exception {
        RecreateOnResetOperatorCoordinator create = this.provider.create(new MockOperatorCoordinatorContext(OPERATOR_ID, 10));
        SourceCoordinator internalCoordinator = create.getInternalCoordinator();
        create.start();
        create.handleEventFromOperator(0, 0, new ReaderRegistrationEvent(0, "location"));
        CompletableFuture completableFuture = new CompletableFuture();
        create.checkpointCoordinator(0L, completableFuture);
        byte[] bArr = (byte[]) completableFuture.get();
        create.handleEventFromOperator(1, 0, new ReaderRegistrationEvent(1, "location"));
        while (internalCoordinator.getContext().registeredReaders().size() < 2) {
            Thread.sleep(1L);
        }
        create.resetToCheckpoint(0L, bArr);
        Assert.assertNotEquals("The restored source coordinator should be a different instance", create.getInternalCoordinator(), internalCoordinator);
        Assert.assertEquals("There should be no registered reader.", 0L, r0.getContext().registeredReaders().size());
    }

    @Test
    public void testCallAsyncExceptionFailsJob() throws Exception {
        MockOperatorCoordinatorContext mockOperatorCoordinatorContext = new MockOperatorCoordinatorContext(OPERATOR_ID, 10);
        this.provider.create(mockOperatorCoordinatorContext).getInternalCoordinator().getContext().callAsync(() -> {
            return null;
        }, (obj, th) -> {
            throw new RuntimeException();
        });
        mockOperatorCoordinatorContext.getClass();
        CommonTestUtils.waitUtil(mockOperatorCoordinatorContext::isJobFailed, Duration.ofSeconds(10L), "The job did not fail before timeout.");
    }
}
