package org.apache.flink.runtime.source.coordinator;

import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.function.Supplier;
import org.apache.flink.api.common.eventtime.WatermarkAlignmentParams;
import org.apache.flink.api.connector.source.Source;
import org.apache.flink.api.connector.source.SourceSplit;
import org.apache.flink.api.connector.source.mocks.MockSourceSplit;
import org.apache.flink.api.connector.source.mocks.MockSourceSplitSerializer;
import org.apache.flink.api.connector.source.mocks.MockSplitEnumeratorCheckpointSerializer;
import org.apache.flink.core.fs.AutoCloseableRegistry;
import org.apache.flink.core.testutils.CommonTestUtils;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.operators.coordination.CoordinatorStoreImpl;
import org.apache.flink.runtime.operators.coordination.EventReceivingTasks;
import org.apache.flink.runtime.operators.coordination.MockOperatorCoordinatorContext;
import org.apache.flink.runtime.operators.coordination.OperatorEvent;
import org.apache.flink.runtime.source.coordinator.SourceCoordinatorProvider;
import org.apache.flink.runtime.source.event.AddSplitEvent;
import org.apache.flink.runtime.source.event.ReaderRegistrationEvent;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.concurrent.ExecutorThreadFactory;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;

/* loaded from: input_file:org/apache/flink/runtime/source/coordinator/SourceCoordinatorTestBase.class */
abstract class SourceCoordinatorTestBase {
    protected static final String OPERATOR_NAME = "TestOperator";
    protected static final OperatorID TEST_OPERATOR_ID = new OperatorID(1234, 5678);
    protected static final int NUM_SUBTASKS = 3;
    protected boolean supportsConcurrentExecutionAttempts = false;
    private AutoCloseableRegistry closeableRegistry;
    protected EventReceivingTasks receivingTasks;
    protected MockOperatorCoordinatorContext operatorCoordinatorContext;
    protected String coordinatorThreadName;
    protected SplitAssignmentTracker<MockSourceSplit> splitSplitAssignmentTracker;
    protected SourceCoordinatorContext<MockSourceSplit> context;
    protected SourceCoordinator<MockSourceSplit, Set<MockSourceSplit>> sourceCoordinator;
    private TestingSplitEnumerator<MockSourceSplit> enumerator;

    /* JADX INFO: Access modifiers changed from: package-private */
    @BeforeEach
    public void setup() throws Exception {
        this.closeableRegistry = new AutoCloseableRegistry();
        this.receivingTasks = EventReceivingTasks.createForRunningTasks();
        this.operatorCoordinatorContext = new MockOperatorCoordinatorContext(TEST_OPERATOR_ID, 3);
        this.splitSplitAssignmentTracker = new SplitAssignmentTracker<>();
        this.coordinatorThreadName = TEST_OPERATOR_ID.toHexString();
        this.sourceCoordinator = getNewSourceCoordinator();
        this.context = this.sourceCoordinator.getContext();
    }

