package com.datatorrent.stram.util;

import com.datatorrent.common.util.PubSubMessage;
import com.datatorrent.common.util.PubSubMessageCodec;
import com.datatorrent.stram.plan.logical.LogicalPlanConfiguration;
import java.io.IOException;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import javax.servlet.http.HttpServletRequest;
import org.codehaus.jackson.map.ObjectMapper;
import org.eclipse.jetty.websocket.WebSocket;
import org.eclipse.jetty.websocket.WebSocketServlet;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/datatorrent/stram/util/PubSubWebSocketServlet.class */
public class PubSubWebSocketServlet<SECURITY_CONTEXT, PRINCIPAL> extends WebSocketServlet {
    private static final Logger LOG = LoggerFactory.getLogger(PubSubWebSocketServlet.class);
    private static final long serialVersionUID = 1;
    private static final int latestTopicCount = 100;
    protected SECURITY_CONTEXT securityContext;
    private SubscribeFilter subscribeFilter;
    private SendFilter sendFilter;
    private String authAttribute;
    private HashMap<String, HashSet<PubSubWebSocketServlet<SECURITY_CONTEXT, PRINCIPAL>.PubSubWebSocket>> topicToSocketMap = new HashMap<>();
    private HashMap<PubSubWebSocketServlet<SECURITY_CONTEXT, PRINCIPAL>.PubSubWebSocket, HashSet<String>> socketToTopicMap = new HashMap<>();
    private ObjectMapper mapper = new JSONSerializationProvider().getContext(null);
    private PubSubMessageCodec<Object> codec = new PubSubMessageCodec<>(this.mapper);
    private InternalMessageHandler internalMessageHandler = null;
    private final LRUCache<String, Long> latestTopics = new LRUCache<String, Long>(latestTopicCount, false) { // from class: com.datatorrent.stram.util.PubSubWebSocketServlet.1
        private static final long serialVersionUID = 20140131;

        @Override // java.util.HashMap, java.util.AbstractMap, java.util.Map
        public Long put(String str, Long l) {
            remove(str);
            return (Long) super.put((Object) str, (Object) l);
        }
    };

    /* loaded from: input_file:com/datatorrent/stram/util/PubSubWebSocketServlet$InternalMessageHandler.class */
    public interface InternalMessageHandler {
        void onMessage(String str, Object obj);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:com/datatorrent/stram/util/PubSubWebSocketServlet$PubSubWebSocket.class */
    public class PubSubWebSocket implements WebSocket.OnTextMessage {
        private WebSocket.Connection connection;
        private final BlockingQueue<String> messageQueue = new ArrayBlockingQueue(1024);
        private final Thread messengerThread = new Thread(new Messenger());
        private final PRINCIPAL principal;

        /* loaded from: input_file:com/datatorrent/stram/util/PubSubWebSocketServlet$PubSubWebSocket$Messenger.class */
        private class Messenger implements Runnable {
            private Messenger() {
            }

            @Override // java.lang.Runnable
            public void run() {
                while (!Thread.interrupted()) {
                    try {
                        PubSubWebSocket.this.connection.sendMessage((String) PubSubWebSocket.this.messageQueue.take());
                    } catch (InterruptedException e) {
                        return;
                    } catch (Exception e2) {
                        PubSubWebSocketServlet.LOG.error("Caught exception in websocket messenger.", e2);
                        return;
                    }
                }
            }
        }

        public PubSubWebSocket(PRINCIPAL principal) {
            this.principal = principal;
        }

        public PRINCIPAL getPrincipal() {
            return this.principal;
        }

