package com.datatorrent.lib.io.fs;

import com.datatorrent.api.Context;
import com.datatorrent.api.DefaultInputPort;
import com.datatorrent.api.Operator;
import com.datatorrent.api.StreamCodec;
import com.datatorrent.api.annotation.OperatorAnnotation;
import com.datatorrent.common.util.BaseOperator;
import com.datatorrent.lib.counters.BasicCounters;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Strings;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.google.common.cache.RemovalListener;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import java.io.FilterOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nonnull;
import javax.validation.constraints.Min;
import javax.validation.constraints.NotNull;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang.mutable.MutableLong;
import org.apache.commons.lang3.mutable.MutableInt;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Options;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RawLocalFileSystem;
import org.apache.hadoop.fs.permission.FsPermission;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/*  JADX ERROR: NullPointerException in pass: ClassModifier
    java.lang.NullPointerException: Cannot invoke "java.util.List.forEach(java.util.function.Consumer)" because "blocks" is null
    	at jadx.core.utils.BlockUtils.collectAllInsns(BlockUtils.java:1017)
    	at jadx.core.dex.visitors.ClassModifier.removeBridgeMethod(ClassModifier.java:239)
    	at jadx.core.dex.visitors.ClassModifier.removeSyntheticMethods(ClassModifier.java:154)
    	at java.base/java.util.ArrayList.forEach(ArrayList.java:1596)
    	at jadx.core.dex.visitors.ClassModifier.visit(ClassModifier.java:64)
    */
@OperatorAnnotation(checkpointableWithinAppWindow = false)
/* loaded from: input_file:com/datatorrent/lib/io/fs/AbstractFileOutputOperator.class */
public abstract class AbstractFileOutputOperator<INPUT> extends BaseOperator implements Operator.CheckpointListener {
    private static final Logger LOG = LoggerFactory.getLogger(AbstractFileOutputOperator.class);
    private static final String TMP_EXTENSION = ".tmp";
    private static final int MAX_NUMBER_FILES_IN_TEARDOWN_EXCEPTION = 25;
    private static final int COPY_BUFFER_SIZE = 1024;
    public static final int DEFAULT_MAX_OPEN_FILES = 100;

    @NotNull
    protected String filePath;
    protected transient FileSystem fs;
    protected transient FileContext fileContext;
    protected transient LoadingCache<String, AbstractFileOutputOperator<INPUT>.FSFilterStreamContext> streamsCache;
    protected transient Context.OperatorContext context;
    private transient long totalWritingTime;
    protected StreamCodec<INPUT> streamCodec;
    private int rotationCount;
    protected FilterStreamProvider filterStreamProvider;
    protected long currentWindow;
    private Long expireStreamAfterAccessMillis;
    protected long totalBytesWritten = 0;

    @Min(0)
    protected int replication = 0;

    @Min(1)
    protected int maxOpenFiles = 100;

    @Min(1)
    protected Long maxLength = Long.MAX_VALUE;

    @Min(0)
    protected int rotationWindows = 0;
    protected transient boolean rollingFile = false;
    protected short filePermission = 511;
    protected final BasicCounters<MutableLong> fileCounters = new BasicCounters<>(MutableLong.class);
    protected boolean alwaysWriteToTmp = true;
    public final transient DefaultInputPort<INPUT> input = new DefaultInputPort<INPUT>() { // from class: com.datatorrent.lib.io.fs.AbstractFileOutputOperator.1
        public void process(INPUT input) {
            AbstractFileOutputOperator.this.processTuple(input);
        }

        public StreamCodec<INPUT> getStreamCodec() {
            return AbstractFileOutputOperator.this.streamCodec == null ? super.getStreamCodec() : AbstractFileOutputOperator.this.streamCodec;
        }
    };
    protected Map<String, MutableLong> endOffsets = Maps.newHashMap();
    protected Map<String, MutableLong> counts = Maps.newHashMap();
    protected Map<String, MutableInt> openPart = Maps.newHashMap();

    @NotNull
    protected Map<String, RotationState> rotationStates = Maps.newHashMap();
    private final Map<String, String> fileNameToTmpName = Maps.newHashMap();
    private final Map<Long, Set<String>> finalizedFiles = Maps.newTreeMap();
    protected final Map<String, MutableInt> finalizedPart = Maps.newHashMap();
    private final Set<String> filesWithOpenStreams = Sets.newHashSet();

