package org.apache.hadoop.mapred.lib;

import java.io.IOException;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapRunnable;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.SkipBadRecords;
import org.apache.hadoop.mapreduce.lib.map.MultithreadedMapper;
import org.apache.hadoop.util.ReflectionUtils;

@InterfaceStability.Stable
@InterfaceAudience.Public
/* loaded from: input_file:hadoop-client-2.8.1/share/hadoop/client/lib/hadoop-mapreduce-client-core-2.8.1.jar:org/apache/hadoop/mapred/lib/MultithreadedMapRunner.class */
public class MultithreadedMapRunner<K1, V1, K2, V2> implements MapRunnable<K1, V1, K2, V2> {
    private static final Log LOG = LogFactory.getLog(MultithreadedMapRunner.class.getName());
    private JobConf job;
    private Mapper<K1, V1, K2, V2> mapper;
    private ExecutorService executorService;
    private volatile IOException ioException;
    private volatile RuntimeException runtimeException;
    private boolean incrProcCount;

    /* loaded from: input_file:hadoop-client-2.8.1/share/hadoop/client/lib/hadoop-mapreduce-client-core-2.8.1.jar:org/apache/hadoop/mapred/lib/MultithreadedMapRunner$BlockingArrayQueue.class */
    private static class BlockingArrayQueue extends ArrayBlockingQueue<Runnable> {
        private static final long serialVersionUID = 1;

        public BlockingArrayQueue(int i) {
            super(i);
        }

        @Override // java.util.concurrent.ArrayBlockingQueue, java.util.Queue, java.util.concurrent.BlockingQueue
        public boolean offer(Runnable runnable) {
            return add(runnable);
        }

        @Override // java.util.concurrent.ArrayBlockingQueue, java.util.AbstractQueue, java.util.AbstractCollection, java.util.Collection, java.util.Queue, java.util.concurrent.BlockingQueue
        public boolean add(Runnable runnable) {
            try {
                put(runnable);
                return true;
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                return true;
            }
        }
    }

    /* loaded from: input_file:hadoop-client-2.8.1/share/hadoop/client/lib/hadoop-mapreduce-client-core-2.8.1.jar:org/apache/hadoop/mapred/lib/MultithreadedMapRunner$MapperInvokeRunable.class */
    private class MapperInvokeRunable implements Runnable {
        private K1 key;
        private V1 value;
        private OutputCollector<K2, V2> output;
        private Reporter reporter;

        public MapperInvokeRunable(K1 k1, V1 v1, OutputCollector<K2, V2> outputCollector, Reporter reporter) {
            this.key = k1;
            this.value = v1;
            this.output = outputCollector;
            this.reporter = reporter;
        }

        @Override // java.lang.Runnable
        public void run() {
            try {
                MultithreadedMapRunner.this.mapper.map(this.key, this.value, this.output, this.reporter);
                if (MultithreadedMapRunner.this.incrProcCount) {
                    this.reporter.incrCounter(SkipBadRecords.COUNTER_GROUP, SkipBadRecords.COUNTER_MAP_PROCESSED_RECORDS, 1L);
                }
            } catch (IOException e) {
                synchronized (MultithreadedMapRunner.this) {
                    if (MultithreadedMapRunner.this.ioException == null) {
                        MultithreadedMapRunner.this.ioException = e;
                    }
                }
            } catch (RuntimeException e2) {
                synchronized (MultithreadedMapRunner.this) {
                    if (MultithreadedMapRunner.this.runtimeException == null) {
                        MultithreadedMapRunner.this.runtimeException = e2;
                    }
                }
            }
        }
    }

    @Override // org.apache.hadoop.mapred.JobConfigurable
    public void configure(JobConf jobConf) {
        int i = jobConf.getInt(MultithreadedMapper.NUM_THREADS, 10);
        if (LOG.isDebugEnabled()) {
            LOG.debug("Configuring jobConf " + jobConf.getJobName() + " to use " + i + " threads");
        }
        this.job = jobConf;
        this.incrProcCount = SkipBadRecords.getMapperMaxSkipRecords(this.job) > 0 && SkipBadRecords.getAutoIncrMapperProcCount(this.job);
        this.mapper = (Mapper) ReflectionUtils.newInstance(jobConf.getMapperClass(), jobConf);
        this.executorService = new ThreadPoolExecutor(i, i, 0L, TimeUnit.MILLISECONDS, new BlockingArrayQueue(i));
    }

    private void checkForExceptionsFromProcessingThreads() throws IOException, RuntimeException {
        if (this.ioException != null) {
            throw this.ioException;
        }
        if (this.runtimeException != null) {
            throw this.runtimeException;
        }
    }

    @Override // org.apache.hadoop.mapred.MapRunnable
    public void run(RecordReader<K1, V1> recordReader, OutputCollector<K2, V2> outputCollector, Reporter reporter) throws IOException {
        try {
            K1 createKey = recordReader.createKey();
            V1 createValue = recordReader.createValue();
            while (recordReader.next(createKey, createValue)) {
                this.executorService.execute(new MapperInvokeRunable(createKey, createValue, outputCollector, reporter));
                checkForExceptionsFromProcessingThreads();
                createKey = recordReader.createKey();
                createValue = recordReader.createValue();
            }
            if (LOG.isDebugEnabled()) {
                LOG.debug("Finished dispatching all Mappper.map calls, job " + this.job.getJobName());
            }
            this.executorService.shutdown();
            while (!this.executorService.awaitTermination(100L, TimeUnit.MILLISECONDS)) {
                try {
                    try {
                        if (LOG.isDebugEnabled()) {
                            LOG.debug("Awaiting all running Mappper.map calls to finish, job " + this.job.getJobName());
                        }
                        checkForExceptionsFromProcessingThreads();
                    } catch (InterruptedException e) {
                        throw new RuntimeException(e);
                    }
                } catch (IOException e2) {
                    this.executorService.shutdownNow();
                    throw e2;
                }
            }
            checkForExceptionsFromProcessingThreads();
        } finally {
            this.mapper.close();
        }
    }
}
