package org.apache.apex.malhar.lib.wal;

import com.datatorrent.api.Context;
import com.datatorrent.api.DAG;
import com.datatorrent.lib.util.KryoCloneUtils;
import com.datatorrent.netlet.util.Slice;
import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Maps;
import com.google.common.primitives.Longs;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.SortedSet;
import java.util.TreeSet;
import javax.validation.constraints.NotNull;
import org.apache.apex.malhar.lib.utils.FileContextUtils;
import org.apache.apex.malhar.lib.wal.FSWindowReplayWAL;
import org.apache.apex.malhar.lib.wal.FileSystemWAL;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/apex/malhar/lib/wal/FSWindowDataManager.class */
public class FSWindowDataManager implements WindowDataManager {
    private static final String DEF_STATE_PATH = "idempotentState";
    private static final String WAL_FILE_NAME = "wal";
    private Set<Integer> deletedOperators;
    private boolean repartitioned;
    private boolean relyOnCheckpoints;
    private transient String fullStatePath;
    private transient int operatorId;
    private transient FileContext fileContext;
    private static Logger LOG = LoggerFactory.getLogger(FSWindowDataManager.class);

    @NotNull
    private String statePath = DEF_STATE_PATH;
    private boolean isStatePathRelativeToAppPath = true;
    private transient long largestCompletedWindow = -1;
    private final FSWindowReplayWAL wal = new FSWindowReplayWAL();
    private final transient Map<Integer, FSWindowReplayWAL> readOnlyWals = new HashMap();
    private final transient Kryo kryo = new Kryo();

    public FSWindowDataManager() {
        this.kryo.setClassLoader(Thread.currentThread().getContextClassLoader());
    }

