/*
 * Decompiled with CFR 0.152.
 */
package com.amazonaws.services.kinesis.producer;

import com.amazonaws.services.kinesis.producer.Daemon;
import com.amazonaws.services.kinesis.producer.FileAgeManager;
import com.amazonaws.services.kinesis.producer.IrrecoverableError;
import com.amazonaws.services.kinesis.producer.KinesisProducerConfiguration;
import com.amazonaws.services.kinesis.producer.Metric;
import com.amazonaws.services.kinesis.producer.ProcessFailureBehavior;
import com.amazonaws.services.kinesis.producer.UserRecordFailedException;
import com.amazonaws.services.kinesis.producer.UserRecordResult;
import com.amazonaws.services.kinesis.producer.protobuf.Messages;
import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.protobuf.ByteString;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.math.BigInteger;
import java.nio.ByteBuffer;
import java.nio.channels.FileLock;
import java.nio.file.Paths;
import java.security.MessageDigest;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import javax.xml.bind.DatatypeConverter;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang.SystemUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KinesisProducer {
    private static final Logger log = LoggerFactory.getLogger(KinesisProducer.class);
    private static final BigInteger UINT_128_MAX = new BigInteger(StringUtils.repeat((String)"FF", (int)16), 16);
    private static final Object EXTRACT_BIN_MUTEX = new Object();
    private static final AtomicInteger callbackCompletionPoolNumber = new AtomicInteger(0);
    private final KinesisProducerConfiguration config;
    private final Map<String, String> env;
    private final AtomicLong messageNumber = new AtomicLong(1L);
    private final Map<Long, SettableFuture<?>> futures = new ConcurrentHashMap();
    private final ExecutorService callbackCompletionExecutor = new ThreadPoolExecutor(1, Runtime.getRuntime().availableProcessors() * 4, 5L, TimeUnit.MINUTES, new LinkedBlockingQueue<Runnable>(), new ThreadFactoryBuilder().setDaemon(true).setNameFormat("kpl-callback-pool-" + callbackCompletionPoolNumber.getAndIncrement() + "-thread-%d").build(), new RejectedExecutionHandler(){

        @Override
        public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
            r.run();
        }
    });
    private String pathToExecutable;
    private String pathToLibDir;
    private String pathToTmpDir;
    private volatile Daemon child;
    private volatile long lastChild = System.nanoTime();
    private volatile boolean destroyed = false;
    private ProcessFailureBehavior processFailureBehavior = ProcessFailureBehavior.AutoRestart;

    public KinesisProducer(KinesisProducerConfiguration config) {
        this.config = config;
        this.extractBinaries();
        this.env = new ImmutableMap.Builder().put((Object)"LD_LIBRARY_PATH", (Object)this.pathToLibDir).put((Object)"DYLD_LIBRARY_PATH", (Object)this.pathToLibDir).put((Object)"CA_DIR", (Object)this.pathToTmpDir).build();
        this.child = new Daemon(this.pathToExecutable, new MessageHandler(), this.pathToTmpDir, config, this.env);
    }

    public KinesisProducer() {
        this(new KinesisProducerConfiguration());
    }

    protected KinesisProducer(File inPipe, File outPipe) {
        this.config = null;
        this.env = null;
        this.child = new Daemon(inPipe, outPipe, new MessageHandler());
    }

    public ListenableFuture<UserRecordResult> addUserRecord(String stream, String partitionKey, ByteBuffer data) {
        return this.addUserRecord(stream, partitionKey, null, data);
    }

    public ListenableFuture<UserRecordResult> addUserRecord(String stream, String partitionKey, String explicitHashKey, ByteBuffer data) {
        if (stream == null) {
            throw new IllegalArgumentException("Stream name cannot be null");
        }
        if ((stream = stream.trim()).length() == 0) {
            throw new IllegalArgumentException("Stream name cannot be empty");
        }
        if (partitionKey == null) {
            throw new IllegalArgumentException("partitionKey cannot be null");
        }
        if (partitionKey.length() < 1 || partitionKey.length() > 256) {
            throw new IllegalArgumentException("Invalid parition key. Length must be at least 1 and at most 256, got " + partitionKey.length());
        }
        try {
            partitionKey.getBytes("UTF-8");
        }
        catch (Exception e) {
            throw new IllegalArgumentException("Partition key must be valid UTF-8");
        }
        BigInteger b = null;
        if (explicitHashKey != null) {
            explicitHashKey = explicitHashKey.trim();
            try {
                b = new BigInteger(explicitHashKey);
            }
            catch (NumberFormatException e) {
                throw new IllegalArgumentException("Invalid explicitHashKey, must be an integer, got " + explicitHashKey);
            }
            if (b != null && (b.compareTo(UINT_128_MAX) > 0 || b.compareTo(BigInteger.ZERO) < 0)) {
                throw new IllegalArgumentException("Invalid explicitHashKey, must be greater or equal to zero and less than or equal to (2^128 - 1), got " + explicitHashKey);
            }
        }
        if (data != null && data.remaining() > 0x100000) {
            throw new IllegalArgumentException("Data must be less than or equal to 1MB in size, got " + data.remaining() + " bytes");
        }
        long id = this.messageNumber.getAndIncrement();
        SettableFuture f = SettableFuture.create();
        this.futures.put(id, f);
        Messages.PutRecord.Builder pr = Messages.PutRecord.newBuilder().setStreamName(stream).setPartitionKey(partitionKey).setData(data != null ? ByteString.copyFrom((ByteBuffer)data) : ByteString.EMPTY);
        if (b != null) {
            pr.setExplicitHashKey(b.toString(10));
        }
        Messages.Message m = Messages.Message.newBuilder().setId(id).setPutRecord(pr.build()).build();
        this.child.add(m);
        return f;
    }

    public int getOutstandingRecordsCount() {
        return this.futures.size();
    }

    public List<Metric> getMetrics(String metricName, int windowSeconds) throws InterruptedException, ExecutionException {
        Messages.MetricsRequest.Builder mrb = Messages.MetricsRequest.newBuilder();
        if (metricName != null) {
            mrb.setName(metricName);
        }
        if (windowSeconds > 0) {
            mrb.setSeconds(windowSeconds);
        }
        long id = this.messageNumber.getAndIncrement();
        SettableFuture f = SettableFuture.create();
        this.futures.put(id, f);
        this.child.add(Messages.Message.newBuilder().setId(id).setMetricsRequest(mrb.build()).build());
        return (List)f.get();
    }

    public List<Metric> getMetrics(String metricName) throws InterruptedException, ExecutionException {
        return this.getMetrics(metricName, -1);
    }

    public List<Metric> getMetrics() throws InterruptedException, ExecutionException {
        return this.getMetrics(null);
    }

    public List<Metric> getMetrics(int windowSeconds) throws InterruptedException, ExecutionException {
        return this.getMetrics(null, windowSeconds);
    }

    public void destroy() {
        this.destroyed = true;
        this.child.destroy();
    }

    public void flush(String stream) {
        Messages.Flush.Builder f = Messages.Flush.newBuilder();
        if (stream != null) {
            f.setStreamName(stream);
        }
        Messages.Message m = Messages.Message.newBuilder().setId(this.messageNumber.getAndIncrement()).setFlush(f.build()).build();
        this.child.add(m);
    }

    public void flush() {
        this.flush(null);
    }

    public void flushSync() {
        while (this.getOutstandingRecordsCount() > 0) {
            this.flush();
            try {
                Thread.sleep(500L);
            }
            catch (InterruptedException interruptedException) {}
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void extractBinaries() {
        Object object = EXTRACT_BIN_MUTEX;
        synchronized (object) {
            ArrayList<File> watchFiles = new ArrayList<File>(2);
            String os = SystemUtils.OS_NAME;
            if (SystemUtils.IS_OS_WINDOWS) {
                os = "windows";
            } else if (SystemUtils.IS_OS_LINUX) {
                os = "linux";
            } else if (SystemUtils.IS_OS_MAC_OSX) {
                os = "osx";
            } else {
                throw new RuntimeException("Your operation system is not supported (" + os + "), the KPL only supports Linux, OSX and Windows");
            }
            String root = "amazon-kinesis-producer-native-binaries";
            String tmpDir = this.config.getTempDirectory();
            if (tmpDir.trim().length() == 0) {
                tmpDir = System.getProperty("java.io.tmpdir");
            }
            this.pathToTmpDir = tmpDir = Paths.get(tmpDir, root).toString();
            String binPath = this.config.getNativeExecutable();
            if (binPath != null && !binPath.trim().isEmpty()) {
                this.pathToExecutable = binPath.trim();
                log.warn("Using non-default native binary at " + this.pathToExecutable);
                this.pathToLibDir = "";
            } else {
                log.info("Extracting binaries to " + tmpDir);
                try {
                    Throwable throwable;
                    block91: {
                        FileLock lock3;
                        Throwable throwable2;
                        File tmpDirFile = new File(tmpDir);
                        if (!tmpDirFile.exists() && !tmpDirFile.mkdirs()) {
                            throw new IOException("Could not create tmp dir " + tmpDir);
                        }
                        String extension = os.equals("windows") ? ".exe" : "";
                        String executableName = "kinesis_producer" + extension;
                        byte[] bin = IOUtils.toByteArray((InputStream)this.getClass().getClassLoader().getResourceAsStream(root + "/" + os + "/" + executableName));
                        MessageDigest md = MessageDigest.getInstance("SHA1");
                        String mdHex = DatatypeConverter.printHexBinary((byte[])md.digest(bin)).toLowerCase();
                        this.pathToExecutable = Paths.get(this.pathToTmpDir, "kinesis_producer_" + mdHex + extension).toString();
                        File extracted = new File(this.pathToExecutable);
                        watchFiles.add(extracted);
                        if (extracted.exists()) {
                            throwable2 = null;
                            try (FileInputStream fis = new FileInputStream(extracted);){
                                lock3 = fis.getChannel().lock(0L, Long.MAX_VALUE, true);
                                throwable = null;
                                try {
                                    boolean contentEqual = false;
                                    if (extracted.length() == (long)bin.length) {
                                        byte[] existingBin = IOUtils.toByteArray((InputStream)new FileInputStream(extracted));
                                        contentEqual = Arrays.equals(bin, existingBin);
                                    }
                                    if (!contentEqual) {
                                        throw new SecurityException("The contents of the binary " + extracted.getAbsolutePath() + " is not what it's expected to be.");
                                    }
                                    break block91;
                                }
                                catch (Throwable contentEqual) {
                                    throwable = contentEqual;
                                    throw contentEqual;
                                }
                                finally {
                                    if (lock3 != null) {
                                        if (throwable != null) {
                                            try {
                                                lock3.close();
                                            }
                                            catch (Throwable contentEqual) {
                                                throwable.addSuppressed(contentEqual);
                                            }
                                        } else {
                                            lock3.close();
                                        }
                                    }
                                }
                            }
                            catch (Throwable lock2) {
                                throwable2 = lock2;
                                throw lock2;
                            }
                        }
                        throwable2 = null;
                        try (FileOutputStream fos = new FileOutputStream(extracted);){
                            lock3 = fos.getChannel().lock();
                            throwable = null;
                            try {
                                IOUtils.write((byte[])bin, (OutputStream)fos);
                            }
                            catch (Throwable contentEqual) {
                                throwable = contentEqual;
                                throw contentEqual;
                            }
                            finally {
                                if (lock3 != null) {
                                    if (throwable != null) {
                                        try {
                                            lock3.close();
                                        }
                                        catch (Throwable contentEqual) {
                                            throwable.addSuppressed(contentEqual);
                                        }
                                    } else {
                                        lock3.close();
                                    }
                                }
                            }
                        }
                        catch (Throwable lock3) {
                            throwable2 = lock3;
                            throw lock3;
                        }
                        extracted.setExecutable(true);
                    }
                    String certFileName = "b204d74a.0";
                    File certFile = new File(this.pathToTmpDir, certFileName);
                    if (!certFile.exists()) {
                        throwable = null;
                        try (FileOutputStream fos = new FileOutputStream(certFile);
                             FileLock lock = fos.getChannel().lock();){
                            byte[] certs = IOUtils.toByteArray((InputStream)this.getClass().getClassLoader().getResourceAsStream("cacerts/" + certFileName));
                            IOUtils.write((byte[])certs, (OutputStream)fos);
                        }
                        catch (Throwable throwable3) {
                            throwable = throwable3;
                            throw throwable3;
                        }
                    }
                    watchFiles.add(certFile);
                    this.pathToLibDir = this.pathToTmpDir;
                    FileAgeManager.instance().registerFiles(watchFiles);
                }
                catch (Exception e) {
                    throw new RuntimeException("Could not copy native binaries to temp directory " + tmpDir, e);
                }
            }
        }
    }

    private class MessageHandler
    implements Daemon.MessageHandler {
        private MessageHandler() {
        }

        @Override
        public void onMessage(final Messages.Message m) {
            KinesisProducer.this.callbackCompletionExecutor.submit(new Runnable(){

                @Override
                public void run() {
                    if (m.hasPutRecordResult()) {
                        MessageHandler.this.onPutRecordResult(m);
                    } else if (m.hasMetricsResponse()) {
                        MessageHandler.this.onMetricsResponse(m);
                    } else {
                        log.error("Unexpected message type from child process");
                    }
                }
            });
        }

        @Override
        public void onError(final Throwable t) {
            if (!KinesisProducer.this.destroyed) {
                log.error("Error in child process", t);
            }
            for (final Map.Entry entry : KinesisProducer.this.futures.entrySet()) {
                KinesisProducer.this.callbackCompletionExecutor.submit(new Runnable(){

                    @Override
                    public void run() {
                        ((SettableFuture)entry.getValue()).setException(t);
                    }
                });
            }
            KinesisProducer.this.futures.clear();
            if (KinesisProducer.this.processFailureBehavior == ProcessFailureBehavior.AutoRestart && !KinesisProducer.this.destroyed) {
                log.info("Restarting native producer process.");
                KinesisProducer.this.child = new Daemon(KinesisProducer.this.pathToExecutable, new MessageHandler(), KinesisProducer.this.pathToTmpDir, KinesisProducer.this.config, KinesisProducer.this.env);
            } else if (!(t instanceof IrrecoverableError) && (double)(System.nanoTime() - KinesisProducer.this.lastChild) > 3.0E9) {
                KinesisProducer.this.lastChild = System.nanoTime();
                KinesisProducer.this.child = new Daemon(KinesisProducer.this.pathToExecutable, new MessageHandler(), KinesisProducer.this.pathToTmpDir, KinesisProducer.this.config, KinesisProducer.this.env);
            }
        }

        private void onPutRecordResult(Messages.Message msg) {
            SettableFuture f = this.getFuture(msg);
            UserRecordResult result = UserRecordResult.fromProtobufMessage(msg.getPutRecordResult());
            if (result.isSuccessful()) {
                f.set((Object)result);
            } else {
                f.setException((Throwable)new UserRecordFailedException(result));
            }
        }

        private void onMetricsResponse(Messages.Message msg) {
            SettableFuture f = this.getFuture(msg);
            ArrayList<Metric> userMetrics = new ArrayList<Metric>();
            Messages.MetricsResponse res = msg.getMetricsResponse();
            for (Messages.Metric metric : res.getMetricsList()) {
                userMetrics.add(new Metric(metric));
            }
            f.set(userMetrics);
        }

        private <T> SettableFuture<T> getFuture(Messages.Message msg) {
            long id = msg.getSourceId();
            SettableFuture f = (SettableFuture)KinesisProducer.this.futures.remove(id);
            if (f == null) {
                throw new RuntimeException("Future for message id " + id + " not found");
            }
            return f;
        }
    }
}

