package org.apache.flink.table.runtime.operators.python.aggregate.arrow.stream;

import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.ListIterator;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.VoidNamespace;
import org.apache.flink.streaming.api.operators.InternalTimer;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.functions.python.PythonFunctionInfo;
import org.apache.flink.table.runtime.functions.CleanupState;
import org.apache.flink.table.runtime.generated.GeneratedProjection;
import org.apache.flink.table.types.logical.RowType;

@Internal
/* loaded from: input_file:org/apache/flink/table/runtime/operators/python/aggregate/arrow/stream/AbstractStreamArrowPythonBoundedRowsOperator.class */
public abstract class AbstractStreamArrowPythonBoundedRowsOperator<K> extends AbstractStreamArrowPythonOverWindowAggregateFunctionOperator<K> implements CleanupState {
    private static final long serialVersionUID = 1;
    private final long minRetentionTime;
    private final long maxRetentionTime;
    private final boolean stateCleaningEnabled;
    transient LinkedList<Long> sortedTimestamps;
    transient LinkedList<RowData> windowData;

    public AbstractStreamArrowPythonBoundedRowsOperator(Configuration configuration, long j, long j2, PythonFunctionInfo[] pythonFunctionInfoArr, RowType rowType, RowType rowType2, RowType rowType3, int i, long j3, GeneratedProjection generatedProjection) {
        super(configuration, pythonFunctionInfoArr, rowType, rowType2, rowType3, i, j3, generatedProjection);
        this.minRetentionTime = j;
        this.maxRetentionTime = j2;
        this.stateCleaningEnabled = j > 1;
    }

    @Override // org.apache.flink.table.runtime.operators.python.aggregate.arrow.stream.AbstractStreamArrowPythonOverWindowAggregateFunctionOperator, org.apache.flink.table.runtime.operators.python.aggregate.arrow.AbstractArrowPythonAggregateFunctionOperator, org.apache.flink.table.runtime.operators.python.AbstractStatelessFunctionOperator, org.apache.flink.streaming.api.operators.python.AbstractExternalPythonFunctionOperator, org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator
    public void open() throws Exception {
        super.open();
        this.sortedTimestamps = new LinkedList<>();
        this.windowData = new LinkedList<>();
    }

    public void onProcessingTime(InternalTimer<K, VoidNamespace> internalTimer) throws Exception {
        if (this.stateCleaningEnabled) {
            Iterator it = this.inputState.keys().iterator();
            Long l = (Long) this.lastTriggeringTsState.value();
            if (l == null) {
                l = 0L;
            }
            boolean z = true;
            while (it.hasNext() && z) {
                if (((Long) it.next()).longValue() > l.longValue()) {
                    z = false;
                }
            }
            if (!z) {
                registerProcessingCleanupTimer(this.timerService.currentProcessingTime());
            } else {
                this.inputState.clear();
                this.cleanupTsState.clear();
            }
        }
    }

    public void onEventTime(InternalTimer<K, VoidNamespace> internalTimer) throws Exception {
        long timestamp = internalTimer.getTimestamp();
        List<RowData> list = (List) this.inputState.get(Long.valueOf(timestamp));
        Iterable<Long> keys = this.inputState.keys();
        long currentWatermark = this.timerService.currentWatermark();
        for (Long l : keys) {
            if (l.longValue() <= currentWatermark) {
                insertToSortedList(l);
            }
        }
        int indexOf = this.sortedTimestamps.indexOf(Long.valueOf(timestamp));
        for (int i = 0; i < list.size(); i++) {
            this.forwardedInputQueue.add(list.get(i));
            triggerWindowProcess(list, i, indexOf);
        }
        this.windowData.clear();
        this.sortedTimestamps.clear();
    }

    @Override // org.apache.flink.streaming.api.operators.python.AbstractExternalPythonFunctionOperator
    public void emitResult(Tuple3<String, byte[], Integer> tuple3) throws Exception {
        this.bais.setBuffer((byte[]) tuple3.f1, 0, ((Integer) tuple3.f2).intValue());
        int load = this.arrowSerializer.load();
        for (int i = 0; i < load; i++) {
            RowData read = this.arrowSerializer.read(i);
            RowData rowData = (RowData) this.forwardedInputQueue.poll();
            this.reuseJoinedRow.setRowKind(rowData.getRowKind());
            this.rowDataWrapper.collect((RowData) this.reuseJoinedRow.replace(rowData, read));
        }
        this.arrowSerializer.resetReader();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void registerProcessingCleanupTimer(long j) throws Exception {
        if (this.stateCleaningEnabled) {
            registerProcessingCleanupTimer(this.cleanupTsState, j, this.minRetentionTime, this.maxRetentionTime, this.timerService);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* JADX WARN: Multi-variable type inference failed */
    public void triggerWindowProcess(List<RowData> list, int i, int i2) throws Exception {
        if (!this.windowData.isEmpty()) {
            if (this.windowData.size() > this.lowerBoundary) {
                this.windowData.pop();
            }
            this.windowData.add(list.get(i));
            this.currentBatchCount += this.windowData.size();
        } else if (i >= this.lowerBoundary) {
            for (int i3 = (int) (i - this.lowerBoundary); i3 <= i; i3++) {
                this.windowData.add(list.get(i3));
            }
            this.currentBatchCount = (int) (this.currentBatchCount + this.lowerBoundary);
        } else {
            for (int i4 = 0; i4 <= i; i4++) {
                this.windowData.add(list.get(i4));
                this.currentBatchCount++;
            }
            long j = this.lowerBoundary - i;
            ListIterator<Long> listIterator = this.sortedTimestamps.listIterator(i2);
            while (j > 0 && listIterator.hasPrevious()) {
                List list2 = (List) this.inputState.get(listIterator.previous());
                ListIterator listIterator2 = list2.listIterator(list2.size());
                while (listIterator2.hasPrevious() && j > 0) {
                    this.windowData.addFirst(listIterator2.previous());
                    j--;
                    this.currentBatchCount++;
                }
            }
            while (listIterator.hasPrevious()) {
                this.inputState.remove(listIterator.previous());
            }
        }
        Iterator<RowData> it = this.windowData.iterator();
        while (it.hasNext()) {
            this.arrowSerializer.write(getFunctionInput(it.next()));
        }
        invokeCurrentBatch();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void insertToSortedList(Long l) {
        ListIterator<Long> listIterator = this.sortedTimestamps.listIterator(0);
        while (listIterator.hasNext()) {
            if (l.longValue() < listIterator.next().longValue()) {
                listIterator.previous();
                listIterator.add(l);
                return;
            }
        }
        this.sortedTimestamps.addLast(l);
    }
}
