package com.datatorrent.lib.io.jms;

import java.io.IOException;
import javax.jms.JMSException;
import javax.validation.constraints.NotNull;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@InterfaceStability.Evolving
/* loaded from: input_file:com/datatorrent/lib/io/jms/FSPsuedoTransactionableStore.class */
public class FSPsuedoTransactionableStore extends JMSBaseTransactionableStore {
    private static final Logger logger = LoggerFactory.getLogger(FSPsuedoTransactionableStore.class);
    public static final String DEFAULT_RECOVERY_DIRECTORY = "recovery";
    public static final String COMMITTED_WINDOW_DIR = "DT_CMT";
    private transient boolean connected = false;
    private transient boolean inTransaction = false;

    @NotNull
    protected String recoveryDirectory = DEFAULT_RECOVERY_DIRECTORY;
    private transient FileSystem fs;

    protected FileSystem getFSInstance() throws IOException {
        FileSystem newInstance = FileSystem.newInstance(new Path(this.recoveryDirectory).toUri(), new Configuration());
        if (newInstance instanceof LocalFileSystem) {
            newInstance = ((LocalFileSystem) newInstance).getRaw();
        }
        return newInstance;
    }

    public void setRecoveryDirectory(String str) {
        this.recoveryDirectory = str;
    }

    public String getRecoveryDirectory() {
        return this.recoveryDirectory;
    }

    @Override // com.datatorrent.lib.db.TransactionableStore
    public long getCommittedWindowId(String str, int i) {
        Path operatorRecoveryPath = getOperatorRecoveryPath(str, i);
        try {
            if (!this.fs.exists(operatorRecoveryPath)) {
                return -1L;
            }
            long j = Long.MIN_VALUE;
            try {
                for (FileStatus fileStatus : this.fs.listStatus(operatorRecoveryPath)) {
                    long parseLong = Long.parseLong(fileStatus.getPath().getName());
                    if (j < parseLong) {
                        j = parseLong;
                    }
                }
                return j;
            } catch (IOException e) {
                throw new RuntimeException(e);
            }
        } catch (IOException e2) {
            throw new RuntimeException(e2);
        }
    }

    @Override // com.datatorrent.lib.db.TransactionableStore
    public void storeCommittedWindowId(String str, int i, long j) {
        Path operatorRecoveryPath = getOperatorRecoveryPath(str, i);
        Path operatorWindowRecoveryPath = getOperatorWindowRecoveryPath(str, i, j);
        String l = Long.toString(j);
        try {
            this.fs.create(operatorWindowRecoveryPath);
            for (FileStatus fileStatus : this.fs.listStatus(operatorRecoveryPath)) {
                Path path = fileStatus.getPath();
                if (!path.getName().equals(l)) {
                    this.fs.delete(path, true);
                }
            }
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    @Override // com.datatorrent.lib.db.TransactionableStore
    public void removeCommittedWindowId(String str, int i) {
        try {
            this.fs.delete(getOperatorRecoveryPath(str, i).getParent(), true);
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    @Override // com.datatorrent.lib.db.Transactionable
    public void beginTransaction() {
        this.inTransaction = true;
    }

    @Override // com.datatorrent.lib.db.Transactionable
    public void commitTransaction() {
        try {
            getBase().getSession().commit();
            this.inTransaction = false;
        } catch (JMSException e) {
            throw new RuntimeException((Throwable) e);
        }
    }

    @Override // com.datatorrent.lib.db.Transactionable
    public void rollbackTransaction() {
        try {
            getBase().getSession().rollback();
            this.inTransaction = false;
        } catch (JMSException e) {
            throw new RuntimeException((Throwable) e);
        }
    }

    @Override // com.datatorrent.lib.db.Transactionable
    public boolean isInTransaction() {
        return this.inTransaction;
    }

    @Override // com.datatorrent.lib.db.Connectable
    public void connect() throws IOException {
        this.fs = getFSInstance();
        this.connected = true;
    }

    @Override // com.datatorrent.lib.db.Connectable
    public void disconnect() throws IOException {
        this.fs.close();
        this.connected = false;
    }

    @Override // com.datatorrent.lib.db.Connectable
    public boolean isConnected() {
        return this.connected;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // com.datatorrent.lib.io.jms.JMSBaseTransactionableStore
    public boolean isExactlyOnce() {
        return false;
    }

    private Path getOperatorRecoveryPath(String str, int i) {
        return new Path("recovery/" + str + "/" + i + "/" + COMMITTED_WINDOW_DIR);
    }

    private Path getOperatorWindowRecoveryPath(String str, int i, long j) {
        return new Path("recovery/" + str + "/" + i + "/" + COMMITTED_WINDOW_DIR + "/" + j);
    }
}
