package org.apache.flink.table.runtime.operators.python.aggregate;

import java.util.ArrayList;
import org.apache.beam.sdk.transforms.reflect.ByteBuddyDoFnInvokerFactory;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.fnexecution.v1.FlinkFnApi;
import org.apache.flink.runtime.state.VoidNamespace;
import org.apache.flink.runtime.state.VoidNamespaceSerializer;
import org.apache.flink.streaming.api.SimpleTimerService;
import org.apache.flink.streaming.api.TimerService;
import org.apache.flink.streaming.api.operators.InternalTimer;
import org.apache.flink.streaming.api.operators.Triggerable;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.UpdatableRowData;
import org.apache.flink.table.functions.python.PythonAggregateFunctionInfo;
import org.apache.flink.table.runtime.dataview.DataViewSpec;
import org.apache.flink.table.runtime.functions.CleanupState;
import org.apache.flink.table.types.logical.BigIntType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.logical.TinyIntType;

@Internal
/* loaded from: input_file:org/apache/flink/table/runtime/operators/python/aggregate/AbstractPythonStreamGroupAggregateOperator.class */
public abstract class AbstractPythonStreamGroupAggregateOperator extends AbstractPythonStreamAggregateOperator implements Triggerable<RowData, VoidNamespace>, CleanupState {
    private static final long serialVersionUID = 1;
    private final long minRetentionTime;
    private final long maxRetentionTime;
    private final boolean stateCleaningEnabled;
    private transient TimerService timerService;
    private transient ValueState<Long> cleanupTimeState;
    private transient UpdatableRowData reuseRowData;
    private transient UpdatableRowData reuseTimerRowData;

    public AbstractPythonStreamGroupAggregateOperator(Configuration configuration, RowType rowType, RowType rowType2, PythonAggregateFunctionInfo[] pythonAggregateFunctionInfoArr, DataViewSpec[][] dataViewSpecArr, int[] iArr, int i, boolean z, long j, long j2) {
        super(configuration, rowType, rowType2, pythonAggregateFunctionInfoArr, dataViewSpecArr, iArr, i, z);
        this.minRetentionTime = j;
        this.maxRetentionTime = j2;
        this.stateCleaningEnabled = j > 1;
    }

    @Override // org.apache.flink.table.runtime.operators.python.aggregate.AbstractPythonStreamAggregateOperator, org.apache.flink.streaming.api.operators.python.AbstractExternalPythonFunctionOperator, org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator
    public void open() throws Exception {
        this.reuseRowData = new UpdatableRowData(GenericRowData.of(new Object[]{(byte) 0, null, null, null}), 4);
        this.reuseTimerRowData = new UpdatableRowData(GenericRowData.of(new Object[]{(byte) 1, null, null, null}), 4);
        this.timerService = new SimpleTimerService(getInternalTimerService("state-clean-timer", VoidNamespaceSerializer.INSTANCE, this));
        initCleanupTimeState();
        super.open();
    }

    public void onEventTime(InternalTimer<RowData, VoidNamespace> internalTimer) {
    }

    public void onProcessingTime(InternalTimer<RowData, VoidNamespace> internalTimer) throws Exception {
        if (this.stateCleaningEnabled) {
            RowData rowData = (RowData) internalTimer.getKey();
            this.reuseTimerRowData.setLong(2, internalTimer.getTimestamp());
            this.reuseTimerRowData.setField(3, rowData);
            this.udfInputTypeSerializer.serialize(this.reuseTimerRowData, this.baosWrapper);
            this.pythonFunctionRunner.process(this.baos.toByteArray());
            this.baos.reset();
            this.elementCount++;
        }
    }

    @Override // org.apache.flink.table.runtime.operators.python.aggregate.AbstractPythonStreamAggregateOperator
    public void processElementInternal(RowData rowData) throws Exception {
        registerProcessingCleanupTimer(this.timerService.currentProcessingTime());
        this.reuseRowData.setField(1, rowData);
        this.udfInputTypeSerializer.serialize(this.reuseRowData, this.baosWrapper);
        this.pythonFunctionRunner.process(this.baos.toByteArray());
        this.baos.reset();
    }

    @Override // org.apache.flink.streaming.api.operators.python.AbstractExternalPythonFunctionOperator
    public void emitResult(Tuple3<String, byte[], Integer> tuple3) throws Exception {
        this.bais.setBuffer((byte[]) tuple3.f1, 0, ((Integer) tuple3.f2).intValue());
        this.rowDataWrapper.collect((RowData) this.udfOutputTypeSerializer.deserialize(this.baisWrapper));
    }

    @Override // org.apache.flink.table.runtime.operators.python.aggregate.AbstractPythonStreamAggregateOperator
    public RowType createUserDefinedFunctionInputType() {
        ArrayList arrayList = new ArrayList();
        arrayList.add(new RowType.RowField("record_type", new TinyIntType()));
        arrayList.add(new RowType.RowField("row", this.inputType));
        arrayList.add(new RowType.RowField(ByteBuddyDoFnInvokerFactory.TIMESTAMP_PARAMETER_METHOD, new BigIntType()));
        arrayList.add(new RowType.RowField("key", getKeyType()));
        return new RowType(arrayList);
    }

    @Override // org.apache.flink.table.runtime.operators.python.aggregate.AbstractPythonStreamAggregateOperator
    public RowType createUserDefinedFunctionOutputType() {
        return this.outputType;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.flink.table.runtime.operators.python.aggregate.AbstractPythonStreamAggregateOperator
    public FlinkFnApi.UserDefinedAggregateFunctions getUserDefinedFunctionsProto() {
        FlinkFnApi.UserDefinedAggregateFunctions.Builder builder = super.getUserDefinedFunctionsProto().toBuilder();
        builder.setStateCleaningEnabled(this.stateCleaningEnabled);
        return builder.build();
    }

    private void initCleanupTimeState() {
        if (this.stateCleaningEnabled) {
            this.cleanupTimeState = getRuntimeContext().getState(new ValueStateDescriptor("PythonAggregateCleanupTime", Types.LONG));
        }
    }

    private void registerProcessingCleanupTimer(long j) throws Exception {
        if (this.stateCleaningEnabled) {
            synchronized (getKeyedStateBackend()) {
                getKeyedStateBackend().setCurrentKey(getCurrentKey());
                registerProcessingCleanupTimer(this.cleanupTimeState, j, this.minRetentionTime, this.maxRetentionTime, this.timerService);
            }
        }
    }
}
