package com.datatorrent.lib.io.fs;

import com.datatorrent.api.Context;
import com.datatorrent.api.DefaultInputPort;
import com.datatorrent.api.Operator;
import com.datatorrent.common.util.BaseOperator;
import com.datatorrent.common.util.NameableThreadFactory;
import com.datatorrent.netlet.util.DTThrowable;
import com.google.common.collect.Maps;
import com.google.common.collect.Queues;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicReference;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/datatorrent/lib/io/fs/AbstractReconciler.class */
public abstract class AbstractReconciler<INPUT, QUEUETUPLE> extends BaseOperator implements Operator.CheckpointListener, Operator.IdleTimeHandler {
    private static final Logger logger = LoggerFactory.getLogger(AbstractReconciler.class);
    protected transient ExecutorService executorService;
    protected long currentWindowId;
    private volatile transient boolean execute;
    private transient AtomicReference<Throwable> cause;
    public transient DefaultInputPort<INPUT> input = new DefaultInputPort<INPUT>() { // from class: com.datatorrent.lib.io.fs.AbstractReconciler.1
        public void process(INPUT input) {
            AbstractReconciler.this.processTuple(input);
        }
    };
    protected transient int spinningTime = 10;
    private Map<Long, List<QUEUETUPLE>> currentWindowTuples = Maps.newConcurrentMap();
    private Queue<Long> currentWindows = Queues.newLinkedBlockingQueue();
    private Queue<QUEUETUPLE> committedTuples = Queues.newLinkedBlockingQueue();
    private transient Queue<QUEUETUPLE> doneTuples = Queues.newLinkedBlockingQueue();
    private transient Queue<QUEUETUPLE> waitingTuples = Queues.newLinkedBlockingQueue();

    public void setup(Context.OperatorContext operatorContext) {
        if (operatorContext != null) {
            this.spinningTime = ((Integer) operatorContext.getValue(Context.OperatorContext.SPIN_MILLIS)).intValue();
        }
        this.execute = true;
        this.cause = new AtomicReference<>();
        this.waitingTuples.addAll(this.committedTuples);
        this.executorService = Executors.newSingleThreadExecutor(new NameableThreadFactory("Reconciler-Helper"));
        this.executorService.submit(processEnqueuedData());
    }

    public void beginWindow(long j) {
        this.currentWindowId = j;
        this.currentWindowTuples.put(Long.valueOf(this.currentWindowId), new ArrayList());
        this.currentWindows.add(Long.valueOf(j));
    }

    public void endWindow() {
        while (this.doneTuples.peek() != null) {
            this.committedTuples.remove(this.doneTuples.poll());
        }
    }

    public void handleIdleTime() {
        if (!this.execute) {
            logger.error("Exception: ", this.cause);
            DTThrowable.rethrow(this.cause.get());
        } else {
            try {
                Thread.sleep(this.spinningTime);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        }
    }

    public void checkpointed(long j) {
    }

    public void committed(long j) {
        logger.debug(" current committed window {}", Long.valueOf(j));
        if (this.currentWindows.isEmpty()) {
            return;
        }
        long longValue = this.currentWindows.peek().longValue();
        while (true) {
            long j2 = longValue;
            if (j2 > j) {
                return;
            }
            List<QUEUETUPLE> list = this.currentWindowTuples.get(Long.valueOf(j2));
            if (list != null && !list.isEmpty()) {
                this.committedTuples.addAll(list);
                this.waitingTuples.addAll(list);
            }
            this.currentWindows.remove();
            this.currentWindowTuples.remove(Long.valueOf(j2));
            if (this.currentWindows.isEmpty()) {
                return;
            } else {
                longValue = this.currentWindows.peek().longValue();
            }
        }
    }

    public void teardown() {
        this.execute = false;
        this.executorService.shutdownNow();
    }

    private Runnable processEnqueuedData() {
        return new Runnable() { // from class: com.datatorrent.lib.io.fs.AbstractReconciler.2
            /* JADX WARN: Multi-variable type inference failed */
            @Override // java.lang.Runnable
            public void run() {
                while (AbstractReconciler.this.execute) {
                    try {
                        while (AbstractReconciler.this.waitingTuples.isEmpty()) {
                            Thread.sleep(AbstractReconciler.this.spinningTime);
                        }
                        Object remove = AbstractReconciler.this.waitingTuples.remove();
                        AbstractReconciler.this.processCommittedData(remove);
                        AbstractReconciler.this.doneTuples.add(remove);
                    } catch (Throwable th) {
                        AbstractReconciler.this.cause.set(th);
                        AbstractReconciler.this.execute = false;
                        return;
                    }
                }
            }
        };
    }

    protected void enqueueForProcessing(QUEUETUPLE queuetuple) {
        this.currentWindowTuples.get(Long.valueOf(this.currentWindowId)).add(queuetuple);
    }

    protected abstract void processTuple(INPUT input);

    protected abstract void processCommittedData(QUEUETUPLE queuetuple);
}
