/*
 * Decompiled with CFR 0.152.
 */
package org.apache.jackrabbit.oak.plugins.segment.standby.client;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.concurrent.EventExecutorGroup;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.jackrabbit.oak.api.Blob;
import org.apache.jackrabbit.oak.commons.IOUtils;
import org.apache.jackrabbit.oak.plugins.segment.RecordId;
import org.apache.jackrabbit.oak.plugins.segment.Segment;
import org.apache.jackrabbit.oak.plugins.segment.SegmentNodeBuilder;
import org.apache.jackrabbit.oak.plugins.segment.SegmentNodeState;
import org.apache.jackrabbit.oak.plugins.segment.SegmentNotFoundException;
import org.apache.jackrabbit.oak.plugins.segment.standby.client.StandbyApplyDiff;
import org.apache.jackrabbit.oak.plugins.segment.standby.codec.Messages;
import org.apache.jackrabbit.oak.plugins.segment.standby.codec.SegmentReply;
import org.apache.jackrabbit.oak.plugins.segment.standby.store.RemoteSegmentLoader;
import org.apache.jackrabbit.oak.plugins.segment.standby.store.StandbyStore;
import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
import org.apache.jackrabbit.oak.spi.state.NodeState;
import org.apache.jackrabbit.oak.spi.state.NodeStateDiff;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SegmentLoaderHandler
extends ChannelInboundHandlerAdapter
implements RemoteSegmentLoader {
    private static final Logger log = LoggerFactory.getLogger(SegmentLoaderHandler.class);
    private final StandbyStore store;
    private final String clientID;
    private final RecordId head;
    private final EventExecutorGroup loaderExecutor;
    private final AtomicBoolean running;
    private final int readTimeoutMs;
    private final boolean autoClean;
    private ChannelHandlerContext ctx;
    final BlockingQueue<SegmentReply> segment = new LinkedBlockingQueue<SegmentReply>();

    public SegmentLoaderHandler(StandbyStore store, RecordId head, EventExecutorGroup loaderExecutor, String clientID, AtomicBoolean running, int readTimeoutMs, boolean autoClean) {
        this.store = store;
        this.head = head;
        this.loaderExecutor = loaderExecutor;
        this.clientID = clientID;
        this.running = running;
        this.readTimeoutMs = readTimeoutMs;
        this.autoClean = autoClean;
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) {
        this.ctx = ctx;
        this.initSync();
    }

    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        if (evt instanceof SegmentReply) {
            this.segment.offer((SegmentReply)evt);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void initSync() {
        log.debug("new head id " + this.head);
        long t = System.currentTimeMillis();
        long preSyncSize = -1L;
        if (this.autoClean) {
            preSyncSize = this.store.size();
        }
        try {
            long postSyncSize;
            this.store.preSync(this);
            SegmentNodeState before = this.store.getHead();
            SegmentNodeBuilder builder = before.builder();
            SegmentNodeState current = new SegmentNodeState(this.head);
            while (true) {
                try {
                    current.compareAgainstBaseState((NodeState)before, (NodeStateDiff)new StandbyApplyDiff((NodeBuilder)builder, this.store, this));
                }
                catch (SegmentNotFoundException e) {
                    String id = e.getSegmentId();
                    Segment s = this.readSegment(e.getSegmentId());
                    if (s == null) {
                        log.warn("can't read locally corrupt segment " + id + " from primary");
                        throw e;
                    }
                    log.debug("did reread locally corrupt segment " + id + " with size " + s.size());
                    this.store.persist(s.getSegmentId(), s);
                    continue;
                }
                break;
            }
            boolean ok = this.store.setHead(before, builder.getNodeState());
            log.debug("updated head state successfully: {} in {}ms.", (Object)ok, (Object)(System.currentTimeMillis() - t));
            if (this.autoClean && preSyncSize > 0L && (double)((postSyncSize = this.store.size()) - preSyncSize) > 0.25 * (double)preSyncSize) {
                log.info("Store size increased from {} to {}, will run cleanup.", (Object)IOUtils.humanReadableByteCount((long)preSyncSize), (Object)IOUtils.humanReadableByteCount((long)postSyncSize));
                this.store.cleanup();
            }
        }
        finally {
            this.store.postSync();
            this.close();
        }
    }

    @Override
    public Segment readSegment(String id) {
        this.ctx.writeAndFlush(Messages.newGetSegmentReq(this.clientID, id));
        return this.getSegment(id);
    }

    @Override
    public Blob readBlob(String blobId) {
        this.ctx.writeAndFlush(Messages.newGetBlobReq(this.clientID, blobId));
        return this.getBlob(blobId);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        log.error("Exception caught, closing channel.", cause);
        this.close();
    }

    private Segment getSegment(String id) {
        return this.getReply(id, 0).getSegment();
    }

    private Blob getBlob(String id) {
        return this.getReply(id, 1).getBlob();
    }

    /*
     * Loose catch block
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    private SegmentReply getReply(String id, int type) {
        boolean interrupted = false;
        while (true) {
            block16: {
                SegmentReply r = this.segment.poll(this.readTimeoutMs, TimeUnit.MILLISECONDS);
                if (r == null) {
                    log.warn("timeout waiting for {}", (Object)id);
                    SegmentReply segmentReply = SegmentReply.empty();
                    if (interrupted) {
                        Thread.currentThread().interrupt();
                    }
                    return segmentReply;
                }
                if (r.getType() != type) break block16;
                switch (r.getType()) {
                    case 0: {
                        if (!r.getSegment().getSegmentId().toString().equals(id)) break;
                        SegmentReply segmentReply = r;
                        if (interrupted) {
                            Thread.currentThread().interrupt();
                        }
                        return segmentReply;
                    }
                    case 1: {
                        if (!r.getBlob().getBlobId().equals(id)) break;
                        SegmentReply segmentReply = r;
                        if (interrupted) {
                            Thread.currentThread().interrupt();
                        }
                        return segmentReply;
                    }
                }
            }
            try {
                continue;
                catch (InterruptedException ignore) {
                    interrupted = true;
                    continue;
                }
            }
            catch (Throwable throwable) {
                if (interrupted) {
                    Thread.currentThread().interrupt();
                }
                throw throwable;
            }
            break;
        }
    }

    @Override
    public void close() {
        this.ctx.close();
    }

    @Override
    public boolean isClosed() {
        return this.loaderExecutor != null && (this.loaderExecutor.isShuttingDown() || this.loaderExecutor.isShutdown());
    }

    @Override
    public boolean isRunning() {
        return this.running.get();
    }
}