    /* loaded from: input_file:com/datatorrent/lib/io/fs/AbstractFileOutputOperator$Counters.class */
    public enum Counters {
        TOTAL_BYTES_WRITTEN,
        TOTAL_TIME_WRITING_MILLISECONDS
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:com/datatorrent/lib/io/fs/AbstractFileOutputOperator$FSFilterStreamContext.class */
    public class FSFilterStreamContext implements FilterStreamContext<FilterOutputStream> {
        private FSDataOutputStream outputStream;
        private FilterStreamContext filterContext;
        private NonCloseableFilterOutputStream outputWrapper;

        public FSFilterStreamContext(FSDataOutputStream fSDataOutputStream) throws IOException {
            this.outputStream = fSDataOutputStream;
            this.outputWrapper = new NonCloseableFilterOutputStream(fSDataOutputStream);
            initializeContext();
        }

        @Override // com.datatorrent.lib.io.fs.FilterStreamContext
        public FilterOutputStream getFilterStream() {
            return this.filterContext != null ? this.filterContext.getFilterStream() : this.outputStream;
        }

        @Override // com.datatorrent.lib.io.fs.FilterStreamContext
        public void finalizeContext() throws IOException {
            if (this.filterContext != null) {
                this.filterContext.finalizeContext();
                this.outputWrapper.flush();
            }
            this.outputStream.hflush();
            if (AbstractFileOutputOperator.this.filterStreamProvider != null) {
                AbstractFileOutputOperator.this.filterStreamProvider.reclaimFilterStreamContext(this.filterContext);
            }
        }

        public void initializeContext() throws IOException {
            if (AbstractFileOutputOperator.this.filterStreamProvider != null) {
                this.filterContext = AbstractFileOutputOperator.this.filterStreamProvider.getFilterStreamContext(this.outputWrapper);
            }
        }

