/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.util;

import java.io.IOException;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.state.CheckpointStreamFactory;
import org.apache.flink.runtime.state.memory.MemCheckpointStreamFactory;

@VisibleForTesting
@Internal
public class BlockerCheckpointStreamFactory
implements CheckpointStreamFactory {
    private final int maxSize;
    private volatile int afterNumberInvocations;
    private volatile OneShotLatch blocker;
    private volatile OneShotLatch waiter;
    MemCheckpointStreamFactory.MemoryCheckpointOutputStream lastCreatedStream;

    public MemCheckpointStreamFactory.MemoryCheckpointOutputStream getLastCreatedStream() {
        return this.lastCreatedStream;
    }

    public BlockerCheckpointStreamFactory(int maxSize) {
        this.maxSize = maxSize;
    }

    public void setAfterNumberInvocations(int afterNumberInvocations) {
        this.afterNumberInvocations = afterNumberInvocations;
    }

    public void setBlockerLatch(OneShotLatch latch) {
        this.blocker = latch;
    }

    public void setWaiterLatch(OneShotLatch latch) {
        this.waiter = latch;
    }

    public MemCheckpointStreamFactory.MemoryCheckpointOutputStream createCheckpointStateOutputStream(long checkpointID, long timestamp) throws Exception {
        this.lastCreatedStream = new MemCheckpointStreamFactory.MemoryCheckpointOutputStream(this.maxSize){
            private int afterNInvocations;
            private final OneShotLatch streamBlocker;
            private final OneShotLatch streamWaiter;
            {
                this.afterNInvocations = BlockerCheckpointStreamFactory.this.afterNumberInvocations;
                this.streamBlocker = BlockerCheckpointStreamFactory.this.blocker;
                this.streamWaiter = BlockerCheckpointStreamFactory.this.waiter;
            }

            public void write(int b) throws IOException {
                this.unblockWaiter();
                if (this.afterNInvocations > 0) {
                    --this.afterNInvocations;
                } else {
                    this.awaitBlocker();
                }
                try {
                    super.write(b);
                }
                catch (IOException ex) {
                    this.unblockWaiter();
                    throw ex;
                }
                if (0 == this.afterNInvocations) {
                    this.unblockWaiter();
                }
                if (this.isClosed()) {
                    throw new IOException("Stream closed.");
                }
            }

            public void write(byte[] b, int off, int len) throws IOException {
                for (int i = 0; i < len; ++i) {
                    this.write(b[off + i]);
                }
            }

            public void close() {
                super.close();
                this.unblockAll();
            }

            private void unblockWaiter() {
                if (null != this.streamWaiter) {
                    this.streamWaiter.trigger();
                }
            }

            private void awaitBlocker() {
                if (null != this.streamBlocker) {
                    try {
                        this.streamBlocker.await();
                    }
                    catch (InterruptedException interruptedException) {
                        // empty catch block
                    }
                }
            }

            private void unblockAll() {
                if (null != this.streamWaiter) {
                    this.streamWaiter.trigger();
                }
                if (null != this.streamBlocker) {
                    this.streamBlocker.trigger();
                }
            }
        };
        return this.lastCreatedStream;
    }

    public void close() throws Exception {
    }
}