    @AfterEach
    void cleanUp() throws Exception {
        this.closeableRegistry.close();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public TestingSplitEnumerator<MockSourceSplit> getEnumerator() {
        if (this.enumerator == null) {
            this.enumerator = (TestingSplitEnumerator) this.sourceCoordinator.getEnumerator();
            Assertions.assertThat(this.enumerator).as("source was not started", new Object[0]).isNotNull();
        }
        return this.enumerator;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void sourceReady() throws Exception {
        this.sourceCoordinator.start();
        setAllReaderTasksReady(this.sourceCoordinator);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void setAllReaderTasksReady(SourceCoordinator<?, ?> sourceCoordinator) {
        for (int i = 0; i < 3; i++) {
            setReaderTaskReady(sourceCoordinator, i, 0);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void setReaderTaskReady(SourceCoordinator<?, ?> sourceCoordinator, int i, int i2) {
        sourceCoordinator.executionAttemptReady(i, i2, this.receivingTasks.createGatewayForSubtask(i, i2));
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void addTestingSplitSet(int i) {
        ArrayList arrayList = new ArrayList();
        for (int i2 = 0; i2 < i; i2++) {
            arrayList.add(new MockSourceSplit(i2));
        }
        getEnumerator().addNewSplits(arrayList);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void registerReader(int i) {
        registerReader(i, 0);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void registerReader(int i, int i2) {
        this.sourceCoordinator.handleEventFromOperator(i, i2, new ReaderRegistrationEvent(i, createLocationFor(i, i2)));
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static String createLocationFor(int i, int i2) {
        return String.format("location_%d_%d", Integer.valueOf(i), Integer.valueOf(i2));
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void waitForCoordinatorToProcessActions() {
        waitForCoordinatorToProcessActions(this.context);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void waitForCoordinatorToProcessActions(SourceCoordinatorContext<?> sourceCoordinatorContext) {
        CompletableFuture completableFuture = new CompletableFuture();
        sourceCoordinatorContext.runInCoordinatorThread(() -> {
            completableFuture.complete(null);
        });
        try {
            completableFuture.get();
        } catch (InterruptedException e) {
            throw new AssertionError("test interrupted");
        } catch (ExecutionException e2) {
            ExceptionUtils.rethrow(ExceptionUtils.stripExecutionException(e2));
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void waitForSentEvents(int i) throws Exception {
        waitUtilNumberReached(() -> {
            return Integer.valueOf(this.receivingTasks.getNumberOfSentEvents());
        }, i);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static void waitUtilNumberReached(Supplier<Integer> supplier, int i) throws Exception {
        CommonTestUtils.waitUtil(() -> {
            return Boolean.valueOf(((Integer) supplier.get()).intValue() == i);
        }, Duration.ofDays(1L), "Not reach expected number within timeout.");
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static <SplitT extends SourceSplit> void assertAddSplitEvent(OperatorEvent operatorEvent, List<SplitT> list) throws Exception {
        Assertions.assertThat(operatorEvent).isInstanceOf(AddSplitEvent.class);
        Assertions.assertThat(((AddSplitEvent) operatorEvent).splits(new MockSourceSplitSerializer())).isEqualTo(list);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public SourceCoordinator<MockSourceSplit, Set<MockSourceSplit>> getNewSourceCoordinator() throws Exception {
        return getNewSourceCoordinator(WatermarkAlignmentParams.WATERMARK_ALIGNMENT_DISABLED);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public SourceCoordinator<MockSourceSplit, Set<MockSourceSplit>> getNewSourceCoordinator(WatermarkAlignmentParams watermarkAlignmentParams) throws Exception {
        return new SourceCoordinator<>(OPERATOR_NAME, createMockSource(), getNewSourceCoordinatorContext(), new CoordinatorStoreImpl(), watermarkAlignmentParams, (String) null);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Source<Integer, MockSourceSplit, Set<MockSourceSplit>> createMockSource() {
        return TestingSplitEnumerator.factorySource(new MockSourceSplitSerializer(), new MockSplitEnumeratorCheckpointSerializer());
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public SourceCoordinatorContext<MockSourceSplit> getNewSourceCoordinatorContext() throws Exception {
        SourceCoordinatorProvider.CoordinatorExecutorThreadFactory coordinatorExecutorThreadFactory = new SourceCoordinatorProvider.CoordinatorExecutorThreadFactory(this.coordinatorThreadName, this.operatorCoordinatorContext);
        SourceCoordinatorContext<MockSourceSplit> sourceCoordinatorContext = new SourceCoordinatorContext<>(Executors.newScheduledThreadPool(1, coordinatorExecutorThreadFactory), Executors.newScheduledThreadPool(1, new ExecutorThreadFactory(this.coordinatorThreadName + "-worker")), coordinatorExecutorThreadFactory, this.operatorCoordinatorContext, new MockSourceSplitSerializer(), this.splitSplitAssignmentTracker, this.supportsConcurrentExecutionAttempts);
        this.closeableRegistry.registerCloseable(sourceCoordinatorContext);
        return sourceCoordinatorContext;
    }
}