        public void close() throws IOException {
            if (this.filterContext != null) {
                this.filterContext.getFilterStream().close();
            }
            this.outputStream.close();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/datatorrent/lib/io/fs/AbstractFileOutputOperator$NonCloseableFilterOutputStream.class */
    public static class NonCloseableFilterOutputStream extends FilterOutputStream {
        public NonCloseableFilterOutputStream(OutputStream outputStream) {
            super(outputStream);
        }

        @Override // java.io.FilterOutputStream, java.io.OutputStream, java.io.Closeable, java.lang.AutoCloseable
        public void close() throws IOException {
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/datatorrent/lib/io/fs/AbstractFileOutputOperator$RotationState.class */
    public static class RotationState {
        boolean notEmpty;
        boolean rotated;

        private RotationState() {
        }
    }

    public AbstractFileOutputOperator() {
    }

    protected FileSystem getFSInstance() throws IOException {
        FileSystem newInstance = FileSystem.newInstance(new Path(this.filePath).toUri(), new Configuration());
        if (newInstance instanceof LocalFileSystem) {
            newInstance = ((LocalFileSystem) newInstance).getRaw();
        }
        return newInstance;
    }

    @Override // 
    public void setup(Context.OperatorContext operatorContext) {
        Path path;
        LOG.debug("setup initiated");
        if (this.expireStreamAfterAccessMillis == null) {
            this.expireStreamAfterAccessMillis = Long.valueOf(((Integer) operatorContext.getValue(Context.DAGContext.STREAMING_WINDOW_SIZE_MILLIS)).intValue() * ((Integer) operatorContext.getValue(Context.DAGContext.CHECKPOINT_WINDOW_COUNT)).intValue());
        }
        this.rollingFile = this.maxLength.longValue() < Long.MAX_VALUE || this.rotationWindows > 0;
        try {
            this.fs = getFSInstance();
            if (this.replication <= 0) {
                this.replication = this.fs.getDefaultReplication(new Path(this.filePath));
            }
            LOG.debug("FS class {}", this.fs.getClass());
            this.streamsCache = CacheBuilder.newBuilder().maximumSize(this.maxOpenFiles).expireAfterAccess(this.expireStreamAfterAccessMillis.longValue(), TimeUnit.MILLISECONDS).removalListener(createCacheRemoveListener()).build(createCacheLoader());
            LOG.debug("File system class: {}", this.fs.getClass());
            LOG.debug("end-offsets {}", this.endOffsets);
            try {
                if (this.fs.exists(new Path(this.filePath))) {
                    for (String str : this.endOffsets.keySet()) {
                        String partFileNamePri = getPartFileNamePri(str);
                        LOG.debug("seenFileNamePart: {}", partFileNamePri);
                        Path path2 = this.alwaysWriteToTmp ? new Path(this.filePath + "/" + this.fileNameToTmpName.get(partFileNamePri)) : new Path(this.filePath + "/" + partFileNamePri);
                        if (this.fs.exists(path2)) {
                            recoverFile(str, partFileNamePri, path2);
                        }
                    }
                }
                if (this.rollingFile) {
                    for (String str2 : this.endOffsets.keySet()) {
                        try {
                            Integer value = this.openPart.get(str2).getValue();
                            int intValue = value.intValue() + 1;
                            while (true) {
                                String partFileName = getPartFileName(str2, intValue);
                                if (this.alwaysWriteToTmp) {
                                    String str3 = this.fileNameToTmpName.get(partFileName);
                                    path = str3 != null ? new Path(this.filePath + "/" + str3) : null;
                                } else {
                                    path = new Path(this.filePath + "/" + partFileName);
                                }
                                if (path == null || !this.fs.exists(path)) {
                                    break;
                                }
                                this.fs.delete(path, true);
                                intValue++;
                            }
                            String partFileName2 = getPartFileName(str2, value.intValue());
                            Path path3 = null;
                            if (!this.alwaysWriteToTmp) {
                                path3 = new Path(this.filePath + "/" + partFileName2);
                            } else if (this.fileNameToTmpName.get(partFileName2) != null) {
                                path3 = new Path(this.filePath + "/" + this.fileNameToTmpName.get(partFileName2));
                            }
                            if (path3 != null && this.fs.getFileStatus(path3).getLen() > this.maxLength.longValue()) {
                                LOG.debug("rotating file at setup.");
                                rotate(str2);
                            }
                        } catch (IOException | ExecutionException e) {
                            throw new RuntimeException(e);
                        }
                    }
                }
                LOG.debug("setup completed");
                this.context = operatorContext;
                this.fileCounters.setCounter(Counters.TOTAL_BYTES_WRITTEN, new MutableLong());
                this.fileCounters.setCounter(Counters.TOTAL_TIME_WRITING_MILLISECONDS, new MutableLong());
            } catch (IOException e2) {
                throw new RuntimeException(e2);
            }
        } catch (IOException e3) {
            throw new RuntimeException(e3);
        }
    }

    private void recoverFile(String str, String str2, Path path) throws IOException {
        LOG.debug("path exists {}", path);
        long longValue = this.endOffsets.get(str).longValue();
        FSDataInputStream open = this.fs.open(path);
        FileStatus fileStatus = this.fs.getFileStatus(path);
        if (fileStatus.getLen() == longValue) {
            if (this.alwaysWriteToTmp && this.filesWithOpenStreams.contains(str)) {
                String str3 = str2 + '.' + System.currentTimeMillis() + TMP_EXTENSION;
                FSDataOutputStream openStream = openStream(new Path(this.filePath + "/" + str3), false);
                IOUtils.copy(open, openStream);
                this.streamsCache.put(str, new FSFilterStreamContext(openStream));
                this.fileNameToTmpName.put(str2, str3);
            }
            open.close();
            return;
        }
        LOG.info("path corrupted {} {} {}", new Object[]{path, Long.valueOf(longValue), Long.valueOf(fileStatus.getLen())});
        byte[] bArr = new byte[COPY_BUFFER_SIZE];
        String str4 = str2 + '.' + System.currentTimeMillis() + TMP_EXTENSION;
        Path path2 = new Path(this.filePath + "/" + str4);
        FSDataOutputStream openStream2 = openStream(path2, false);
        while (open.getPos() < longValue) {
            long pos = longValue - open.getPos();
            int i = pos < 1024 ? (int) pos : COPY_BUFFER_SIZE;
            open.read(bArr);
            openStream2.write(bArr, 0, i);
        }
        flush(openStream2);
        openStream2.close();
        open.close();
        LOG.debug("active {} recovery {} ", path, path2);
        if (this.alwaysWriteToTmp) {
            this.fileNameToTmpName.put(str2, str4);
        } else {
            LOG.debug("recovery path {} actual path {} ", path2, fileStatus.getPath());
            rename(path2, fileStatus.getPath());
        }
    }

    private CacheLoader<String, AbstractFileOutputOperator<INPUT>.FSFilterStreamContext> createCacheLoader() {
        return new CacheLoader<String, AbstractFileOutputOperator<INPUT>.FSFilterStreamContext>() { // from class: com.datatorrent.lib.io.fs.AbstractFileOutputOperator.2
            public AbstractFileOutputOperator<INPUT>.FSFilterStreamContext load(@Nonnull String str) {
                Path path;
                FSDataOutputStream openStream;
                if (AbstractFileOutputOperator.this.rollingFile) {
                    RotationState rotationState = AbstractFileOutputOperator.this.getRotationState(str);
                    if (AbstractFileOutputOperator.this.rollingFile && rotationState.rotated) {
                        AbstractFileOutputOperator.this.openPart.get(str).add(1);
                        rotationState.rotated = false;
                        AbstractFileOutputOperator.this.endOffsets.get(str).setValue(0L);
                    }
                }
                String partFileNamePri = AbstractFileOutputOperator.this.getPartFileNamePri(str);
                Path path2 = new Path(AbstractFileOutputOperator.this.filePath + "/" + partFileNamePri);
                if (AbstractFileOutputOperator.this.alwaysWriteToTmp) {
                    String str2 = (String) AbstractFileOutputOperator.this.fileNameToTmpName.get(partFileNamePri);
                    if (str2 == null) {
                        str2 = partFileNamePri + '.' + System.currentTimeMillis() + AbstractFileOutputOperator.TMP_EXTENSION;
                        AbstractFileOutputOperator.this.fileNameToTmpName.put(partFileNamePri, str2);
                    }
                    path = new Path(AbstractFileOutputOperator.this.filePath + "/" + str2);
                } else {
                    path = path2;
                }
                boolean containsKey = AbstractFileOutputOperator.this.endOffsets.containsKey(str);
                try {
                    if (!AbstractFileOutputOperator.this.fs.exists(path2) && (!AbstractFileOutputOperator.this.alwaysWriteToTmp || !AbstractFileOutputOperator.this.fs.exists(path))) {
                        openStream = AbstractFileOutputOperator.this.openStream(path, false);
                    } else if (containsKey) {
                        FileStatus fileStatus = AbstractFileOutputOperator.this.fs.getFileStatus(path);
                        MutableLong mutableLong = AbstractFileOutputOperator.this.endOffsets.get(str);
                        if (mutableLong != null) {
                            mutableLong.setValue(fileStatus.getLen());
                        } else {
                            AbstractFileOutputOperator.this.endOffsets.put(str, new MutableLong(fileStatus.getLen()));
                        }
                        openStream = AbstractFileOutputOperator.this.openStream(path, true);
                        AbstractFileOutputOperator.LOG.debug("appending to {}", path);
                    } else if (AbstractFileOutputOperator.this.rollingFile) {
                        int i = 0;
                        while (true) {
                            Path path3 = new Path(AbstractFileOutputOperator.this.filePath + "/" + AbstractFileOutputOperator.this.getPartFileName(str, i));
                            if (!AbstractFileOutputOperator.this.fs.exists(path3)) {
                                break;
                            }
                            AbstractFileOutputOperator.this.fs.delete(path3, true);
                            i++;
                        }
                        openStream = AbstractFileOutputOperator.this.openStream(path, false);
                    } else {
                        AbstractFileOutputOperator.this.fs.delete(path, true);
                        if (AbstractFileOutputOperator.this.alwaysWriteToTmp && AbstractFileOutputOperator.this.fs.exists(path2)) {
                            AbstractFileOutputOperator.this.fs.delete(path2, true);
                        }
                        openStream = AbstractFileOutputOperator.this.openStream(path, false);
                    }
                    AbstractFileOutputOperator.this.filesWithOpenStreams.add(str);
                    AbstractFileOutputOperator.LOG.info("opened {}, active {}", partFileNamePri, path);
                    return new FSFilterStreamContext(openStream);
                } catch (IOException e) {
                    throw new RuntimeException(e);
                }
            }
        };
    }

    private RemovalListener<String, AbstractFileOutputOperator<INPUT>.FSFilterStreamContext> createCacheRemoveListener() {
        return new RemovalListener<String, AbstractFileOutputOperator<INPUT>.FSFilterStreamContext>() { // from class: com.datatorrent.lib.io.fs.AbstractFileOutputOperator.3
            /*  JADX ERROR: JadxRuntimeException in pass: InlineMethods
                jadx.core.utils.exceptions.JadxRuntimeException: Failed to process method for inline: com.datatorrent.lib.io.fs.AbstractFileOutputOperator.access$414(com.datatorrent.lib.io.fs.AbstractFileOutputOperator, long):long
                	at jadx.core.dex.visitors.InlineMethods.processInvokeInsn(InlineMethods.java:74)
                	at jadx.core.dex.visitors.InlineMethods.visit(InlineMethods.java:49)
                Caused by: jadx.core.utils.exceptions.JadxRuntimeException: Class not yet loaded at codegen stage: com.datatorrent.lib.io.fs.AbstractFileOutputOperator
                	at jadx.core.dex.nodes.ClassNode.reloadAtCodegenStage(ClassNode.java:883)
                	at jadx.core.dex.visitors.InlineMethods.processInvokeInsn(InlineMethods.java:66)
                	... 1 more
                */
            public void onRemoval(@javax.annotation.Nonnull com.google.common.cache.RemovalNotification<java.lang.String, com.datatorrent.lib.io.fs.AbstractFileOutputOperator<INPUT>.FSFilterStreamContext> r7) {
                /*
                    r6 = this;
                    r0 = r7
                    java.lang.Object r0 = r0.getValue()
                    com.datatorrent.lib.io.fs.AbstractFileOutputOperator$FSFilterStreamContext r0 = (com.datatorrent.lib.io.fs.AbstractFileOutputOperator.FSFilterStreamContext) r0
                    r8 = r0
                    r0 = r8
                    if (r0 == 0) goto L6f
                    r0 = r7
                    java.lang.Object r0 = r0.getKey()     // Catch: java.io.IOException -> L56
                    java.lang.String r0 = (java.lang.String) r0     // Catch: java.io.IOException -> L56
                    r9 = r0
                    r0 = r6
                    com.datatorrent.lib.io.fs.AbstractFileOutputOperator r0 = com.datatorrent.lib.io.fs.AbstractFileOutputOperator.this     // Catch: java.io.IOException -> L56
                    r1 = r9
                    java.lang.String r0 = r0.getPartFileNamePri(r1)     // Catch: java.io.IOException -> L56
                    r10 = r0
                    org.slf4j.Logger r0 = com.datatorrent.lib.io.fs.AbstractFileOutputOperator.access$200()     // Catch: java.io.IOException -> L56
                    java.lang.String r1 = "closing {}"
                    r2 = r10
                    r0.info(r1, r2)     // Catch: java.io.IOException -> L56
                    long r0 = java.lang.System.currentTimeMillis()     // Catch: java.io.IOException -> L56
                    r11 = r0
                    r0 = r6
                    com.datatorrent.lib.io.fs.AbstractFileOutputOperator r0 = com.datatorrent.lib.io.fs.AbstractFileOutputOperator.this     // Catch: java.io.IOException -> L56
                    r1 = r8
                    r0.closeStream(r1)     // Catch: java.io.IOException -> L56
                    r0 = r6
                    com.datatorrent.lib.io.fs.AbstractFileOutputOperator r0 = com.datatorrent.lib.io.fs.AbstractFileOutputOperator.this     // Catch: java.io.IOException -> L56
                    java.util.Set r0 = com.datatorrent.lib.io.fs.AbstractFileOutputOperator.access$300(r0)     // Catch: java.io.IOException -> L56
                    r1 = r9
                    boolean r0 = r0.remove(r1)     // Catch: java.io.IOException -> L56
                    r0 = r6
                    com.datatorrent.lib.io.fs.AbstractFileOutputOperator r0 = com.datatorrent.lib.io.fs.AbstractFileOutputOperator.this     // Catch: java.io.IOException -> L56
                    long r1 = java.lang.System.currentTimeMillis()     // Catch: java.io.IOException -> L56
                    r2 = r11
                    long r1 = r1 - r2
                    long r0 = com.datatorrent.lib.io.fs.AbstractFileOutputOperator.access$414(r0, r1)     // Catch: java.io.IOException -> L56
                    goto L6f
                L56:
                    r9 = move-exception
                    org.slf4j.Logger r0 = com.datatorrent.lib.io.fs.AbstractFileOutputOperator.access$200()
                    java.lang.String r1 = "removing {}"
                    r2 = r7
                    java.lang.Object r2 = r2.getValue()
                    r3 = r9
                    r0.error(r1, r2, r3)
                    java.lang.RuntimeException r0 = new java.lang.RuntimeException
                    r1 = r0
                    r2 = r9
                    r1.<init>(r2)
                    throw r0
                L6f:
                    return
                */
                throw new UnsupportedOperationException("Method not decompiled: com.datatorrent.lib.io.fs.AbstractFileOutputOperator.AnonymousClass3.onRemoval(com.google.common.cache.RemovalNotification):void");
            }
        };
    }

    protected FSDataOutputStream openStream(Path path, boolean z) throws IOException {
        FSDataOutputStream create;
        if (z) {
            create = this.fs.append(path);
        } else {
            create = this.fs.create(path, (short) this.replication);
            this.fs.setPermission(path, FsPermission.createImmutable(this.filePermission));
        }
        return create;
    }

    protected void closeStream(AbstractFileOutputOperator<INPUT>.FSFilterStreamContext fSFilterStreamContext) throws IOException {
        fSFilterStreamContext.close();
    }

    protected void rename(Path path, Path path2) throws IOException {
        if (this.fileContext == null) {
            this.fileContext = FileContext.getFileContext(this.fs.getUri());
        }
        this.fileContext.rename(path, path2, new Options.Rename[]{Options.Rename.OVERWRITE});
    }

    protected void requestFinalize(String str) {
        Set<String> set = this.finalizedFiles.get(Long.valueOf(this.currentWindow));
        if (set == null) {
            set = Sets.newHashSet();
            this.finalizedFiles.put(Long.valueOf(this.currentWindow), set);
        }
        if (this.rollingFile) {
            MutableInt mutableInt = this.finalizedPart.get(str);
            if (mutableInt == null) {
                mutableInt = new MutableInt(-1);
                this.finalizedPart.put(str, mutableInt);
            }
            MutableInt mutableInt2 = this.openPart.get(str);
            for (int intValue = mutableInt.getValue().intValue() + 1; intValue <= mutableInt2.getValue().intValue(); intValue++) {
                String partFileName = getPartFileName(str, intValue);
                LOG.debug("request finalize {}", partFileName);
                set.add(partFileName);
            }
            str = getPartFileNamePri(str);
            mutableInt.setValue(mutableInt2.getValue());
        }
        set.add(str);
    }

    public void teardown() {
        String str;
        ArrayList arrayList = new ArrayList();
        int i = 0;
        IOException iOException = null;
        ConcurrentMap asMap = this.streamsCache.asMap();
        for (String str2 : asMap.keySet()) {
            AbstractFileOutputOperator<INPUT>.FSFilterStreamContext fSFilterStreamContext = (FSFilterStreamContext) asMap.get(str2);
            try {
                long currentTimeMillis = System.currentTimeMillis();
                closeStream(fSFilterStreamContext);
                this.filesWithOpenStreams.remove(str2);
                this.totalWritingTime += System.currentTimeMillis() - currentTimeMillis;
            } catch (IOException e) {
                i++;
                if (arrayList.size() < MAX_NUMBER_FILES_IN_TEARDOWN_EXCEPTION) {
                    arrayList.add(str2);
                    iOException = e;
                }
            }
        }
        boolean z = false;
        try {
            this.fs.close();
        } catch (IOException e2) {
            iOException = e2;
            z = true;
        }
        if (iOException == null) {
            System.currentTimeMillis();
            return;
        }
        str = "";
        str = z ? str + "Closing the fileSystem failed. " : "";
        if (!arrayList.isEmpty()) {
            str = str + "The following files failed closing: ";
        }
        Iterator it = arrayList.iterator();
        while (it.hasNext()) {
            str = str + ((String) it.next()) + ", ";
        }
        if (i > MAX_NUMBER_FILES_IN_TEARDOWN_EXCEPTION) {
            str = str + (i - MAX_NUMBER_FILES_IN_TEARDOWN_EXCEPTION) + " more files failed.";
        }
        throw new RuntimeException(str, iOException);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void processTuple(INPUT input) {
        String fileName = getFileName(input);
        if (Strings.isNullOrEmpty(fileName)) {
            return;
        }
        try {
            FilterOutputStream filterStream = ((FSFilterStreamContext) this.streamsCache.get(fileName)).getFilterStream();
            byte[] bytesForTuple = getBytesForTuple(input);
            long currentTimeMillis = System.currentTimeMillis();
            filterStream.write(bytesForTuple);
            this.totalWritingTime += System.currentTimeMillis() - currentTimeMillis;
            this.totalBytesWritten += bytesForTuple.length;
            MutableLong mutableLong = this.endOffsets.get(fileName);
            if (mutableLong == null) {
                mutableLong = new MutableLong(0L);
                this.endOffsets.put(fileName, mutableLong);
            }
            mutableLong.add(bytesForTuple.length);
            if (this.rotationWindows > 0) {
                getRotationState(fileName).notEmpty = true;
            }
            if (this.rollingFile && mutableLong.longValue() > this.maxLength.longValue()) {
                LOG.debug("Rotating file {} {} {}", new Object[]{fileName, this.openPart.get(fileName), Long.valueOf(mutableLong.longValue())});
                rotate(fileName);
            }
            MutableLong mutableLong2 = this.counts.get(fileName);
            if (mutableLong2 == null) {
                mutableLong2 = new MutableLong(0L);
                this.counts.put(fileName, mutableLong2);
            }
            mutableLong2.add(1L);
        } catch (IOException | ExecutionException e) {
            throw new RuntimeException(e);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void rotate(String str) throws IllegalArgumentException, IOException, ExecutionException {
        requestFinalize(str);
        this.counts.remove(str);
        this.streamsCache.invalidate(str);
        MutableInt mutableInt = this.openPart.get(str);
        LOG.debug("Part file rotated {} : {}", str, mutableInt.getValue());
        rotateHook(getPartFileName(str, mutableInt.getValue().intValue()));
        getRotationState(str).rotated = true;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public RotationState getRotationState(String str) {
        RotationState rotationState = this.rotationStates.get(str);
        if (rotationState == null) {
            rotationState = new RotationState();
            this.rotationStates.put(str, rotationState);
        }
        return rotationState;
    }

    @Deprecated
    protected void rotateHook(String str) {
    }

    protected void flush(FSDataOutputStream fSDataOutputStream) throws IOException {
        if ((this.fs instanceof LocalFileSystem) || (this.fs instanceof RawLocalFileSystem)) {
            fSDataOutputStream.flush();
        } else {
            fSDataOutputStream.hflush();
        }
    }

    protected String getPartFileNamePri(String str) {
        if (!this.rollingFile) {
            return str;
        }
        MutableInt mutableInt = this.openPart.get(str);
        if (mutableInt == null) {
            mutableInt = new MutableInt(0);
            this.openPart.put(str, mutableInt);
            LOG.debug("First file part number {}", mutableInt);
        }
        return getPartFileName(str, mutableInt.intValue());
    }

    protected String getPartFileName(String str, int i) {
        return str + "." + i;
    }

    public void beginWindow(long j) {
        try {
            Iterator it = this.streamsCache.asMap().values().iterator();
            while (it.hasNext()) {
                ((FSFilterStreamContext) it.next()).initializeContext();
            }
            this.currentWindow = j;
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    public void endWindow() {
        try {
            for (FSFilterStreamContext fSFilterStreamContext : this.streamsCache.asMap().values()) {
                long currentTimeMillis = System.currentTimeMillis();
                fSFilterStreamContext.finalizeContext();
                this.totalWritingTime += System.currentTimeMillis() - currentTimeMillis;
            }
            if (this.rotationWindows > 0) {
                int i = this.rotationCount + 1;
                this.rotationCount = i;
                if (i == this.rotationWindows) {
                    this.rotationCount = 0;
                    Iterator<Map.Entry<String, MutableInt>> it = this.openPart.entrySet().iterator();
                    while (it.hasNext()) {
                        String key = it.next().getKey();
                        RotationState rotationState = this.rotationStates.get(key);
                        boolean z = false;
                        if (rotationState != null) {
                            z = !rotationState.rotated && rotationState.notEmpty;
                            rotationState.notEmpty = false;
                        }
                        if (z) {
                            try {
                                rotate(key);
                            } catch (IOException | ExecutionException e) {
                                throw new RuntimeException(e);
                            }
                        }
                    }
                }
            }
            this.fileCounters.getCounter(Counters.TOTAL_TIME_WRITING_MILLISECONDS).setValue(this.totalWritingTime);
            this.fileCounters.getCounter(Counters.TOTAL_BYTES_WRITTEN).setValue(this.totalBytesWritten);
            this.context.setCounters(this.fileCounters);
        } catch (IOException e2) {
            throw new RuntimeException(e2);
        }
    }

    protected abstract String getFileName(INPUT input);

    protected abstract byte[] getBytesForTuple(INPUT input);

    public void setFilePath(@Nonnull String str) {
        this.filePath = str;
    }

    public String getFilePath() {
        return this.filePath;
    }

    public void setMaxLength(long j) {
        this.maxLength = Long.valueOf(j);
    }

    public long getMaxLength() {
        return this.maxLength.longValue();
    }

    public int getRotationWindows() {
        return this.rotationWindows;
    }

    public void setRotationWindows(int i) {
        this.rotationWindows = i;
    }

    public void setMaxOpenFiles(int i) {
        this.maxOpenFiles = i;
    }

    public int getMaxOpenFiles() {
        return this.maxOpenFiles;
    }

    public short getFilePermission() {
        return this.filePermission;
    }

    public void setFilePermission(short s) {
        this.filePermission = s;
    }

    public FilterStreamProvider getFilterStreamProvider() {
        return this.filterStreamProvider;
    }

    public void setFilterStreamProvider(FilterStreamProvider filterStreamProvider) {
        this.filterStreamProvider = filterStreamProvider;
    }

    public void checkpointed(long j) {
    }

    public void committed(long j) {
        if (this.alwaysWriteToTmp) {
            Iterator<Map.Entry<Long, Set<String>>> it = this.finalizedFiles.entrySet().iterator();
            while (it.hasNext()) {
                try {
                    Map.Entry<Long, Set<String>> next = it.next();
                    if (next.getKey().longValue() > j) {
                        break;
                    }
                    Iterator<String> it2 = next.getValue().iterator();
                    while (it2.hasNext()) {
                        finalizeFile(it2.next());
                    }
                    it.remove();
                } catch (IOException e) {
                    throw new RuntimeException("failed to commit", e);
                }
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void finalizeFile(String str) throws IOException {
        String str2 = this.fileNameToTmpName.get(str);
        Path path = new Path(this.filePath + "/" + str2);
        Path path2 = new Path(this.filePath + "/" + str);
        if (!this.fs.exists(path2)) {
            LOG.debug("rename from tmp {} actual {} ", str2, str);
            rename(path, path2);
        } else if (this.fs.exists(path)) {
            LOG.debug("deleting tmp {}", str2);
            this.fs.delete(path, true);
        }
        this.endOffsets.remove(str);
        this.fileNameToTmpName.remove(str);
        for (FileStatus fileStatus : this.fs.listStatus(path2.getParent())) {
            String name = fileStatus.getPath().getName();
            if (name.endsWith(TMP_EXTENSION) && name.startsWith(path2.getName()) && str.equals(name.substring(0, name.lastIndexOf(46, name.lastIndexOf(46) - 1)))) {
                LOG.debug("deleting stray file {}", name);
                this.fs.delete(fileStatus.getPath(), true);
            }
        }
    }

    public boolean isAlwaysWriteToTmp() {
        return this.alwaysWriteToTmp;
    }

    public void setAlwaysWriteToTmp(boolean z) {
        this.alwaysWriteToTmp = z;
    }

    @VisibleForTesting
    protected Map<String, String> getFileNameToTmpName() {
        return this.fileNameToTmpName;
    }

    @VisibleForTesting
    protected Map<Long, Set<String>> getFinalizedFiles() {
        return this.finalizedFiles;
    }

    public Long getExpireStreamAfterAccessMillis() {
        return this.expireStreamAfterAccessMillis;
    }

    public void setExpireStreamAfterAccessMillis(Long l) {
        this.expireStreamAfterAccessMillis = l;
    }

    /*  JADX ERROR: Failed to decode insn: 0x0007: MOVE_MULTI, method: com.datatorrent.lib.io.fs.AbstractFileOutputOperator.access$414(com.datatorrent.lib.io.fs.AbstractFileOutputOperator, long):long
        java.lang.ArrayIndexOutOfBoundsException: arraycopy: source index -1 out of bounds for object array[6]
        	at java.base/java.lang.System.arraycopy(Native Method)
        	at jadx.plugins.input.java.data.code.StackState.insert(StackState.java:49)
        	at jadx.plugins.input.java.data.code.CodeDecodeState.insert(CodeDecodeState.java:118)
        	at jadx.plugins.input.java.data.code.JavaInsnsRegister.dup2x1(JavaInsnsRegister.java:313)
        	at jadx.plugins.input.java.data.code.JavaInsnData.decode(JavaInsnData.java:46)
        	at jadx.core.dex.instructions.InsnDecoder.lambda$process$0(InsnDecoder.java:54)
        	at jadx.plugins.input.java.data.code.JavaCodeReader.visitInstructions(JavaCodeReader.java:81)
        	at jadx.core.dex.instructions.InsnDecoder.process(InsnDecoder.java:50)
        	at jadx.core.dex.nodes.MethodNode.load(MethodNode.java:156)
        	at jadx.core.dex.nodes.ClassNode.load(ClassNode.java:443)
        	at jadx.core.ProcessClass.process(ProcessClass.java:70)
        	at jadx.core.ProcessClass.generateCode(ProcessClass.java:118)
        	at jadx.core.dex.nodes.ClassNode.generateClassCode(ClassNode.java:400)
        	at jadx.core.dex.nodes.ClassNode.decompile(ClassNode.java:388)
        	at jadx.core.dex.nodes.ClassNode.getCode(ClassNode.java:338)
        */
    static /* synthetic */ long access$414(com.datatorrent.lib.io.fs.AbstractFileOutputOperator r6, long r7) {
        /*
            r0 = r6
            r1 = r0
            long r1 = r1.totalWritingTime
            r2 = r7
            long r1 = r1 + r2
            // decode failed: arraycopy: source index -1 out of bounds for object array[6]
            r0.totalWritingTime = r1
            return r-1
        */
        throw new UnsupportedOperationException("Method not decompiled: com.datatorrent.lib.io.fs.AbstractFileOutputOperator.access$414(com.datatorrent.lib.io.fs.AbstractFileOutputOperator, long):long");
    }

    static {
    }
}
