/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.operators;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.accumulators.Accumulator;
import org.apache.flink.api.common.distributions.DataDistribution;
import org.apache.flink.api.common.functions.Function;
import org.apache.flink.api.common.functions.GroupCombineFunction;
import org.apache.flink.api.common.functions.Partitioner;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.functions.util.FunctionUtils;
import org.apache.flink.api.common.typeutils.TypeComparator;
import org.apache.flink.api.common.typeutils.TypeComparatorFactory;
import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.broadcast.BroadcastVariableMaterialization;
import org.apache.flink.runtime.execution.CancelTaskException;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.network.api.reader.MutableReader;
import org.apache.flink.runtime.io.network.api.reader.MutableRecordReader;
import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
import org.apache.flink.runtime.io.network.partition.consumer.UnionInputGate;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.memory.MemoryManager;
import org.apache.flink.runtime.metrics.groups.OperatorMetricGroup;
import org.apache.flink.runtime.operators.Driver;
import org.apache.flink.runtime.operators.ResettableDriver;
import org.apache.flink.runtime.operators.TaskContext;
import org.apache.flink.runtime.operators.TempBarrier;
import org.apache.flink.runtime.operators.chaining.ChainedDriver;
import org.apache.flink.runtime.operators.chaining.ExceptionInChainedStubException;
import org.apache.flink.runtime.operators.resettable.SpillingResettableMutableObjectIterator;
import org.apache.flink.runtime.operators.shipping.OutputCollector;
import org.apache.flink.runtime.operators.shipping.OutputEmitter;
import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
import org.apache.flink.runtime.operators.sort.CombiningUnilateralSortMerger;
import org.apache.flink.runtime.operators.sort.UnilateralSortMerger;
import org.apache.flink.runtime.operators.util.CloseableInputProvider;
import org.apache.flink.runtime.operators.util.DistributedRuntimeUDFContext;
import org.apache.flink.runtime.operators.util.LocalStrategy;
import org.apache.flink.runtime.operators.util.ReaderIterator;
import org.apache.flink.runtime.operators.util.TaskConfig;
import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
import org.apache.flink.util.Collector;
import org.apache.flink.util.InstantiationUtil;
import org.apache.flink.util.MutableObjectIterator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class BatchTask<S extends Function, OT>
extends AbstractInvokable
implements TaskContext<S, OT> {
    protected static final Logger LOG = LoggerFactory.getLogger(BatchTask.class);
    protected volatile Driver<S, OT> driver;
    protected S stub;
    protected DistributedRuntimeUDFContext runtimeUdfContext;
    protected Collector<OT> output;
    protected List<RecordWriter<?>> eventualOutputs;
    protected MutableReader<?>[] inputReaders;
    protected MutableReader<?>[] broadcastInputReaders;
    protected MutableObjectIterator<?>[] inputIterators;
    protected int[] iterativeInputs;
    protected int[] iterativeBroadcastInputs;
    protected volatile CloseableInputProvider<?>[] localStrategies;
    protected volatile TempBarrier<?>[] tempBarriers;
    protected volatile SpillingResettableMutableObjectIterator<?>[] resettableInputs;
    protected MutableObjectIterator<?>[] inputs;
    protected TypeSerializerFactory<?>[] inputSerializers;
    protected TypeSerializerFactory<?>[] broadcastInputSerializers;
    protected TypeComparator<?>[] inputComparators;
    protected TaskConfig config;
    protected ArrayList<ChainedDriver<?, ?>> chainedTasks;
    private boolean[] excludeFromReset;
    private boolean[] inputIsCached;
    private boolean[] inputIsAsyncMaterialized;
    private int[] materializationMemory;
    protected volatile boolean running = true;
    protected Map<String, Accumulator<?, ?>> accumulatorMap;
    private OperatorMetricGroup metrics;

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void invoke() throws Exception {
        if (LOG.isDebugEnabled()) {
            LOG.debug(this.formatLogString("Start registering input and output."));
        }
        Configuration taskConf = this.getTaskConfiguration();
        this.config = new TaskConfig(taskConf);
        Class driverClass = this.config.getDriver();
        this.driver = (Driver)InstantiationUtil.instantiate(driverClass, Driver.class);
        String headName = this.getEnvironment().getTaskInfo().getTaskName().split("->")[0].trim();
        this.metrics = this.getEnvironment().getMetricGroup().addOperator(headName.startsWith("CHAIN") ? headName.substring(6) : headName);
        this.metrics.getIOMetricGroup().reuseInputMetricsForTask();
        if (this.config.getNumberOfChainedStubs() == 0) {
            this.metrics.getIOMetricGroup().reuseOutputMetricsForTask();
        }
        this.initInputReaders();
        this.initBroadcastInputReaders();
        this.initOutputs();
        if (LOG.isDebugEnabled()) {
            LOG.debug(this.formatLogString("Finished registering input and output."));
        }
        if (LOG.isDebugEnabled()) {
            LOG.debug(this.formatLogString("Start task code."));
        }
        this.runtimeUdfContext = this.createRuntimeContext(this.metrics);
        try {
            try {
                int numberOfEventsUntilInterrupt;
                int i;
                int numInputs = this.driver.getNumberOfInputs();
                int numComparators = this.driver.getNumberOfDriverComparators();
                int numBroadcastInputs = this.config.getNumBroadcastInputs();
                this.initInputsSerializersAndComparators(numInputs, numComparators);
                this.initBroadcastInputsSerializers(numBroadcastInputs);
                ArrayList<Integer> iterativeInputs = new ArrayList<Integer>();
                for (i = 0; i < numInputs; ++i) {
                    numberOfEventsUntilInterrupt = this.getTaskConfig().getNumberOfEventsUntilInterruptInIterativeGate(i);
                    if (numberOfEventsUntilInterrupt < 0) {
                        throw new IllegalArgumentException();
                    }
                    if (numberOfEventsUntilInterrupt <= 0) continue;
                    this.inputReaders[i].setIterativeReader();
                    iterativeInputs.add(i);
                    if (!LOG.isDebugEnabled()) continue;
                    LOG.debug(this.formatLogString("Input [" + i + "] reads in supersteps with [" + numberOfEventsUntilInterrupt + "] event(s) till next superstep."));
                }
                this.iterativeInputs = BatchTask.asArray(iterativeInputs);
                ArrayList<Integer> iterativeBcInputs = new ArrayList<Integer>();
                for (i = 0; i < numBroadcastInputs; ++i) {
                    numberOfEventsUntilInterrupt = this.getTaskConfig().getNumberOfEventsUntilInterruptInIterativeBroadcastGate(i);
                    if (numberOfEventsUntilInterrupt < 0) {
                        throw new IllegalArgumentException();
                    }
                    if (numberOfEventsUntilInterrupt <= 0) continue;
                    this.broadcastInputReaders[i].setIterativeReader();
                    iterativeBcInputs.add(i);
                    if (!LOG.isDebugEnabled()) continue;
                    LOG.debug(this.formatLogString("Broadcast input [" + i + "] reads in supersteps with [" + numberOfEventsUntilInterrupt + "] event(s) till next superstep."));
                }
                this.iterativeBroadcastInputs = BatchTask.asArray(iterativeBcInputs);
                this.initLocalStrategies(numInputs);
            }
            catch (Exception e) {
                throw new RuntimeException("Initializing the input processing failed" + (e.getMessage() == null ? "." : ": " + e.getMessage()), e);
            }
            if (!this.running) {
                if (LOG.isDebugEnabled()) {
                    LOG.debug(this.formatLogString("Task cancelled before task code was started."));
                }
                return;
            }
            this.initialize();
            for (int i = 0; i < this.config.getNumBroadcastInputs(); ++i) {
                String name = this.config.getBroadcastInputName(i);
                this.readAndSetBroadcastInput(i, name, this.runtimeUdfContext, 1);
            }
            this.run();
        }
        finally {
            this.closeLocalStrategiesAndCaches();
            BatchTask.clearReaders(this.inputReaders);
            BatchTask.clearWriters(this.eventualOutputs);
        }
        if (this.running) {
            if (LOG.isDebugEnabled()) {
                LOG.debug(this.formatLogString("Finished task code."));
            }
        } else if (LOG.isDebugEnabled()) {
            LOG.debug(this.formatLogString("Task code cancelled."));
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void cancel() throws Exception {
        this.running = false;
        if (LOG.isDebugEnabled()) {
            LOG.debug(this.formatLogString("Cancelling task code"));
        }
        try {
            if (this.driver != null) {
                this.driver.cancel();
            }
        }
        finally {
            this.closeLocalStrategiesAndCaches();
        }
    }

    protected void initialize() throws Exception {
        try {
            this.driver.setup(this);
        }
        catch (Throwable t) {
            throw new Exception("The driver setup for '" + this.getEnvironment().getTaskInfo().getTaskName() + "' , caused an error: " + t.getMessage(), t);
        }
        try {
            Class<S> userCodeFunctionType = this.driver.getStubType();
            if (userCodeFunctionType != null) {
                this.stub = this.initStub(userCodeFunctionType);
            }
        }
        catch (Exception e) {
            throw new RuntimeException("Initializing the UDF" + (e.getMessage() == null ? "." : ": " + e.getMessage()), e);
        }
    }

    protected <X> void readAndSetBroadcastInput(int inputNum, String bcVarName, DistributedRuntimeUDFContext context, int superstep) throws IOException {
        if (LOG.isDebugEnabled()) {
            LOG.debug(this.formatLogString("Setting broadcast variable '" + bcVarName + "'" + (superstep > 1 ? ", superstep " + superstep : "")));
        }
        TypeSerializerFactory<?> serializerFactory = this.broadcastInputSerializers[inputNum];
        MutableReader<?> reader = this.broadcastInputReaders[inputNum];
        BroadcastVariableMaterialization<?, ?> variable = this.getEnvironment().getBroadcastVariableManager().materializeBroadcastVariable(bcVarName, superstep, this, reader, serializerFactory);
        context.setBroadcastVariable(bcVarName, variable);
    }

    protected void releaseBroadcastVariables(String bcVarName, int superstep, DistributedRuntimeUDFContext context) {
        if (LOG.isDebugEnabled()) {
            LOG.debug(this.formatLogString("Releasing broadcast variable '" + bcVarName + "'" + (superstep > 1 ? ", superstep " + superstep : "")));
        }
        this.getEnvironment().getBroadcastVariableManager().releaseReference(bcVarName, superstep, this);
        context.clearBroadcastVariable(bcVarName);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void run() throws Exception {
        if (!this.running) {
            return;
        }
        boolean stubOpen = false;
        try {
            try {
                this.driver.prepare();
            }
            catch (Throwable t) {
                throw new Exception("The data preparation for task '" + this.getEnvironment().getTaskInfo().getTaskName() + "' , caused an error: " + t.getMessage(), t);
            }
            if (!this.running) {
                return;
            }
            BatchTask.openChainedTasks(this.chainedTasks, this);
            if (this.stub != null) {
                try {
                    Configuration stubConfig = this.config.getStubParameters();
                    FunctionUtils.openFunction(this.stub, (Configuration)stubConfig);
                    stubOpen = true;
                }
                catch (Throwable t) {
                    throw new Exception("The user defined 'open()' method caused an exception: " + t.getMessage(), t);
                }
            }
            this.driver.run();
            if (this.running && this.stub != null) {
                FunctionUtils.closeFunction(this.stub);
                stubOpen = false;
            }
            this.output.close();
            BatchTask.closeChainedTasks(this.chainedTasks, this);
        }
        catch (Exception ex) {
            if (stubOpen) {
                try {
                    FunctionUtils.closeFunction(this.stub);
                }
                catch (Throwable throwable) {
                    // empty catch block
                }
            }
            if (this.driver instanceof ResettableDriver) {
                ResettableDriver resDriver = (ResettableDriver)this.driver;
                try {
                    resDriver.teardown();
                }
                catch (Throwable t) {
                    throw new Exception("Error while shutting down an iterative operator: " + t.getMessage(), t);
                }
            }
            BatchTask.cancelChainedTasks(this.chainedTasks);
            ex = ExceptionInChainedStubException.exceptionUnwrap(ex);
            if (ex instanceof CancelTaskException) {
                throw ex;
            }
            if (this.running) {
                BatchTask.logAndThrowException(ex, this);
            }
        }
        finally {
            this.driver.cleanup();
        }
    }

    protected void closeLocalStrategiesAndCaches() {
        int i;
        if (LOG.isDebugEnabled()) {
            LOG.debug(this.formatLogString("Releasing all broadcast variables."));
        }
        this.getEnvironment().getBroadcastVariableManager().releaseAllReferencesFromTask(this);
        if (this.runtimeUdfContext != null) {
            this.runtimeUdfContext.clearAllBroadcastVariables();
        }
        if (this.localStrategies != null) {
            for (i = 0; i < this.localStrategies.length; ++i) {
                if (this.localStrategies[i] == null) continue;
                try {
                    this.localStrategies[i].close();
                    continue;
                }
                catch (Throwable t) {
                    LOG.error("Error closing local strategy for input " + i, t);
                }
            }
        }
        if (this.tempBarriers != null) {
            for (i = 0; i < this.tempBarriers.length; ++i) {
                if (this.tempBarriers[i] == null) continue;
                try {
                    this.tempBarriers[i].close();
                    continue;
                }
                catch (Throwable t) {
                    LOG.error("Error closing temp barrier for input " + i, t);
                }
            }
        }
        if (this.resettableInputs != null) {
            for (i = 0; i < this.resettableInputs.length; ++i) {
                if (this.resettableInputs[i] == null) continue;
                try {
                    this.resettableInputs[i].close();
                    continue;
                }
                catch (Throwable t) {
                    LOG.error("Error closing cache for input " + i, t);
                }
            }
        }
    }

    protected Collector<OT> getLastOutputCollector() {
        int numChained = this.chainedTasks.size();
        return numChained == 0 ? this.output : this.chainedTasks.get(numChained - 1).getOutputCollector();
    }

    protected void setLastOutputCollector(Collector<OT> newOutputCollector) {
        int numChained = this.chainedTasks.size();
        if (numChained == 0) {
            this.output = newOutputCollector;
            return;
        }
        this.chainedTasks.get(numChained - 1).setOutputCollector(newOutputCollector);
    }

    public TaskConfig getLastTasksConfig() {
        int numChained = this.chainedTasks.size();
        return numChained == 0 ? this.config : this.chainedTasks.get(numChained - 1).getTaskConfig();
    }

    protected S initStub(Class<? super S> stubSuperClass) throws Exception {
        try {
            ClassLoader userCodeClassLoader = this.getUserCodeClassLoader();
            Function stub = (Function)this.config.getStubWrapper(userCodeClassLoader).getUserCodeObject(stubSuperClass, userCodeClassLoader);
            if (stubSuperClass != null && !stubSuperClass.isAssignableFrom(stub.getClass())) {
                throw new RuntimeException("The class '" + stub.getClass().getName() + "' is not a subclass of '" + stubSuperClass.getName() + "' as is required.");
            }
            FunctionUtils.setFunctionRuntimeContext((Function)stub, (RuntimeContext)this.runtimeUdfContext);
            return (S)stub;
        }
        catch (ClassCastException ccex) {
            throw new Exception("The stub class is not a proper subclass of " + stubSuperClass.getName(), ccex);
        }
    }

    protected void initInputReaders() throws Exception {
        int numInputs = this.getNumTaskInputs();
        MutableReader[] inputReaders = new MutableReader[numInputs];
        int currentReaderOffset = 0;
        for (int i = 0; i < numInputs; ++i) {
            int groupSize = this.config.getGroupSize(i);
            if (groupSize == 1) {
                inputReaders[i] = new MutableRecordReader(this.getEnvironment().getInputGate(currentReaderOffset), this.getEnvironment().getTaskManagerInfo().getTmpDirectories());
            } else if (groupSize > 1) {
                InputGate[] readers = new InputGate[groupSize];
                for (int j = 0; j < groupSize; ++j) {
                    readers[j] = this.getEnvironment().getInputGate(currentReaderOffset + j);
                }
                inputReaders[i] = new MutableRecordReader(new UnionInputGate(readers), this.getEnvironment().getTaskManagerInfo().getTmpDirectories());
            } else {
                throw new Exception("Illegal input group size in task configuration: " + groupSize);
            }
            currentReaderOffset += groupSize;
        }
        this.inputReaders = inputReaders;
        if (currentReaderOffset != this.config.getNumInputs()) {
            throw new Exception("Illegal configuration: Number of input gates and group sizes are not consistent.");
        }
    }

    protected void initBroadcastInputReaders() throws Exception {
        int numBroadcastInputs = this.config.getNumBroadcastInputs();
        MutableReader[] broadcastInputReaders = new MutableReader[numBroadcastInputs];
        int currentReaderOffset = this.config.getNumInputs();
        for (int i = 0; i < this.config.getNumBroadcastInputs(); ++i) {
            int groupSize = this.config.getBroadcastGroupSize(i);
            if (groupSize == 1) {
                broadcastInputReaders[i] = new MutableRecordReader(this.getEnvironment().getInputGate(currentReaderOffset), this.getEnvironment().getTaskManagerInfo().getTmpDirectories());
            } else if (groupSize > 1) {
                InputGate[] readers = new InputGate[groupSize];
                for (int j = 0; j < groupSize; ++j) {
                    readers[j] = this.getEnvironment().getInputGate(currentReaderOffset + j);
                }
                broadcastInputReaders[i] = new MutableRecordReader(new UnionInputGate(readers), this.getEnvironment().getTaskManagerInfo().getTmpDirectories());
            } else {
                throw new Exception("Illegal input group size in task configuration: " + groupSize);
            }
            currentReaderOffset += groupSize;
        }
        this.broadcastInputReaders = broadcastInputReaders;
    }

    protected void initInputsSerializersAndComparators(int numInputs, int numComparators) throws Exception {
        int i;
        this.inputSerializers = new TypeSerializerFactory[numInputs];
        this.inputComparators = numComparators > 0 ? new TypeComparator[numComparators] : null;
        this.inputIterators = new MutableObjectIterator[numInputs];
        ClassLoader userCodeClassLoader = this.getUserCodeClassLoader();
        for (i = 0; i < numInputs; ++i) {
            TypeSerializerFactory serializerFactory = this.config.getInputSerializer(i, userCodeClassLoader);
            this.inputSerializers[i] = serializerFactory;
            this.inputIterators[i] = this.createInputIterator(this.inputReaders[i], this.inputSerializers[i]);
        }
        for (i = 0; i < numComparators; ++i) {
            if (this.inputComparators == null) continue;
            TypeComparatorFactory comparatorFactory = this.config.getDriverComparator(i, userCodeClassLoader);
            this.inputComparators[i] = comparatorFactory.createComparator();
        }
    }

    protected void initBroadcastInputsSerializers(int numBroadcastInputs) throws Exception {
        this.broadcastInputSerializers = new TypeSerializerFactory[numBroadcastInputs];
        ClassLoader userCodeClassLoader = this.getUserCodeClassLoader();
        for (int i = 0; i < numBroadcastInputs; ++i) {
            TypeSerializerFactory serializerFactory = this.config.getBroadcastInputSerializer(i, userCodeClassLoader);
            this.broadcastInputSerializers[i] = serializerFactory;
        }
    }

    protected void initLocalStrategies(int numInputs) throws Exception {
        int i;
        MemoryManager memMan = this.getMemoryManager();
        IOManager ioMan = this.getIOManager();
        this.localStrategies = new CloseableInputProvider[numInputs];
        this.inputs = new MutableObjectIterator[numInputs];
        this.excludeFromReset = new boolean[numInputs];
        this.inputIsCached = new boolean[numInputs];
        this.inputIsAsyncMaterialized = new boolean[numInputs];
        this.materializationMemory = new int[numInputs];
        for (i = 0; i < numInputs; ++i) {
            this.initInputLocalStrategy(i);
        }
        this.resettableInputs = new SpillingResettableMutableObjectIterator[numInputs];
        this.tempBarriers = new TempBarrier[numInputs];
        for (i = 0; i < numInputs; ++i) {
            int memoryPages;
            boolean async = this.config.isInputAsynchronouslyMaterialized(i);
            boolean cached = this.config.isInputCached(i);
            this.inputIsAsyncMaterialized[i] = async;
            this.inputIsCached[i] = cached;
            if (async || cached) {
                memoryPages = memMan.computeNumberOfPages(this.config.getRelativeInputMaterializationMemory(i));
                if (memoryPages <= 0) {
                    throw new Exception("Input marked as materialized/cached, but no memory for materialization provided.");
                }
                this.materializationMemory[i] = memoryPages;
            } else {
                memoryPages = 0;
            }
            if (async) {
                TempBarrier barrier = new TempBarrier(this, this.getInput(i), this.inputSerializers[i], memMan, ioMan, memoryPages);
                barrier.startReading();
                this.tempBarriers[i] = barrier;
                this.inputs[i] = null;
                continue;
            }
            if (!cached) continue;
            SpillingResettableMutableObjectIterator iter = new SpillingResettableMutableObjectIterator(this.getInput(i), this.inputSerializers[i].getSerializer(), this.getMemoryManager(), this.getIOManager(), memoryPages, this);
            this.resettableInputs[i] = iter;
            this.inputs[i] = iter;
        }
    }

    protected void resetAllInputs() throws Exception {
        int i;
        for (i = 0; i < this.inputs.length; ++i) {
            if (!this.inputIsCached[i] || this.resettableInputs[i] == null) continue;
            this.resettableInputs[i].consumeAndCacheRemainingData();
        }
        for (i = 0; i < this.localStrategies.length; ++i) {
            if (this.localStrategies[i] == null) continue;
            this.localStrategies[i].close();
            this.localStrategies[i] = null;
        }
        MemoryManager memMan = this.getMemoryManager();
        IOManager ioMan = this.getIOManager();
        for (int i2 = 0; i2 < this.inputs.length; ++i2) {
            if (this.excludeFromReset[i2]) {
                if (this.tempBarriers[i2] != null) {
                    this.tempBarriers[i2].close();
                    this.tempBarriers[i2] = null;
                    continue;
                }
                if (this.resettableInputs[i2] == null) continue;
                this.resettableInputs[i2].close();
                this.resettableInputs[i2] = null;
                continue;
            }
            this.inputs[i2] = null;
            if (this.inputIsCached[i2]) {
                if (this.tempBarriers[i2] != null) {
                    this.inputs[i2] = this.tempBarriers[i2].getIterator();
                    continue;
                }
                if (this.resettableInputs[i2] != null) {
                    this.resettableInputs[i2].reset();
                    this.inputs[i2] = this.resettableInputs[i2];
                    continue;
                }
                throw new RuntimeException("Found a resettable input, but no temp barrier and no resettable iterator.");
            }
            if (this.tempBarriers[i2] != null) {
                this.tempBarriers[i2].close();
            }
            this.initInputLocalStrategy(i2);
            if (!this.inputIsAsyncMaterialized[i2]) continue;
            int pages = this.materializationMemory[i2];
            TempBarrier barrier = new TempBarrier(this, this.getInput(i2), this.inputSerializers[i2], memMan, ioMan, pages);
            barrier.startReading();
            this.tempBarriers[i2] = barrier;
            this.inputs[i2] = null;
        }
    }

    protected void excludeFromReset(int inputNum) {
        this.excludeFromReset[inputNum] = true;
    }

    private void initInputLocalStrategy(int inputNum) throws Exception {
        block12: {
            block11: {
                if (this.localStrategies[inputNum] != null) {
                    throw new IllegalStateException();
                }
                LocalStrategy localStrategy = this.config.getInputLocalStrategy(inputNum);
                if (localStrategy == null) break block11;
                switch (localStrategy) {
                    case NONE: {
                        this.inputs[inputNum] = this.inputIterators[inputNum];
                        break block12;
                    }
                    case SORT: {
                        UnilateralSortMerger sorter = new UnilateralSortMerger(this.getMemoryManager(), this.getIOManager(), this.inputIterators[inputNum], this, this.inputSerializers[inputNum], this.getLocalStrategyComparator(inputNum), this.config.getRelativeMemoryInput(inputNum), this.config.getFilehandlesInput(inputNum), this.config.getSpillingThresholdInput(inputNum), this.config.getUseLargeRecordHandler(), this.getExecutionConfig().isObjectReuseEnabled());
                        this.inputs[inputNum] = null;
                        this.localStrategies[inputNum] = sorter;
                        break block12;
                    }
                    case COMBININGSORT: {
                        S localStub;
                        if (inputNum != 0) {
                            throw new IllegalStateException("Performing combining sort outside a (group)reduce task!");
                        }
                        Class<S> userCodeFunctionType = this.driver.getStubType();
                        if (userCodeFunctionType == null) {
                            throw new IllegalStateException("Performing combining sort outside a reduce task!");
                        }
                        try {
                            localStub = this.initStub(userCodeFunctionType);
                        }
                        catch (Exception e) {
                            throw new RuntimeException("Initializing the user code and the configuration failed" + (e.getMessage() == null ? "." : ": " + e.getMessage()), e);
                        }
                        if (!(localStub instanceof GroupCombineFunction)) {
                            throw new IllegalStateException("Performing combining sort outside a reduce task!");
                        }
                        CombiningUnilateralSortMerger cSorter = new CombiningUnilateralSortMerger((GroupCombineFunction)localStub, this.getMemoryManager(), this.getIOManager(), this.inputIterators[inputNum], (AbstractInvokable)this, this.inputSerializers[inputNum], this.getLocalStrategyComparator(inputNum), this.config.getRelativeMemoryInput(inputNum), this.config.getFilehandlesInput(inputNum), this.config.getSpillingThresholdInput(inputNum), this.getTaskConfig().getUseLargeRecordHandler(), this.getExecutionConfig().isObjectReuseEnabled());
                        cSorter.setUdfConfiguration(this.config.getStubParameters());
                        this.inputs[inputNum] = null;
                        this.localStrategies[inputNum] = cSorter;
                        break block12;
                    }
                    default: {
                        throw new Exception("Unrecognized local strategy provided: " + localStrategy.name());
                    }
                }
            }
            this.inputs[inputNum] = this.inputIterators[inputNum];
        }
    }

    private <T> TypeComparator<T> getLocalStrategyComparator(int inputNum) throws Exception {
        TypeComparatorFactory compFact = this.config.getInputComparator(inputNum, this.getUserCodeClassLoader());
        if (compFact == null) {
            throw new Exception("Missing comparator factory for local strategy on input " + inputNum);
        }
        return compFact.createComparator();
    }

    protected MutableObjectIterator<?> createInputIterator(MutableReader<?> inputReader, TypeSerializerFactory<?> serializerFactory) {
        MutableReader reader = inputReader;
        ReaderIterator iter = new ReaderIterator(reader, serializerFactory.getSerializer());
        return iter;
    }

    protected int getNumTaskInputs() {
        return this.driver.getNumberOfInputs();
    }

    protected void initOutputs() throws Exception {
        this.chainedTasks = new ArrayList();
        this.eventualOutputs = new ArrayList();
        ClassLoader userCodeClassLoader = this.getUserCodeClassLoader();
        this.accumulatorMap = this.getEnvironment().getAccumulatorRegistry().getUserMap();
        this.output = BatchTask.initOutputs(this, userCodeClassLoader, this.config, this.chainedTasks, this.eventualOutputs, this.getExecutionConfig(), this.accumulatorMap);
    }

    public DistributedRuntimeUDFContext createRuntimeContext(MetricGroup metrics) {
        Environment env = this.getEnvironment();
        return new DistributedRuntimeUDFContext(env.getTaskInfo(), this.getUserCodeClassLoader(), this.getExecutionConfig(), env.getDistributedCacheEntries(), this.accumulatorMap, metrics);
    }

    @Override
    public TaskConfig getTaskConfig() {
        return this.config;
    }

    @Override
    public TaskManagerRuntimeInfo getTaskManagerInfo() {
        return this.getEnvironment().getTaskManagerInfo();
    }

    @Override
    public MemoryManager getMemoryManager() {
        return this.getEnvironment().getMemoryManager();
    }

    @Override
    public IOManager getIOManager() {
        return this.getEnvironment().getIOManager();
    }

    @Override
    public S getStub() {
        return this.stub;
    }

    @Override
    public Collector<OT> getOutputCollector() {
        return this.output;
    }

    @Override
    public AbstractInvokable getContainingTask() {
        return this;
    }

    @Override
    public String formatLogString(String message) {
        return BatchTask.constructLogString(message, this.getEnvironment().getTaskInfo().getTaskName(), this);
    }

    @Override
    public OperatorMetricGroup getMetricGroup() {
        return this.metrics;
    }

    @Override
    public <X> MutableObjectIterator<X> getInput(int index) {
        if (index < 0 || index > this.driver.getNumberOfInputs()) {
            throw new IndexOutOfBoundsException();
        }
        if (this.inputs[index] != null) {
            MutableObjectIterator<?> in = this.inputs[index];
            return in;
        }
        try {
            MutableObjectIterator<?> in;
            if (this.tempBarriers[index] != null) {
                MutableObjectIterator<?> iter;
                in = iter = this.tempBarriers[index].getIterator();
            } else if (this.localStrategies[index] != null) {
                MutableObjectIterator<?> iter = this.localStrategies[index].getIterator();
                in = iter;
            } else {
                throw new RuntimeException("Bug: null input iterator, null temp barrier, and null local strategy.");
            }
            this.inputs[index] = in;
            return in;
        }
        catch (InterruptedException iex) {
            throw new RuntimeException("Interrupted while waiting for input " + index + " to become available.");
        }
        catch (IOException ioex) {
            throw new RuntimeException("An I/O Exception occurred while obtaining input " + index + ".");
        }
    }

    @Override
    public <X> TypeSerializerFactory<X> getInputSerializer(int index) {
        if (index < 0 || index >= this.driver.getNumberOfInputs()) {
            throw new IndexOutOfBoundsException();
        }
        TypeSerializerFactory<?> serializerFactory = this.inputSerializers[index];
        return serializerFactory;
    }

    @Override
    public <X> TypeComparator<X> getDriverComparator(int index) {
        if (this.inputComparators == null) {
            throw new IllegalStateException("Comparators have not been created!");
        }
        if (index < 0 || index >= this.driver.getNumberOfDriverComparators()) {
            throw new IndexOutOfBoundsException();
        }
        TypeComparator<?> comparator = this.inputComparators[index];
        return comparator;
    }

    public static String constructLogString(String message, String taskName, AbstractInvokable parent) {
        return message + ":  " + taskName + " (" + (parent.getEnvironment().getTaskInfo().getIndexOfThisSubtask() + 1) + '/' + parent.getEnvironment().getTaskInfo().getNumberOfParallelSubtasks() + ')';
    }

    public static void logAndThrowException(Exception ex, AbstractInvokable parent) throws Exception {
        String taskName;
        if (ex instanceof ExceptionInChainedStubException) {
            ExceptionInChainedStubException cex;
            do {
                cex = (ExceptionInChainedStubException)ex;
                taskName = cex.getTaskName();
            } while ((ex = cex.getWrappedException()) instanceof ExceptionInChainedStubException);
        } else {
            taskName = parent.getEnvironment().getTaskInfo().getTaskName();
        }
        if (LOG.isErrorEnabled()) {
            LOG.error(BatchTask.constructLogString("Error in task code", taskName, parent), (Throwable)ex);
        }
        throw ex;
    }

    public static <T> Collector<T> getOutputCollector(AbstractInvokable task, TaskConfig config, ClassLoader cl, List<RecordWriter<?>> eventualOutputs, int outputOffset, int numOutputs) throws Exception {
        if (numOutputs == 0) {
            return null;
        }
        TypeSerializerFactory serializerFactory = config.getOutputSerializer(cl);
        ArrayList writers = new ArrayList(numOutputs);
        for (int i = 0; i < numOutputs; ++i) {
            OutputEmitter oe;
            ShipStrategyType strategy = config.getOutputShipStrategy(i);
            int indexInSubtaskGroup = task.getIndexInSubtaskGroup();
            TypeComparatorFactory compFactory = config.getOutputComparator(i, cl);
            if (compFactory == null) {
                oe = new OutputEmitter(strategy, indexInSubtaskGroup);
            } else {
                DataDistribution dataDist = config.getOutputDataDistribution(i, cl);
                Partitioner<?> partitioner = config.getOutputPartitioner(i, cl);
                TypeComparator comparator = compFactory.createComparator();
                oe = new OutputEmitter(strategy, indexInSubtaskGroup, comparator, partitioner, dataDist);
            }
            RecordWriter recordWriter = new RecordWriter(task.getEnvironment().getWriter(outputOffset + i), oe);
            recordWriter.setMetricGroup(task.getEnvironment().getMetricGroup().getIOMetricGroup());
            writers.add(recordWriter);
        }
        if (eventualOutputs != null) {
            eventualOutputs.addAll(writers);
        }
        return new OutputCollector(writers, serializerFactory.getSerializer());
    }

    public static <T> Collector<T> initOutputs(AbstractInvokable containingTask, ClassLoader cl, TaskConfig config, List<ChainedDriver<?, ?>> chainedTasksTarget, List<RecordWriter<?>> eventualOutputs, ExecutionConfig executionConfig, Map<String, Accumulator<?, ?>> accumulatorMap) throws Exception {
        int numOutputs = config.getNumOutputs();
        int numChained = config.getNumberOfChainedStubs();
        if (numChained > 0) {
            if (numOutputs != 1 || config.getOutputShipStrategy(0) != ShipStrategyType.FORWARD) {
                throw new RuntimeException("Plan Generation Bug: Found a chained stub that is not connected via an only forward connection.");
            }
            Collector<T> previous = null;
            for (int i = numChained - 1; i >= 0; --i) {
                ChainedDriver<?, ?> ct;
                try {
                    Class<ChainedDriver<?, ?>> ctc = config.getChainedTask(i);
                    ct = ctc.newInstance();
                }
                catch (Exception ex) {
                    throw new RuntimeException("Could not instantiate chained task driver.", ex);
                }
                TaskConfig chainedStubConf = config.getChainedStubConfig(i);
                String taskName = config.getChainedTaskName(i);
                if (i == numChained - 1) {
                    previous = BatchTask.getOutputCollector(containingTask, chainedStubConf, cl, eventualOutputs, 0, chainedStubConf.getNumOutputs());
                }
                ct.setup(chainedStubConf, taskName, previous, containingTask, cl, executionConfig, accumulatorMap);
                chainedTasksTarget.add(0, ct);
                if (i == numChained - 1) {
                    ct.getIOMetrics().reuseOutputMetricsForTask();
                }
                previous = ct;
            }
            return previous;
        }
        return BatchTask.getOutputCollector(containingTask, config, cl, eventualOutputs, 0, numOutputs);
    }

    public static void openUserCode(Function stub, Configuration parameters) throws Exception {
        try {
            FunctionUtils.openFunction((Function)stub, (Configuration)parameters);
        }
        catch (Throwable t) {
            throw new Exception("The user defined 'open(Configuration)' method in " + stub.getClass().toString() + " caused an exception: " + t.getMessage(), t);
        }
    }

    public static void closeUserCode(Function stub) throws Exception {
        try {
            FunctionUtils.closeFunction((Function)stub);
        }
        catch (Throwable t) {
            throw new Exception("The user defined 'close()' method caused an exception: " + t.getMessage(), t);
        }
    }

    public static void openChainedTasks(List<ChainedDriver<?, ?>> tasks, AbstractInvokable parent) throws Exception {
        for (int i = 0; i < tasks.size(); ++i) {
            ChainedDriver<?, ?> task = tasks.get(i);
            if (LOG.isDebugEnabled()) {
                LOG.debug(BatchTask.constructLogString("Start task code", task.getTaskName(), parent));
            }
            task.openTask();
        }
    }

    public static void closeChainedTasks(List<ChainedDriver<?, ?>> tasks, AbstractInvokable parent) throws Exception {
        for (int i = 0; i < tasks.size(); ++i) {
            ChainedDriver<?, ?> task = tasks.get(i);
            task.closeTask();
            if (!LOG.isDebugEnabled()) continue;
            LOG.debug(BatchTask.constructLogString("Finished task code", task.getTaskName(), parent));
        }
    }

    public static void cancelChainedTasks(List<ChainedDriver<?, ?>> tasks) {
        for (int i = 0; i < tasks.size(); ++i) {
            try {
                tasks.get(i).cancelTask();
                continue;
            }
            catch (Throwable throwable) {
                // empty catch block
            }
        }
    }

    public static <T> T instantiateUserCode(TaskConfig config, ClassLoader cl, Class<? super T> superClass) {
        try {
            Object stub = config.getStubWrapper(cl).getUserCodeObject(superClass, cl);
            if (superClass != null && !superClass.isAssignableFrom(stub.getClass())) {
                throw new RuntimeException("The class '" + stub.getClass().getName() + "' is not a subclass of '" + superClass.getName() + "' as is required.");
            }
            return (T)stub;
        }
        catch (ClassCastException ccex) {
            throw new RuntimeException("The UDF class is not a proper subclass of " + superClass.getName(), ccex);
        }
    }

    private static int[] asArray(List<Integer> list) {
        int[] a = new int[list.size()];
        int i = 0;
        for (int val : list) {
            a[i++] = val;
        }
        return a;
    }

    public static void clearWriters(List<RecordWriter<?>> writers) {
        for (RecordWriter<?> writer : writers) {
            writer.clearBuffers();
        }
    }

    public static void clearReaders(MutableReader<?>[] readers) {
        for (MutableReader<?> reader : readers) {
            reader.clearBuffers();
        }
    }
}

