package org.apache.flink.runtime.state.ttl;

import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.apache.flink.api.common.state.AggregatingStateDescriptor;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.ReducingStateDescriptor;
import org.apache.flink.api.common.state.State;
import org.apache.flink.api.common.state.StateDescriptor;
import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeutils.CompositeSerializer;
import org.apache.flink.api.common.typeutils.CompositeTypeSerializerSnapshot;
import org.apache.flink.api.common.typeutils.CompositeTypeSerializerUtil;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility;
import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
import org.apache.flink.api.common.typeutils.base.ListSerializer;
import org.apache.flink.api.common.typeutils.base.LongSerializer;
import org.apache.flink.api.common.typeutils.base.MapSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.state.KeyedStateBackend;
import org.apache.flink.runtime.state.StateSnapshotTransformer;
import org.apache.flink.runtime.state.internal.InternalKvState;
import org.apache.flink.runtime.state.ttl.TtlStateSnapshotTransformer;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.function.SupplierWithException;

/* loaded from: input_file:org/apache/flink/runtime/state/ttl/TtlStateFactory.class */
public class TtlStateFactory<K, N, SV, TTLSV, S extends State, IS extends S> {

    @Nonnull
    private final TypeSerializer<N> namespaceSerializer;

    @Nonnull
    private final StateDescriptor<S, SV> stateDesc;

    @Nonnull
    private final KeyedStateBackend<K> stateBackend;

    @Nonnull
    private final StateTtlConfig ttlConfig;

    @Nonnull
    private final TtlTimeProvider timeProvider;
    private final long ttl;
    private final Map<StateDescriptor.Type, SupplierWithException<IS, Exception>> stateFactories = createStateFactories();

    @Nullable
    private final TtlIncrementalCleanup<K, N, TTLSV> incrementalCleanup = getTtlIncrementalCleanup();

    /* loaded from: input_file:org/apache/flink/runtime/state/ttl/TtlStateFactory$TtlSerializer.class */
    public static class TtlSerializer<T> extends CompositeSerializer<TtlValue<T>> implements TypeSerializerConfigSnapshot.SelfResolvingTypeSerializer<TtlValue<T>> {
        private static final long serialVersionUID = 131020282727167064L;

        public TtlSerializer(TypeSerializer<Long> typeSerializer, TypeSerializer<T> typeSerializer2) {
            super(true, new TypeSerializer[]{typeSerializer, typeSerializer2});
            Preconditions.checkArgument(!(typeSerializer2 instanceof TtlSerializer));
        }

        public TtlSerializer(CompositeSerializer.PrecomputedParameters precomputedParameters, TypeSerializer<?>... typeSerializerArr) {
            super(precomputedParameters, typeSerializerArr);
        }

        /* renamed from: createInstance, reason: merged with bridge method [inline-methods] */
        public TtlValue<T> m713createInstance(@Nonnull Object... objArr) {
            Preconditions.checkArgument(objArr.length == 2);
            return new TtlValue<>(objArr[1], ((Long) objArr[0]).longValue());
        }

        /* JADX INFO: Access modifiers changed from: protected */
        public void setField(@Nonnull TtlValue<T> ttlValue, int i, Object obj) {
            throw new UnsupportedOperationException("TtlValue is immutable");
        }

        /* JADX INFO: Access modifiers changed from: protected */
        public Object getField(@Nonnull TtlValue<T> ttlValue, int i) {
            return i == 0 ? Long.valueOf(ttlValue.getLastAccessTimestamp()) : ttlValue.getUserValue();
        }

        protected CompositeSerializer<TtlValue<T>> createSerializerInstance(CompositeSerializer.PrecomputedParameters precomputedParameters, TypeSerializer<?>... typeSerializerArr) {
            Preconditions.checkNotNull(typeSerializerArr);
            Preconditions.checkArgument(typeSerializerArr.length == 2);
            return new TtlSerializer(precomputedParameters, typeSerializerArr);
        }

        TypeSerializer<Long> getTimestampSerializer() {
            return this.fieldSerializers[0];
        }

        TypeSerializer<T> getValueSerializer() {
            return this.fieldSerializers[1];
        }

