package com.datatorrent.lib.appdata.query;

import com.datatorrent.api.Component;
import com.datatorrent.api.Context;
import com.datatorrent.common.util.NameableThreadFactory;
import com.datatorrent.netlet.util.DTThrowable;
import com.google.common.base.Preconditions;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/datatorrent/lib/appdata/query/WindowBoundedService.class */
public class WindowBoundedService implements Component<Context.OperatorContext> {
    public static final long DEFAULT_FLUSH_INTERVAL_MILLIS = 10;
    private final long executeIntervalMillis;
    private final Runnable runnable;
    protected transient ExecutorService executorThread;
    private final transient Semaphore mutex;
    private volatile boolean terminated;
    private static final Logger LOG = LoggerFactory.getLogger(WindowBoundedService.class);

    /* loaded from: input_file:com/datatorrent/lib/appdata/query/WindowBoundedService$AsynchExecutorThread.class */
    public class AsynchExecutorThread implements Callable<Void> {
        private long lastExecuteTime = 0;

        public AsynchExecutorThread() {
        }

        @Deprecated
        public AsynchExecutorThread(Thread thread) {
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // java.util.concurrent.Callable
        public Void call() throws Exception {
            try {
                loop();
                return null;
            } catch (Exception e) {
                WindowBoundedService.LOG.error("Exception thrown while processing:", e);
                WindowBoundedService.this.mutex.release();
                return null;
            }
        }

        private void loop() throws Exception {
            while (true) {
                long currentTimeMillis = System.currentTimeMillis();
                long j = currentTimeMillis - this.lastExecuteTime;
                if (j > WindowBoundedService.this.executeIntervalMillis) {
                    this.lastExecuteTime = currentTimeMillis;
                    WindowBoundedService.this.mutex.acquireUninterruptibly();
                    if (WindowBoundedService.this.terminated) {
                        WindowBoundedService.LOG.info("Terminated");
                        return;
                    } else {
                        WindowBoundedService.this.runnable.run();
                        WindowBoundedService.this.mutex.release();
                    }
                } else {
                    if (WindowBoundedService.this.terminated) {
                        WindowBoundedService.LOG.info("Terminated");
                        return;
                    }
                    Thread.sleep(WindowBoundedService.this.executeIntervalMillis - j);
                }
            }
        }
    }

    public WindowBoundedService(Runnable runnable) {
        this.mutex = new Semaphore(0);
        this.terminated = false;
        this.executeIntervalMillis = 10L;
        this.runnable = (Runnable) Preconditions.checkNotNull(runnable);
    }

    public WindowBoundedService(long j, Runnable runnable) {
        this.mutex = new Semaphore(0);
        this.terminated = false;
        Preconditions.checkArgument(j > 0, "The executeIntervalMillis must be positive");
        this.executeIntervalMillis = j;
        this.runnable = (Runnable) Preconditions.checkNotNull(runnable);
    }

    public void setup(Context.OperatorContext operatorContext) {
        this.executorThread = Executors.newSingleThreadExecutor(new NameableThreadFactory("Query Executor Thread"));
        this.executorThread.submit(new AsynchExecutorThread());
    }

    public void beginWindow(long j) {
        this.mutex.release();
    }

    public void endWindow() {
        try {
            this.mutex.acquire();
        } catch (InterruptedException e) {
            DTThrowable.wrapIfChecked(e);
        }
    }

    public void teardown() {
        LOG.info("Shutting down");
        this.terminated = true;
        this.mutex.release();
        this.executorThread.shutdown();
        try {
            this.executorThread.awaitTermination(10000 + this.executeIntervalMillis, TimeUnit.MILLISECONDS);
        } catch (InterruptedException e) {
        }
    }
}
