/*
 * Decompiled with CFR 0.152.
 */
package org.dcache.xrootd.stream;

import com.google.common.base.Strings;
import java.io.IOException;
import java.net.SocketAddress;
import java.nio.channels.ClosedChannelException;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import org.dcache.xrootd.core.XrootdException;
import org.dcache.xrootd.protocol.messages.AbstractResponseMessage;
import org.dcache.xrootd.protocol.messages.ErrorResponse;
import org.dcache.xrootd.stream.ChunkedResponse;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelDownstreamHandler;
import org.jboss.netty.channel.ChannelEvent;
import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.ChannelFutureListener;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.ChannelStateEvent;
import org.jboss.netty.channel.ChannelUpstreamHandler;
import org.jboss.netty.channel.Channels;
import org.jboss.netty.channel.LifeCycleAwareChannelHandler;
import org.jboss.netty.channel.MessageEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ChunkedResponseWriteHandler
implements ChannelUpstreamHandler,
ChannelDownstreamHandler,
LifeCycleAwareChannelHandler {
    private static final Logger logger = LoggerFactory.getLogger(ChunkedResponseWriteHandler.class);
    private final Queue<MessageEvent> queue = new ConcurrentLinkedQueue<MessageEvent>();
    private final AtomicBoolean flush = new AtomicBoolean(false);
    private MessageEvent currentEvent;
    private volatile boolean flushNeeded;

    public void handleDownstream(ChannelHandlerContext ctx, ChannelEvent e) throws Exception {
        if (!(e instanceof MessageEvent)) {
            ctx.sendDownstream(e);
            return;
        }
        Object m = ((MessageEvent)e).getMessage();
        if (!(m instanceof ChunkedResponse)) {
            ctx.sendDownstream(e);
            return;
        }
        boolean offered = this.queue.offer((MessageEvent)e);
        assert (offered);
        Channel channel = ctx.getChannel();
        if (channel.isWritable() || !channel.isConnected()) {
            this.flush(ctx, false);
        }
    }

    public void handleUpstream(ChannelHandlerContext ctx, ChannelEvent e) throws Exception {
        if (e instanceof ChannelStateEvent) {
            ChannelStateEvent cse = (ChannelStateEvent)e;
            switch (cse.getState()) {
                case INTEREST_OPS: {
                    this.flush(ctx, true);
                    break;
                }
                case OPEN: {
                    if (Boolean.TRUE.equals(cse.getValue())) break;
                    this.flush(ctx, true);
                }
            }
        }
        ctx.sendUpstream(e);
    }

    private void discard(ChannelHandlerContext ctx, boolean fireNow) {
        ClosedChannelException cause = null;
        while (true) {
            MessageEvent currentEvent = this.currentEvent;
            if (this.currentEvent == null) {
                currentEvent = this.queue.poll();
            } else {
                this.currentEvent = null;
            }
            if (currentEvent == null) break;
            ChunkedResponseWriteHandler.closeInput((ChunkedResponse)currentEvent.getMessage());
            if (cause == null) {
                cause = new ClosedChannelException();
            }
            currentEvent.getFuture().setFailure((Throwable)cause);
        }
        if (cause != null) {
            if (fireNow) {
                Channels.fireExceptionCaught((Channel)ctx.getChannel(), cause);
            } else {
                Channels.fireExceptionCaughtLater((Channel)ctx.getChannel(), cause);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void flush(ChannelHandlerContext ctx, boolean fireNow) throws Exception {
        Channel channel = ctx.getChannel();
        this.flushNeeded = true;
        boolean acquired = this.flush.compareAndSet(false, true);
        if (acquired) {
            this.flushNeeded = false;
            try {
                if (!channel.isConnected()) {
                    this.discard(ctx, fireNow);
                    return;
                }
                while (channel.isWritable()) {
                    if (this.currentEvent == null) {
                        this.currentEvent = this.queue.poll();
                    }
                    if (this.currentEvent == null) {
                        break;
                    }
                    if (this.currentEvent.getFuture().isDone()) {
                        this.currentEvent = null;
                    } else {
                        ChannelFuture writeFuture;
                        boolean endOfInput;
                        AbstractResponseMessage chunk;
                        final MessageEvent currentEvent = this.currentEvent;
                        final ChunkedResponse chunks = (ChunkedResponse)currentEvent.getMessage();
                        try {
                            chunk = chunks.nextChunk();
                            endOfInput = chunks.isEndOfInput();
                        }
                        catch (XrootdException e) {
                            currentEvent.getFuture().setFailure((Throwable)e);
                            chunk = new ErrorResponse(chunks.getRequest(), e.getError(), Strings.nullToEmpty((String)e.getMessage()));
                            endOfInput = true;
                        }
                        catch (IOException e) {
                            logger.warn("xrootd I/O error: {}", (Object)e.toString());
                            currentEvent.getFuture().setFailure((Throwable)e);
                            chunk = new ErrorResponse(chunks.getRequest(), 3007, Strings.nullToEmpty((String)e.getMessage()));
                            endOfInput = true;
                        }
                        catch (RuntimeException e) {
                            logger.error("xrootd server error (please report this to support@dcache.org)", (Throwable)e);
                            currentEvent.getFuture().setFailure((Throwable)e);
                            chunk = new ErrorResponse(chunks.getRequest(), 3012, Strings.nullToEmpty((String)e.getMessage()));
                            endOfInput = true;
                        }
                        catch (Exception e) {
                            logger.warn("xrootd server error: {}", (Object)e.toString());
                            currentEvent.getFuture().setFailure((Throwable)e);
                            chunk = new ErrorResponse(chunks.getRequest(), 3012, Strings.nullToEmpty((String)e.getMessage()));
                            endOfInput = true;
                        }
                        catch (Error t) {
                            this.currentEvent = null;
                            currentEvent.getFuture().setFailure((Throwable)t);
                            if (fireNow) {
                                Channels.fireExceptionCaught((ChannelHandlerContext)ctx, (Throwable)t);
                            } else {
                                Channels.fireExceptionCaughtLater((ChannelHandlerContext)ctx, (Throwable)t);
                            }
                            ChunkedResponseWriteHandler.closeInput(chunks);
                            break;
                        }
                        if (endOfInput) {
                            this.currentEvent = null;
                            writeFuture = currentEvent.getFuture();
                            writeFuture.addListener(new ChannelFutureListener(){

                                public void operationComplete(ChannelFuture future) throws Exception {
                                    ChunkedResponseWriteHandler.closeInput(chunks);
                                }
                            });
                        } else {
                            writeFuture = Channels.future((Channel)channel);
                            writeFuture.addListener(new ChannelFutureListener(){

                                public void operationComplete(ChannelFuture future) throws Exception {
                                    if (!future.isSuccess()) {
                                        currentEvent.getFuture().setFailure(future.getCause());
                                        ChunkedResponseWriteHandler.closeInput((ChunkedResponse)currentEvent.getMessage());
                                    }
                                }
                            });
                        }
                        Channels.write((ChannelHandlerContext)ctx, (ChannelFuture)writeFuture, (Object)chunk, (SocketAddress)currentEvent.getRemoteAddress());
                    }
                    if (channel.isConnected()) continue;
                    this.discard(ctx, fireNow);
                    return;
                }
            }
            finally {
                this.flush.set(false);
            }
        }
        if (acquired && (!channel.isConnected() || channel.isWritable() && !this.queue.isEmpty() || this.flushNeeded)) {
            this.flush(ctx, fireNow);
        }
    }

    static void closeInput(ChunkedResponse chunks) {
        try {
            chunks.close();
        }
        catch (Throwable t) {
            logger.warn("Failed to close a chunked input.", t);
        }
    }

    public void beforeAdd(ChannelHandlerContext ctx) throws Exception {
    }

    public void afterAdd(ChannelHandlerContext ctx) throws Exception {
    }

    public void beforeRemove(ChannelHandlerContext ctx) throws Exception {
        this.flush(ctx, false);
    }

    public void afterRemove(ChannelHandlerContext ctx) throws Exception {
        IOException cause = null;
        boolean fireExceptionCaught = false;
        while (true) {
            MessageEvent currentEvent = this.currentEvent;
            if (this.currentEvent == null) {
                currentEvent = this.queue.poll();
            } else {
                this.currentEvent = null;
            }
            if (currentEvent == null) break;
            ChunkedResponseWriteHandler.closeInput((ChunkedResponse)currentEvent.getMessage());
            if (cause == null) {
                cause = new IOException("Unable to flush event, discarding");
            }
            currentEvent.getFuture().setFailure(cause);
            fireExceptionCaught = true;
        }
        if (fireExceptionCaught) {
            Channels.fireExceptionCaughtLater((Channel)ctx.getChannel(), cause);
        }
    }
}

