/*
 * Decompiled with CFR 0.152.
 */
package com.microsoft.azure.kusto.ingest;

import com.azure.core.http.HttpClient;
import com.azure.storage.blob.BlobClient;
import com.azure.storage.blob.BlobClientBuilder;
import com.azure.storage.blob.models.BlobStorageException;
import com.azure.storage.common.policy.RequestRetryOptions;
import com.microsoft.azure.kusto.data.Ensure;
import com.microsoft.azure.kusto.data.ExponentialRetry;
import com.microsoft.azure.kusto.data.HttpClientProperties;
import com.microsoft.azure.kusto.data.StreamingClient;
import com.microsoft.azure.kusto.data.auth.ConnectionStringBuilder;
import com.microsoft.azure.kusto.data.exceptions.DataServiceException;
import com.microsoft.azure.kusto.data.exceptions.DataWebException;
import com.microsoft.azure.kusto.data.exceptions.OneApiError;
import com.microsoft.azure.kusto.ingest.AzureStorageClient;
import com.microsoft.azure.kusto.ingest.IngestClientBase;
import com.microsoft.azure.kusto.ingest.IngestionProperties;
import com.microsoft.azure.kusto.ingest.IngestionResourceManager;
import com.microsoft.azure.kusto.ingest.QueuedIngestClient;
import com.microsoft.azure.kusto.ingest.QueuedIngestClientImpl;
import com.microsoft.azure.kusto.ingest.ResourceManager;
import com.microsoft.azure.kusto.ingest.StreamingIngestClient;
import com.microsoft.azure.kusto.ingest.exceptions.IngestionClientException;
import com.microsoft.azure.kusto.ingest.exceptions.IngestionServiceException;
import com.microsoft.azure.kusto.ingest.result.IngestionResult;
import com.microsoft.azure.kusto.ingest.source.BlobSourceInfo;
import com.microsoft.azure.kusto.ingest.source.FileSourceInfo;
import com.microsoft.azure.kusto.ingest.source.ResultSetSourceInfo;
import com.microsoft.azure.kusto.ingest.source.SourceInfo;
import com.microsoft.azure.kusto.ingest.source.StreamSourceInfo;
import com.microsoft.azure.kusto.ingest.utils.IngestionUtils;
import java.io.ByteArrayInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.SequenceInputStream;
import java.lang.invoke.MethodHandles;
import java.net.URISyntaxException;
import java.util.UUID;
import org.apache.http.impl.client.CloseableHttpClient;
import org.jetbrains.annotations.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ManagedStreamingIngestClient
extends IngestClientBase
implements QueuedIngestClient {
    private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
    public static final int ATTEMPT_COUNT = 3;
    public static final int MAX_STREAMING_SIZE_BYTES = 0x400000;
    public static final String CLASS_NAME = ManagedStreamingIngestClient.class.getSimpleName();
    final QueuedIngestClientImpl queuedIngestClient;
    final StreamingIngestClient streamingIngestClient;
    private final ExponentialRetry exponentialRetryTemplate;
    private CloseableHttpClient httpClient = null;

    public static ManagedStreamingIngestClient fromDmConnectionString(ConnectionStringBuilder dmConnectionString) throws URISyntaxException {
        return ManagedStreamingIngestClient.fromDmConnectionString(dmConnectionString, null);
    }

    public static ManagedStreamingIngestClient fromDmConnectionString(ConnectionStringBuilder dmConnectionString, @Nullable HttpClientProperties properties) throws URISyntaxException {
        ConnectionStringBuilder engineConnectionString = new ConnectionStringBuilder(dmConnectionString);
        engineConnectionString.setClusterUrl(IngestClientBase.getQueryEndpoint(engineConnectionString.getClusterUrl()));
        return new ManagedStreamingIngestClient(dmConnectionString, engineConnectionString, properties);
    }

    public static ManagedStreamingIngestClient fromEngineConnectionString(ConnectionStringBuilder engineConnectionString) throws URISyntaxException {
        return ManagedStreamingIngestClient.fromEngineConnectionString(engineConnectionString, null);
    }

    public static ManagedStreamingIngestClient fromEngineConnectionString(ConnectionStringBuilder engineConnectionString, @Nullable HttpClientProperties properties) throws URISyntaxException {
        ConnectionStringBuilder dmConnectionString = new ConnectionStringBuilder(engineConnectionString);
        dmConnectionString.setClusterUrl(IngestClientBase.getIngestionEndpoint(engineConnectionString.getClusterUrl()));
        return new ManagedStreamingIngestClient(dmConnectionString, engineConnectionString, properties);
    }

    public ManagedStreamingIngestClient(ConnectionStringBuilder ingestionEndpointConnectionStringBuilder, ConnectionStringBuilder queryEndpointConnectionStringBuilder) throws URISyntaxException {
        this(ingestionEndpointConnectionStringBuilder, queryEndpointConnectionStringBuilder, null);
    }

    public ManagedStreamingIngestClient(ConnectionStringBuilder ingestionEndpointConnectionStringBuilder, ConnectionStringBuilder queryEndpointConnectionStringBuilder, boolean autoCorrectEndpoint) throws URISyntaxException {
        this(ingestionEndpointConnectionStringBuilder, queryEndpointConnectionStringBuilder, null, autoCorrectEndpoint);
    }

    public ManagedStreamingIngestClient(ConnectionStringBuilder ingestionEndpointConnectionStringBuilder, ConnectionStringBuilder queryEndpointConnectionStringBuilder, @Nullable HttpClientProperties properties, boolean autoCorrectEndpoint) throws URISyntaxException {
        log.info("Creating a new ManagedStreamingIngestClient from connection strings");
        this.queuedIngestClient = new QueuedIngestClientImpl(ingestionEndpointConnectionStringBuilder, properties, autoCorrectEndpoint);
        this.streamingIngestClient = new StreamingIngestClient(queryEndpointConnectionStringBuilder, properties, autoCorrectEndpoint);
        this.exponentialRetryTemplate = new ExponentialRetry(3);
    }

    public ManagedStreamingIngestClient(ConnectionStringBuilder connectionStringBuilder, @Nullable HttpClientProperties properties, boolean autoCorrectEndpoint) throws URISyntaxException {
        log.info("Creating a new ManagedStreamingIngestClient from connection strings");
        this.queuedIngestClient = new QueuedIngestClientImpl(connectionStringBuilder, properties, autoCorrectEndpoint);
        this.streamingIngestClient = new StreamingIngestClient(connectionStringBuilder, properties, autoCorrectEndpoint);
        this.exponentialRetryTemplate = new ExponentialRetry(3);
    }

    public ManagedStreamingIngestClient(ConnectionStringBuilder connectionStringBuilder, @Nullable CloseableHttpClient httpClient, boolean autoCorrectEndpoint) throws URISyntaxException {
        log.info("Creating a new ManagedStreamingIngestClient from connection strings");
        this.queuedIngestClient = new QueuedIngestClientImpl(connectionStringBuilder, httpClient, autoCorrectEndpoint);
        this.streamingIngestClient = new StreamingIngestClient(connectionStringBuilder, httpClient, autoCorrectEndpoint);
        this.httpClient = httpClient;
        this.exponentialRetryTemplate = new ExponentialRetry(3);
    }

    public ManagedStreamingIngestClient(ConnectionStringBuilder ingestionEndpointConnectionStringBuilder, ConnectionStringBuilder queryEndpointConnectionStringBuilder, @Nullable HttpClientProperties properties) throws URISyntaxException {
        log.info("Creating a new ManagedStreamingIngestClient from connection strings");
        this.queuedIngestClient = new QueuedIngestClientImpl(ingestionEndpointConnectionStringBuilder, properties, true);
        this.streamingIngestClient = new StreamingIngestClient(queryEndpointConnectionStringBuilder, properties, true);
        this.exponentialRetryTemplate = new ExponentialRetry(3);
    }

    public ManagedStreamingIngestClient(ConnectionStringBuilder connectionStringBuilder, @Nullable HttpClientProperties properties) throws URISyntaxException {
        log.info("Creating a new ManagedStreamingIngestClient from connection strings");
        this.queuedIngestClient = new QueuedIngestClientImpl(connectionStringBuilder, properties, true);
        this.streamingIngestClient = new StreamingIngestClient(connectionStringBuilder, properties, true);
        this.exponentialRetryTemplate = new ExponentialRetry(3);
    }

    public ManagedStreamingIngestClient(ConnectionStringBuilder connectionStringBuilder, @Nullable CloseableHttpClient httpClient) throws URISyntaxException {
        log.info("Creating a new ManagedStreamingIngestClient from connection strings");
        this.queuedIngestClient = new QueuedIngestClientImpl(connectionStringBuilder, httpClient, true);
        this.streamingIngestClient = new StreamingIngestClient(connectionStringBuilder, httpClient, true);
        this.httpClient = httpClient;
        this.exponentialRetryTemplate = new ExponentialRetry(3);
    }

    public ManagedStreamingIngestClient(ResourceManager resourceManager, AzureStorageClient storageClient, StreamingClient streamingClient) {
        log.info("Creating a new ManagedStreamingIngestClient from raw parts");
        this.queuedIngestClient = new QueuedIngestClientImpl(resourceManager, storageClient);
        this.streamingIngestClient = new StreamingIngestClient(streamingClient);
        this.exponentialRetryTemplate = new ExponentialRetry(3);
    }

    public ManagedStreamingIngestClient(ResourceManager resourceManager, AzureStorageClient storageClient, StreamingClient streamingClient, ExponentialRetry retryTemplate) {
        log.info("Creating a new ManagedStreamingIngestClient from raw parts");
        this.queuedIngestClient = new QueuedIngestClientImpl(resourceManager, storageClient);
        this.streamingIngestClient = new StreamingIngestClient(streamingClient);
        this.exponentialRetryTemplate = retryTemplate;
    }

    @Override
    protected IngestionResult ingestFromFileImpl(FileSourceInfo fileSourceInfo, IngestionProperties ingestionProperties) throws IngestionClientException, IngestionServiceException {
        Ensure.argIsNotNull((Object)fileSourceInfo, (String)"fileSourceInfo");
        Ensure.argIsNotNull((Object)ingestionProperties, (String)"ingestionProperties");
        fileSourceInfo.validate();
        ingestionProperties.validate();
        try {
            StreamSourceInfo streamSourceInfo = IngestionUtils.fileToStream(fileSourceInfo, true);
            return this.ingestFromStream(streamSourceInfo, ingestionProperties);
        }
        catch (FileNotFoundException e) {
            log.error("File not found when ingesting a file.", (Throwable)e);
            throw new IngestionClientException("IO exception - check file path.", e);
        }
    }

    @Override
    protected IngestionResult ingestFromBlobImpl(BlobSourceInfo blobSourceInfo, IngestionProperties ingestionProperties) throws IngestionClientException, IngestionServiceException {
        Ensure.argIsNotNull((Object)blobSourceInfo, (String)"blobSourceInfo");
        Ensure.argIsNotNull((Object)ingestionProperties, (String)"ingestionProperties");
        blobSourceInfo.validate();
        ingestionProperties.validate();
        BlobClientBuilder blobClientBuilder = new BlobClientBuilder().endpoint(blobSourceInfo.getBlobPath());
        if (this.httpClient != null) {
            blobClientBuilder.httpClient((HttpClient)this.httpClient);
        }
        BlobClient blobClient = blobClientBuilder.buildClient();
        if (blobSourceInfo.getRawSizeInBytes() <= 0L) {
            try {
                blobSourceInfo.setRawSizeInBytes(blobClient.getProperties().getBlobSize());
            }
            catch (BlobStorageException e) {
                throw new IngestionServiceException(blobSourceInfo.getBlobPath(), "Failed getting blob properties: " + e.getMessage(), (Exception)((Object)e));
            }
        }
        if (blobSourceInfo.getRawSizeInBytes() > 0x400000L) {
            log.info("Blob size is greater than max streaming size ({} bytes). Falling back to queued.", (Object)blobSourceInfo.getRawSizeInBytes());
            return this.queuedIngestClient.ingestFromBlob(blobSourceInfo, ingestionProperties);
        }
        IngestionResult result = this.streamWithRetries(blobSourceInfo, ingestionProperties, blobClient);
        if (result != null) {
            return result;
        }
        return this.queuedIngestClient.ingestFromBlob(blobSourceInfo, ingestionProperties);
    }

    private IngestionResult streamWithRetries(SourceInfo sourceInfo, IngestionProperties ingestionProperties, @Nullable BlobClient blobClient) throws IngestionClientException, IngestionServiceException {
        ExponentialRetry retry = new ExponentialRetry(this.exponentialRetryTemplate);
        return (IngestionResult)retry.execute(currentAttempt -> {
            try {
                if (blobClient != null) {
                    String clientRequestId = String.format("KJC.executeManagedStreamingIngest.ingestFromBlob;%s;%d", sourceInfo.getSourceId(), currentAttempt);
                    return this.streamingIngestClient.ingestFromBlob((BlobSourceInfo)sourceInfo, ingestionProperties, blobClient, clientRequestId);
                }
                String clientRequestId = String.format("KJC.executeManagedStreamingIngest.ingestFromStream;%s;%d", sourceInfo.getSourceId(), currentAttempt);
                return this.streamingIngestClient.ingestFromStream((StreamSourceInfo)sourceInfo, ingestionProperties, clientRequestId);
            }
            catch (Exception e) {
                DataWebException webException;
                OneApiError oneApiError;
                if (e instanceof IngestionServiceException && e.getCause() != null && e.getCause() instanceof DataServiceException && e.getCause().getCause() != null && e.getCause().getCause() instanceof DataWebException && (oneApiError = (webException = (DataWebException)e.getCause().getCause()).getApiError()).isPermanent()) {
                    throw e;
                }
                log.info(String.format("Streaming ingestion failed attempt %d", currentAttempt), (Throwable)e);
                if (sourceInfo instanceof StreamSourceInfo) {
                    try {
                        ((StreamSourceInfo)sourceInfo).getStream().reset();
                    }
                    catch (IOException ioException) {
                        throw new IngestionClientException("Failed to reset stream", ioException);
                    }
                }
                return null;
            }
        });
    }

    @Override
    protected IngestionResult ingestFromResultSetImpl(ResultSetSourceInfo resultSetSourceInfo, IngestionProperties ingestionProperties) throws IngestionClientException, IngestionServiceException {
        Ensure.argIsNotNull((Object)resultSetSourceInfo, (String)"resultSetSourceInfo");
        Ensure.argIsNotNull((Object)ingestionProperties, (String)"ingestionProperties");
        resultSetSourceInfo.validate();
        ingestionProperties.validateResultSetProperties();
        try {
            StreamSourceInfo streamSourceInfo = IngestionUtils.resultSetToStream(resultSetSourceInfo);
            return this.ingestFromStream(streamSourceInfo, ingestionProperties);
        }
        catch (IOException ex) {
            String msg = "Failed to read from ResultSet.";
            log.error(msg, (Throwable)ex);
            throw new IngestionClientException(msg, ex);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    protected IngestionResult ingestFromStreamImpl(StreamSourceInfo streamSourceInfo, IngestionProperties ingestionProperties) throws IngestionClientException, IngestionServiceException {
        byte[] streamingBytes;
        Ensure.argIsNotNull((Object)streamSourceInfo, (String)"streamSourceInfo");
        Ensure.argIsNotNull((Object)ingestionProperties, (String)"ingestionProperties");
        streamSourceInfo.validate();
        ingestionProperties.validate();
        UUID sourceId = streamSourceInfo.getSourceId();
        if (sourceId == null) {
            sourceId = UUID.randomUUID();
        }
        try {
            streamingBytes = IngestionUtils.readBytesFromInputStream(streamSourceInfo.getStream(), 0x400001);
        }
        catch (IOException e) {
            throw new IngestionClientException("Failed to read from stream.", e);
        }
        ByteArrayInputStream byteArrayStream = new ByteArrayInputStream(streamingBytes);
        if (streamingBytes.length > 0x400000) {
            log.info("Stream size is greater than max streaming size ({} bytes). Falling back to queued.", (Object)streamingBytes.length);
            StreamSourceInfo managedSourceInfo = new StreamSourceInfo(new SequenceInputStream(byteArrayStream, streamSourceInfo.getStream()), streamSourceInfo.isLeaveOpen(), sourceId, streamSourceInfo.getCompressionType());
            return this.queuedIngestClient.ingestFromStream(managedSourceInfo, ingestionProperties);
        }
        if (!streamSourceInfo.isLeaveOpen()) {
            try {
                streamSourceInfo.getStream().close();
            }
            catch (IOException e) {
                log.warn("Failed to close stream", (Throwable)e);
            }
        }
        StreamSourceInfo managedSourceInfo = new StreamSourceInfo(byteArrayStream, true, sourceId, streamSourceInfo.getCompressionType());
        try {
            IngestionResult result = this.streamWithRetries(managedSourceInfo, ingestionProperties, null);
            if (result != null) {
                IngestionResult ingestionResult = result;
                return ingestionResult;
            }
            IngestionResult ingestionResult = this.queuedIngestClient.ingestFromStream(managedSourceInfo, ingestionProperties);
            return ingestionResult;
        }
        finally {
            try {
                managedSourceInfo.getStream().close();
            }
            catch (IOException e) {
                log.warn("Failed to close byte stream", (Throwable)e);
            }
        }
    }

    @Override
    protected String getClientType() {
        return CLASS_NAME;
    }

    @Override
    public void close() throws IOException {
        this.queuedIngestClient.close();
        this.streamingIngestClient.close();
    }

    @Override
    public void setQueueRequestOptions(RequestRetryOptions queueRequestOptions) {
        this.queuedIngestClient.setQueueRequestOptions(queueRequestOptions);
    }

    @Override
    public IngestionResourceManager getResourceManager() {
        return this.queuedIngestClient.getResourceManager();
    }
}

