/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.yarn.event;

import java.lang.reflect.Field;
import java.util.HashMap;
import java.util.HashSet;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.metrics2.AbstractMetric;
import org.apache.hadoop.metrics2.MetricsCollector;
import org.apache.hadoop.metrics2.MetricsRecord;
import org.apache.hadoop.metrics2.impl.MetricsCollectorImpl;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.metrics2.lib.Interns;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.AsyncDispatcher;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.DrainDispatcher;
import org.apache.hadoop.yarn.event.Event;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.metrics.EventTypeMetrics;
import org.apache.hadoop.yarn.metrics.GenericEventTypeMetrics;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;
import org.mockito.verification.VerificationMode;
import org.slf4j.Logger;

public class TestAsyncDispatcher {
    @Test(timeout=10000L)
    public void testDispatcherOnCloseIfQueueEmpty() throws Exception {
        BlockingQueue eventQueue = (BlockingQueue)Mockito.spy(new LinkedBlockingQueue());
        Event event = (Event)Mockito.mock(Event.class);
        ((BlockingQueue)Mockito.doThrow((Throwable[])new Throwable[]{new InterruptedException()}).when((Object)eventQueue)).put(event);
        DrainDispatcher disp = new DrainDispatcher(eventQueue);
        disp.init(new Configuration());
        disp.setDrainEventsOnStop();
        disp.start();
        disp.waitForEventThreadToWait();
        try {
            disp.getEventHandler().handle(event);
            Assert.fail((String)"Expected YarnRuntimeException");
        }
        catch (YarnRuntimeException e) {
            Assert.assertTrue((boolean)(e.getCause() instanceof InterruptedException));
        }
        Assert.assertTrue((String)"Event Queue should have been empty", (boolean)eventQueue.isEmpty());
        disp.close();
    }

    @Test(timeout=10000L)
    public void testDispatchStopOnTimeout() throws Exception {
        BlockingQueue<Event> eventQueue = new LinkedBlockingQueue();
        eventQueue = (BlockingQueue)Mockito.spy(eventQueue);
        Mockito.when((Object)eventQueue.isEmpty()).thenReturn((Object)false);
        YarnConfiguration conf = new YarnConfiguration();
        conf.setInt("yarn.dispatcher.drain-events.timeout", 2000);
        DrainDispatcher disp = new DrainDispatcher(eventQueue);
        disp.init((Configuration)conf);
        disp.setDrainEventsOnStop();
        disp.start();
        disp.waitForEventThreadToWait();
        disp.close();
    }

    private void dispatchDummyEvents(Dispatcher disp, int count) {
        for (int i = 0; i < count; ++i) {
            Event event = (Event)Mockito.mock(Event.class);
            Mockito.when((Object)event.getType()).thenReturn((Object)DummyType.DUMMY);
            disp.getEventHandler().handle(event);
        }
    }

