/*
 * Decompiled with CFR 0.152.
 */
package org.hornetq.core.server.cluster.impl;

import java.util.ArrayList;
import java.util.HashSet;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import org.hornetq.api.core.HornetQException;
import org.hornetq.api.core.HornetQExceptionType;
import org.hornetq.api.core.Message;
import org.hornetq.api.core.SimpleString;
import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.api.core.client.ClientConsumer;
import org.hornetq.api.core.client.ClientMessage;
import org.hornetq.api.core.client.ClientProducer;
import org.hornetq.api.core.client.MessageHandler;
import org.hornetq.api.core.management.CoreNotificationType;
import org.hornetq.api.core.management.ManagementHelper;
import org.hornetq.core.client.impl.ClientSessionFactoryInternal;
import org.hornetq.core.client.impl.ServerLocatorInternal;
import org.hornetq.core.filter.Filter;
import org.hornetq.core.message.impl.MessageImpl;
import org.hornetq.core.persistence.StorageManager;
import org.hornetq.core.postoffice.BindingType;
import org.hornetq.core.remoting.FailureListener;
import org.hornetq.core.server.HornetQServerLogger;
import org.hornetq.core.server.Queue;
import org.hornetq.core.server.ServerMessage;
import org.hornetq.core.server.cluster.ClusterConnection;
import org.hornetq.core.server.cluster.ClusterManager;
import org.hornetq.core.server.cluster.HornetQServerSideProtocolManagerFactory;
import org.hornetq.core.server.cluster.MessageFlowRecord;
import org.hornetq.core.server.cluster.Transformer;
import org.hornetq.core.server.cluster.impl.BridgeImpl;
import org.hornetq.spi.core.remoting.ClientProtocolManagerFactory;
import org.hornetq.utils.UUID;
import org.hornetq.utils.UUIDGenerator;

