/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hdfs.qjournal.client;

import java.io.Closeable;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.net.InetSocketAddress;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.UnknownHostException;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.TreeSet;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.qjournal.MiniJournalCluster;
import org.apache.hadoop.hdfs.qjournal.QJMTestUtil;
import org.apache.hadoop.hdfs.qjournal.client.AsyncLogger;
import org.apache.hadoop.hdfs.qjournal.client.DirectExecutorService;
import org.apache.hadoop.hdfs.qjournal.client.IPCLoggerChannel;
import org.apache.hadoop.hdfs.qjournal.client.QuorumException;
import org.apache.hadoop.hdfs.qjournal.client.QuorumJournalManager;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocol;
import org.apache.hadoop.hdfs.qjournal.server.JournalFaultInjector;
import org.apache.hadoop.hdfs.server.namenode.EditLogFileOutputStream;
import org.apache.hadoop.hdfs.server.namenode.EditLogOutputStream;
import org.apache.hadoop.hdfs.server.namenode.NameNodeLayoutVersion;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.hdfs.util.Holder;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.ipc.ProtobufRpcEngine2;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.thirdparty.com.google.common.collect.Maps;
import org.apache.hadoop.util.Preconditions;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.mockito.MockSettings;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.event.Level;

public class TestQJMWithFaults {
    private static final Logger LOG = LoggerFactory.getLogger(TestQJMWithFaults.class);
    private static final String RAND_SEED_PROPERTY = "TestQJMWithFaults.random-seed";
    private static final int NUM_WRITER_ITERS = 500;
    private static final int SEGMENTS_PER_WRITER = 2;
    private static final Configuration conf = new Configuration();
    private static final JournalFaultInjector faultInjector;
    @Rule
    public ExpectedException expectedException = ExpectedException.none();

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private static long determineMaxIpcNumber() throws Exception {
        long ret;
        Configuration conf = new Configuration();
        MiniJournalCluster cluster = new MiniJournalCluster.Builder(conf).build();
        cluster.waitActive();
        QuorumJournalManager qjm = null;
        try {
            qjm = TestQJMWithFaults.createInjectableQJM(cluster);
            qjm.format(QJMTestUtil.FAKE_NSINFO, false);
            TestQJMWithFaults.doWorkload(cluster, qjm);
            TreeSet<Integer> ipcCounts = new TreeSet<Integer>();
            for (AsyncLogger l : qjm.getLoggerSetForTests().getLoggersForTests()) {
                InvocationCountingChannel ch = (InvocationCountingChannel)l;
                ch.waitForAllPendingCalls();
                ipcCounts.add(ch.getRpcCount());
            }
            Assert.assertEquals((long)1L, (long)ipcCounts.size());
            ret = ((Integer)ipcCounts.first()).intValue();
            LOG.info("Max IPC count = " + ret);
        }
        finally {
            IOUtils.closeStream((Closeable)qjm);
            cluster.shutdown();
        }
        return ret;
    }