    @Test(timeout=10000L)
    public void testDrainDispatcherDrainEventsOnStop() throws Exception {
        YarnConfiguration conf = new YarnConfiguration();
        conf.setInt("yarn.dispatcher.drain-events.timeout", 2000);
        LinkedBlockingQueue<Event> queue = new LinkedBlockingQueue<Event>();
        DrainDispatcher disp = new DrainDispatcher(queue);
        disp.init((Configuration)conf);
        disp.register(DummyType.class, new DummyHandler());
        disp.setDrainEventsOnStop();
        disp.start();
        disp.waitForEventThreadToWait();
        this.dispatchDummyEvents((Dispatcher)disp, 2);
        disp.close();
        Assert.assertEquals((long)0L, (long)queue.size());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(timeout=10000L)
    public void testPrintDispatcherEventDetails() throws Exception {
        YarnConfiguration conf = new YarnConfiguration();
        conf.setInt("yarn.dispatcher.print-events-info.threshold", 5000);
        Logger log = (Logger)Mockito.mock(Logger.class);
        AsyncDispatcher dispatcher = new AsyncDispatcher();
        dispatcher.init((Configuration)conf);
        Field logger = AsyncDispatcher.class.getDeclaredField("LOG");
        logger.setAccessible(true);
        Field modifiers = Field.class.getDeclaredField("modifiers");
        modifiers.setAccessible(true);
        modifiers.setInt(logger, logger.getModifiers() & 0xFFFFFFEF);
        Object oldLog = logger.get(null);
        try {
            logger.set(null, log);
            dispatcher.register(TestEnum.class, (EventHandler)new TestHandler());
            dispatcher.start();
            for (int i = 0; i < 10000; ++i) {
                Event event = (Event)Mockito.mock(Event.class);
                Mockito.when((Object)event.getType()).thenReturn((Object)TestEnum.TestEventType);
                dispatcher.getEventHandler().handle(event);
            }
            Thread.sleep(2000L);
            ((Logger)Mockito.verify((Object)log, (VerificationMode)Mockito.atLeastOnce())).info("Latest dispatch event type: TestEventType");
        }
        finally {
            logger.set(null, oldLog);
            dispatcher.stop();
        }
    }

    @Test(timeout=60000L)
    public void testPrintDispatcherEventDetailsAvoidDeadLoop() throws Exception {
        for (int i = 0; i < 5; ++i) {
            this.testPrintDispatcherEventDetailsAvoidDeadLoopInternal();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void testPrintDispatcherEventDetailsAvoidDeadLoopInternal() throws Exception {
        YarnConfiguration conf = new YarnConfiguration();
        conf.setInt("yarn.dispatcher.print-events-info.threshold", 10);
        Logger log = (Logger)Mockito.mock(Logger.class);
        AsyncDispatcher dispatcher = new AsyncDispatcher();
        dispatcher.init((Configuration)conf);
        Field logger = AsyncDispatcher.class.getDeclaredField("LOG");
        logger.setAccessible(true);
        Field modifiers = Field.class.getDeclaredField("modifiers");
        modifiers.setAccessible(true);
        modifiers.setInt(logger, logger.getModifiers() & 0xFFFFFFEF);
        Object oldLog = logger.get(null);
        try {
            logger.set(null, log);
            dispatcher.register(TestEnum.class, (EventHandler)new TestHandler(0L));
            dispatcher.start();
            for (int i = 0; i < 10000; ++i) {
                Event event = (Event)Mockito.mock(Event.class);
                Mockito.when((Object)event.getType()).thenReturn((Object)TestEnum.TestEventType);
                dispatcher.getEventHandler().handle(event);
            }
            Thread.sleep(3000L);
        }
        finally {
            logger.set(null, oldLog);
            dispatcher.stop();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testMetricsForDispatcher() throws Exception {
        YarnConfiguration conf = new YarnConfiguration();
        try (AsyncDispatcher dispatcher = null;){
            Event event;
            int i;
            dispatcher = new AsyncDispatcher("RM Event dispatcher");
            GenericEventTypeMetrics genericEventTypeMetrics = new GenericEventTypeMetrics.EventTypeMetricsBuilder().setMs(DefaultMetricsSystem.instance()).setInfo(Interns.info((String)("GenericEventTypeMetrics for " + TestEnum.class.getName()), (String)("Metrics for " + dispatcher.getName()))).setEnumClass(TestEnum.class).setEnums((Enum[])TestEnum.class.getEnumConstants()).build().registerMetrics();
            dispatcher.addMetrics((EventTypeMetrics)genericEventTypeMetrics, genericEventTypeMetrics.getEnumClass());
            dispatcher.init((Configuration)conf);
            dispatcher.register(TestEnum.class, (EventHandler)new TestHandler());
            dispatcher.start();
            for (i = 0; i < 3; ++i) {
                event = (Event)Mockito.mock(Event.class);
                Mockito.when((Object)event.getType()).thenReturn((Object)TestEnum.TestEventType);
                dispatcher.getEventHandler().handle(event);
            }
            for (i = 0; i < 2; ++i) {
                event = (Event)Mockito.mock(Event.class);
                Mockito.when((Object)event.getType()).thenReturn((Object)TestEnum.TestEventType2);
                dispatcher.getEventHandler().handle(event);
            }
            GenericTestUtils.waitFor(() -> genericEventTypeMetrics.get((Enum)TestEnum.TestEventType) == 3L, (long)1000L, (long)10000L);
            GenericTestUtils.waitFor(() -> genericEventTypeMetrics.get((Enum)TestEnum.TestEventType2) == 2L, (long)1000L, (long)10000L);
            Assert.assertTrue((genericEventTypeMetrics.getTotalProcessingTime((Enum)TestEnum.TestEventType) >= 4500L ? 1 : 0) != 0);
            Assert.assertTrue((genericEventTypeMetrics.getTotalProcessingTime((Enum)TestEnum.TestEventType) < 6000L ? 1 : 0) != 0);
            Assert.assertTrue((genericEventTypeMetrics.getTotalProcessingTime((Enum)TestEnum.TestEventType2) >= 3000L ? 1 : 0) != 0);
            Assert.assertTrue((genericEventTypeMetrics.getTotalProcessingTime((Enum)TestEnum.TestEventType2) < 4500L ? 1 : 0) != 0);
            Assert.assertEquals((Object)Long.toString(genericEventTypeMetrics.get((Enum)TestEnum.TestEventType)), (Object)genericEventTypeMetrics.getRegistry().get("TestEventType_event_count").toString());
            Assert.assertEquals((Object)Long.toString(genericEventTypeMetrics.get((Enum)TestEnum.TestEventType2)), (Object)genericEventTypeMetrics.getRegistry().get("TestEventType2_event_count").toString());
            Assert.assertEquals((Object)Long.toString(genericEventTypeMetrics.getTotalProcessingTime((Enum)TestEnum.TestEventType)), (Object)genericEventTypeMetrics.getRegistry().get("TestEventType_processing_time").toString());
            Assert.assertEquals((Object)Long.toString(genericEventTypeMetrics.getTotalProcessingTime((Enum)TestEnum.TestEventType2)), (Object)genericEventTypeMetrics.getRegistry().get("TestEventType2_processing_time").toString());
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testDispatcherMetricsHistogram() throws Exception {
        YarnConfiguration conf = new YarnConfiguration();
        try (AsyncDispatcher dispatcher = null;){
            Event event;
            int i;
            dispatcher = new AsyncDispatcher("RM Event dispatcher");
            GenericEventTypeMetrics genericEventTypeMetrics = new GenericEventTypeMetrics.EventTypeMetricsBuilder().setMs(DefaultMetricsSystem.instance()).setInfo(Interns.info((String)("GenericEventTypeMetrics for " + TestEnum.class.getName()), (String)("Metrics for " + dispatcher.getName()))).setEnumClass(TestEnum.class).setEnums((Enum[])TestEnum.class.getEnumConstants()).build().registerMetrics();
            dispatcher.addMetrics((EventTypeMetrics)genericEventTypeMetrics, genericEventTypeMetrics.getEnumClass());
            dispatcher.init((Configuration)conf);
            dispatcher.register(TestEnum.class, (EventHandler)new TestHandler());
            dispatcher.start();
            for (i = 0; i < 3; ++i) {
                event = (Event)Mockito.mock(Event.class);
                Mockito.when((Object)event.getType()).thenReturn((Object)TestEnum.TestEventType);
                dispatcher.getEventHandler().handle(event);
            }
            for (i = 0; i < 2; ++i) {
                event = (Event)Mockito.mock(Event.class);
                Mockito.when((Object)event.getType()).thenReturn((Object)TestEnum.TestEventType2);
                dispatcher.getEventHandler().handle(event);
            }
            GenericTestUtils.waitFor(() -> genericEventTypeMetrics.get((Enum)TestEnum.TestEventType) == 3L, (long)1000L, (long)10000L);
            GenericTestUtils.waitFor(() -> genericEventTypeMetrics.get((Enum)TestEnum.TestEventType2) == 2L, (long)1000L, (long)10000L);
            HashMap<String, Long> expectedValues = new HashMap<String, Long>();
            expectedValues.put("TestEventType_event_count", genericEventTypeMetrics.get((Enum)TestEnum.TestEventType));
            expectedValues.put("TestEventType_processing_time", genericEventTypeMetrics.getTotalProcessingTime((Enum)TestEnum.TestEventType));
            expectedValues.put("TestEventType2_event_count", genericEventTypeMetrics.get((Enum)TestEnum.TestEventType2));
            expectedValues.put("TestEventType2_processing_time", genericEventTypeMetrics.getTotalProcessingTime((Enum)TestEnum.TestEventType2));
            HashSet<String> testResults = new HashSet<String>();
            MetricsCollectorImpl collector = new MetricsCollectorImpl();
            genericEventTypeMetrics.getMetrics((MetricsCollector)collector, true);
            for (MetricsRecord record : collector.getRecords()) {
                for (AbstractMetric metric : record.metrics()) {
                    String metricName = metric.name();
                    if (!expectedValues.containsKey(metricName)) continue;
                    Long expectedValue = (Long)expectedValues.get(metricName);
                    Assert.assertEquals((String)("Metric " + metricName + " doesn't have expected value"), (Object)expectedValue, (Object)metric.value());
                    testResults.add(metricName);
                }
            }
            Assert.assertEquals(expectedValues.keySet(), testResults);
        }
    }

    private static enum TestEnum {
        TestEventType,
        TestEventType2;

    }

    private static class TestHandler
    implements EventHandler<Event> {
        private long sleepTime = 1500L;

        TestHandler() {
        }

        TestHandler(long sleepTime) {
            this.sleepTime = sleepTime;
        }

        public void handle(Event event) {
            try {
                Thread.sleep(this.sleepTime);
            }
            catch (InterruptedException interruptedException) {
                // empty catch block
            }
        }
    }

    private static enum DummyType {
        DUMMY;

    }

    private static class DummyHandler
    implements EventHandler<Event> {
        private DummyHandler() {
        }

        public void handle(Event event) {
            try {
                Thread.sleep(500L);
            }
            catch (InterruptedException interruptedException) {
                // empty catch block
            }
        }
    }
}

