/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.state;

import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.NoSuchElementException;
import org.apache.commons.io.IOUtils;
import org.apache.flink.api.common.state.KeyedStateStore;
import org.apache.flink.api.common.state.OperatorStateStore;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.core.fs.FSDataInputStream;
import org.apache.flink.runtime.state.KeyGroupStatePartitionStreamProvider;
import org.apache.flink.runtime.state.KeyGroupsStateHandle;
import org.apache.flink.runtime.state.OperatorStateHandle;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.runtime.state.StatePartitionStreamProvider;
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.util.Preconditions;

public class StateInitializationContextImpl
implements StateInitializationContext {
    private final CloseableRegistry closableRegistry;
    private final boolean restored;
    private final OperatorStateStore operatorStateStore;
    private final Collection<OperatorStateHandle> operatorStateHandles;
    private final KeyedStateStore keyedStateStore;
    private final Collection<KeyGroupsStateHandle> keyGroupsStateHandles;
    private final Iterable<KeyGroupStatePartitionStreamProvider> keyedStateIterable;

    public StateInitializationContextImpl(boolean restored, OperatorStateStore operatorStateStore, KeyedStateStore keyedStateStore, Collection<KeyGroupsStateHandle> keyGroupsStateHandles, Collection<OperatorStateHandle> operatorStateHandles, CloseableRegistry closableRegistry) {
        this.restored = restored;
        this.closableRegistry = (CloseableRegistry)Preconditions.checkNotNull((Object)closableRegistry);
        this.operatorStateStore = operatorStateStore;
        this.keyedStateStore = keyedStateStore;
        this.operatorStateHandles = operatorStateHandles;
        this.keyGroupsStateHandles = keyGroupsStateHandles;
        this.keyedStateIterable = keyGroupsStateHandles == null ? null : new Iterable<KeyGroupStatePartitionStreamProvider>(){

            @Override
            public Iterator<KeyGroupStatePartitionStreamProvider> iterator() {
                return new KeyGroupStreamIterator(StateInitializationContextImpl.this.getKeyGroupsStateHandles().iterator(), StateInitializationContextImpl.this.getClosableRegistry());
            }
        };
    }

    @Override
    public boolean isRestored() {
        return this.restored;
    }

    public Collection<OperatorStateHandle> getOperatorStateHandles() {
        return this.operatorStateHandles;
    }

    public Collection<KeyGroupsStateHandle> getKeyGroupsStateHandles() {
        return this.keyGroupsStateHandles;
    }

    public CloseableRegistry getClosableRegistry() {
        return this.closableRegistry;
    }

    @Override
    public Iterable<StatePartitionStreamProvider> getRawOperatorStateInputs() {
        if (null != this.operatorStateHandles) {
            return new Iterable<StatePartitionStreamProvider>(){

                @Override
                public Iterator<StatePartitionStreamProvider> iterator() {
                    return new OperatorStateStreamIterator("_default_", StateInitializationContextImpl.this.getOperatorStateHandles().iterator(), StateInitializationContextImpl.this.getClosableRegistry());
                }
            };
        }
        return Collections.emptyList();
    }

    @Override
    public Iterable<KeyGroupStatePartitionStreamProvider> getRawKeyedStateInputs() {
        if (null == this.keyedStateStore) {
            throw new IllegalStateException("Attempt to access keyed state from non-keyed operator.");
        }
        if (null != this.keyGroupsStateHandles) {
            return this.keyedStateIterable;
        }
        return Collections.emptyList();
    }

    @Override
    public OperatorStateStore getOperatorStateStore() {
        return this.operatorStateStore;
    }

    @Override
    public KeyedStateStore getKeyedStateStore() {
        return this.keyedStateStore;
    }

    public void close() {
        IOUtils.closeQuietly((Closeable)this.closableRegistry);
    }

    static abstract class AbstractStateStreamIterator<T extends StatePartitionStreamProvider, H extends StreamStateHandle>
    implements Iterator<T> {
        protected final Iterator<H> stateHandleIterator;
        protected final CloseableRegistry closableRegistry;
        protected H currentStateHandle;
        protected FSDataInputStream currentStream;

        public AbstractStateStreamIterator(Iterator<H> stateHandleIterator, CloseableRegistry closableRegistry) {
            this.stateHandleIterator = (Iterator)Preconditions.checkNotNull(stateHandleIterator);
            this.closableRegistry = (CloseableRegistry)Preconditions.checkNotNull((Object)closableRegistry);
        }

        protected void openCurrentStream() throws IOException {
            FSDataInputStream stream = this.currentStateHandle.openInputStream();
            this.closableRegistry.registerClosable((Closeable)stream);
            this.currentStream = stream;
        }

        protected void closeCurrentStream() {
            this.closableRegistry.unregisterClosable((Closeable)this.currentStream);
            IOUtils.closeQuietly((InputStream)this.currentStream);
            this.currentStream = null;
        }

        @Override
        public void remove() {
            throw new UnsupportedOperationException("Read only Iterator");
        }
    }

    private static class OperatorStateStreamIterator
    extends AbstractStateStreamIterator<StatePartitionStreamProvider, OperatorStateHandle> {
        private final String stateName;
        private long[] offsets;
        private int offPos;

        public OperatorStateStreamIterator(String stateName, Iterator<OperatorStateHandle> stateHandleIterator, CloseableRegistry closableRegistry) {
            super(stateHandleIterator, closableRegistry);
            this.stateName = (String)Preconditions.checkNotNull((Object)stateName);
        }

        @Override
        public boolean hasNext() {
            if (null != this.offsets && this.offPos < this.offsets.length) {
                return true;
            }
            this.closeCurrentStream();
            while (this.stateHandleIterator.hasNext()) {
                long[] metaOffsets;
                this.currentStateHandle = (StreamStateHandle)this.stateHandleIterator.next();
                OperatorStateHandle.StateMetaInfo metaInfo = ((OperatorStateHandle)this.currentStateHandle).getStateNameToPartitionOffsets().get(this.stateName);
                if (null == metaInfo || null == (metaOffsets = metaInfo.getOffsets()) || metaOffsets.length <= 0) continue;
                this.offsets = metaOffsets;
                this.offPos = 0;
                this.closableRegistry.unregisterClosable((Closeable)this.currentStream);
                IOUtils.closeQuietly((InputStream)this.currentStream);
                this.currentStream = null;
                return true;
            }
            return false;
        }

        @Override
        public StatePartitionStreamProvider next() {
            if (!this.hasNext()) {
                throw new NoSuchElementException("Iterator exhausted");
            }
            long offset = this.offsets[this.offPos++];
            try {
                if (null == this.currentStream) {
                    this.openCurrentStream();
                }
                this.currentStream.seek(offset);
                return new StatePartitionStreamProvider((InputStream)this.currentStream);
            }
            catch (IOException ioex) {
                return new StatePartitionStreamProvider(ioex);
            }
        }
    }

    private static class KeyGroupStreamIterator
    extends AbstractStateStreamIterator<KeyGroupStatePartitionStreamProvider, KeyGroupsStateHandle> {
        private Iterator<Tuple2<Integer, Long>> currentOffsetsIterator;

        public KeyGroupStreamIterator(Iterator<KeyGroupsStateHandle> stateHandleIterator, CloseableRegistry closableRegistry) {
            super(stateHandleIterator, closableRegistry);
        }

        @Override
        public boolean hasNext() {
            if (null != this.currentStateHandle && this.currentOffsetsIterator.hasNext()) {
                return true;
            }
            this.closeCurrentStream();
            while (this.stateHandleIterator.hasNext()) {
                this.currentStateHandle = (StreamStateHandle)this.stateHandleIterator.next();
                if (((KeyGroupsStateHandle)this.currentStateHandle).getNumberOfKeyGroups() <= 0) continue;
                this.currentOffsetsIterator = ((KeyGroupsStateHandle)this.currentStateHandle).getGroupRangeOffsets().iterator();
                return true;
            }
            return false;
        }

        @Override
        public KeyGroupStatePartitionStreamProvider next() {
            if (!this.hasNext()) {
                throw new NoSuchElementException("Iterator exhausted");
            }
            Tuple2<Integer, Long> keyGroupOffset = this.currentOffsetsIterator.next();
            try {
                if (null == this.currentStream) {
                    this.openCurrentStream();
                }
                this.currentStream.seek(((Long)keyGroupOffset.f1).longValue());
                return new KeyGroupStatePartitionStreamProvider((InputStream)this.currentStream, (int)((Integer)keyGroupOffset.f0));
            }
            catch (IOException ioex) {
                return new KeyGroupStatePartitionStreamProvider(ioex, (int)((Integer)keyGroupOffset.f0));
            }
        }
    }
}

