/*
 * Decompiled with CFR 0.152.
 */
package dmg.cells.network;

import dmg.cells.nucleus.CellAdapter;
import dmg.cells.nucleus.CellDomainInfo;
import dmg.cells.nucleus.CellDomainRole;
import dmg.cells.nucleus.CellMessage;
import dmg.cells.nucleus.CellNucleus;
import dmg.cells.nucleus.CellRoute;
import dmg.cells.nucleus.CellTunnel;
import dmg.cells.nucleus.CellTunnelInfo;
import dmg.cells.nucleus.MessageEvent;
import dmg.cells.nucleus.NoRouteToCellException;
import dmg.cells.nucleus.RoutedMessageEvent;
import dmg.cells.nucleus.SerializationHandler;
import dmg.util.Releases;
import dmg.util.StreamEngine;
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.OutputStream;
import java.io.PrintWriter;
import java.io.Serializable;
import java.net.Socket;
import java.nio.channels.AsynchronousCloseException;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.atomic.LongAdder;
import org.dcache.util.Args;
import org.dcache.util.NDC;
import org.dcache.util.Version;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class LocationMgrTunnel
extends CellAdapter
implements CellTunnel,
Runnable {
    private static final Tunnels _tunnels = new Tunnels();
    private static final Logger _log = LoggerFactory.getLogger(LocationMgrTunnel.class);
    private final CellNucleus _nucleus;
    private final CellDomainInfo _localDomainInfo;
    private CellDomainInfo _remoteDomainInfo;
    private boolean _allowForwardingOfRemoteMessages;
    private Thread _thread;
    private final Socket _socket;
    private final OutputStream _rawOut;
    private final InputStream _rawIn;
    private ObjectSource _input;
    private ObjectSink _output;
    private SerializationHandler.Serializer _serializer;
    private LongAdder _messagesToTunnel = new LongAdder();
    private LongAdder _messagesToSystem = new LongAdder();

    public LocationMgrTunnel(String cellName, StreamEngine engine, Args args) {
        super(cellName, "System", args);
        this._nucleus = this.getNucleus();
        this._socket = engine.getSocket();
        this._rawOut = new BufferedOutputStream(engine.getOutputStream());
        this._rawIn = new BufferedInputStream(engine.getInputStream());
        CellDomainRole role = args.hasOption("role") ? CellDomainRole.valueOf(args.getOption("role").toUpperCase()) : CellDomainRole.SATELLITE;
        this._localDomainInfo = new CellDomainInfo(this._nucleus.getCellDomainName(), Version.of(LocationMgrTunnel.class).getVersion(), role, this._nucleus.getZone());
    }

    @Override
    protected void starting() throws Exception {
        this._socket.setTcpNoDelay(true);
        this.handshake();
        _tunnels.add(this);
    }

    @Override
    protected void started() {
        this.installRoutes();
        this._thread = this._nucleus.newThread(this, "Tunnel");
        this._thread.start();
    }

    @Override
    public void stopped() {
        _log.info("Closing tunnel to {}", (Object)this.getRemoteDomainName());
        _tunnels.remove(this);
        try {
            try {
                this._socket.shutdownOutput();
                if (this._thread != null) {
                    this._thread.join(2000L);
                }
            }
            catch (IOException e) {
                _log.debug("Failed to shutdown socket: {}", (Object)e.getMessage());
            }
            catch (UnsupportedOperationException e) {
            }
            catch (InterruptedException e) {
                // empty catch block
            }
        }
        finally {
            try {
                this._socket.close();
            }
            catch (IOException e) {
                _log.warn("Failed to close socket: {}", (Object)e.getMessage());
            }
        }
    }

    private void installRoutes() {
        String domain = this.getRemoteDomainName();
        CellNucleus nucleus = this.getNucleus();
        CellRoute route = new CellRoute(domain, nucleus.getThisAddress(), nucleus.getZone(), 3);
        try {
            nucleus.routeAdd(route);
        }
        catch (IllegalArgumentException e) {
            _log.warn("Failed to add route: {}", (Object)e.getMessage());
        }
    }

    private void handshake() throws IOException {
        try {
            SerializationHandler.Serializer serializer;
            ObjectOutputStream out = new ObjectOutputStream(this._rawOut);
            out.writeObject(this._localDomainInfo);
            out.flush();
            ObjectInputStream in = new ObjectInputStream(this._rawIn);
            this._remoteDomainInfo = (CellDomainInfo)in.readObject();
            if (this._remoteDomainInfo == null) {
                throw new IOException("Remote dCache domain disconnected during handshake.");
            }
            short release = this._remoteDomainInfo.getRelease();
            if (release < 768) {
                throw new IOException("Connection from incompatible domain " + this._remoteDomainInfo + " rejected.");
            }
            _log.debug("Using raw serialization for message envelope.");
            boolean samedVersionEndpoint = release == this._localDomainInfo.getRelease();
            this._serializer = serializer = samedVersionEndpoint ? this._nucleus.getMsgSerialization() : SerializationHandler.Serializer.JOS;
            this._input = new RawObjectSource(this._rawIn);
            this._output = new RawObjectSink(this._rawOut, serializer);
            this._allowForwardingOfRemoteMessages = this._remoteDomainInfo.getRole() != CellDomainRole.CORE;
            _log.info("Established connection with {}", (Object)this._remoteDomainInfo);
        }
        catch (Releases.BadVersionException e) {
            throw new IOException("Invalid information presented during handshake: " + e.getMessage(), e);
        }
        catch (ClassNotFoundException e) {
            throw new IOException("Cannot deserialize object. This is most likely due to a version mismatch.", e);
        }
    }

    @Override
    public void run() {
        NDC.push((String)this._remoteDomainInfo.toString());
        try {
            CellMessage msg;
            while ((msg = this._input.readObject()) != null) {
                this.getNucleus().sendMessage(msg, true, this._allowForwardingOfRemoteMessages, false);
                this._messagesToSystem.increment();
            }
        }
        catch (EOFException | AsynchronousCloseException msg) {
        }
        catch (ClassNotFoundException e) {
            _log.warn("Cannot deserialize object. This is most likely due to a version mismatch.");
        }
        catch (IOException e) {
            _log.warn("Error while reading from tunnel: {}", (Object)e.toString());
        }
        finally {
            this.kill();
            NDC.pop();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void messageArrived(MessageEvent me) {
        if (me instanceof RoutedMessageEvent) {
            CellMessage msg = me.getMessage();
            try {
                this._messagesToTunnel.increment();
                this._output.writeObject(msg);
            }
            catch (IOException e) {
                NDC.push((String)this._remoteDomainInfo.toString());
                try {
                    this.kill();
                    _log.warn("Error while sending message: {}", (Object)e.getMessage());
                    NoRouteToCellException noRoute = new NoRouteToCellException(msg, "Communication failure. Message could not be delivered.");
                    CellMessage envelope = new CellMessage(msg.getSourcePath().revert(), (Serializable)noRoute);
                    envelope.setLastUOID(msg.getUOID());
                    this._nucleus.sendMessage(envelope, true, true, true);
                }
                finally {
                    NDC.pop();
                }
            }
        } else {
            super.messageArrived(me);
        }
    }

    @Override
    public CellTunnelInfo getCellTunnelInfo() {
        return new CellTunnelInfo(this.getNucleus().getThisAddress(), this._localDomainInfo, this._remoteDomainInfo);
    }

    private String getRemoteDomainName() {
        return this._remoteDomainInfo == null ? "" : this._remoteDomainInfo.getCellDomainName();
    }

    public Optional<String> getRemoteZone() {
        return this._remoteDomainInfo == null ? Optional.empty() : this._remoteDomainInfo.getZone();
    }

    @Override
    public String toString() {
        return "Connected to " + this.getRemoteDomainName() + this.getRemoteZone().map(z -> " in zone " + z).orElse("");
    }

    @Override
    public void getInfo(PrintWriter pw) {
        pw.println("Tunnel                    : " + this.getCellName());
        pw.println("Message payload serializer: " + this._serializer);
        pw.println("Messages delivered to");
        pw.println("   Peer       : " + this._messagesToTunnel);
        pw.println("   Local      : " + this._messagesToSystem);
        pw.println("Local domain");
        pw.println("   Name       : " + this._localDomainInfo.getCellDomainName());
        pw.println("   Version    : " + this._localDomainInfo.getVersion());
        pw.println("   Role       : " + this._localDomainInfo.getRole());
        pw.println("   Zone       : " + this._localDomainInfo.getZone().orElse("(none)"));
        pw.println("Peer domain");
        pw.println("   Name       : " + this._remoteDomainInfo.getCellDomainName());
        pw.println("   Version    : " + this._remoteDomainInfo.getVersion());
        pw.println("   Role       : " + this._remoteDomainInfo.getRole());
        pw.println("   Zone       : " + this._remoteDomainInfo.getZone().orElse("(none)"));
    }

    private static class RawObjectSource
    implements ObjectSource {
        private final DataInputStream in;

        private RawObjectSource(InputStream in) {
            this.in = new DataInputStream(in);
        }

        @Override
        public CellMessage readObject() throws IOException, ClassNotFoundException {
            return CellMessage.createFrom(this.in);
        }
    }

    private static interface ObjectSource {
        public CellMessage readObject() throws IOException, ClassNotFoundException;
    }

    private static class RawObjectSink
    implements ObjectSink {
        private final SerializationHandler.Serializer serializer;
        private final DataOutputStream out;

        private RawObjectSink(OutputStream out, SerializationHandler.Serializer serializer) {
            this.out = new DataOutputStream(out);
            this.serializer = serializer;
        }

        @Override
        public void writeObject(CellMessage message) throws IOException {
            message.ensureEncodedWith(this.serializer);
            message.writeTo(this.out);
            this.out.flush();
        }
    }

    private static interface ObjectSink {
        public void writeObject(CellMessage var1) throws IOException;
    }

    private static class Tunnels {
        private Map<String, LocationMgrTunnel> _tunnels = new HashMap<String, LocationMgrTunnel>();

        private Tunnels() {
        }

        public synchronized void add(LocationMgrTunnel tunnel) throws InterruptedException {
            LocationMgrTunnel old;
            if (this._tunnels.containsValue(tunnel)) {
                throw new IllegalArgumentException("Cannot register the same tunnel twice");
            }
            String domain = tunnel.getRemoteDomainName();
            while ((old = this._tunnels.get(domain)) != null) {
                old.kill();
                this.wait();
            }
            this._tunnels.put(domain, tunnel);
            this.notifyAll();
        }

        public synchronized void remove(LocationMgrTunnel tunnel) {
            if (this._tunnels.remove(tunnel.getRemoteDomainName(), tunnel)) {
                this.notifyAll();
            }
        }
    }
}

