package com.datatorrent.lib.appdata.query;

import com.datatorrent.api.Component;
import com.datatorrent.api.Context;
import com.datatorrent.api.DefaultOutputPort;
import com.datatorrent.api.Operator;
import com.datatorrent.common.util.NameableThreadFactory;
import com.datatorrent.lib.appdata.query.serde.MessageSerializerFactory;
import com.datatorrent.lib.appdata.schemas.Result;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import javax.validation.constraints.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/datatorrent/lib/appdata/query/QueryManagerAsynchronous.class */
public class QueryManagerAsynchronous<QUERY_TYPE, META_QUERY, QUEUE_CONTEXT, RESULT extends Result> implements Component<Context.OperatorContext>, Operator.IdleTimeHandler {
    private DefaultOutputPort<String> resultPort = null;
    private final transient Semaphore inWindowSemaphore = new Semaphore(0);
    private final ConcurrentLinkedQueue<String> queue = new ConcurrentLinkedQueue<>();
    private QueueManager<QUERY_TYPE, META_QUERY, QUEUE_CONTEXT> queueManager;
    private QueryExecutor<QUERY_TYPE, META_QUERY, QUEUE_CONTEXT, RESULT> queryExecutor;
    private MessageSerializerFactory messageSerializerFactory;

    @VisibleForTesting
    protected transient ExecutorService processingThread;
    private transient Thread mainThread;
    private static final Logger LOG = LoggerFactory.getLogger(QueryManagerAsynchronous.class);

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/datatorrent/lib/appdata/query/QueryManagerAsynchronous$ProcessingThread.class */
    public class ProcessingThread implements Callable<Void> {
        private Thread mainThread;

        public ProcessingThread(Thread thread) {
            setMainThread(thread);
        }

        private void setMainThread(Thread thread) {
            this.mainThread = (Thread) Preconditions.checkNotNull(thread);
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // java.util.concurrent.Callable
        public Void call() throws Exception {
            try {
                loop();
                return null;
            } catch (Exception e) {
                QueryManagerAsynchronous.LOG.error("Exception thrown while processing:", e);
                this.mainThread.interrupt();
                throw e;
            }
        }

        private void loop() {
            while (true) {
                QueryBundle<QUERY_TYPE, META_QUERY, QUEUE_CONTEXT> dequeueBlock = QueryManagerAsynchronous.this.queueManager.dequeueBlock();
                try {
                    QueryManagerAsynchronous.this.inWindowSemaphore.acquire();
                    Result result = (Result) QueryManagerAsynchronous.this.queryExecutor.executeQuery(dequeueBlock.getQuery(), dequeueBlock.getMetaQuery(), dequeueBlock.getQueueContext());
                    if (result != null) {
                        QueryManagerAsynchronous.this.queue.add(QueryManagerAsynchronous.this.messageSerializerFactory.serialize(result));
                    }
                    QueryManagerAsynchronous.this.inWindowSemaphore.release();
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }
        }
    }

    public QueryManagerAsynchronous(DefaultOutputPort<String> defaultOutputPort, QueueManager<QUERY_TYPE, META_QUERY, QUEUE_CONTEXT> queueManager, QueryExecutor<QUERY_TYPE, META_QUERY, QUEUE_CONTEXT, RESULT> queryExecutor, MessageSerializerFactory messageSerializerFactory, Thread thread) {
        setResultPort(defaultOutputPort);
        setQueueManager(queueManager);
        setQueryExecutor(queryExecutor);
        setMessageSerializerFactory(messageSerializerFactory);
        setMainThread(thread);
    }

    private void setMainThread(@NotNull Thread thread) {
        this.mainThread = (Thread) Preconditions.checkNotNull(thread);
    }

    private void setResultPort(DefaultOutputPort<String> defaultOutputPort) {
        this.resultPort = (DefaultOutputPort) Preconditions.checkNotNull(defaultOutputPort);
    }

    private void setQueueManager(QueueManager<QUERY_TYPE, META_QUERY, QUEUE_CONTEXT> queueManager) {
        this.queueManager = (QueueManager) Preconditions.checkNotNull(queueManager);
    }

    public QueueManager<QUERY_TYPE, META_QUERY, QUEUE_CONTEXT> getQueueManager() {
        return this.queueManager;
    }

    private void setQueryExecutor(QueryExecutor<QUERY_TYPE, META_QUERY, QUEUE_CONTEXT, RESULT> queryExecutor) {
        this.queryExecutor = (QueryExecutor) Preconditions.checkNotNull(queryExecutor);
    }

    public QueryExecutor<QUERY_TYPE, META_QUERY, QUEUE_CONTEXT, RESULT> getQueryExecutor() {
        return this.queryExecutor;
    }

    private void setMessageSerializerFactory(MessageSerializerFactory messageSerializerFactory) {
        this.messageSerializerFactory = (MessageSerializerFactory) Preconditions.checkNotNull(messageSerializerFactory);
    }

    public void setup(Context.OperatorContext operatorContext) {
        this.processingThread = Executors.newSingleThreadScheduledExecutor(new NameableThreadFactory("Query Executor Thread"));
        this.processingThread.submit(new ProcessingThread(this.mainThread));
    }

    public void beginWindow(long j) {
        this.inWindowSemaphore.release();
        this.queueManager.resumeEnqueue();
    }

    public void endWindow() {
        this.queueManager.haltEnqueue();
        while (!isProcessingDone()) {
            if (this.queue.isEmpty()) {
                Thread.yield();
            } else {
                emptyQueue();
            }
        }
        emptyQueue();
        try {
            this.inWindowSemaphore.acquire();
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }

    private boolean isProcessingDone() {
        return this.queueManager instanceof AbstractWindowEndQueueManager ? ((AbstractWindowEndQueueManager) this.queueManager).isEmptyAndBlocked() : this.queueManager.getNumLeft() == 0;
    }

    private void emptyQueue() {
        while (!this.queue.isEmpty()) {
            this.resultPort.emit(this.queue.poll());
        }
    }

    public void teardown() {
        this.processingThread.shutdownNow();
    }

    public void handleIdleTime() {
        emptyQueue();
    }
}