    @Override // 
    public void setup(Context.OperatorContext operatorContext) {
        this.operatorId = operatorContext.getId();
        if (this.isStatePathRelativeToAppPath) {
            this.fullStatePath = ((String) operatorContext.getValue(DAG.APPLICATION_PATH)) + "/" + this.statePath;
        } else {
            this.fullStatePath = this.statePath;
        }
        try {
            this.fileContext = FileContextUtils.getFileContext(this.fullStatePath);
            setupWals(((Long) operatorContext.getValue(Context.OperatorContext.ACTIVATION_WINDOW_ID)).longValue());
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    private void setupWals(long j) throws IOException {
        findFiles(this.wal, this.operatorId);
        configureWal(this.wal, this.operatorId, !this.relyOnCheckpoints);
        if (this.repartitioned) {
            createReadOnlyWals();
            for (Map.Entry<Integer, FSWindowReplayWAL> entry : this.readOnlyWals.entrySet()) {
                findFiles(entry.getValue(), entry.getKey().intValue());
                configureWal(entry.getValue(), entry.getKey().intValue(), true);
            }
        }
        if (this.relyOnCheckpoints) {
            this.wal.walEndPointerAfterRecovery = this.wal.getWriter().getCurrentPointer();
            this.largestCompletedWindow = this.wal.getLastCheckpointedWindow();
        } else {
            long findLargestCompletedWindow = findLargestCompletedWindow(this.wal, null);
            if (findLargestCompletedWindow > j) {
                this.largestCompletedWindow = findLargestCompletedWindow;
            }
            if (this.wal.getReader().getCurrentPointer() != null) {
                this.wal.getWriter().setCurrentPointer(this.wal.getReader().getCurrentPointer().getCopy());
            }
        }
        if (this.repartitioned && this.largestCompletedWindow > -1) {
            for (Map.Entry<Integer, FSWindowReplayWAL> entry2 : this.readOnlyWals.entrySet()) {
                long j2 = -1;
                if (this.relyOnCheckpoints) {
                    j2 = findLargestCompletedWindow(entry2.getValue(), Long.valueOf(j));
                } else {
                    long findLargestCompletedWindow2 = findLargestCompletedWindow(entry2.getValue(), null);
                    if (findLargestCompletedWindow2 > j) {
                        j2 = findLargestCompletedWindow2;
                    }
                }
                if (j2 < this.largestCompletedWindow) {
                    this.largestCompletedWindow = j2;
                }
            }
        }
        this.wal.getReader().seek2(this.wal.walStartPointer);
        for (FSWindowReplayWAL fSWindowReplayWAL : this.readOnlyWals.values()) {
            fSWindowReplayWAL.getReader().seek2(fSWindowReplayWAL.walStartPointer);
        }
        this.wal.setup();
        Iterator<FSWindowReplayWAL> it = this.readOnlyWals.values().iterator();
        while (it.hasNext()) {
            it.next().setup();
        }
    }

    protected void createReadOnlyWals() throws IOException {
        RemoteIterator listStatus = this.fileContext.listStatus(new Path(this.fullStatePath));
        while (listStatus.hasNext()) {
            int parseInt = Integer.parseInt(((FileStatus) listStatus.next()).getPath().getName());
            if (parseInt != this.operatorId) {
                this.readOnlyWals.put(Integer.valueOf(parseInt), new FSWindowReplayWAL(true));
            }
        }
    }

    private void configureWal(FSWindowReplayWAL fSWindowReplayWAL, int i, boolean z) throws IOException {
        fSWindowReplayWAL.setFilePath((this.fullStatePath + "/" + i) + "/" + WAL_FILE_NAME);
        fSWindowReplayWAL.fileContext = this.fileContext;
        if (!z || fSWindowReplayWAL.fileDescriptors.isEmpty()) {
            return;
        }
        SortedSet keySet = fSWindowReplayWAL.fileDescriptors.keySet();
        fSWindowReplayWAL.walStartPointer = new FileSystemWAL.FileSystemWALPointer(((Integer) keySet.first()).intValue(), 0L);
        FSWindowReplayWAL.FileDescriptor fileDescriptor = (FSWindowReplayWAL.FileDescriptor) fSWindowReplayWAL.fileDescriptors.get(keySet.last()).last();
        if (fileDescriptor.isTmp) {
            fSWindowReplayWAL.tempPartFiles.put(Integer.valueOf(fileDescriptor.part), fileDescriptor.filePath.toString());
        }
    }

    private void findFiles(FSWindowReplayWAL fSWindowReplayWAL, int i) throws IOException {
        Path path = new Path(this.fullStatePath + "/" + i);
        if (this.fileContext.util().exists(path)) {
            RemoteIterator listStatus = this.fileContext.listStatus(path);
            while (listStatus.hasNext()) {
                FSWindowReplayWAL.FileDescriptor create = FSWindowReplayWAL.FileDescriptor.create(((FileStatus) listStatus.next()).getPath());
                fSWindowReplayWAL.fileDescriptors.put(Integer.valueOf(create.part), create);
            }
        }
    }

    private long findLargestCompletedWindow(FSWindowReplayWAL fSWindowReplayWAL, Long l) throws IOException {
        if (fSWindowReplayWAL.fileDescriptors.isEmpty()) {
            return -1L;
        }
        FileSystemWAL.FileSystemWALReader reader = fSWindowReplayWAL.getReader();
        Iterator it = new TreeSet(fSWindowReplayWAL.fileDescriptors.keySet()).descendingSet().iterator();
        while (it.hasNext()) {
            FSWindowReplayWAL.FileDescriptor fileDescriptor = (FSWindowReplayWAL.FileDescriptor) fSWindowReplayWAL.fileDescriptors.get(Integer.valueOf(((Integer) it.next()).intValue())).last();
            reader.seek2(new FileSystemWAL.FileSystemWALPointer(fileDescriptor.part, 0L));
            long j = -1;
            long j2 = -1;
            Slice readNext = readNext(reader);
            while (true) {
                Slice slice = readNext;
                if (slice == null || !skipNext(reader)) {
                    break;
                }
                long offset = reader.getCurrentPointer().getOffset();
                long fromByteArray = Longs.fromByteArray(slice.toByteArray());
                if (l != null && fromByteArray > l.longValue()) {
                    break;
                }
                j = offset;
                j2 = fromByteArray;
                readNext = readNext(reader);
            }
            if (j != -1) {
                fSWindowReplayWAL.walEndPointerAfterRecovery = new FileSystemWAL.FileSystemWALPointer(fileDescriptor.part, j);
                fSWindowReplayWAL.windowWalParts.put(Long.valueOf(j2), Integer.valueOf(fSWindowReplayWAL.walEndPointerAfterRecovery.getPartNum()));
                return j2;
            }
        }
        return -1L;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public Slice readNext(FileSystemWAL.FileSystemWALReader fileSystemWALReader) {
        try {
            return fileSystemWALReader.next();
        } catch (IOException e) {
            try {
                fileSystemWALReader.close();
                return null;
            } catch (IOException e2) {
                return null;
            }
        }
    }

    private boolean skipNext(FileSystemWAL.FileSystemWALReader fileSystemWALReader) {
        try {
            fileSystemWALReader.skipNext();
            return true;
        } catch (IOException e) {
            try {
                fileSystemWALReader.close();
                return false;
            } catch (IOException e2) {
                return false;
            }
        }
    }

    private void closeReaders() throws IOException {
        this.wal.getReader().close();
        if (this.readOnlyWals.size() > 0) {
            Iterator<Map.Entry<Integer, FSWindowReplayWAL>> it = this.readOnlyWals.entrySet().iterator();
            while (it.hasNext()) {
                Map.Entry<Integer, FSWindowReplayWAL> next = it.next();
                next.getValue().getReader().close();
                int intValue = next.getKey().intValue();
                if (this.deletedOperators == null || !this.deletedOperators.contains(Integer.valueOf(intValue))) {
                    it.remove();
                }
            }
        }
    }

    @Override // org.apache.apex.malhar.lib.wal.WindowDataManager
    public void save(Object obj, long j) throws IOException {
        closeReaders();
        FileSystemWAL.FileSystemWALWriter writer = this.wal.getWriter();
        writer.append(new Slice(Longs.toByteArray(j)));
        writer.append(toSlice(obj));
        this.wal.beforeCheckpoint(j);
        this.wal.windowWalParts.put(Long.valueOf(j), Integer.valueOf(writer.getCurrentPointer().getPartNum()));
        writer.rotateIfNecessary();
    }

    @Override // org.apache.apex.malhar.lib.wal.WindowDataManager
    public Object retrieve(long j) throws IOException {
        return retrieve(this.wal, j);
    }

    @Override // org.apache.apex.malhar.lib.wal.WindowDataManager
    public Map<Integer, Object> retrieveAllPartitions(long j) throws IOException {
        if (j > this.largestCompletedWindow) {
            return null;
        }
        HashMap newHashMap = Maps.newHashMap();
        Object retrieve = retrieve(this.wal, j);
        if (retrieve != null) {
            newHashMap.put(Integer.valueOf(this.operatorId), retrieve);
        }
        if (this.repartitioned) {
            for (Map.Entry<Integer, FSWindowReplayWAL> entry : this.readOnlyWals.entrySet()) {
                Object retrieve2 = retrieve(entry.getValue(), j);
                if (retrieve2 != null) {
                    newHashMap.put(entry.getKey(), retrieve2);
                }
            }
        }
        return newHashMap;
    }

    private Object retrieve(FSWindowReplayWAL fSWindowReplayWAL, long j) throws IOException {
        if (j > this.largestCompletedWindow || fSWindowReplayWAL.walEndPointerAfterRecovery == null) {
            return null;
        }
        FileSystemWAL.FileSystemWALReader reader = fSWindowReplayWAL.getReader();
        do {
            if (reader.getCurrentPointer() != null && reader.getCurrentPointer().compareTo(fSWindowReplayWAL.walEndPointerAfterRecovery) >= 0) {
                return null;
            }
            if (fSWindowReplayWAL.retrievedWindow == null) {
                fSWindowReplayWAL.retrievedWindow = readNext(reader);
                Preconditions.checkNotNull(fSWindowReplayWAL.retrievedWindow);
            }
            long fromByteArray = Longs.fromByteArray(fSWindowReplayWAL.retrievedWindow.toByteArray());
            if (j == fromByteArray) {
                Slice readNext = readNext(reader);
                Preconditions.checkNotNull(readNext, "data is null");
                fSWindowReplayWAL.windowWalParts.put(Long.valueOf(fromByteArray), Integer.valueOf(reader.getCurrentPointer().getPartNum()));
                fSWindowReplayWAL.retrievedWindow = readNext(reader);
                return fromSlice(readNext);
            }
            if (j < fromByteArray) {
                return null;
            }
            skipNext(reader);
            fSWindowReplayWAL.windowWalParts.put(Long.valueOf(fromByteArray), Integer.valueOf(reader.getCurrentPointer().getPartNum()));
            fSWindowReplayWAL.retrievedWindow = readNext(reader);
        } while (fSWindowReplayWAL.retrievedWindow != null);
        return null;
    }

    @Override // org.apache.apex.malhar.lib.wal.WindowDataManager
    public void committed(long j) throws IOException {
        closeReaders();
        Map.Entry<Long, Integer> entry = null;
        Iterator<Map.Entry<Long, Integer>> it = this.wal.windowWalParts.entrySet().iterator();
        while (it.hasNext()) {
            Map.Entry<Long, Integer> next = it.next();
            if (next.getKey().longValue() <= j && !this.wal.tempPartFiles.containsKey(next.getValue())) {
                entry = next;
                it.remove();
            }
            if (next.getKey().longValue() > j) {
                break;
            }
        }
        if (entry != null && !this.wal.windowWalParts.containsValue(entry.getValue())) {
            int intValue = entry.getValue().intValue();
            this.wal.getWriter().delete2(new FileSystemWAL.FileSystemWALPointer(intValue + 1, 0L));
            for (Map.Entry entry2 : this.wal.fileDescriptors.entries()) {
                if (((Integer) entry2.getKey()).intValue() > intValue || !((FSWindowReplayWAL.FileDescriptor) entry2.getValue()).isTmp) {
                    if (((Integer) entry2.getKey()).intValue() > intValue) {
                        break;
                    }
                } else if (this.fileContext.util().exists(((FSWindowReplayWAL.FileDescriptor) entry2.getValue()).filePath)) {
                    this.fileContext.delete(((FSWindowReplayWAL.FileDescriptor) entry2.getValue()).filePath, true);
                }
            }
        }
        if (this.deletedOperators != null) {
            Iterator<Integer> it2 = this.deletedOperators.iterator();
            while (it2.hasNext()) {
                int intValue2 = it2.next().intValue();
                FSWindowReplayWAL fSWindowReplayWAL = this.readOnlyWals.get(Integer.valueOf(intValue2));
                if (j > this.largestCompletedWindow) {
                    Path path = new Path(this.fullStatePath + "/" + intValue2);
                    if (this.fileContext.util().exists(path)) {
                        this.fileContext.delete(path, true);
                    }
                    fSWindowReplayWAL.teardown();
                    it2.remove();
                    this.readOnlyWals.remove(Integer.valueOf(intValue2));
                }
            }
            if (this.deletedOperators.isEmpty()) {
                this.deletedOperators = null;
            }
        }
    }

    private Slice toSlice(Object obj) {
        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
        Output output = new Output(byteArrayOutputStream);
        this.kryo.writeClassAndObject(output, obj);
        output.close();
        return new Slice(byteArrayOutputStream.toByteArray());
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public Object fromSlice(Slice slice) {
        Input input = new Input(slice.buffer, slice.offset, slice.length);
        Object readClassAndObject = this.kryo.readClassAndObject(input);
        input.close();
        return readClassAndObject;
    }

    @Override // org.apache.apex.malhar.lib.wal.WindowDataManager
    public long getLargestCompletedWindow() {
        return this.largestCompletedWindow;
    }

    @Override // org.apache.apex.malhar.lib.wal.WindowDataManager
    public List<WindowDataManager> partition(int i, Set<Integer> set) {
        this.repartitioned = true;
        FSWindowDataManager[] fSWindowDataManagerArr = (FSWindowDataManager[]) KryoCloneUtils.createCloneUtils(this).getClones(i);
        if (set != null && !set.isEmpty()) {
            fSWindowDataManagerArr[0].deletedOperators = set;
        }
        ArrayList arrayList = new ArrayList();
        arrayList.addAll(Arrays.asList(fSWindowDataManagerArr));
        return arrayList;
    }

    public void teardown() {
        this.wal.teardown();
        Iterator<FSWindowReplayWAL> it = this.readOnlyWals.values().iterator();
        while (it.hasNext()) {
            it.next().teardown();
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void setRelyOnCheckpoints(boolean z) {
        this.relyOnCheckpoints = z;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public FSWindowReplayWAL getWal() {
        return this.wal;
    }

    @VisibleForTesting
    public Set<Integer> getDeletedOperators() {
        if (this.deletedOperators == null) {
            return null;
        }
        return ImmutableSet.copyOf(this.deletedOperators);
    }

    public String getStatePath() {
        return this.statePath;
    }

    public void setStatePath(String str) {
        this.statePath = str;
    }

    public boolean isStatePathRelativeToAppPath() {
        return this.isStatePathRelativeToAppPath;
    }

    public void setStatePathRelativeToAppPath(boolean z) {
        this.isStatePathRelativeToAppPath = z;
    }
}