        public void onMessage(String str) {
            PubSubWebSocketServlet.LOG.debug("Received message {}", str);
            try {
                PubSubMessage parseMessage = PubSubWebSocketServlet.this.codec.parseMessage(str);
                if (parseMessage != null) {
                    PubSubMessage.PubSubMessageType type = parseMessage.getType();
                    String topic = parseMessage.getTopic();
                    if (type != null) {
                        if (type.equals(PubSubMessage.PubSubMessageType.SUBSCRIBE)) {
                            if (topic != null) {
                                PubSubWebSocketServlet.this.subscribe(this, topic);
                            }
                        } else if (type.equals(PubSubMessage.PubSubMessageType.UNSUBSCRIBE)) {
                            if (topic != null) {
                                PubSubWebSocketServlet.this.unsubscribe(this, topic);
                            }
                        } else if (type.equals(PubSubMessage.PubSubMessageType.PUBLISH)) {
                            if (topic != null) {
                                Object data = parseMessage.getData();
                                if (data != null) {
                                    PubSubWebSocketServlet.this.publish(topic, data);
                                }
                                if (topic.startsWith("_internal.") && PubSubWebSocketServlet.this.internalMessageHandler != null) {
                                    PubSubWebSocketServlet.this.internalMessageHandler.onMessage(topic, data);
                                }
                            }
                        } else if (type.equals(PubSubMessage.PubSubMessageType.SUBSCRIBE_NUM_SUBSCRIBERS)) {
                            if (topic != null) {
                                PubSubWebSocketServlet.this.subscribe(this, topic + LogicalPlanConfiguration.KEY_SEPARATOR + "numSubscribers");
                                PubSubWebSocketServlet.this.sendData(this, topic + LogicalPlanConfiguration.KEY_SEPARATOR + "numSubscribers", Integer.valueOf(PubSubWebSocketServlet.this.getNumSubscribers(topic)));
                            }
                        } else if (type.equals(PubSubMessage.PubSubMessageType.UNSUBSCRIBE_NUM_SUBSCRIBERS)) {
                            if (topic != null) {
                                PubSubWebSocketServlet.this.unsubscribe(this, topic + LogicalPlanConfiguration.KEY_SEPARATOR + "numSubscribers");
                            }
                        } else if (type.equals(PubSubMessage.PubSubMessageType.GET_LATEST_TOPICS)) {
                            synchronized (this) {
                                PubSubWebSocketServlet.this.sendData(this, "_latestTopics", PubSubWebSocketServlet.this.latestTopics.keySet());
                            }
                        }
                    }
                }
            } catch (Exception e) {
                PubSubWebSocketServlet.LOG.warn("Exception caught", e);
            }
        }

        public void onOpen(WebSocket.Connection connection) {
            PubSubWebSocketServlet.LOG.debug("onOpen");
            this.connection = connection;
            this.connection.setMaxIdleTime(300000);
            this.connection.setMaxTextMessageSize(8388608);
            this.messengerThread.start();
        }

        public void onClose(int i, String str) {
            PubSubWebSocketServlet.LOG.debug("onClose");
            PubSubWebSocketServlet.this.disconnect(this);
            this.messengerThread.interrupt();
        }

        public void sendMessage(String str) throws IllegalStateException {
            this.messageQueue.add(str);
        }
    }

    /* loaded from: input_file:com/datatorrent/stram/util/PubSubWebSocketServlet$SendFilter.class */
    public interface SendFilter<SECURITY_CONTEXT, PRINCIPAL> {
        Object filter(SECURITY_CONTEXT security_context, PRINCIPAL principal, String str, Object obj);
    }

    /* loaded from: input_file:com/datatorrent/stram/util/PubSubWebSocketServlet$SubscribeFilter.class */
    public interface SubscribeFilter<SECURITY_CONTEXT, PRINCIPAL> {
        boolean filter(SECURITY_CONTEXT security_context, PRINCIPAL principal, String str);
    }

    /* loaded from: input_file:com/datatorrent/stram/util/PubSubWebSocketServlet$UserHolder.class */
    public class UserHolder {
        public String username;

        public UserHolder() {
        }
    }

    public void registerSubscribeFilter(SubscribeFilter subscribeFilter) {
        this.subscribeFilter = subscribeFilter;
    }

    public void registerSendFilter(SendFilter sendFilter) {
        this.sendFilter = sendFilter;
    }

    public PubSubWebSocketServlet(SECURITY_CONTEXT security_context, String str) {
        this.securityContext = security_context;
        this.authAttribute = str;
    }

    public void setInternalMessageHandler(InternalMessageHandler internalMessageHandler) {
        this.internalMessageHandler = internalMessageHandler;
    }

    public WebSocket doWebSocketConnect(HttpServletRequest httpServletRequest, String str) {
        return new PubSubWebSocket(httpServletRequest.getAttribute(this.authAttribute));
    }

    /* JADX INFO: Access modifiers changed from: private */
    public synchronized void subscribe(PubSubWebSocketServlet<SECURITY_CONTEXT, PRINCIPAL>.PubSubWebSocket pubSubWebSocket, String str) {
        HashSet<PubSubWebSocketServlet<SECURITY_CONTEXT, PRINCIPAL>.PubSubWebSocket> hashSet;
        HashSet<String> hashSet2;
        if (this.subscribeFilter != null && !this.subscribeFilter.filter(this.securityContext, pubSubWebSocket.getPrincipal(), str)) {
            LOG.warn("Subscribe filter returns false for topic {}, user {}. Ignoring subscribe request", str, pubSubWebSocket.getPrincipal());
            return;
        }
        LOG.debug("Subscribe is allowed for topic {}, user {}", str, pubSubWebSocket.getPrincipal());
        if (this.topicToSocketMap.containsKey(str)) {
            hashSet = this.topicToSocketMap.get(str);
        } else {
            hashSet = new HashSet<>();
            this.topicToSocketMap.put(str, hashSet);
        }
        hashSet.add(pubSubWebSocket);
        if (this.socketToTopicMap.containsKey(pubSubWebSocket)) {
            hashSet2 = this.socketToTopicMap.get(pubSubWebSocket);
        } else {
            hashSet2 = new HashSet<>(0);
            this.socketToTopicMap.put(pubSubWebSocket, hashSet2);
        }
        hashSet2.add(str);
        publish(str + LogicalPlanConfiguration.KEY_SEPARATOR + "numSubscribers", Integer.valueOf(getNumSubscribers(str)));
    }