        public TypeSerializerSnapshot<TtlValue<T>> snapshotConfiguration() {
            return new TtlSerializerSnapshot(this);
        }

        public TypeSerializerSchemaCompatibility<TtlValue<T>> resolveSchemaCompatibilityViaRedirectingToNewSnapshotClass(TypeSerializerConfigSnapshot<TtlValue<T>> typeSerializerConfigSnapshot) {
            return typeSerializerConfigSnapshot instanceof CompositeSerializer.ConfigSnapshot ? CompositeTypeSerializerUtil.delegateCompatibilityCheckToNewSnapshot(this, new TtlSerializerSnapshot(), ((CompositeSerializer.ConfigSnapshot) typeSerializerConfigSnapshot).getNestedSerializerSnapshots()) : TypeSerializerSchemaCompatibility.incompatible();
        }

        public static boolean isTtlStateSerializer(TypeSerializer<?> typeSerializer) {
            return (typeSerializer instanceof TtlSerializer) || ((typeSerializer instanceof ListSerializer) && (((ListSerializer) typeSerializer).getElementSerializer() instanceof TtlSerializer)) || ((typeSerializer instanceof MapSerializer) && (((MapSerializer) typeSerializer).getValueSerializer() instanceof TtlSerializer));
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/state/ttl/TtlStateFactory$TtlSerializerSnapshot.class */
    public static final class TtlSerializerSnapshot<T> extends CompositeTypeSerializerSnapshot<TtlValue<T>, TtlSerializer<T>> {
        private static final int VERSION = 2;

        public TtlSerializerSnapshot() {
            super(TtlSerializer.class);
        }

        TtlSerializerSnapshot(TtlSerializer<T> ttlSerializer) {
            super(ttlSerializer);
        }

        protected int getCurrentOuterSnapshotVersion() {
            return 2;
        }

        /* JADX INFO: Access modifiers changed from: protected */
        /* JADX WARN: Multi-variable type inference failed */
        public TypeSerializer<?>[] getNestedSerializers(TtlSerializer<T> ttlSerializer) {
            return new TypeSerializer[]{ttlSerializer.getTimestampSerializer(), ttlSerializer.getValueSerializer()};
        }

        protected TtlSerializer<T> createOuterSerializerWithNestedSerializers(TypeSerializer<?>[] typeSerializerArr) {
            return new TtlSerializer<>((TypeSerializer<Long>) typeSerializerArr[0], (TypeSerializer) typeSerializerArr[1]);
        }

        /* renamed from: createOuterSerializerWithNestedSerializers, reason: collision with other method in class */
        protected /* bridge */ /* synthetic */ TypeSerializer m714createOuterSerializerWithNestedSerializers(TypeSerializer[] typeSerializerArr) {
            return createOuterSerializerWithNestedSerializers((TypeSerializer<?>[]) typeSerializerArr);
        }
    }

    /* JADX WARN: Incorrect return type in method signature: <K:Ljava/lang/Object;N:Ljava/lang/Object;SV:Ljava/lang/Object;TTLSV:Ljava/lang/Object;S::Lorg/apache/flink/api/common/state/State;IS:TS;>(Lorg/apache/flink/api/common/typeutils/TypeSerializer<TN;>;Lorg/apache/flink/api/common/state/StateDescriptor<TS;TSV;>;Lorg/apache/flink/runtime/state/KeyedStateBackend<TK;>;Lorg/apache/flink/runtime/state/ttl/TtlTimeProvider;)TIS; */
    public static State createStateAndWrapWithTtlIfEnabled(TypeSerializer typeSerializer, StateDescriptor stateDescriptor, KeyedStateBackend keyedStateBackend, TtlTimeProvider ttlTimeProvider) throws Exception {
        Preconditions.checkNotNull(typeSerializer);
        Preconditions.checkNotNull(stateDescriptor);
        Preconditions.checkNotNull(keyedStateBackend);
        Preconditions.checkNotNull(ttlTimeProvider);
        return stateDescriptor.getTtlConfig().isEnabled() ? new TtlStateFactory(typeSerializer, stateDescriptor, keyedStateBackend, ttlTimeProvider).createState() : keyedStateBackend.createOrUpdateInternalState(typeSerializer, stateDescriptor);
    }

    private TtlStateFactory(@Nonnull TypeSerializer<N> typeSerializer, @Nonnull StateDescriptor<S, SV> stateDescriptor, @Nonnull KeyedStateBackend<K> keyedStateBackend, @Nonnull TtlTimeProvider ttlTimeProvider) {
        this.namespaceSerializer = typeSerializer;
        this.stateDesc = stateDescriptor;
        this.stateBackend = keyedStateBackend;
        this.ttlConfig = stateDescriptor.getTtlConfig();
        this.timeProvider = ttlTimeProvider;
        this.ttl = this.ttlConfig.getTtl().toMilliseconds();
    }

    private Map<StateDescriptor.Type, SupplierWithException<IS, Exception>> createStateFactories() {
        return (Map) Stream.of((Object[]) new Tuple2[]{Tuple2.of(StateDescriptor.Type.VALUE, this::createValueState), Tuple2.of(StateDescriptor.Type.LIST, this::createListState), Tuple2.of(StateDescriptor.Type.MAP, this::createMapState), Tuple2.of(StateDescriptor.Type.REDUCING, this::createReducingState), Tuple2.of(StateDescriptor.Type.AGGREGATING, this::createAggregatingState)}).collect(Collectors.toMap(tuple2 -> {
            return (StateDescriptor.Type) tuple2.f0;
        }, tuple22 -> {
            return (SupplierWithException) tuple22.f1;
        }));
    }

    /* JADX WARN: Incorrect return type in method signature: ()TIS; */
    private State createState() throws Exception {
        SupplierWithException<IS, Exception> supplierWithException = this.stateFactories.get(this.stateDesc.getType());
        if (supplierWithException == null) {
            throw new FlinkRuntimeException(String.format("State type: %s is not supported by %s", this.stateDesc.getType(), TtlStateFactory.class));
        }
        State state = (State) supplierWithException.get();
        if (this.incrementalCleanup != null) {
            this.incrementalCleanup.setTtlState((AbstractTtlState) state);
        }
        return state;
    }

    /* JADX WARN: Incorrect return type in method signature: ()TIS; */
    private State createValueState() throws Exception {
        return new TtlValueState(createTtlStateContext(this.stateDesc.getSerializer() instanceof TtlSerializer ? (ValueStateDescriptor) this.stateDesc : new ValueStateDescriptor(this.stateDesc.getName(), new TtlSerializer((TypeSerializer<Long>) LongSerializer.INSTANCE, this.stateDesc.getSerializer()))));
    }

    /* JADX WARN: Incorrect return type in method signature: <T:Ljava/lang/Object;>()TIS; */
    private State createListState() throws Exception {
        ListStateDescriptor listStateDescriptor = this.stateDesc;
        return new TtlListState(createTtlStateContext(listStateDescriptor.getElementSerializer() instanceof TtlSerializer ? (ListStateDescriptor) this.stateDesc : new ListStateDescriptor(this.stateDesc.getName(), new TtlSerializer((TypeSerializer<Long>) LongSerializer.INSTANCE, listStateDescriptor.getElementSerializer()))));
    }

    /* JADX WARN: Incorrect return type in method signature: <UK:Ljava/lang/Object;UV:Ljava/lang/Object;>()TIS; */
    private State createMapState() throws Exception {
        MapStateDescriptor mapStateDescriptor = this.stateDesc;
        return new TtlMapState(createTtlStateContext(mapStateDescriptor.getValueSerializer() instanceof TtlSerializer ? (MapStateDescriptor) this.stateDesc : new MapStateDescriptor(this.stateDesc.getName(), mapStateDescriptor.getKeySerializer(), new TtlSerializer((TypeSerializer<Long>) LongSerializer.INSTANCE, mapStateDescriptor.getValueSerializer()))));
    }

    /* JADX WARN: Incorrect return type in method signature: ()TIS; */
    private State createReducingState() throws Exception {
        return new TtlReducingState(createTtlStateContext(new ReducingStateDescriptor(this.stateDesc.getName(), new TtlReduceFunction(this.stateDesc.getReduceFunction(), this.ttlConfig, this.timeProvider), this.stateDesc.getSerializer() instanceof TtlSerializer ? this.stateDesc.getSerializer() : new TtlSerializer((TypeSerializer<Long>) LongSerializer.INSTANCE, this.stateDesc.getSerializer()))));
    }

    /* JADX WARN: Incorrect return type in method signature: <IN:Ljava/lang/Object;OUT:Ljava/lang/Object;>()TIS; */
    private State createAggregatingState() throws Exception {
        TtlAggregateFunction ttlAggregateFunction = new TtlAggregateFunction(this.stateDesc.getAggregateFunction(), this.ttlConfig, this.timeProvider);
        return new TtlAggregatingState(createTtlStateContext(new AggregatingStateDescriptor(this.stateDesc.getName(), ttlAggregateFunction, this.stateDesc.getSerializer() instanceof TtlSerializer ? this.stateDesc.getSerializer() : new TtlSerializer((TypeSerializer<Long>) LongSerializer.INSTANCE, this.stateDesc.getSerializer()))), ttlAggregateFunction);
    }

    private <OIS extends State, TTLS extends State, V, TTLV> TtlStateContext<OIS, V> createTtlStateContext(StateDescriptor<TTLS, TTLV> stateDescriptor) throws Exception {
        stateDescriptor.enableTimeToLive(this.stateDesc.getTtlConfig());
        State createOrUpdateInternalState = this.stateBackend.createOrUpdateInternalState(this.namespaceSerializer, stateDescriptor, getSnapshotTransformFactory());
        return new TtlStateContext<>(createOrUpdateInternalState, this.ttlConfig, this.timeProvider, this.stateDesc.getSerializer(), registerTtlIncrementalCleanupCallback((InternalKvState) createOrUpdateInternalState));
    }

    private TtlIncrementalCleanup<K, N, TTLSV> getTtlIncrementalCleanup() {
        StateTtlConfig.IncrementalCleanupStrategy incrementalCleanupStrategy = this.ttlConfig.getCleanupStrategies().getIncrementalCleanupStrategy();
        if (incrementalCleanupStrategy != null) {
            return new TtlIncrementalCleanup<>(incrementalCleanupStrategy.getCleanupSize());
        }
        return null;
    }

    private Runnable registerTtlIncrementalCleanupCallback(InternalKvState<?, ?, ?> internalKvState) {
        Runnable runnable;
        StateTtlConfig.IncrementalCleanupStrategy incrementalCleanupStrategy = this.ttlConfig.getCleanupStrategies().getIncrementalCleanupStrategy();
        boolean z = (incrementalCleanupStrategy != null && this.incrementalCleanup != null) && isStateIteratorSupported(internalKvState, this.incrementalCleanup.getCleanupSize());
        if (z) {
            TtlIncrementalCleanup<K, N, TTLSV> ttlIncrementalCleanup = this.incrementalCleanup;
            ttlIncrementalCleanup.getClass();
            runnable = ttlIncrementalCleanup::stateAccessed;
        } else {
            runnable = () -> {
            };
        }
        Runnable runnable2 = runnable;
        if (z && incrementalCleanupStrategy.runCleanupForEveryRecord()) {
            this.stateBackend.registerKeySelectionListener(obj -> {
                runnable2.run();
            });
        }
        return runnable2;
    }

    private boolean isStateIteratorSupported(InternalKvState<?, ?, ?> internalKvState, int i) {
        boolean z = false;
        try {
            z = internalKvState.getStateIncrementalVisitor(i) != null;
        } catch (Throwable th) {
        }
        return z;
    }

    private StateSnapshotTransformer.StateSnapshotTransformFactory<?> getSnapshotTransformFactory() {
        return !this.ttlConfig.getCleanupStrategies().inFullSnapshot() ? StateSnapshotTransformer.StateSnapshotTransformFactory.noTransform() : new TtlStateSnapshotTransformer.Factory(this.timeProvider, this.ttl);
    }
}