    @Test
    public void testRecoverAfterDoubleFailures() throws Exception {
        long MAX_IPC_NUMBER = TestQJMWithFaults.determineMaxIpcNumber();
        int failA = 1;
        while ((long)failA <= MAX_IPC_NUMBER) {
            int failB = 1;
            while ((long)failB <= MAX_IPC_NUMBER) {
                String injectionStr = "(" + failA + ", " + failB + ")";
                LOG.info("\n\n-------------------------------------------\nBeginning test, failing at " + injectionStr + "\n-------------------------------------------\n\n");
                MiniJournalCluster cluster = new MiniJournalCluster.Builder(conf).build();
                cluster.waitActive();
                QuorumJournalManager qjm = null;
                try {
                    qjm = TestQJMWithFaults.createInjectableQJM(cluster);
                    qjm.format(QJMTestUtil.FAKE_NSINFO, false);
                    List loggers = qjm.getLoggerSetForTests().getLoggersForTests();
                    this.failIpcNumber((AsyncLogger)loggers.get(0), failA);
                    this.failIpcNumber((AsyncLogger)loggers.get(1), failB);
                    int lastAckedTxn = TestQJMWithFaults.doWorkload(cluster, qjm);
                    if (lastAckedTxn < 6) {
                        LOG.info("Failed after injecting failures at " + injectionStr + ". This is expected since we injected a failure in the majority.");
                    }
                    qjm.close();
                    qjm = null;
                    qjm = TestQJMWithFaults.createInjectableQJM(cluster);
                    long lastRecoveredTxn = QJMTestUtil.recoverAndReturnLastTxn(qjm);
                    Assert.assertTrue((lastRecoveredTxn >= (long)lastAckedTxn ? 1 : 0) != 0);
                    QJMTestUtil.writeSegment(cluster, qjm, lastRecoveredTxn + 1L, 3, true);
                }
                catch (Throwable t) {
                    throw new RuntimeException("Test failed with injection: " + injectionStr, t);
                }
                finally {
                    cluster.shutdown();
                    cluster = null;
                    IOUtils.closeStream((Closeable)qjm);
                    qjm = null;
                }
                ++failB;
            }
            ++failA;
        }
    }

