package org.apache.flink.table.runtime.operators.python.aggregate.arrow.stream;

import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.VoidNamespace;
import org.apache.flink.streaming.api.TimeDomain;
import org.apache.flink.streaming.api.operators.InternalTimer;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.functions.python.PythonFunctionInfo;
import org.apache.flink.table.runtime.generated.GeneratedProjection;
import org.apache.flink.table.types.logical.RowType;

@Internal
/* loaded from: input_file:org/apache/flink/table/runtime/operators/python/aggregate/arrow/stream/AbstractStreamArrowPythonBoundedRangeOperator.class */
public abstract class AbstractStreamArrowPythonBoundedRangeOperator<K> extends AbstractStreamArrowPythonOverWindowAggregateFunctionOperator<K> {
    private static final long serialVersionUID = 1;
    private transient LinkedList<List<RowData>> inputData;

    public AbstractStreamArrowPythonBoundedRangeOperator(Configuration configuration, PythonFunctionInfo[] pythonFunctionInfoArr, RowType rowType, RowType rowType2, RowType rowType3, int i, long j, GeneratedProjection generatedProjection) {
        super(configuration, pythonFunctionInfoArr, rowType, rowType2, rowType3, i, j, generatedProjection);
    }

    @Override // org.apache.flink.table.runtime.operators.python.aggregate.arrow.stream.AbstractStreamArrowPythonOverWindowAggregateFunctionOperator, org.apache.flink.table.runtime.operators.python.aggregate.arrow.AbstractArrowPythonAggregateFunctionOperator, org.apache.flink.table.runtime.operators.python.AbstractStatelessFunctionOperator, org.apache.flink.streaming.api.operators.python.AbstractExternalPythonFunctionOperator, org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator
    public void open() throws Exception {
        super.open();
        this.inputData = new LinkedList<>();
    }

    public void onEventTime(InternalTimer<K, VoidNamespace> internalTimer) throws Exception {
        long timestamp = internalTimer.getTimestamp();
        Long l = (Long) this.cleanupTsState.value();
        if (l == null || l.longValue() > timestamp) {
            triggerWindowProcess(timestamp, (List) this.inputState.get(Long.valueOf(timestamp)));
            this.lastTriggeringTsState.update(Long.valueOf(timestamp));
        } else {
            this.inputState.clear();
            this.lastTriggeringTsState.clear();
            this.cleanupTsState.clear();
        }
    }

    public void onProcessingTime(InternalTimer<K, VoidNamespace> internalTimer) throws Exception {
        long timestamp = internalTimer.getTimestamp();
        Long l = (Long) this.cleanupTsState.value();
        if (l == null || l.longValue() > timestamp) {
            triggerWindowProcess(timestamp, (List) this.inputState.get(Long.valueOf(timestamp - 1)));
        } else {
            this.inputState.clear();
            this.cleanupTsState.clear();
        }
    }

    @Override // org.apache.flink.streaming.api.operators.python.AbstractExternalPythonFunctionOperator
    public void emitResult(Tuple3<String, byte[], Integer> tuple3) throws Exception {
        this.bais.setBuffer((byte[]) tuple3.f1, 0, ((Integer) tuple3.f2).intValue());
        int load = this.arrowSerializer.load();
        for (int i = 0; i < load; i++) {
            RowData read = this.arrowSerializer.read(i);
            for (RowData rowData : this.inputData.poll()) {
                this.reuseJoinedRow.setRowKind(rowData.getRowKind());
                this.rowDataWrapper.collect((RowData) this.reuseJoinedRow.replace(rowData, read));
            }
        }
        this.arrowSerializer.resetReader();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void registerCleanupTimer(long j, TimeDomain timeDomain) throws Exception {
        long j2 = j + this.lowerBoundary + 1;
        long j3 = j + ((long) (this.lowerBoundary * 1.5d)) + 1;
        Long l = (Long) this.cleanupTsState.value();
        if (l == null || l.longValue() < j2) {
            if (timeDomain == TimeDomain.EVENT_TIME) {
                this.timerService.registerEventTimeTimer(j3);
            } else {
                this.timerService.registerProcessingTimeTimer(j3);
            }
            this.cleanupTsState.update(Long.valueOf(j3));
        }
    }

    private void triggerWindowProcess(long j, List<RowData> list) throws Exception {
        long j2 = j - this.lowerBoundary;
        if (list != null) {
            Iterator it = this.inputState.iterator();
            while (it.hasNext()) {
                Map.Entry entry = (Map.Entry) it.next();
                long longValue = ((Long) entry.getKey()).longValue();
                if (longValue < j2) {
                    it.remove();
                } else if (longValue <= j) {
                    Iterator it2 = ((List) entry.getValue()).iterator();
                    while (it2.hasNext()) {
                        this.arrowSerializer.write(getFunctionInput((RowData) it2.next()));
                        this.currentBatchCount++;
                    }
                }
            }
            this.inputData.add(list);
            invokeCurrentBatch();
        }
    }
}
