package info.batey.kafka.unit;

import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.attribute.FileAttribute;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import kafka.admin.TopicCommand;
import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.ConsumerTimeoutException;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.javaapi.producer.Producer;
import kafka.message.MessageAndMetadata;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
import kafka.serializer.StringDecoder;
import kafka.serializer.StringEncoder;
import kafka.server.KafkaConfig;
import kafka.server.KafkaServerStartable;
import kafka.utils.VerifiableProperties;
import org.junit.ComparisonFailure;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:info/batey/kafka/unit/KafkaUnit.class */
public class KafkaUnit {
    private static final Logger LOGGER = LoggerFactory.getLogger(KafkaUnit.class);
    public KafkaServerStartable broker;
    private Zookeeper zookeeper;
    private final String zookeeperString;
    private final String brokerString;
    private int zkPort;
    private int brokerPort;
    private Producer<String, String> producer = null;
    private Properties kafkaBrokerConfig = new Properties();

    public KafkaUnit(int i, int i2) {
        this.zkPort = i;
        this.brokerPort = i2;
        this.zookeeperString = "localhost:" + i;
        this.brokerString = "localhost:" + i2;
    }

    public void startup() {
        this.zookeeper = new Zookeeper(this.zkPort);
        this.zookeeper.startup();
        try {
            File file = Files.createTempDirectory("kafka", new FileAttribute[0]).toFile();
            file.deleteOnExit();
            this.kafkaBrokerConfig.setProperty("zookeeper.connect", this.zookeeperString);
            this.kafkaBrokerConfig.setProperty("broker.id", "1");
            this.kafkaBrokerConfig.setProperty("host.name", "localhost");
            this.kafkaBrokerConfig.setProperty("port", Integer.toString(this.brokerPort));
            this.kafkaBrokerConfig.setProperty("log.dir", file.getAbsolutePath());
            this.kafkaBrokerConfig.setProperty("log.flush.interval.messages", String.valueOf(1));
            this.broker = new KafkaServerStartable(new KafkaConfig(this.kafkaBrokerConfig));
            this.broker.startup();
        } catch (IOException e) {
            throw new RuntimeException("Unable to start Kafka", e);
        }
    }

    public void createTopic(String str) {
        createTopic(str, 1);
    }

    public void createTopic(String str, Integer num) {
        String[] strArr = {"--create", "--zookeeper", this.zookeeperString, "--replication-factor", "1", "--partitions", "" + Integer.valueOf(num.intValue()), "--topic", str};
        LOGGER.info("Executing: CreateTopic " + Arrays.toString(strArr));
        TopicCommand.main(strArr);
    }

    public void shutdown() {
        if (this.broker != null) {
            this.broker.shutdown();
        }
        if (this.zookeeper != null) {
            this.zookeeper.shutdown();
        }
    }

    public List<String> readMessages(String str, final int i) throws TimeoutException {
        ExecutorService newSingleThreadExecutor = Executors.newSingleThreadExecutor();
        Properties properties = new Properties();
        properties.put("zookeeper.connect", this.zookeeperString);
        properties.put("group.id", "10");
        properties.put("socket.timeout.ms", "500");
        properties.put("consumer.id", "test");
        properties.put("auto.offset.reset", "smallest");
        properties.put("consumer.timeout.ms", "500");
        ConsumerConnector createJavaConsumerConnector = Consumer.createJavaConsumerConnector(new ConsumerConfig(properties));
        StringDecoder stringDecoder = new StringDecoder(new VerifiableProperties(new Properties()));
        HashMap hashMap = new HashMap();
        hashMap.put(str, 1);
        final KafkaStream kafkaStream = (KafkaStream) ((List) createJavaConsumerConnector.createMessageStreams(hashMap, stringDecoder, stringDecoder).get(str)).get(0);
        try {
            try {
                List<String> list = (List) newSingleThreadExecutor.submit(new Callable<List<String>>() { // from class: info.batey.kafka.unit.KafkaUnit.1
                    /* JADX WARN: Can't rename method to resolve collision */
                    @Override // java.util.concurrent.Callable
                    public List<String> call() throws Exception {
                        ArrayList arrayList = new ArrayList();
                        try {
                            ConsumerIterator it = kafkaStream.iterator();
                            while (it.hasNext()) {
                                String str2 = (String) ((MessageAndMetadata) it.next()).message();
                                KafkaUnit.LOGGER.info("Received message: {}", str2);
                                arrayList.add(str2);
                            }
                        } catch (ConsumerTimeoutException e) {
                        }
                        if (arrayList.size() != i) {
                            throw new ComparisonFailure("Incorrect number of messages returned", Integer.toString(i), Integer.toString(arrayList.size()));
                        }
                        return arrayList;
                    }
                }).get(3L, TimeUnit.SECONDS);
                newSingleThreadExecutor.shutdown();
                createJavaConsumerConnector.shutdown();
                return list;
            } catch (InterruptedException | ExecutionException | TimeoutException e) {
                if (e.getCause() instanceof ComparisonFailure) {
                    throw ((ComparisonFailure) e.getCause());
                }
                throw new TimeoutException("Timed out waiting for messages");
            }
        } catch (Throwable th) {
            newSingleThreadExecutor.shutdown();
            createJavaConsumerConnector.shutdown();
            throw th;
        }
    }

    @SafeVarargs
    public final void sendMessages(KeyedMessage<String, String> keyedMessage, KeyedMessage<String, String>... keyedMessageArr) {
        if (this.producer == null) {
            Properties properties = new Properties();
            properties.put("serializer.class", StringEncoder.class.getName());
            properties.put("metadata.broker.list", this.brokerString);
            this.producer = new Producer<>(new ProducerConfig(properties));
        }
        this.producer.send(keyedMessage);
        this.producer.send(Arrays.asList(keyedMessageArr));
    }

    public final void setKafkaBrokerConfig(String str, String str2) {
        this.kafkaBrokerConfig.setProperty(str, str2);
    }
}
