package org.apache.apex.malhar.lib.dedup;

import com.datatorrent.api.AutoMetric;
import com.datatorrent.api.Context;
import com.datatorrent.api.DAG;
import com.datatorrent.api.DefaultInputPort;
import com.datatorrent.api.DefaultOutputPort;
import com.datatorrent.api.Operator;
import com.datatorrent.api.annotation.OperatorAnnotation;
import com.datatorrent.lib.fileaccess.FileAccessFSImpl;
import com.datatorrent.netlet.util.Slice;
import com.google.common.base.Preconditions;
import com.google.common.collect.Maps;
import com.google.common.util.concurrent.Futures;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import javax.validation.constraints.NotNull;
import org.apache.apex.malhar.lib.dimensions.aggregator.AggregatorCumSum;
import org.apache.apex.malhar.lib.state.BucketedState;
import org.apache.apex.malhar.lib.state.managed.AbstractManagedStateImpl;
import org.apache.hadoop.classification.InterfaceStability;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@OperatorAnnotation(checkpointableWithinAppWindow = false)
@InterfaceStability.Evolving
/* loaded from: input_file:org/apache/apex/malhar/lib/dedup/AbstractDeduper.class */
public abstract class AbstractDeduper<T> implements Operator, Operator.IdleTimeHandler, Operator.ActivationListener<Context>, Operator.CheckpointNotificationListener {
    private static final String BUCKET_DIR = "bucket_data";

    @NotNull
    protected AbstractManagedStateImpl managedState;
    private transient Map<T, Decision> decisions;

    @AutoMetric
    private transient long uniqueEvents;

    @AutoMetric
    private transient long duplicateEvents;

    @AutoMetric
    private transient long expiredEvents;
    private static final Logger logger = LoggerFactory.getLogger(AbstractDeduper.class);
    public final transient DefaultInputPort<T> input = new DefaultInputPort<T>() { // from class: org.apache.apex.malhar.lib.dedup.AbstractDeduper.1
        public final void process(T t) {
            AbstractDeduper.this.processTuple(t);
        }
    };
    public final transient DefaultOutputPort<T> unique = new DefaultOutputPort<>();
    public final transient DefaultOutputPort<T> duplicate = new DefaultOutputPort<>();
    public final transient DefaultOutputPort<T> expired = new DefaultOutputPort<>();
    private boolean preserveTupleOrder = true;
    private transient Map<T, Future<Slice>> waitingEvents = Maps.newLinkedHashMap();
    private transient Map<Slice, Long> asyncEvents = Maps.newLinkedHashMap();

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: org.apache.apex.malhar.lib.dedup.AbstractDeduper$2, reason: invalid class name */
    /* loaded from: input_file:org/apache/apex/malhar/lib/dedup/AbstractDeduper$2.class */
    public static /* synthetic */ class AnonymousClass2 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$apex$malhar$lib$dedup$AbstractDeduper$Decision = new int[Decision.values().length];

