package org.apache.apex.malhar.lib.window.impl;

import com.datatorrent.api.Context;
import com.datatorrent.api.DefaultInputPort;
import com.datatorrent.api.DefaultOutputPort;
import com.datatorrent.api.annotation.InputPortFieldAnnotation;
import com.datatorrent.common.util.BaseOperator;
import com.google.common.base.Function;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import javax.validation.ValidationException;
import org.apache.apex.malhar.lib.dimensions.aggregator.AggregatorCumSum;
import org.apache.apex.malhar.lib.window.Accumulation;
import org.apache.apex.malhar.lib.window.ControlTuple;
import org.apache.apex.malhar.lib.window.TriggerOption;
import org.apache.apex.malhar.lib.window.Tuple;
import org.apache.apex.malhar.lib.window.Window;
import org.apache.apex.malhar.lib.window.WindowOption;
import org.apache.apex.malhar.lib.window.WindowState;
import org.apache.apex.malhar.lib.window.WindowedOperator;
import org.apache.apex.malhar.lib.window.WindowedStorage;
import org.apache.hadoop.classification.InterfaceStability;
import org.joda.time.Duration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@InterfaceStability.Evolving
/* loaded from: input_file:org/apache/apex/malhar/lib/window/impl/AbstractWindowedOperator.class */
public abstract class AbstractWindowedOperator<InputT, OutputT, DataStorageT extends WindowedStorage, RetractionStorageT extends WindowedStorage, AccumulationT extends Accumulation> extends BaseOperator implements WindowedOperator<InputT> {
    protected WindowOption windowOption;
    protected TriggerOption triggerOption;
    protected WindowedStorage<WindowState> windowStateMap;
    private Function<InputT, Long> timestampExtractor;
    private boolean triggerAtWatermark;
    private long earlyTriggerCount;
    private long earlyTriggerMillis;
    private long lateTriggerCount;
    private long lateTriggerMillis;
    private long windowWidthMillis;
    protected DataStorageT dataStorage;
    protected RetractionStorageT retractionStorage;
    protected AccumulationT accumulation;
    private static final transient Logger LOG = LoggerFactory.getLogger(AbstractWindowedOperator.class);
    protected long allowedLatenessMillis = -1;
    private long currentWatermark = -1;
    private long watermarkTimestamp = -1;
    private long currentDerivedTimestamp = -1;
    private long fixedWatermarkMillis = -1;
    public final transient DefaultInputPort<Tuple<InputT>> input = new DefaultInputPort<Tuple<InputT>>() { // from class: org.apache.apex.malhar.lib.window.impl.AbstractWindowedOperator.1
        public void process(Tuple<InputT> tuple) {
            AbstractWindowedOperator.this.processTuple(tuple);
        }
    };

    @InputPortFieldAnnotation(optional = true)
    public final transient DefaultInputPort<ControlTuple> controlInput = new DefaultInputPort<ControlTuple>() { // from class: org.apache.apex.malhar.lib.window.impl.AbstractWindowedOperator.2
        public void process(ControlTuple controlTuple) {
            if (controlTuple instanceof ControlTuple.Watermark) {
                AbstractWindowedOperator.this.processWatermark((ControlTuple.Watermark) controlTuple);
            }
        }
    };
    public final transient DefaultOutputPort<Tuple.WindowedTuple<OutputT>> output = new DefaultOutputPort<>();
    public final transient DefaultOutputPort<ControlTuple> controlOutput = new DefaultOutputPort<>();

    /* renamed from: org.apache.apex.malhar.lib.window.impl.AbstractWindowedOperator$3, reason: invalid class name */
    /* loaded from: input_file:org/apache/apex/malhar/lib/window/impl/AbstractWindowedOperator$3.class */
    static /* synthetic */ class AnonymousClass3 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$apex$malhar$lib$window$TriggerOption$Type = new int[TriggerOption.Type.values().length];

