/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.io.network.partition.consumer;

import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.BitSet;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Timer;
import java.util.concurrent.CompletableFuture;
import javax.annotation.Nullable;
import javax.annotation.concurrent.GuardedBy;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.core.memory.MemorySegmentFactory;
import org.apache.flink.core.memory.MemorySegmentProvider;
import org.apache.flink.runtime.checkpoint.channel.InputChannelInfo;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.event.AbstractEvent;
import org.apache.flink.runtime.event.TaskEvent;
import org.apache.flink.runtime.execution.CancelTaskException;
import org.apache.flink.runtime.io.network.api.EndOfData;
import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.BufferDecompressor;
import org.apache.flink.runtime.io.network.buffer.BufferPool;
import org.apache.flink.runtime.io.network.buffer.BufferProvider;
import org.apache.flink.runtime.io.network.partition.PartitionProducerStateProvider;
import org.apache.flink.runtime.io.network.partition.PrioritizedDeque;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
import org.apache.flink.runtime.io.network.partition.consumer.GateNotificationHelper;
import org.apache.flink.runtime.io.network.partition.consumer.IndexedInputGate;
import org.apache.flink.runtime.io.network.partition.consumer.InputChannel;
import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
import org.apache.flink.runtime.io.network.partition.consumer.LocalInputChannel;
import org.apache.flink.runtime.io.network.partition.consumer.RecoveredInputChannel;
import org.apache.flink.runtime.io.network.partition.consumer.RemoteChannelStateChecker;
import org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel;
import org.apache.flink.runtime.io.network.partition.consumer.UnknownInputChannel;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
import org.apache.flink.runtime.shuffle.NettyShuffleDescriptor;
import org.apache.flink.runtime.throughput.BufferDebloater;
import org.apache.flink.runtime.throughput.ThroughputCalculator;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.function.SupplierWithException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SingleInputGate
extends IndexedInputGate {
    private static final Logger LOG = LoggerFactory.getLogger(SingleInputGate.class);
    private final Object requestLock = new Object();
    private final String owningTaskName;
    private final int gateIndex;
    private final IntermediateDataSetID consumedResultId;
    private final ResultPartitionType consumedPartitionType;
    private final int consumedSubpartitionIndex;
    private final int numberOfInputChannels;
    private final Map<IntermediateResultPartitionID, InputChannel> inputChannels;
    @GuardedBy(value="requestLock")
    private final InputChannel[] channels;
    private final PrioritizedDeque<InputChannel> inputChannelsWithData = new PrioritizedDeque();
    @GuardedBy(value="inputChannelsWithData")
    private final BitSet enqueuedInputChannelsWithData;
    @GuardedBy(value="inputChannelsWithData")
    private final BitSet channelsWithEndOfPartitionEvents;
    @GuardedBy(value="inputChannelsWithData")
    private final BitSet channelsWithEndOfUserRecords;
    @GuardedBy(value="inputChannelsWithData")
    private int[] lastPrioritySequenceNumber;
    private final PartitionProducerStateProvider partitionProducerStateProvider;
    private BufferPool bufferPool;
    private boolean hasReceivedAllEndOfPartitionEvents;
    private boolean hasReceivedEndOfData;
    private boolean requestedPartitionsFlag;
    private final List<TaskEvent> pendingEvents = new ArrayList<TaskEvent>();
    private int numberOfUninitializedChannels;
    private Timer retriggerLocalRequestTimer;
    private final SupplierWithException<BufferPool, IOException> bufferPoolFactory;
    private final CompletableFuture<Void> closeFuture;
    @Nullable
    private final BufferDecompressor bufferDecompressor;
    private final MemorySegmentProvider memorySegmentProvider;
    private final MemorySegment unpooledSegment;
    private final ThroughputCalculator throughputCalculator;
    private final BufferDebloater bufferDebloater;

    public SingleInputGate(String owningTaskName, int gateIndex, IntermediateDataSetID consumedResultId, ResultPartitionType consumedPartitionType, int consumedSubpartitionIndex, int numberOfInputChannels, PartitionProducerStateProvider partitionProducerStateProvider, SupplierWithException<BufferPool, IOException> bufferPoolFactory, @Nullable BufferDecompressor bufferDecompressor, MemorySegmentProvider memorySegmentProvider, int segmentSize, ThroughputCalculator throughputCalculator, @Nullable BufferDebloater bufferDebloater) {
        this.owningTaskName = (String)Preconditions.checkNotNull((Object)owningTaskName);
        Preconditions.checkArgument((0 <= gateIndex ? 1 : 0) != 0, (Object)"The gate index must be positive.");
        this.gateIndex = gateIndex;
        this.consumedResultId = (IntermediateDataSetID)Preconditions.checkNotNull((Object)consumedResultId);
        this.consumedPartitionType = (ResultPartitionType)((Object)Preconditions.checkNotNull((Object)((Object)consumedPartitionType)));
        this.bufferPoolFactory = (SupplierWithException)Preconditions.checkNotNull(bufferPoolFactory);
        Preconditions.checkArgument((consumedSubpartitionIndex >= 0 ? 1 : 0) != 0);
        this.consumedSubpartitionIndex = consumedSubpartitionIndex;
        Preconditions.checkArgument((numberOfInputChannels > 0 ? 1 : 0) != 0);
        this.numberOfInputChannels = numberOfInputChannels;
        this.inputChannels = new HashMap<IntermediateResultPartitionID, InputChannel>(numberOfInputChannels);
        this.channels = new InputChannel[numberOfInputChannels];
        this.channelsWithEndOfPartitionEvents = new BitSet(numberOfInputChannels);
        this.channelsWithEndOfUserRecords = new BitSet(numberOfInputChannels);
        this.enqueuedInputChannelsWithData = new BitSet(numberOfInputChannels);
        this.lastPrioritySequenceNumber = new int[numberOfInputChannels];
        Arrays.fill(this.lastPrioritySequenceNumber, Integer.MIN_VALUE);
        this.partitionProducerStateProvider = (PartitionProducerStateProvider)Preconditions.checkNotNull((Object)partitionProducerStateProvider);
        this.bufferDecompressor = bufferDecompressor;
        this.memorySegmentProvider = (MemorySegmentProvider)Preconditions.checkNotNull((Object)memorySegmentProvider);
        this.closeFuture = new CompletableFuture();
        this.unpooledSegment = MemorySegmentFactory.allocateUnpooledSegment((int)segmentSize);
        this.bufferDebloater = bufferDebloater;
        this.throughputCalculator = (ThroughputCalculator)Preconditions.checkNotNull((Object)throughputCalculator);
    }

    protected PrioritizedDeque<InputChannel> getInputChannelsWithData() {
        return this.inputChannelsWithData;
    }

    @Override
    public void setup() throws IOException {
        Preconditions.checkState((this.bufferPool == null ? 1 : 0) != 0, (Object)"Bug in input gate setup logic: Already registered buffer pool.");
        BufferPool bufferPool = (BufferPool)this.bufferPoolFactory.get();
        this.setBufferPool(bufferPool);
        this.setupChannels();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public CompletableFuture<Void> getStateConsumedFuture() {
        Object object = this.requestLock;
        synchronized (object) {
            ArrayList futures = new ArrayList(this.inputChannels.size());
            for (InputChannel inputChannel : this.inputChannels.values()) {
                if (!(inputChannel instanceof RecoveredInputChannel)) continue;
                futures.add(((RecoveredInputChannel)inputChannel).getStateConsumedFuture());
            }
            return CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void requestPartitions() {
        Object object = this.requestLock;
        synchronized (object) {
            if (!this.requestedPartitionsFlag) {
                if (this.closeFuture.isDone()) {
                    throw new IllegalStateException("Already released.");
                }
                if (this.numberOfInputChannels != this.inputChannels.size()) {
                    throw new IllegalStateException(String.format("Bug in input gate setup logic: mismatch between number of total input channels [%s] and the currently set number of input channels [%s].", this.inputChannels.size(), this.numberOfInputChannels));
                }
                this.convertRecoveredInputChannels();
                this.internalRequestPartitions();
            }
            this.requestedPartitionsFlag = true;
        }
    }

    @VisibleForTesting
    public void convertRecoveredInputChannels() {
        LOG.debug("Converting recovered input channels ({} channels)", (Object)this.getNumberOfInputChannels());
        for (Map.Entry<IntermediateResultPartitionID, InputChannel> entry : this.inputChannels.entrySet()) {
            InputChannel inputChannel = entry.getValue();
            if (!(inputChannel instanceof RecoveredInputChannel)) continue;
            try {
                InputChannel realInputChannel = ((RecoveredInputChannel)inputChannel).toInputChannel();
                inputChannel.releaseAllResources();
                entry.setValue(realInputChannel);
                this.channels[inputChannel.getChannelIndex()] = realInputChannel;
            }
            catch (Throwable t) {
                inputChannel.setError(t);
                return;
            }
        }
    }

    private void internalRequestPartitions() {
        for (InputChannel inputChannel : this.inputChannels.values()) {
            try {
                inputChannel.requestSubpartition(this.consumedSubpartitionIndex);
            }
            catch (Throwable t) {
                inputChannel.setError(t);
                return;
            }
        }
    }

    @Override
    public void finishReadRecoveredState() throws IOException {
        for (InputChannel channel : this.channels) {
            if (!(channel instanceof RecoveredInputChannel)) continue;
            ((RecoveredInputChannel)channel).finishReadRecoveredState();
        }
    }

    @Override
    public int getNumberOfInputChannels() {
        return this.numberOfInputChannels;
    }

    @Override
    public int getGateIndex() {
        return this.gateIndex;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public List<InputChannelInfo> getUnfinishedChannels() {
        ArrayList<InputChannelInfo> unfinishedChannels = new ArrayList<InputChannelInfo>(this.numberOfInputChannels - this.channelsWithEndOfPartitionEvents.cardinality());
        PrioritizedDeque<InputChannel> prioritizedDeque = this.inputChannelsWithData;
        synchronized (prioritizedDeque) {
            int i = this.channelsWithEndOfPartitionEvents.nextClearBit(0);
            while (i < this.numberOfInputChannels) {
                unfinishedChannels.add(this.getChannel(i).getChannelInfo());
                i = this.channelsWithEndOfPartitionEvents.nextClearBit(i + 1);
            }
        }
        return unfinishedChannels;
    }

    @VisibleForTesting
    int getBuffersInUseCount() {
        int total = 0;
        for (InputChannel channel : this.channels) {
            total += channel.getBuffersInUseCount();
        }
        return total;
    }

    @VisibleForTesting
    public void announceBufferSize(int newBufferSize) {
        for (InputChannel channel : this.channels) {
            if (channel.isReleased()) continue;
            channel.announceBufferSize(newBufferSize);
        }
    }

    @Override
    public void triggerDebloating() {
        if (this.isFinished() || this.closeFuture.isDone()) {
            return;
        }
        Preconditions.checkState((this.bufferDebloater != null ? 1 : 0) != 0, (Object)"Buffer debloater should not be null");
        long currentThroughput = this.throughputCalculator.calculateThroughput();
        this.bufferDebloater.recalculateBufferSize(currentThroughput, this.getBuffersInUseCount()).ifPresent(this::announceBufferSize);
    }

    public Duration getLastEstimatedTimeToConsume() {
        return this.bufferDebloater.getLastEstimatedTimeToConsumeBuffers();
    }

    public ResultPartitionType getConsumedPartitionType() {
        return this.consumedPartitionType;
    }

    BufferProvider getBufferProvider() {
        return this.bufferPool;
    }

    public BufferPool getBufferPool() {
        return this.bufferPool;
    }

    MemorySegmentProvider getMemorySegmentProvider() {
        return this.memorySegmentProvider;
    }

    public String getOwningTaskName() {
        return this.owningTaskName;
    }

    public int getNumberOfQueuedBuffers() {
        for (int retry = 0; retry < 3; ++retry) {
            try {
                int totalBuffers = 0;
                for (InputChannel channel : this.inputChannels.values()) {
                    totalBuffers += channel.unsynchronizedGetNumberOfQueuedBuffers();
                }
                return totalBuffers;
            }
            catch (Exception exception) {
                continue;
            }
        }
        return 0;
    }

    public CompletableFuture<Void> getCloseFuture() {
        return this.closeFuture;
    }

    @Override
    public InputChannel getChannel(int channelIndex) {
        return this.channels[channelIndex];
    }

    public void setBufferPool(BufferPool bufferPool) {
        Preconditions.checkState((this.bufferPool == null ? 1 : 0) != 0, (Object)"Bug in input gate setup logic: buffer pool hasalready been set for this input gate.");
        this.bufferPool = (BufferPool)Preconditions.checkNotNull((Object)bufferPool);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @VisibleForTesting
    public void setupChannels() throws IOException {
        this.bufferPool.reserveSegments(1);
        Object object = this.requestLock;
        synchronized (object) {
            for (InputChannel inputChannel : this.inputChannels.values()) {
                inputChannel.setup();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void setInputChannels(InputChannel ... channels) {
        if (channels.length != this.numberOfInputChannels) {
            throw new IllegalArgumentException("Expected " + this.numberOfInputChannels + " channels, but got " + channels.length);
        }
        Object object = this.requestLock;
        synchronized (object) {
            System.arraycopy(channels, 0, this.channels, 0, this.numberOfInputChannels);
            for (InputChannel inputChannel : channels) {
                IntermediateResultPartitionID partitionId = inputChannel.getPartitionId().getPartitionId();
                if (this.inputChannels.put(partitionId, inputChannel) != null || !(inputChannel instanceof UnknownInputChannel)) continue;
                ++this.numberOfUninitializedChannels;
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void updateInputChannel(ResourceID localLocation, NettyShuffleDescriptor shuffleDescriptor) throws IOException, InterruptedException {
        Object object = this.requestLock;
        synchronized (object) {
            if (this.closeFuture.isDone()) {
                return;
            }
            IntermediateResultPartitionID partitionId = shuffleDescriptor.getResultPartitionID().getPartitionId();
            InputChannel current = this.inputChannels.get(partitionId);
            if (current instanceof UnknownInputChannel) {
                InputChannel newChannel;
                UnknownInputChannel unknownChannel = (UnknownInputChannel)current;
                boolean isLocal = shuffleDescriptor.isLocalTo(localLocation);
                if (isLocal) {
                    newChannel = unknownChannel.toLocalInputChannel();
                } else {
                    RemoteInputChannel remoteInputChannel = unknownChannel.toRemoteInputChannel(shuffleDescriptor.getConnectionId());
                    remoteInputChannel.setup();
                    newChannel = remoteInputChannel;
                }
                LOG.debug("{}: Updated unknown input channel to {}.", (Object)this.owningTaskName, (Object)newChannel);
                this.inputChannels.put(partitionId, newChannel);
                this.channels[current.getChannelIndex()] = newChannel;
                if (this.requestedPartitionsFlag) {
                    newChannel.requestSubpartition(this.consumedSubpartitionIndex);
                }
                for (TaskEvent event : this.pendingEvents) {
                    newChannel.sendTaskEvent(event);
                }
                if (--this.numberOfUninitializedChannels == 0) {
                    this.pendingEvents.clear();
                }
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void retriggerPartitionRequest(IntermediateResultPartitionID partitionId) throws IOException {
        Object object = this.requestLock;
        synchronized (object) {
            if (!this.closeFuture.isDone()) {
                InputChannel ch = this.inputChannels.get(partitionId);
                Preconditions.checkNotNull((Object)ch, (String)("Unknown input channel with ID " + partitionId));
                LOG.debug("{}: Retriggering partition request {}:{}.", new Object[]{this.owningTaskName, ch.partitionId, this.consumedSubpartitionIndex});
                if (ch.getClass() == RemoteInputChannel.class) {
                    RemoteInputChannel rch = (RemoteInputChannel)ch;
                    rch.retriggerSubpartitionRequest(this.consumedSubpartitionIndex);
                } else if (ch.getClass() == LocalInputChannel.class) {
                    LocalInputChannel ich = (LocalInputChannel)ch;
                    if (this.retriggerLocalRequestTimer == null) {
                        this.retriggerLocalRequestTimer = new Timer(true);
                    }
                    ich.retriggerSubpartitionRequest(this.retriggerLocalRequestTimer, this.consumedSubpartitionIndex);
                } else {
                    throw new IllegalStateException("Unexpected type of channel to retrigger partition: " + ch.getClass());
                }
            }
        }
    }

    @VisibleForTesting
    Timer getRetriggerLocalRequestTimer() {
        return this.retriggerLocalRequestTimer;
    }

    MemorySegment getUnpooledSegment() {
        return this.unpooledSegment;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void close() throws IOException {
        boolean released = false;
        PrioritizedDeque<InputChannel> prioritizedDeque = this.requestLock;
        synchronized (prioritizedDeque) {
            if (!this.closeFuture.isDone()) {
                try {
                    LOG.debug("{}: Releasing {}.", (Object)this.owningTaskName, (Object)this);
                    if (this.retriggerLocalRequestTimer != null) {
                        this.retriggerLocalRequestTimer.cancel();
                    }
                    for (InputChannel inputChannel : this.inputChannels.values()) {
                        try {
                            inputChannel.releaseAllResources();
                        }
                        catch (IOException e) {
                            LOG.warn("{}: Error during release of channel resources: {}.", new Object[]{this.owningTaskName, e.getMessage(), e});
                        }
                    }
                    if (this.bufferPool != null) {
                        this.bufferPool.lazyDestroy();
                    }
                }
                finally {
                    released = true;
                    this.closeFuture.complete(null);
                }
            }
        }
        if (released) {
            prioritizedDeque = this.inputChannelsWithData;
            synchronized (prioritizedDeque) {
                this.inputChannelsWithData.notifyAll();
            }
        }
    }

    @Override
    public boolean isFinished() {
        return this.hasReceivedAllEndOfPartitionEvents;
    }

    @Override
    public boolean hasReceivedEndOfData() {
        return this.hasReceivedEndOfData;
    }

    public String toString() {
        return "SingleInputGate{owningTaskName='" + this.owningTaskName + '\'' + ", gateIndex=" + this.gateIndex + '}';
    }

    @Override
    public Optional<BufferOrEvent> getNext() throws IOException, InterruptedException {
        return this.getNextBufferOrEvent(true);
    }

    @Override
    public Optional<BufferOrEvent> pollNext() throws IOException, InterruptedException {
        return this.getNextBufferOrEvent(false);
    }

    private Optional<BufferOrEvent> getNextBufferOrEvent(boolean blocking) throws IOException, InterruptedException {
        if (this.hasReceivedAllEndOfPartitionEvents) {
            return Optional.empty();
        }
        if (this.closeFuture.isDone()) {
            throw new CancelTaskException("Input gate is already closed.");
        }
        Optional<InputGate.InputWithData<InputChannel, InputChannel.BufferAndAvailability>> next = this.waitAndGetNextData(blocking);
        if (!next.isPresent()) {
            this.throughputCalculator.pauseMeasurement(System.currentTimeMillis());
            this.getAvailableFuture().whenComplete((future, ex) -> this.throughputCalculator.resumeMeasurement(System.currentTimeMillis()));
            return Optional.empty();
        }
        InputGate.InputWithData<InputChannel, InputChannel.BufferAndAvailability> inputWithData = next.get();
        BufferOrEvent bufferOrEvent = this.transformToBufferOrEvent(((InputChannel.BufferAndAvailability)inputWithData.data).buffer(), inputWithData.moreAvailable, (InputChannel)inputWithData.input, inputWithData.morePriorityEvents);
        this.throughputCalculator.incomingDataSize(bufferOrEvent.getSize());
        return Optional.of(bufferOrEvent);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    private Optional<InputGate.InputWithData<InputChannel, InputChannel.BufferAndAvailability>> waitAndGetNextData(boolean blocking) throws IOException, InterruptedException {
        Optional<InputChannel.BufferAndAvailability> bufferAndAvailabilityOpt;
        InputChannel inputChannel;
        while (true) {
            PrioritizedDeque<InputChannel> prioritizedDeque = this.inputChannelsWithData;
            synchronized (prioritizedDeque) {
                Optional<InputChannel> inputChannelOpt = this.getChannel(blocking);
                if (!inputChannelOpt.isPresent()) {
                    return Optional.empty();
                }
                inputChannel = inputChannelOpt.get();
                bufferAndAvailabilityOpt = inputChannel.getNextBuffer();
                if (bufferAndAvailabilityOpt.isPresent()) break;
                this.checkUnavailability();
            }
        }
        {
            boolean morePriorityEvents;
            InputChannel.BufferAndAvailability bufferAndAvailability = bufferAndAvailabilityOpt.get();
            if (bufferAndAvailability.moreAvailable()) {
                this.queueChannelUnsafe(inputChannel, bufferAndAvailability.morePriorityEvents());
            }
            boolean bl = morePriorityEvents = this.inputChannelsWithData.getNumPriorityElements() > 0;
            if (bufferAndAvailability.hasPriority()) {
                this.lastPrioritySequenceNumber[inputChannel.getChannelIndex()] = bufferAndAvailability.getSequenceNumber();
                if (!morePriorityEvents) {
                    this.priorityAvailabilityHelper.resetUnavailable();
                }
            }
            this.checkUnavailability();
            return Optional.of(new InputGate.InputWithData<InputChannel, InputChannel.BufferAndAvailability>(inputChannel, bufferAndAvailability, !this.inputChannelsWithData.isEmpty(), morePriorityEvents));
        }
    }

    private void checkUnavailability() {
        assert (Thread.holdsLock(this.inputChannelsWithData));
        if (this.inputChannelsWithData.isEmpty()) {
            this.availabilityHelper.resetUnavailable();
        }
    }

    private BufferOrEvent transformToBufferOrEvent(Buffer buffer, boolean moreAvailable, InputChannel currentChannel, boolean morePriorityEvents) throws IOException, InterruptedException {
        if (buffer.isBuffer()) {
            return this.transformBuffer(buffer, moreAvailable, currentChannel, morePriorityEvents);
        }
        return this.transformEvent(buffer, moreAvailable, currentChannel, morePriorityEvents);
    }

    private BufferOrEvent transformBuffer(Buffer buffer, boolean moreAvailable, InputChannel currentChannel, boolean morePriorityEvents) {
        return new BufferOrEvent(this.decompressBufferIfNeeded(buffer), currentChannel.getChannelInfo(), moreAvailable, morePriorityEvents);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private BufferOrEvent transformEvent(Buffer buffer, boolean moreAvailable, InputChannel currentChannel, boolean morePriorityEvents) throws IOException, InterruptedException {
        AbstractEvent event;
        try {
            event = EventSerializer.fromBuffer(buffer, this.getClass().getClassLoader());
        }
        finally {
            buffer.recycleBuffer();
        }
        if (event.getClass() == EndOfPartitionEvent.class) {
            PrioritizedDeque<InputChannel> prioritizedDeque = this.inputChannelsWithData;
            synchronized (prioritizedDeque) {
                Preconditions.checkState((!this.channelsWithEndOfPartitionEvents.get(currentChannel.getChannelIndex()) ? 1 : 0) != 0);
                this.channelsWithEndOfPartitionEvents.set(currentChannel.getChannelIndex());
                this.hasReceivedAllEndOfPartitionEvents = this.channelsWithEndOfPartitionEvents.cardinality() == this.numberOfInputChannels;
                this.enqueuedInputChannelsWithData.clear(currentChannel.getChannelIndex());
                if (this.inputChannelsWithData.contains(currentChannel)) {
                    this.inputChannelsWithData.getAndRemove(channel -> channel == currentChannel);
                }
            }
            if (this.hasReceivedAllEndOfPartitionEvents) {
                Preconditions.checkState((!moreAvailable || !this.pollNext().isPresent() ? 1 : 0) != 0);
                moreAvailable = false;
                this.markAvailable();
            }
            currentChannel.releaseAllResources();
        } else if (event.getClass() == EndOfData.class) {
            PrioritizedDeque<InputChannel> prioritizedDeque = this.inputChannelsWithData;
            synchronized (prioritizedDeque) {
                Preconditions.checkState((!this.channelsWithEndOfUserRecords.get(currentChannel.getChannelIndex()) ? 1 : 0) != 0);
                this.channelsWithEndOfUserRecords.set(currentChannel.getChannelIndex());
                this.hasReceivedEndOfData = this.channelsWithEndOfUserRecords.cardinality() == this.numberOfInputChannels;
            }
        }
        return new BufferOrEvent(event, buffer.getDataType().hasPriority(), currentChannel.getChannelInfo(), moreAvailable, buffer.getSize(), morePriorityEvents);
    }

    private Buffer decompressBufferIfNeeded(Buffer buffer) {
        if (buffer.isCompressed()) {
            try {
                Preconditions.checkNotNull((Object)this.bufferDecompressor, (String)"Buffer decompressor not set.");
                Buffer buffer2 = this.bufferDecompressor.decompressToIntermediateBuffer(buffer);
                return buffer2;
            }
            finally {
                buffer.recycleBuffer();
            }
        }
        return buffer;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void markAvailable() {
        CompletableFuture<?> toNotify;
        PrioritizedDeque<InputChannel> prioritizedDeque = this.inputChannelsWithData;
        synchronized (prioritizedDeque) {
            toNotify = this.availabilityHelper.getUnavailableToResetAvailable();
        }
        toNotify.complete(null);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void sendTaskEvent(TaskEvent event) throws IOException {
        Object object = this.requestLock;
        synchronized (object) {
            for (InputChannel inputChannel : this.inputChannels.values()) {
                inputChannel.sendTaskEvent(event);
            }
            if (this.numberOfUninitializedChannels > 0) {
                this.pendingEvents.add(event);
            }
        }
    }

    @Override
    public void resumeConsumption(InputChannelInfo channelInfo) throws IOException {
        Preconditions.checkState((!this.isFinished() ? 1 : 0) != 0, (Object)"InputGate already finished.");
        this.channels[channelInfo.getInputChannelIdx()].resumeConsumption();
    }

    @Override
    public void acknowledgeAllRecordsProcessed(InputChannelInfo channelInfo) throws IOException {
        Preconditions.checkState((!this.isFinished() ? 1 : 0) != 0, (Object)"InputGate already finished.");
        this.channels[channelInfo.getInputChannelIdx()].acknowledgeAllRecordsProcessed();
    }

    void notifyChannelNonEmpty(InputChannel channel) {
        this.queueChannel((InputChannel)Preconditions.checkNotNull((Object)channel), null, false);
    }

    void notifyPriorityEvent(InputChannel inputChannel, int prioritySequenceNumber) {
        this.queueChannel((InputChannel)Preconditions.checkNotNull((Object)inputChannel), prioritySequenceNumber, false);
    }

    void notifyPriorityEventForce(InputChannel inputChannel) {
        this.queueChannel((InputChannel)Preconditions.checkNotNull((Object)inputChannel), null, true);
    }

    void triggerPartitionStateCheck(ResultPartitionID partitionId) {
        this.partitionProducerStateProvider.requestPartitionProducerState(this.consumedResultId, partitionId, responseHandle -> {
            boolean isProducingState = new RemoteChannelStateChecker(partitionId, this.owningTaskName).isProducerReadyOrAbortConsumption((PartitionProducerStateProvider.ResponseHandle)responseHandle);
            if (isProducingState) {
                try {
                    this.retriggerPartitionRequest(partitionId.getPartitionId());
                }
                catch (IOException t) {
                    responseHandle.failConsumption(t);
                }
            }
        });
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     * Converted monitor instructions to comments
     * Lifted jumps to return sites
     */
    private void queueChannel(InputChannel channel, @Nullable Integer prioritySequenceNumber, boolean forcePriority) {
        boolean priority;
        Throwable throwable;
        GateNotificationHelper notification;
        block25: {
            block26: {
                block23: {
                    block24: {
                        notification = new GateNotificationHelper(this, this.inputChannelsWithData);
                        throwable = null;
                        PrioritizedDeque<InputChannel> prioritizedDeque = this.inputChannelsWithData;
                        // MONITORENTER : prioritizedDeque
                        boolean bl = priority = prioritySequenceNumber != null || forcePriority;
                        if (forcePriority || !priority || !this.isOutdated(prioritySequenceNumber, this.lastPrioritySequenceNumber[channel.getChannelIndex()])) break block23;
                        // MONITOREXIT : prioritizedDeque
                        if (notification == null) return;
                        if (throwable == null) break block24;
                        try {
                            notification.close();
                            return;
                        }
                        catch (Throwable throwable2) {
                            throwable.addSuppressed(throwable2);
                            return;
                        }
                    }
                    notification.close();
                    return;
                }
                try {
                    if (this.queueChannelUnsafe(channel, priority)) break block25;
                    // MONITOREXIT : prioritizedDeque
                    if (notification == null) return;
                    if (throwable == null) break block26;
                }
                catch (Throwable throwable3) {
                    throwable = throwable3;
                    throw throwable3;
                }
                try {
                    notification.close();
                    return;
                }
                catch (Throwable throwable4) {
                    throwable.addSuppressed(throwable4);
                    return;
                }
            }
            notification.close();
            return;
        }
        try {
            if (priority && this.inputChannelsWithData.getNumPriorityElements() == 1) {
                notification.notifyPriority();
            }
            if (this.inputChannelsWithData.size() == 1) {
                notification.notifyDataAvailable();
            }
            // MONITOREXIT : prioritizedDeque
            return;
        }
        catch (Throwable throwable5) {
            throw throwable5;
        }
        finally {
            if (notification != null) {
                if (throwable != null) {
                    try {
                        notification.close();
                    }
                    catch (Throwable throwable6) {
                        throwable.addSuppressed(throwable6);
                    }
                } else {
                    notification.close();
                }
            }
        }
    }

    private boolean isOutdated(int sequenceNumber, int lastSequenceNumber) {
        if (lastSequenceNumber < 0 != sequenceNumber < 0 && Math.max(lastSequenceNumber, sequenceNumber) > 0x3FFFFFFF) {
            return lastSequenceNumber < 0;
        }
        return lastSequenceNumber >= sequenceNumber;
    }

    private boolean queueChannelUnsafe(InputChannel channel, boolean priority) {
        assert (Thread.holdsLock(this.inputChannelsWithData));
        if (this.channelsWithEndOfPartitionEvents.get(channel.getChannelIndex())) {
            return false;
        }
        boolean alreadyEnqueued = this.enqueuedInputChannelsWithData.get(channel.getChannelIndex());
        if (alreadyEnqueued && (!priority || this.inputChannelsWithData.containsPriorityElement(channel))) {
            return false;
        }
        this.inputChannelsWithData.add(channel, priority, alreadyEnqueued);
        if (!alreadyEnqueued) {
            this.enqueuedInputChannelsWithData.set(channel.getChannelIndex());
        }
        return true;
    }

    private Optional<InputChannel> getChannel(boolean blocking) throws InterruptedException {
        assert (Thread.holdsLock(this.inputChannelsWithData));
        while (this.inputChannelsWithData.isEmpty()) {
            if (this.closeFuture.isDone()) {
                throw new IllegalStateException("Released");
            }
            if (blocking) {
                this.inputChannelsWithData.wait();
                continue;
            }
            this.availabilityHelper.resetUnavailable();
            return Optional.empty();
        }
        InputChannel inputChannel = this.inputChannelsWithData.poll();
        this.enqueuedInputChannelsWithData.clear(inputChannel.getChannelIndex());
        return Optional.of(inputChannel);
    }

    public Map<IntermediateResultPartitionID, InputChannel> getInputChannels() {
        return this.inputChannels;
    }
}