public class ClusterConnectionBridge
extends BridgeImpl {
    private final ClusterConnection clusterConnection;
    private final ClusterManager clusterManager;
    private final MessageFlowRecord flowRecord;
    private final SimpleString managementAddress;
    private final SimpleString managementNotificationAddress;
    private ClientConsumer notifConsumer;
    private final SimpleString idsHeaderName;
    private final long targetNodeEventUID;
    private final ServerLocatorInternal discoveryLocator;

    public ClusterConnectionBridge(ClusterConnection clusterConnection, ClusterManager clusterManager, ServerLocatorInternal targetLocator, ServerLocatorInternal discoveryLocator, int initialConnectAttempts, int reconnectAttempts, long retryInterval, double retryMultiplier, long maxRetryInterval, UUID nodeUUID, long targetNodeEventUID, String targetNodeID, SimpleString name, Queue queue, Executor executor, Filter filterString, SimpleString forwardingAddress, ScheduledExecutorService scheduledExecutor, Transformer transformer, boolean useDuplicateDetection, String user, String password, StorageManager storageManager, SimpleString managementAddress, SimpleString managementNotificationAddress, MessageFlowRecord flowRecord, TransportConfiguration connector) {
        super(targetLocator, initialConnectAttempts, reconnectAttempts, 0, retryInterval, retryMultiplier, maxRetryInterval, nodeUUID, name, queue, executor, filterString, forwardingAddress, scheduledExecutor, transformer, useDuplicateDetection, user, password, storageManager);
        this.discoveryLocator = discoveryLocator;
        this.idsHeaderName = MessageImpl.HDR_ROUTE_TO_IDS.concat(name);
        this.clusterConnection = clusterConnection;
        this.clusterManager = clusterManager;
        this.targetNodeEventUID = targetNodeEventUID;
        this.targetNodeID = targetNodeID;
        this.managementAddress = managementAddress;
        this.managementNotificationAddress = managementNotificationAddress;
        this.flowRecord = flowRecord;
        queue.setInternalQueue(true);
        if (HornetQServerLogger.LOGGER.isTraceEnabled()) {
            HornetQServerLogger.LOGGER.trace("Setting up bridge between " + clusterConnection.getConnector() + " and " + targetLocator, new Exception("trace"));
        }
    }

    @Override
    protected ClientSessionFactoryInternal createSessionFactory() throws Exception {
        this.serverLocator.setProtocolManagerFactory((ClientProtocolManagerFactory)HornetQServerSideProtocolManagerFactory.getInstance());
        ClientSessionFactoryInternal factory = (ClientSessionFactoryInternal)this.serverLocator.createSessionFactory(this.targetNodeID);
        this.setSessionFactory(factory);
        if (factory == null) {
            HornetQServerLogger.LOGGER.nodeNotAvailable(this.targetNodeID);
            return null;
        }
        factory.setReconnectAttempts(0);
        factory.getConnection().addFailureListener((FailureListener)this);
        return factory;
    }

    @Override
    protected ServerMessage beforeForward(ServerMessage message) {
        ServerMessage messageCopy = message.copy();
        if (HornetQServerLogger.LOGGER.isTraceEnabled()) {
            HornetQServerLogger.LOGGER.trace("Clustered bridge  copied message " + message + " as " + messageCopy + " before delivery");
        }
        HashSet propNames = new HashSet(messageCopy.getPropertyNames());
        byte[] queueIds = message.getBytesProperty(this.idsHeaderName);
        if (queueIds == null) {
            HornetQServerLogger.LOGGER.noQueueIdDefined(message, messageCopy, this.idsHeaderName);
            throw new IllegalStateException("no queueIDs defined");
        }
        for (SimpleString propName : propNames) {
            if (!propName.startsWith(MessageImpl.HDR_ROUTE_TO_IDS)) continue;
            messageCopy.removeProperty(propName);
        }
        messageCopy.putBytesProperty(MessageImpl.HDR_ROUTE_TO_IDS, queueIds);
        messageCopy = super.beforeForward(messageCopy);
        return messageCopy;
    }

    private void setupNotificationConsumer() throws Exception {
        if (HornetQServerLogger.LOGGER.isDebugEnabled()) {
            HornetQServerLogger.LOGGER.debug("Setting up notificationConsumer between " + this.clusterConnection.getConnector() + " and " + this.flowRecord.getBridge().getForwardingConnection() + " clusterConnection = " + this.clusterConnection.getName() + " on server " + this.clusterConnection.getServer());
        }
        if (this.flowRecord != null) {
            this.flowRecord.reset();
            if (this.notifConsumer != null) {
                try {
                    HornetQServerLogger.LOGGER.debug("Closing notification Consumer for reopening " + this.notifConsumer + " on bridge " + this.getName());
                    this.notifConsumer.close();
                    this.notifConsumer = null;
                }
                catch (HornetQException e) {
                    HornetQServerLogger.LOGGER.errorClosingConsumer((Exception)((Object)e));
                }
            }
            String qName = "notif." + UUIDGenerator.getInstance().generateStringUUID() + "." + this.clusterConnection.getServer();
            SimpleString notifQueueName = new SimpleString(qName);
            SimpleString filter = new SimpleString(ManagementHelper.HDR_BINDING_TYPE + "<>" + BindingType.DIVERT.toInt() + " AND " + ManagementHelper.HDR_NOTIFICATION_TYPE + " IN ('" + CoreNotificationType.BINDING_ADDED + "','" + CoreNotificationType.BINDING_REMOVED + "','" + CoreNotificationType.CONSUMER_CREATED + "','" + CoreNotificationType.CONSUMER_CLOSED + "','" + CoreNotificationType.PROPOSAL + "','" + CoreNotificationType.PROPOSAL_RESPONSE + "','" + CoreNotificationType.UNPROPOSAL + "') AND " + ManagementHelper.HDR_DISTANCE + "<" + this.flowRecord.getMaxHops() + " AND (" + ClusterConnectionBridge.createSelectorFromAddress(this.flowRecord.getAddress()) + ")");
            this.session.createTemporaryQueue(this.managementNotificationAddress, notifQueueName, filter);
            this.notifConsumer = this.session.createConsumer(notifQueueName);
            this.notifConsumer.setMessageHandler((MessageHandler)this.flowRecord);
            this.session.start();
            ClientMessage message = this.session.createMessage(false);
            if (HornetQServerLogger.LOGGER.isTraceEnabled()) {
                HornetQServerLogger.LOGGER.trace("Requesting sendQueueInfoToQueue through " + this, new Exception("trace"));
            }
            ManagementHelper.putOperationInvocation((Message)message, (String)"core.server", (String)"sendQueueInfoToQueue", (Object[])new Object[]{notifQueueName.toString(), this.flowRecord.getAddress()});
            ClientProducer prod = this.session.createProducer(this.managementAddress);
            if (HornetQServerLogger.LOGGER.isDebugEnabled()) {
                HornetQServerLogger.LOGGER.debug("Cluster connetion bridge on " + this.clusterConnection + " requesting information on queues");
            }
            prod.send((Message)message);
        }
    }

    public static String createSelectorFromAddress(String address) {
        StringBuilder stringBuilder = new StringBuilder();
        if (!address.contains(",")) {
            if (address.startsWith("!")) {
                stringBuilder.append(ManagementHelper.HDR_ADDRESS + " NOT LIKE '" + address.substring(1, address.length()) + "%'");
            } else {
                stringBuilder.append(ManagementHelper.HDR_ADDRESS + " LIKE '" + address + "%'");
            }
            return stringBuilder.toString();
        }
        return ClusterConnectionBridge.buildSelectorFromArray(address.split(","));
    }

    public static String buildSelectorFromArray(String[] list) {
        int i;
        ArrayList<String> includes = new ArrayList<String>();
        ArrayList<String> excludes = new ArrayList<String>();
        for (int i2 = 0; i2 < list.length; ++i2) {
            if (list[i2].startsWith("!")) {
                excludes.add(list[i2].substring(1, list[i2].length()));
                continue;
            }
            includes.add(list[i2]);
        }
        StringBuilder builder = new StringBuilder("(");
        if (includes.size() > 0) {
            if (excludes.size() > 0) {
                builder.append("(");
            }
            for (i = 0; i < includes.size(); ++i) {
                builder.append("(" + ManagementHelper.HDR_ADDRESS + " LIKE '" + (String)includes.get(i) + "%')");
                if (i >= includes.size() - 1) continue;
                builder.append(" OR ");
            }
            if (excludes.size() > 0) {
                builder.append(")");
            }
        }
        if (excludes.size() > 0) {
            if (includes.size() > 0) {
                builder.append(" AND (");
            }
            for (i = 0; i < excludes.size(); ++i) {
                builder.append("(" + ManagementHelper.HDR_ADDRESS + " NOT LIKE '" + (String)excludes.get(i) + "%')");
                if (i >= excludes.size() - 1) continue;
                builder.append(" AND ");
            }
            if (includes.size() > 0) {
                builder.append(")");
            }
        }
        builder.append(")");
        return builder.toString();
    }

    @Override
    protected void afterConnect() throws Exception {
        super.afterConnect();
        this.setupNotificationConsumer();
    }

    @Override
    protected void tryScheduleRetryReconnect(HornetQExceptionType type) {
        this.scheduleRetryConnect();
    }

    @Override
    protected void fail(boolean permanently) {
        HornetQServerLogger.LOGGER.debug("Cluster Bridge " + this.getName() + " failed, permanently=" + permanently);
        super.fail(permanently);
        if (permanently) {
            HornetQServerLogger.LOGGER.debug("cluster node for bridge " + this.getName() + " is permanently down");
            this.clusterConnection.removeRecord(this.targetNodeID);
        } else {
            this.clusterConnection.disconnectRecord(this.targetNodeID);
        }
    }

    @Override
    protected boolean isPlainCoreBridge() {
        return false;
    }
}