        static {
            try {
                $SwitchMap$org$apache$apex$malhar$lib$dedup$AbstractDeduper$Decision[Decision.UNIQUE.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$apex$malhar$lib$dedup$AbstractDeduper$Decision[Decision.DUPLICATE.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$org$apache$apex$malhar$lib$dedup$AbstractDeduper$Decision[Decision.EXPIRED.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:org/apache/apex/malhar/lib/dedup/AbstractDeduper$Decision.class */
    public enum Decision {
        UNIQUE,
        DUPLICATE,
        EXPIRED,
        UNKNOWN
    }

    @Override // 
    public void setup(Context.OperatorContext operatorContext) {
        ((FileAccessFSImpl) this.managedState.getFileAccess()).setBasePath(((String) operatorContext.getValue(DAG.APPLICATION_PATH)) + "/" + BUCKET_DIR);
        this.managedState.setup(operatorContext);
        if (this.preserveTupleOrder) {
            this.decisions = Maps.newLinkedHashMap();
        }
    }

    public void beginWindow(long j) {
        this.uniqueEvents = 0L;
        this.duplicateEvents = 0L;
        this.expiredEvents = 0L;
        this.managedState.beginWindow(j);
    }

    protected abstract Slice getKey(T t);

    protected abstract long getTime(T t);

    /* JADX INFO: Access modifiers changed from: protected */
    public void processTuple(T t) {
        Future<Slice> asyncManagedState = getAsyncManagedState(t);
        if (!asyncManagedState.isDone()) {
            processWaitingEvent(t, asyncManagedState);
            return;
        }
        try {
            processEvent(t, asyncManagedState.get());
        } catch (InterruptedException | ExecutionException e) {
            throw new RuntimeException(e);
        }
    }

    protected void processEvent(T t, Slice slice) {
        if (slice == BucketedState.EXPIRED) {
            processInvalid(t);
        } else {
            processValid(t, slice);
        }
    }

    protected void processWaitingEvent(T t, Future<Slice> future) {
        this.waitingEvents.put(t, future);
        if (this.preserveTupleOrder) {
            recordDecision(t, Decision.UNKNOWN);
        }
    }

    protected void processValid(T t, Slice slice) {
        if (this.preserveTupleOrder && !this.waitingEvents.isEmpty()) {
            processWaitingEvent(t, Futures.immediateFuture(slice));
        } else if (slice != null) {
            processDuplicate(t);
        } else {
            putManagedState(t);
            processUnique(t);
        }
    }

    protected void processInvalid(T t) {
        if (!this.preserveTupleOrder || this.decisions.isEmpty()) {
            processExpired(t);
        } else {
            recordDecision(t, Decision.EXPIRED);
        }
    }

    protected void processExpired(T t) {
        this.expiredEvents++;
        emitExpired(t);
    }

    protected void processDuplicate(T t) {
        if (this.preserveTupleOrder && !this.decisions.isEmpty()) {
            recordDecision(t, Decision.DUPLICATE);
        } else {
            this.duplicateEvents++;
            emitDuplicate(t);
        }
    }

    protected void processUnique(T t) {
        if (this.preserveTupleOrder && !this.decisions.isEmpty()) {
            recordDecision(t, Decision.UNIQUE);
        } else {
            this.uniqueEvents++;
            emitUnique(t);
        }
    }

    public void handleIdleTime() {
        if (this.preserveTupleOrder) {
            emitProcessedTuples();
        }
        processAuxiliary(false);
    }

    protected void processAuxiliary(boolean z) {
        if (this.waitingEvents.size() > 0) {
            Iterator<Map.Entry<T, Future<Slice>>> it = this.waitingEvents.entrySet().iterator();
            while (it.hasNext()) {
                Map.Entry<T, Future<Slice>> next = it.next();
                T key = next.getKey();
                Slice key2 = getKey(key);
                long time = getTime(key);
                Future<Slice> value = next.getValue();
                if (value.isDone() || z) {
                    try {
                        if (value.get() == null && this.asyncEvents.get(key2) == null) {
                            putManagedState(key);
                            this.asyncEvents.put(key2, Long.valueOf(time));
                            processUnique(key);
                        } else {
                            processDuplicate(key);
                        }
                        it.remove();
                    } catch (InterruptedException | ExecutionException e) {
                        throw new RuntimeException("handle idle time", e);
                    }
                }
                if (!z) {
                    return;
                }
            }
        }
    }

    public void endWindow() {
        processAuxiliary(true);
        if (this.preserveTupleOrder) {
            emitProcessedTuples();
        }
        Preconditions.checkArgument(this.waitingEvents.isEmpty());
        this.asyncEvents.clear();
        this.managedState.endWindow();
    }

    protected abstract Future<Slice> getAsyncManagedState(T t);

    protected abstract void putManagedState(T t);

    protected void recordDecision(T t, Decision decision) {
        this.decisions.put(t, decision);
    }

    protected void emitProcessedTuples() {
        Iterator<Map.Entry<T, Decision>> it = this.decisions.entrySet().iterator();
        while (it.hasNext()) {
            Map.Entry<T, Decision> next = it.next();
            switch (AnonymousClass2.$SwitchMap$org$apache$apex$malhar$lib$dedup$AbstractDeduper$Decision[next.getValue().ordinal()]) {
                case 1:
                    this.uniqueEvents++;
                    emitUnique(next.getKey());
                    it.remove();
                    break;
                case 2:
                    this.duplicateEvents++;
                    emitDuplicate(next.getKey());
                    it.remove();
                    break;
                case AggregatorCumSum.AGGREGATES_INDEX /* 3 */:
                    this.expiredEvents++;
                    emitExpired(next.getKey());
                    it.remove();
                    break;
            }
        }
    }

    public void teardown() {
        this.managedState.teardown();
    }

    public void beforeCheckpoint(long j) {
        this.managedState.beforeCheckpoint(j);
    }

    public void checkpointed(long j) {
        this.managedState.checkpointed(j);
    }

    public void committed(long j) {
        this.managedState.committed(j);
    }

    protected void emitUnique(T t) {
        this.unique.emit(t);
    }

    protected void emitDuplicate(T t) {
        this.duplicate.emit(t);
    }

    protected void emitExpired(T t) {
        this.expired.emit(t);
    }

    public boolean isOrderedOutput() {
        return this.preserveTupleOrder;
    }

    public void setPreserveTupleOrder(boolean z) {
        this.preserveTupleOrder = z;
    }
}
