/*
 * Decompiled with CFR 0.152.
 */
package com.microsoft.azure.flink.writer.internal.committer;

import com.microsoft.azure.flink.common.KustoClientUtil;
import com.microsoft.azure.flink.config.KustoConnectionOptions;
import com.microsoft.azure.flink.config.KustoWriteOptions;
import com.microsoft.azure.kusto.data.Client;
import com.microsoft.azure.kusto.data.KustoOperationResult;
import com.microsoft.azure.kusto.data.exceptions.DataClientException;
import com.microsoft.azure.kusto.data.exceptions.DataServiceException;
import com.microsoft.azure.kusto.ingest.IngestClient;
import com.microsoft.azure.kusto.ingest.IngestionProperties;
import com.microsoft.azure.kusto.ingest.exceptions.IngestionClientException;
import com.microsoft.azure.kusto.ingest.exceptions.IngestionServiceException;
import com.microsoft.azure.kusto.ingest.source.StreamSourceInfo;
import java.io.ByteArrayInputStream;
import java.io.InputStream;
import java.time.Clock;
import java.time.Instant;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.text.StringEscapeUtils;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.streaming.runtime.operators.CheckpointCommitter;

@Internal
@PublicEvolving
public class KustoCommitter
extends CheckpointCommitter {
    private static final long serialVersionUID = 1L;
    private final KustoConnectionOptions connectionOptions;
    private final KustoWriteOptions kustoWriteOptions;
    private IngestClient streamingIngestClient;
    private transient Client queryClient;
    private final String table = "flink_checkpoints";
    private final Map<Integer, Long> lastCommittedCheckpoints = new HashMap<Integer, Long>();

    public KustoCommitter(KustoConnectionOptions connectionOptions, KustoWriteOptions kustoWriteOptions) {
        this.connectionOptions = connectionOptions;
        this.kustoWriteOptions = kustoWriteOptions;
    }

    public void setJobId(String id) throws Exception {
        super.setJobId(id);
    }

    public void createResource() throws Exception {
        long startTime = Instant.now(Clock.systemUTC()).toEpochMilli();
        LOG.debug("Creating resources for KustoCommitter. Creating table {} in database {} and applying policies", (Object)this.table, (Object)this.kustoWriteOptions.getDatabase());
        String createCheckpointTable = String.format(".create-merge table %s (job_id:string, sink_id:string, sub_id:int, checkpoint_id:long) with (hidden=true,folder='Flink',docstring='Checkpointing table in Flink')", this.table);
        String enableStreaming = String.format(".alter-merge table %s policy streamingingestion '{\"IsEnabled\": true}'", this.table);
        String retentionPolicy = String.format(".alter-merge table %s policy retention softdelete = 1d recoverability = disabled", this.table);
        try {
            if (this.queryClient == null) {
                this.queryClient = KustoClientUtil.createClient(this.connectionOptions, KustoCommitter.class.getSimpleName());
                LOG.info("Initialized queryClient in createResource and query client is null");
            }
            this.queryClient.execute(this.kustoWriteOptions.getDatabase(), createCheckpointTable);
            this.queryClient.execute(this.kustoWriteOptions.getDatabase(), enableStreaming);
            this.queryClient.execute(this.kustoWriteOptions.getDatabase(), retentionPolicy);
        }
        catch (Exception e) {
            LOG.error("Error while creating resources. To use the KustoCommitter you need to have admin privileges on the database {}", (Object)this.kustoWriteOptions.getDatabase(), (Object)e);
            throw e;
        }
        LOG.info("Created resources for KustoCommitter. Table {} in database {} took {} ms", new Object[]{this.table, this.kustoWriteOptions.getDatabase(), Instant.now().toEpochMilli() - startTime});
    }

    public void open() throws Exception {
        LOG.debug("Opening KustoCommitter");
        if (this.connectionOptions == null || this.kustoWriteOptions == null) {
            throw new IllegalArgumentException("No Connection options were provided or WriteOptions were null");
        }
        if (StringUtils.isEmpty((CharSequence)this.kustoWriteOptions.getDatabase())) {
            throw new IllegalArgumentException("Database provided was empty for KustoCommitter");
        }
        String thisClassName = KustoCommitter.class.getSimpleName();
        this.streamingIngestClient = KustoClientUtil.createMangedIngestClient(this.connectionOptions, thisClassName);
        if (this.queryClient == null) {
            this.queryClient = KustoClientUtil.createClient(this.connectionOptions, thisClassName);
            LOG.info("Initialized queryClient in open and query client is null");
        }
        LOG.debug("Opened KustoCommitter");
    }

    public void close() throws Exception {
        LOG.debug("Closing KustoCommitter");
        this.lastCommittedCheckpoints.clear();
        try {
            this.streamingIngestClient.close();
            this.queryClient.close();
        }
        catch (Exception e) {
            LOG.warn("Error while closing resources.", (Throwable)e);
            throw e;
        }
        LOG.debug("Closed KustoCommitter");
    }

    public void commitCheckpoint(int subtaskIdx, long checkpointId) {
        long startTime = Instant.now().toEpochMilli();
        LOG.info("Starting checkpoint {} for subtask {} at {}", new Object[]{checkpointId, subtaskIdx, Instant.now().toEpochMilli()});
        String payload = String.format("%s,%s,%d,%d", StringEscapeUtils.escapeCsv((String)this.jobId), StringEscapeUtils.escapeCsv((String)this.operatorId), subtaskIdx, checkpointId);
        IngestionProperties properties = new IngestionProperties(this.kustoWriteOptions.getDatabase(), this.table);
        properties.setFlushImmediately(true);
        properties.setDataFormat(IngestionProperties.DataFormat.CSV);
        try {
            this.streamingIngestClient.ingestFromStream(new StreamSourceInfo((InputStream)new ByteArrayInputStream(payload.getBytes())), properties);
        }
        catch (IngestionClientException | IngestionServiceException e) {
            LOG.warn("Error performing checkpoint ingestion on table {} in database {}.", new Object[]{this.table, this.kustoWriteOptions.getDatabase(), e});
            throw new RuntimeException(e);
        }
        LOG.info("Committed checkpoint {} for subtask {} in {} ms", new Object[]{checkpointId, subtaskIdx, Instant.now().toEpochMilli() - startTime});
        this.lastCommittedCheckpoints.put(subtaskIdx, checkpointId);
    }

    public boolean isCheckpointCommitted(int subtaskIdx, long checkpointId) {
        long startTime = Instant.now().toEpochMilli();
        Long lastCommittedCheckpoint = this.lastCommittedCheckpoints.get(subtaskIdx);
        if (lastCommittedCheckpoint == null) {
            String statement = String.format("%s | where job_id == '%s' and sink_id == '%s' and sub_id == %d and checkpoint_id >= %d | top 1 by checkpoint_id desc", this.table, this.jobId, this.operatorId, subtaskIdx, checkpointId);
            try {
                KustoOperationResult checkpoints = this.queryClient.execute(this.kustoWriteOptions.getDatabase(), statement);
                if (checkpoints != null && checkpoints.getPrimaryResults() != null && !checkpoints.getPrimaryResults().getData().isEmpty()) {
                    lastCommittedCheckpoint = Long.parseLong(((List)checkpoints.getPrimaryResults().getData().get(0)).toString());
                    this.lastCommittedCheckpoints.put(subtaskIdx, lastCommittedCheckpoint);
                    return true;
                }
            }
            catch (DataClientException | DataServiceException e) {
                throw new RuntimeException(e);
            }
        }
        LOG.info("Checkpoint query took {} ms", (Object)(Instant.now().toEpochMilli() - startTime));
        return lastCommittedCheckpoint != null && checkpointId <= lastCommittedCheckpoint;
    }
}