    /* JADX INFO: Access modifiers changed from: private */
    public synchronized void unsubscribe(PubSubWebSocketServlet<SECURITY_CONTEXT, PRINCIPAL>.PubSubWebSocket pubSubWebSocket, String str) {
        if (this.topicToSocketMap.containsKey(str)) {
            HashSet<PubSubWebSocketServlet<SECURITY_CONTEXT, PRINCIPAL>.PubSubWebSocket> hashSet = this.topicToSocketMap.get(str);
            hashSet.remove(pubSubWebSocket);
            if (hashSet.isEmpty()) {
                this.topicToSocketMap.remove(str);
            }
            if (this.socketToTopicMap.containsKey(pubSubWebSocket)) {
                HashSet<String> hashSet2 = this.socketToTopicMap.get(pubSubWebSocket);
                hashSet2.remove(str);
                if (hashSet2.isEmpty()) {
                    this.socketToTopicMap.remove(pubSubWebSocket);
                }
                publish(str + LogicalPlanConfiguration.KEY_SEPARATOR + "numSubscribers", Integer.valueOf(getNumSubscribers(str)));
            }
        }
    }

    private synchronized void unsubscribeAll(PubSubWebSocketServlet<SECURITY_CONTEXT, PRINCIPAL>.PubSubWebSocket pubSubWebSocket) {
        HashSet<String> hashSet = this.socketToTopicMap.get(pubSubWebSocket);
        if (hashSet != null) {
            Iterator<String> it = hashSet.iterator();
            while (it.hasNext()) {
                String next = it.next();
                HashSet<PubSubWebSocketServlet<SECURITY_CONTEXT, PRINCIPAL>.PubSubWebSocket> hashSet2 = this.topicToSocketMap.get(next);
                hashSet2.remove(pubSubWebSocket);
                if (hashSet2.isEmpty()) {
                    this.topicToSocketMap.remove(next);
                }
                publish(next + LogicalPlanConfiguration.KEY_SEPARATOR + "numSubscribers", Integer.valueOf(getNumSubscribers(next)));
            }
            this.socketToTopicMap.remove(pubSubWebSocket);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public synchronized void disconnect(PubSubWebSocketServlet<SECURITY_CONTEXT, PRINCIPAL>.PubSubWebSocket pubSubWebSocket) {
        unsubscribeAll(pubSubWebSocket);
    }

    public synchronized int getNumSubscribers(String str) {
        HashSet<PubSubWebSocketServlet<SECURITY_CONTEXT, PRINCIPAL>.PubSubWebSocket> hashSet = this.topicToSocketMap.get(str);
        if (hashSet == null) {
            return 0;
        }
        return hashSet.size();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public synchronized void sendData(PubSubWebSocketServlet<SECURITY_CONTEXT, PRINCIPAL>.PubSubWebSocket pubSubWebSocket, String str, Object obj) throws IOException {
        PubSubMessage pubSubMessage = new PubSubMessage();
        pubSubMessage.setType(PubSubMessage.PubSubMessageType.DATA);
        pubSubMessage.setTopic(str);
        pubSubMessage.setData(obj);
        LOG.debug("Sending data {} to subscriber...", str);
        pubSubWebSocket.sendMessage(this.codec.formatMessage(pubSubMessage));
    }

    public synchronized void publish(String str, Object obj) {
        if (!str.endsWith(".numSubscribers") && !str.startsWith("_internal.")) {
            this.latestTopics.put(str, Long.valueOf(System.currentTimeMillis()));
        }
        HashSet<PubSubWebSocketServlet<SECURITY_CONTEXT, PRINCIPAL>.PubSubWebSocket> hashSet = this.topicToSocketMap.get(str);
        if (hashSet != null) {
            Iterator<PubSubWebSocketServlet<SECURITY_CONTEXT, PRINCIPAL>.PubSubWebSocket> it = hashSet.iterator();
            while (it.hasNext()) {
                PubSubWebSocketServlet<SECURITY_CONTEXT, PRINCIPAL>.PubSubWebSocket next = it.next();
                try {
                    if (this.sendFilter != null) {
                        sendData(next, str, this.sendFilter.filter(this.securityContext, next.getPrincipal(), str, obj));
                    } else {
                        sendData(next, str, obj);
                    }
                } catch (Exception e) {
                    LOG.error("Cannot send message", e);
                    it.remove();
                    disconnect(next);
                }
            }
        }
    }
}
