package org.apache.flink.streaming.api.operators.collect.utils;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.apache.flink.api.common.accumulators.SerializedListAccumulator;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
import org.apache.flink.streaming.api.operators.collect.CollectCoordinationRequest;
import org.apache.flink.streaming.api.operators.collect.CollectCoordinationResponse;
import org.apache.flink.streaming.api.operators.collect.CollectSinkFunction;
import org.apache.flink.streaming.api.operators.collect.CollectSinkOperatorCoordinator;
import org.apache.flink.streaming.util.MockStreamingRuntimeContext;
import org.junit.Assert;

/* loaded from: input_file:org/apache/flink/streaming/api/operators/collect/utils/CollectSinkFunctionTestWrapper.class */
public class CollectSinkFunctionTestWrapper<IN> {
    public static final String ACCUMULATOR_NAME = "tableCollectAccumulator";
    private static final int SOCKET_TIMEOUT_MILLIS = 1000;
    private static final int FUTURE_TIMEOUT_MILLIS = 10000;
    private static final int MAX_RETIRES = 100;
    private final TypeSerializer<IN> serializer;
    private final int maxBytesPerBatch;
    private final IOManager ioManager = new IOManagerAsync();
    private final StreamingRuntimeContext runtimeContext = new MockStreamingRuntimeContext(false, 1, 0, new MockEnvironmentBuilder().setTaskName("mockTask").setManagedMemorySize(131072).setIOManager(this.ioManager).build());
    private final MockOperatorEventGateway gateway = new MockOperatorEventGateway();
    private final CollectSinkOperatorCoordinator coordinator = new CollectSinkOperatorCoordinator(SOCKET_TIMEOUT_MILLIS);
    private final MockFunctionInitializationContext functionInitializationContext;
    private CollectSinkFunction<IN> function;

    public CollectSinkFunctionTestWrapper(TypeSerializer<IN> typeSerializer, int i) throws Exception {
        this.serializer = typeSerializer;
        this.maxBytesPerBatch = i;
        this.coordinator.start();
        this.functionInitializationContext = new MockFunctionInitializationContext();
    }

    public void closeWrapper() throws Exception {
        this.coordinator.close();
        this.ioManager.close();
    }

    public CollectSinkOperatorCoordinator getCoordinator() {
        return this.coordinator;
    }

    public void openFunction() throws Exception {
        this.function = new CollectSinkFunction<>(this.serializer, this.maxBytesPerBatch, ACCUMULATOR_NAME);
        this.function.setRuntimeContext(this.runtimeContext);
        this.function.setOperatorEventGateway(this.gateway);
        this.function.open(new Configuration());
        this.coordinator.handleEventFromOperator(0, this.gateway.getNextEvent());
    }

    public void openFunctionWithState() throws Exception {
        this.functionInitializationContext.m68getOperatorStateStore().revertToLastSuccessCheckpoint();
        this.function = new CollectSinkFunction<>(this.serializer, this.maxBytesPerBatch, ACCUMULATOR_NAME);
        this.function.setRuntimeContext(this.runtimeContext);
        this.function.setOperatorEventGateway(this.gateway);
        this.function.initializeState(this.functionInitializationContext);
        this.function.open(new Configuration());
        this.coordinator.handleEventFromOperator(0, this.gateway.getNextEvent());
    }

    public void invoke(IN in) throws Exception {
        this.function.invoke(in, (SinkFunction.Context) null);
    }

    public void checkpointFunction(long j) throws Exception {
        this.function.snapshotState(new MockFunctionSnapshotContext(j));
        this.functionInitializationContext.m68getOperatorStateStore().checkpointBegin(j);
    }

    public void checkpointComplete(long j) {
        this.function.notifyCheckpointComplete(j);
        this.functionInitializationContext.m68getOperatorStateStore().checkpointSuccess(j);
    }

    public void closeFunctionNormally() throws Exception {
        this.function.accumulateFinalResults();
        this.function.close();
    }

    public void closeFunctionAbnormally() throws Exception {
        this.function.close();
        this.coordinator.subtaskFailed(0, (Throwable) null);
    }

    public CollectCoordinationResponse sendRequestAndGetResponse(String str, long j) throws Exception {
        for (int i = 0; i < MAX_RETIRES; i++) {
            CollectCoordinationResponse sendRequest = sendRequest(str, j);
            if (sendRequest.getLastCheckpointedOffset() >= 0) {
                return sendRequest;
            }
        }
        throw new RuntimeException("Too many retries in sendRequestAndGetValidResponse");
    }

    private CollectCoordinationResponse sendRequest(String str, long j) throws Exception {
        return (CollectCoordinationResponse) this.coordinator.handleCoordinationRequest(new CollectCoordinationRequest(str, j)).get(10000L, TimeUnit.MILLISECONDS);
    }

    public Tuple2<Long, CollectCoordinationResponse> getAccumulatorResults() throws Exception {
        List deserializeList = SerializedListAccumulator.deserializeList(getAccumulatorLocalValue(), BytePrimitiveArraySerializer.INSTANCE);
        Assert.assertEquals(1L, deserializeList.size());
        return CollectSinkFunction.deserializeAccumulatorResult((byte[]) deserializeList.get(0));
    }

    public ArrayList<byte[]> getAccumulatorLocalValue() {
        return this.runtimeContext.getAccumulator(ACCUMULATOR_NAME).getLocalValue();
    }
}