    @Test
    public void testUnresolvableHostName() throws Exception {
        this.expectedException.expect(UnknownHostException.class);
        new QuorumJournalManager(conf, new URI("qjournal://bogus:12345/test-journal"), QJMTestUtil.FAKE_NSINFO);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testRandomized() throws Exception {
        long seed;
        Long userSpecifiedSeed = Long.getLong(RAND_SEED_PROPERTY);
        if (userSpecifiedSeed != null) {
            LOG.info("Using seed specified in system property");
            seed = userSpecifiedSeed;
            GenericTestUtils.setLogLevel((Logger)ProtobufRpcEngine2.LOG, (Level)Level.TRACE);
        } else {
            seed = new Random().nextLong();
        }
        LOG.info("Random seed: " + seed);
        Random r = new Random(seed);
        MiniJournalCluster cluster = new MiniJournalCluster.Builder(conf).build();
        cluster.waitActive();
        QuorumJournalManager qjmForInitialFormat = TestQJMWithFaults.createInjectableQJM(cluster);
        qjmForInitialFormat.format(QJMTestUtil.FAKE_NSINFO, false);
        qjmForInitialFormat.close();
        try {
            long txid = 0L;
            long lastAcked = 0L;
            block9: for (int i = 0; i < 500; ++i) {
                long recovered;
                LOG.info("Starting writer " + i + "\n-------------------");
                QuorumJournalManager qjm = TestQJMWithFaults.createRandomFaultyQJM(cluster, r);
                try {
                    recovered = QJMTestUtil.recoverAndReturnLastTxn(qjm);
                }
                catch (Throwable t) {
                    LOG.info("Failed recovery", t);
                    this.checkException(t);
                    qjm.close();
                    continue;
                }
                try {
                    Assert.assertTrue((String)("Recovered only up to txnid " + recovered + " but had gotten an ack for " + lastAcked), (recovered >= lastAcked ? 1 : 0) != 0);
                    txid = recovered + 1L;
                    if (txid > 100L && i % 10 == 1) {
                        qjm.purgeLogsOlderThan(txid - 100L);
                    }
                    Holder thrown = new Holder(null);
                    for (int j = 0; j < 2; ++j) {
                        lastAcked = this.writeSegmentUntilCrash(cluster, qjm, txid, 4, (Holder<Throwable>)thrown);
                        if (thrown.held != null) {
                            LOG.info("Failed write", (Throwable)thrown.held);
                            this.checkException((Throwable)thrown.held);
                            continue block9;
                        }
                        txid += 4L;
                    }
                    continue;
                }
                catch (Throwable throwable) {
                    throw throwable;
                }
                finally {
                    qjm.close();
                }
            }
        }
        finally {
            cluster.shutdown();
        }
    }

    private void checkException(Throwable t) {
        GenericTestUtils.assertExceptionContains((String)"Injected", (Throwable)t);
        if (t.toString().contains("AssertionError")) {
            throw new RuntimeException("Should never see AssertionError in fault test!", t);
        }
    }

    private long writeSegmentUntilCrash(MiniJournalCluster cluster, QuorumJournalManager qjm, long txid, int numTxns, Holder<Throwable> thrown) {
        long firstTxId = txid;
        long lastAcked = txid - 1L;
        try {
            EditLogOutputStream stm = qjm.startLogSegment(txid, NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION);
            for (int i = 0; i < numTxns; ++i) {
                QJMTestUtil.writeTxns(stm, txid++, 1);
                ++lastAcked;
            }
            stm.close();
            qjm.finalizeLogSegment(firstTxId, lastAcked);
        }
        catch (Throwable t) {
            thrown.held = t;
        }
        return lastAcked;
    }

    private static int doWorkload(MiniJournalCluster cluster, QuorumJournalManager qjm) throws IOException {
        int lastAcked = 0;
        try {
            qjm.recoverUnfinalizedSegments();
            QJMTestUtil.writeSegment(cluster, qjm, 1L, 3, true);
            lastAcked = 3;
            QJMTestUtil.writeSegment(cluster, qjm, 4L, 3, true);
            lastAcked = 6;
        }
        catch (QuorumException qe) {
            LOG.info("Failed to write at txid " + lastAcked, (Throwable)qe);
        }
        return lastAcked;
    }

    private void failIpcNumber(AsyncLogger logger, int idx) {
        ((InvocationCountingChannel)logger).failIpcNumber(idx);
    }

    private static QJournalProtocol mockProxy(WrapEveryCall<Object> wrapper) throws IOException {
        QJournalProtocol mock = (QJournalProtocol)Mockito.mock(QJournalProtocol.class, (MockSettings)Mockito.withSettings().defaultAnswer(wrapper).extraInterfaces(new Class[]{Closeable.class}));
        return mock;
    }

    private static QuorumJournalManager createInjectableQJM(MiniJournalCluster cluster) throws IOException, URISyntaxException {
        AsyncLogger.Factory spyFactory = new AsyncLogger.Factory(){

            public AsyncLogger createLogger(Configuration conf, NamespaceInfo nsInfo, String journalId, String nameserviceId, InetSocketAddress addr) {
                return new InvocationCountingChannel(conf, nsInfo, journalId, addr);
            }
        };
        return new QuorumJournalManager(conf, cluster.getQuorumJournalURI("test-journal"), QJMTestUtil.FAKE_NSINFO, spyFactory);
    }

    private static QuorumJournalManager createRandomFaultyQJM(MiniJournalCluster cluster, final Random seedGenerator) throws IOException, URISyntaxException {
        AsyncLogger.Factory spyFactory = new AsyncLogger.Factory(){

            public AsyncLogger createLogger(Configuration conf, NamespaceInfo nsInfo, String journalId, String nameServiceId, InetSocketAddress addr) {
                return new RandomFaultyChannel(conf, nsInfo, journalId, addr, seedGenerator.nextLong());
            }
        };
        return new QuorumJournalManager(conf, cluster.getQuorumJournalURI("test-journal"), QJMTestUtil.FAKE_NSINFO, spyFactory);
    }

    static {
        conf.setInt("ipc.client.connect.max.retries", 0);
        EditLogFileOutputStream.setShouldSkipFsyncForTesting((boolean)true);
        faultInjector = JournalFaultInjector.instance = (JournalFaultInjector)Mockito.mock(JournalFaultInjector.class);
    }

    private static abstract class WrapEveryCall<T>
    implements Answer<T> {
        private final Object realObj;

        WrapEveryCall(Object realObj) {
            this.realObj = realObj;
        }

        public T answer(InvocationOnMock invocation) throws Throwable {
            if (!Closeable.class.equals(invocation.getMethod().getDeclaringClass())) {
                this.beforeCall(invocation);
            }
            boolean success = false;
            try {
                Object ret = invocation.getMethod().invoke(this.realObj, invocation.getArguments());
                success = true;
                Object object = ret;
                return (T)object;
            }
            catch (InvocationTargetException ite) {
                throw ite.getCause();
            }
            finally {
                this.afterCall(invocation, success);
            }
        }

        abstract void beforeCall(InvocationOnMock var1) throws Exception;

        void afterCall(InvocationOnMock invocation, boolean succeeded) {
        }
    }

    private static class InvocationCountingChannel
    extends IPCLoggerChannel {
        private int rpcCount = 0;
        private final Map<Integer, Callable<Void>> injections = Maps.newHashMap();

        public InvocationCountingChannel(Configuration conf, NamespaceInfo nsInfo, String journalId, InetSocketAddress addr) {
            super(conf, nsInfo, journalId, addr);
        }

        int getRpcCount() {
            return this.rpcCount;
        }

        void failIpcNumber(final int idx) {
            Preconditions.checkArgument((idx > 0 ? 1 : 0) != 0, (Object)"id must be positive");
            this.inject(idx, new Callable<Void>(){

                @Override
                public Void call() throws Exception {
                    throw new IOException("injected failed IPC at " + idx);
                }
            });
        }

        private void inject(int beforeRpcNumber, Callable<Void> injectedCode) {
            this.injections.put(beforeRpcNumber, injectedCode);
        }

        protected QJournalProtocol createProxy() throws IOException {
            QJournalProtocol realProxy = super.createProxy();
            QJournalProtocol mock = TestQJMWithFaults.mockProxy(new WrapEveryCall<Object>((Object)realProxy){

                @Override
                void beforeCall(InvocationOnMock invocation) throws Exception {
                    rpcCount++;
                    String param = "";
                    for (Object val : invocation.getArguments()) {
                        param = param + val + ",";
                    }
                    String callStr = "[" + addr + "] " + invocation.getMethod().getName() + "(" + param + ")";
                    Callable inject = (Callable)injections.get(rpcCount);
                    if (inject != null) {
                        LOG.info("Injecting code before IPC #" + rpcCount + ": " + callStr);
                        inject.call();
                    } else {
                        LOG.info("IPC call #" + rpcCount + ": " + callStr);
                    }
                }
            });
            return mock;
        }
    }

    private static class RandomFaultyChannel
    extends IPCLoggerChannel {
        private final Random random;
        private final float injectionProbability = 0.1f;
        private boolean isUp = true;

        public RandomFaultyChannel(Configuration conf, NamespaceInfo nsInfo, String journalId, InetSocketAddress addr, long seed) {
            super(conf, nsInfo, journalId, addr);
            this.random = new Random(seed);
        }

        protected QJournalProtocol createProxy() throws IOException {
            QJournalProtocol realProxy = super.createProxy();
            return TestQJMWithFaults.mockProxy(new WrapEveryCall<Object>((Object)realProxy){

                @Override
                void beforeCall(InvocationOnMock invocation) throws Exception {
                    if (random.nextFloat() < 0.1f) {
                        isUp = !isUp;
                        LOG.info("transitioned " + addr + " to " + (isUp ? "up" : "down"));
                    }
                    if (!isUp) {
                        throw new IOException("Injected - faking being down");
                    }
                    if (invocation.getMethod().getName().equals("acceptRecovery")) {
                        if (random.nextFloat() < 0.1f) {
                            ((JournalFaultInjector)Mockito.doThrow((Throwable[])new Throwable[]{new IOException("Injected - faking fault before persisting paxos data")}).when((Object)faultInjector)).beforePersistPaxosData();
                        } else if (random.nextFloat() < 0.1f) {
                            ((JournalFaultInjector)Mockito.doThrow((Throwable[])new Throwable[]{new IOException("Injected - faking fault after persisting paxos data")}).when((Object)faultInjector)).afterPersistPaxosData();
                        }
                    }
                }

                @Override
                public void afterCall(InvocationOnMock invocation, boolean succeeded) {
                    Mockito.reset((Object[])new JournalFaultInjector[]{faultInjector});
                }
            });
        }

        protected ExecutorService createSingleThreadExecutor() {
            return new DirectExecutorService();
        }
    }
}

