package org.apache.atlas.kafka;

import java.io.File;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.MalformedURLException;
import java.net.URISyntaxException;
import java.net.URL;
import java.util.Properties;
import javax.inject.Inject;
import kafka.server.KafkaConfig;
import kafka.server.KafkaServer;
import org.apache.atlas.ApplicationProperties;
import org.apache.atlas.AtlasException;
import org.apache.atlas.notification.spool.models.IndexRecord;
import org.apache.atlas.service.Service;
import org.apache.commons.configuration.Configuration;
import org.apache.commons.configuration.ConfigurationConverter;
import org.apache.kafka.common.utils.Time;
import org.apache.zookeeper.server.NIOServerCnxnFactory;
import org.apache.zookeeper.server.ServerCnxnFactory;
import org.apache.zookeeper.server.ZooKeeperServer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Component;
import scala.Option;

@Component
@Order(3)
/* loaded from: input_file:org/apache/atlas/kafka/EmbeddedKafkaServer.class */
public class EmbeddedKafkaServer implements Service {
    public static final Logger LOG = LoggerFactory.getLogger(EmbeddedKafkaServer.class);
    public static final String PROPERTY_PREFIX = "atlas.kafka";
    private static final String ATLAS_KAFKA_DATA = "data";
    public static final String PROPERTY_EMBEDDED = "atlas.notification.embedded";
    private final boolean isEmbedded;
    private final Properties properties;
    private KafkaServer kafkaServer;
    private ServerCnxnFactory factory;

    @Inject
    public EmbeddedKafkaServer(Configuration configuration) throws AtlasException {
        Configuration subsetConfiguration = ApplicationProperties.getSubsetConfiguration(configuration, "atlas.kafka");
        this.isEmbedded = configuration.getBoolean(PROPERTY_EMBEDDED, false);
        this.properties = ConfigurationConverter.getProperties(subsetConfiguration);
    }

    public void start() throws AtlasException {
        LOG.info("==> EmbeddedKafkaServer.start(isEmbedded={})", Boolean.valueOf(this.isEmbedded));
        if (this.isEmbedded) {
            try {
                startZk();
                startKafka();
            } catch (Exception e) {
                throw new AtlasException("Failed to start embedded kafka", e);
            }
        } else {
            LOG.info("==> EmbeddedKafkaServer.start(): not embedded..nothing todo");
        }
        LOG.info("<== EmbeddedKafkaServer.start(isEmbedded={})", Boolean.valueOf(this.isEmbedded));
    }

    public void stop() {
        LOG.info("==> EmbeddedKafkaServer.stop(isEmbedded={})", Boolean.valueOf(this.isEmbedded));
        if (this.kafkaServer != null) {
            this.kafkaServer.shutdown();
        }
        if (this.factory != null) {
            this.factory.shutdown();
        }
        LOG.info("<== EmbeddedKafka.stop(isEmbedded={})", Boolean.valueOf(this.isEmbedded));
    }

    private String startZk() throws IOException, InterruptedException, URISyntaxException {
        String property = this.properties.getProperty("zookeeper.connect");
        LOG.info("Starting zookeeper at {}", property);
        URL url = getURL(property);
        File constructDir = constructDir("zk/txn");
        File constructDir2 = constructDir("zk/snap");
        this.factory = NIOServerCnxnFactory.createFactory(new InetSocketAddress(url.getHost(), url.getPort()), 1024);
        this.factory.startup(new ZooKeeperServer(constructDir, constructDir2, IndexRecord.RECORD_SIZE));
        String inetAddress = this.factory.getLocalAddress().getAddress().toString();
        LOG.info("Embedded zookeeper for Kafka started at {}", inetAddress);
        return inetAddress;
    }

    private void startKafka() throws IOException, URISyntaxException {
        String property = this.properties.getProperty("bootstrap.servers");
        LOG.info("Starting kafka at {}", property);
        URL url = getURL(property);
        Properties properties = this.properties;
        properties.setProperty("broker.id", "1");
        properties.setProperty("host.name", url.getHost());
        properties.setProperty("port", String.valueOf(url.getPort()));
        properties.setProperty("log.dirs", constructDir("kafka").getAbsolutePath());
        properties.setProperty("log.flush.interval.messages", String.valueOf(1));
        this.kafkaServer = new KafkaServer(KafkaConfig.fromProps(properties), Time.SYSTEM, Option.apply(getClass().getName()), false);
        this.kafkaServer.startup();
        LOG.info("Embedded kafka server started with broker config {}", properties);
    }

    private File constructDir(String str) {
        File file = new File(this.properties.getProperty(ATLAS_KAFKA_DATA), str);
        if (file.exists() || file.mkdirs()) {
            return file;
        }
        throw new RuntimeException("could not create temp directory: " + file.getAbsolutePath());
    }

    private URL getURL(String str) throws MalformedURLException {
        try {
            return new URL(str);
        } catch (MalformedURLException e) {
            return new URL("http://" + str);
        }
    }
}