        static {
            try {
                $SwitchMap$org$apache$apex$malhar$lib$window$TriggerOption$Type[TriggerOption.Type.ON_TIME.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$apex$malhar$lib$window$TriggerOption$Type[TriggerOption.Type.EARLY.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$org$apache$apex$malhar$lib$window$TriggerOption$Type[TriggerOption.Type.LATE.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
        }
    }

    public void processTuple(Tuple<InputT> tuple) {
        if (isTooLate(extractTimestamp(tuple))) {
            dropTuple(tuple);
            return;
        }
        Tuple.WindowedTuple<InputT> windowedValue = getWindowedValue(tuple);
        accumulateTuple(windowedValue);
        for (Window window : windowedValue.getWindows()) {
            WindowState windowState = this.windowStateMap.get(window);
            windowState.tupleCount++;
            if (windowState.watermarkArrivalTime == -1) {
                if (this.earlyTriggerCount > 0 && windowState.tupleCount % this.earlyTriggerCount == 0) {
                    fireTrigger(window, windowState);
                }
            } else if (this.lateTriggerCount > 0 && windowState.tupleCount % this.lateTriggerCount == 0) {
                fireTrigger(window, windowState);
            }
        }
    }

    @Override // org.apache.apex.malhar.lib.window.WindowedOperator
    public void setWindowOption(WindowOption windowOption) {
        this.windowOption = windowOption;
        if (this.windowOption instanceof WindowOption.GlobalWindow) {
            this.windowStateMap.put(Window.GLOBAL_WINDOW, new WindowState());
        }
    }

    @Override // org.apache.apex.malhar.lib.window.WindowedOperator
    public void setTriggerOption(TriggerOption triggerOption) {
        this.triggerOption = triggerOption;
        for (TriggerOption.Trigger trigger : triggerOption.getTriggerList()) {
            switch (AnonymousClass3.$SwitchMap$org$apache$apex$malhar$lib$window$TriggerOption$Type[trigger.getType().ordinal()]) {
                case 1:
                    this.triggerAtWatermark = true;
                    break;
                case 2:
                    if (trigger instanceof TriggerOption.TimeTrigger) {
                        this.earlyTriggerMillis = ((TriggerOption.TimeTrigger) trigger).getDuration().getMillis();
                        break;
                    } else if (trigger instanceof TriggerOption.CountTrigger) {
                        this.earlyTriggerCount = ((TriggerOption.CountTrigger) trigger).getCount();
                        break;
                    } else {
                        break;
                    }
                case AggregatorCumSum.AGGREGATES_INDEX /* 3 */:
                    if (trigger instanceof TriggerOption.TimeTrigger) {
                        this.lateTriggerMillis = ((TriggerOption.TimeTrigger) trigger).getDuration().getMillis();
                        break;
                    } else if (trigger instanceof TriggerOption.CountTrigger) {
                        this.lateTriggerCount = ((TriggerOption.CountTrigger) trigger).getCount();
                        break;
                    } else {
                        break;
                    }
                default:
                    throw new RuntimeException("Unknown trigger type: " + trigger.getType());
            }
        }
    }

    @Override // org.apache.apex.malhar.lib.window.WindowedOperator
    public void setAllowedLateness(Duration duration) {
        this.allowedLatenessMillis = duration.getMillis();
    }

    public void setDataStorage(DataStorageT datastoraget) {
        this.dataStorage = datastoraget;
    }

    public void setRetractionStorage(RetractionStorageT retractionstoraget) {
        this.retractionStorage = retractionstoraget;
    }

    public void setAccumulation(AccumulationT accumulationt) {
        this.accumulation = accumulationt;
    }

    @Override // org.apache.apex.malhar.lib.window.WindowedOperator
    public void setWindowStateStorage(WindowedStorage<WindowState> windowedStorage) {
        this.windowStateMap = windowedStorage;
    }

    @Override // org.apache.apex.malhar.lib.window.WindowedOperator
    public void setTimestampExtractor(Function<InputT, Long> function) {
        this.timestampExtractor = function;
    }

    public void setFixedWatermark(long j) {
        this.fixedWatermarkMillis = j;
    }

    public void validate() throws ValidationException {
        if (this.accumulation == null) {
            throw new ValidationException("Accumulation must be set");
        }
        if (this.dataStorage == null) {
            throw new ValidationException("Data storage must be set");
        }
        if (this.windowStateMap == null) {
            throw new ValidationException("Window state storage must be set");
        }
        if (this.triggerOption != null) {
            if (this.triggerOption.isFiringOnlyUpdatedPanes()) {
                if (this.retractionStorage == null) {
                    throw new ValidationException("A retraction storage is required for firingOnlyUpdatedPanes option");
                }
                if (this.triggerOption.getAccumulationMode() == TriggerOption.AccumulationMode.DISCARDING) {
                    throw new ValidationException("DISCARDING accumulation mode is not valid for firingOnlyUpdatedPanes option");
                }
            }
            if (this.triggerOption.getAccumulationMode() == TriggerOption.AccumulationMode.ACCUMULATING_AND_RETRACTING && this.retractionStorage == null) {
                throw new ValidationException("A retraction storage is required for ACCUMULATING_AND_RETRACTING accumulation mode");
            }
        }
    }

    @Override // org.apache.apex.malhar.lib.window.WindowedOperator
    public Tuple.WindowedTuple<InputT> getWindowedValue(Tuple<InputT> tuple) {
        if (this.windowOption == null && (tuple instanceof Tuple.WindowedTuple)) {
            return (Tuple.WindowedTuple) tuple;
        }
        Tuple.WindowedTuple<InputT> windowedTuple = new Tuple.WindowedTuple<>();
        windowedTuple.setValue(tuple.getValue());
        windowedTuple.setTimestamp(extractTimestamp(tuple));
        assignWindows(windowedTuple.getWindows(), tuple);
        return windowedTuple;
    }

    private long extractTimestamp(Tuple<InputT> tuple) {
        if (this.timestampExtractor != null) {
            return ((Long) this.timestampExtractor.apply(tuple.getValue())).longValue();
        }
        if (tuple instanceof Tuple.TimestampedTuple) {
            return ((Tuple.TimestampedTuple) tuple).getTimestamp();
        }
        return 0L;
    }

    private void assignWindows(List<Window> list, Tuple<InputT> tuple) {
        if (this.windowOption instanceof WindowOption.GlobalWindow) {
            list.add(Window.GLOBAL_WINDOW);
            return;
        }
        long extractTimestamp = extractTimestamp(tuple);
        if (!(this.windowOption instanceof WindowOption.TimeWindows)) {
            if (this.windowOption instanceof WindowOption.SessionWindows) {
                assignSessionWindows(list, extractTimestamp, tuple);
            }
        } else {
            for (Window.TimeWindow timeWindow : getTimeWindowsForTimestamp(extractTimestamp)) {
                if (!this.windowStateMap.containsWindow(timeWindow)) {
                    this.windowStateMap.put(timeWindow, new WindowState());
                }
                list.add(timeWindow);
            }
        }
    }

    protected void assignSessionWindows(List<Window> list, long j, Tuple<InputT> tuple) {
        throw new UnsupportedOperationException();
    }

    private List<Window.TimeWindow> getTimeWindowsForTimestamp(long j) {
        ArrayList arrayList = new ArrayList();
        if (!(this.windowOption instanceof WindowOption.TimeWindows)) {
            throw new IllegalStateException("Unexpected WindowOption");
        }
        long millis = ((WindowOption.TimeWindows) this.windowOption).getDuration().getMillis();
        long j2 = j - (j % millis);
        arrayList.add(new Window.TimeWindow(j2, millis));
        if (this.windowOption instanceof WindowOption.SlidingTimeWindows) {
            long millis2 = ((WindowOption.SlidingTimeWindows) this.windowOption).getSlideByDuration().getMillis();
            long j3 = j2;
            while (true) {
                long j4 = j3 - millis2;
                if (j4 > j || j >= j4 + millis) {
                    break;
                }
                arrayList.add(new Window.TimeWindow(j4, millis));
                j3 = j4;
            }
            long j5 = j2;
            while (true) {
                long j6 = j5 + millis2;
                if (j6 > j || j >= j6 + millis) {
                    break;
                }
                arrayList.add(new Window.TimeWindow(j6, millis));
                j5 = j6;
            }
        }
        return arrayList;
    }

    @Override // org.apache.apex.malhar.lib.window.WindowedOperator
    public boolean isTooLate(long j) {
        return this.allowedLatenessMillis >= 0 && j < this.currentWatermark - this.allowedLatenessMillis;
    }

    @Override // org.apache.apex.malhar.lib.window.WindowedOperator
    public void dropTuple(Tuple<InputT> tuple) {
        LOG.debug("Dropping late tuple {}", tuple);
    }

    @Override // org.apache.apex.malhar.lib.window.WindowedOperator
    public void processWatermark(ControlTuple.Watermark watermark) {
        this.watermarkTimestamp = watermark.getTimestamp();
    }

    public void setup(Context.OperatorContext operatorContext) {
        this.windowWidthMillis = ((Integer) operatorContext.getValue(Context.DAGContext.STREAMING_WINDOW_SIZE_MILLIS)).intValue();
        validate();
    }

    public void beginWindow(long j) {
        if (this.currentDerivedTimestamp == -1) {
            this.currentDerivedTimestamp = ((j >> 32) * 1000) + (j & 4294967295L);
        } else {
            this.currentDerivedTimestamp += this.windowWidthMillis;
        }
        this.watermarkTimestamp = -1L;
    }

    public void endWindow() {
        processWatermarkAtEndWindow();
        fireTimeTriggers();
    }

    private void processWatermarkAtEndWindow() {
        if (this.fixedWatermarkMillis > 0) {
            this.watermarkTimestamp = this.currentDerivedTimestamp - this.fixedWatermarkMillis;
        }
        if (this.watermarkTimestamp > 0) {
            this.currentWatermark = this.watermarkTimestamp;
            long j = this.watermarkTimestamp - this.allowedLatenessMillis;
            Iterator<WindowState> it = this.windowStateMap.iterator();
            while (it.hasNext()) {
                Map.Entry entry = (Map.Entry) it.next();
                Window window = (Window) entry.getKey();
                WindowState windowState = (WindowState) entry.getValue();
                if (window.getBeginTimestamp() + window.getDurationMillis() < this.watermarkTimestamp) {
                    if (windowState.watermarkArrivalTime == -1) {
                        windowState.watermarkArrivalTime = this.currentDerivedTimestamp;
                        if (this.triggerAtWatermark) {
                            fireTrigger(window, windowState);
                        }
                    }
                    if (this.allowedLatenessMillis >= 0 && window.getBeginTimestamp() + window.getDurationMillis() < j) {
                        it.remove();
                        this.dataStorage.remove(window);
                        this.retractionStorage.remove(window);
                    }
                }
            }
            this.controlOutput.emit(new WatermarkImpl(this.watermarkTimestamp));
        }
    }

    private void fireTimeTriggers() {
        if (this.earlyTriggerMillis > 0 || this.lateTriggerMillis > 0) {
            for (Map.Entry<Window, WindowState> entry : this.windowStateMap.entrySet()) {
                Window key = entry.getKey();
                WindowState value = entry.getValue();
                if (value.watermarkArrivalTime == -1) {
                    if (this.earlyTriggerMillis > 0 && value.lastTriggerFiredTime + this.earlyTriggerMillis <= this.currentDerivedTimestamp) {
                        fireTrigger(key, value);
                    }
                } else if (this.lateTriggerMillis > 0 && value.lastTriggerFiredTime + this.lateTriggerMillis <= this.currentDerivedTimestamp) {
                    fireTrigger(key, value);
                }
            }
        }
    }

    @Override // org.apache.apex.malhar.lib.window.WindowedOperator
    public void fireTrigger(Window window, WindowState windowState) {
        if (this.triggerOption.getAccumulationMode() == TriggerOption.AccumulationMode.ACCUMULATING_AND_RETRACTING) {
            fireRetractionTrigger(window);
        }
        fireNormalTrigger(window, this.triggerOption.isFiringOnlyUpdatedPanes());
        windowState.lastTriggerFiredTime = this.currentDerivedTimestamp;
        if (this.triggerOption.getAccumulationMode() == TriggerOption.AccumulationMode.DISCARDING) {
            clearWindowData(window);
        }
    }

    public abstract void fireNormalTrigger(Window window, boolean z);

    public abstract void fireRetractionTrigger(Window window);

    @Override // org.apache.apex.malhar.lib.window.WindowedOperator
    public void clearWindowData(Window window) {
        this.dataStorage.remove(window);
    }
}
