/*
 * Decompiled with CFR 0.152.
 */
package com.github.charithe.kafka;

import com.google.common.collect.Maps;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.FileVisitResult;
import java.nio.file.FileVisitor;
import java.nio.file.Files;
import java.nio.file.LinkOption;
import java.nio.file.OpenOption;
import java.nio.file.Path;
import java.nio.file.SimpleFileVisitor;
import java.nio.file.attribute.BasicFileAttributes;
import java.nio.file.attribute.FileAttribute;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import kafka.server.KafkaConfig;
import kafka.server.KafkaServerStartable;
import org.apache.curator.test.InstanceSpec;
import org.apache.curator.test.TestingServer;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class EphemeralKafkaBroker {
    private static final Logger LOGGER = LoggerFactory.getLogger(EphemeralKafkaBroker.class);
    private static final int ALLOCATE_RANDOM_PORT = -1;
    private static final String LOCALHOST = "localhost";
    private int kafkaPort;
    private int zookeeperPort;
    private Properties overrideBrokerProperties;
    private TestingServer zookeeper;
    private boolean managedZk = false;
    private KafkaServerStartable kafkaServer;
    private Path kafkaLogDir;
    private volatile boolean brokerStarted = false;

    public static EphemeralKafkaBroker create() {
        return EphemeralKafkaBroker.create(-1);
    }

    public static EphemeralKafkaBroker create(int kafkaPort) {
        return EphemeralKafkaBroker.create(kafkaPort, -1);
    }

    public static EphemeralKafkaBroker create(int kafkaPort, int zookeeperPort) {
        return EphemeralKafkaBroker.create(kafkaPort, zookeeperPort, null);
    }

    public static EphemeralKafkaBroker create(int kafkaPort, int zookeeperPort, Properties overrideBrokerProperties) {
        return new EphemeralKafkaBroker(kafkaPort, zookeeperPort, overrideBrokerProperties);
    }

    EphemeralKafkaBroker(int kafkaPort, int zookeeperPort, Properties overrideBrokerProperties) {
        this.kafkaPort = kafkaPort;
        this.zookeeperPort = zookeeperPort;
        this.overrideBrokerProperties = overrideBrokerProperties;
    }

    EphemeralKafkaBroker(TestingServer zookeeper, int kafkaPort, Properties overrideBrokerProperties) {
        this.zookeeper = zookeeper;
        this.kafkaPort = kafkaPort;
        this.overrideBrokerProperties = overrideBrokerProperties;
        this.managedZk = true;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public CompletableFuture<Void> start() throws Exception {
        if (!this.brokerStarted) {
            EphemeralKafkaBroker ephemeralKafkaBroker = this;
            synchronized (ephemeralKafkaBroker) {
                if (!this.brokerStarted) {
                    return this.startBroker();
                }
            }
        }
        return CompletableFuture.completedFuture(null);
    }

    private CompletableFuture<Void> startBroker() throws Exception {
        if (!this.managedZk) {
            if (this.zookeeperPort == -1) {
                this.zookeeper = new TestingServer(true);
                this.zookeeperPort = this.zookeeper.getPort();
            } else {
                this.zookeeper = new TestingServer(this.zookeeperPort, true);
            }
        }
        this.kafkaPort = this.kafkaPort == -1 ? InstanceSpec.getRandomPort() : this.kafkaPort;
        String zookeeperConnectionString = this.zookeeper.getConnectString();
        KafkaConfig kafkaConfig = this.buildKafkaConfig(zookeeperConnectionString);
        LOGGER.info("Starting Kafka server with config: {}", (Object)kafkaConfig.props());
        this.kafkaServer = new KafkaServerStartable(kafkaConfig);
        this.brokerStarted = true;
        Integer brokerId = this.kafkaServer.staticServerConfig().getInt(KafkaConfig.BrokerIdProp());
        if (brokerId != null) {
            Files.write(this.kafkaLogDir.resolve("meta.properties"), ("version=0\nbroker.id=" + brokerId).getBytes(StandardCharsets.UTF_8), new OpenOption[0]);
        }
        return CompletableFuture.runAsync(() -> this.kafkaServer.startup());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void stop() throws ExecutionException, InterruptedException {
        if (this.brokerStarted) {
            EphemeralKafkaBroker ephemeralKafkaBroker = this;
            synchronized (ephemeralKafkaBroker) {
                if (this.brokerStarted) {
                    this.stopBroker();
                    this.brokerStarted = false;
                }
            }
        }
    }

    private void stopBroker() throws ExecutionException, InterruptedException {
        this.stopBrokerAsync().get();
    }

    CompletableFuture<Void> stopBrokerAsync() {
        return CompletableFuture.runAsync(() -> {
            try {
                if (this.kafkaServer != null) {
                    LOGGER.info("Shutting down Kafka Server");
                    this.kafkaServer.shutdown();
                    this.kafkaServer.awaitShutdown();
                }
                if (this.zookeeper != null && !this.managedZk) {
                    LOGGER.info("Shutting down Zookeeper");
                    this.zookeeper.close();
                }
                if (Files.exists(this.kafkaLogDir, new LinkOption[0])) {
                    LOGGER.info("Deleting the log dir:  {}", (Object)this.kafkaLogDir);
                    Files.walkFileTree(this.kafkaLogDir, (FileVisitor<? super Path>)new SimpleFileVisitor<Path>(){

                        @Override
                        public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException {
                            Files.deleteIfExists(file);
                            return FileVisitResult.CONTINUE;
                        }

                        @Override
                        public FileVisitResult postVisitDirectory(Path dir, IOException exc) throws IOException {
                            Files.deleteIfExists(dir);
                            return FileVisitResult.CONTINUE;
                        }
                    });
                }
            }
            catch (Exception e) {
                LOGGER.error("Failed to clean-up Kafka", (Throwable)e);
            }
        });
    }

    private KafkaConfig buildKafkaConfig(String zookeeperQuorum) throws IOException {
        this.kafkaLogDir = Files.createTempDirectory("kafka_junit", new FileAttribute[0]);
        Properties props = new Properties();
        props.put("advertised.listeners", "PLAINTEXT://localhost:" + this.kafkaPort);
        props.put("listeners", "PLAINTEXT://0.0.0.0:" + this.kafkaPort);
        props.put("port", this.kafkaPort + "");
        props.put("broker.id", "1");
        props.put("log.dirs", this.kafkaLogDir.toAbsolutePath().toString());
        props.put("zookeeper.connect", zookeeperQuorum);
        props.put("leader.imbalance.check.interval.seconds", "1");
        props.put("offsets.topic.num.partitions", "1");
        props.put("offsets.topic.replication.factor", "1");
        props.put("default.replication.factor", "1");
        props.put("num.partitions", "1");
        props.put("group.min.session.timeout.ms", "100");
        props.put("transaction.state.log.replication.factor", "1");
        props.put("transaction.state.log.min.isr", "1");
        props.put("transaction.state.log.num.partitions", "1");
        props.put("transaction.timeout.ms", "500");
        if (this.overrideBrokerProperties != null) {
            props.putAll((Map<?, ?>)this.overrideBrokerProperties);
        }
        return new KafkaConfig((Map)props);
    }

    public Properties producerConfig() {
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:" + this.kafkaPort);
        props.put("acks", "1");
        props.put("batch.size", "10");
        props.put("client.id", "kafka-junit");
        props.put("request.timeout.ms", "500");
        return props;
    }

    public Properties consumerConfig() {
        return this.consumerConfig(true);
    }

    public Properties consumerConfig(boolean enableAutoCommit) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:" + this.kafkaPort);
        props.put("group.id", "kafka-junit-consumer");
        props.put("enable.auto.commit", String.valueOf(enableAutoCommit));
        props.put("auto.commit.interval.ms", "10");
        props.put("auto.offset.reset", "earliest");
        props.put("heartbeat.interval.ms", "100");
        props.put("session.timeout.ms", "200");
        props.put("fetch.max.wait.ms", "200");
        props.put("metadata.max.age.ms", "100");
        return props;
    }

    public <K, V> KafkaProducer<K, V> createProducer(Serializer<K> keySerializer, Serializer<V> valueSerializer, Properties overrideConfig) {
        Properties conf = this.producerConfig();
        if (overrideConfig != null) {
            conf.putAll((Map<?, ?>)overrideConfig);
        }
        keySerializer.configure((Map)Maps.fromProperties((Properties)conf), true);
        valueSerializer.configure((Map)Maps.fromProperties((Properties)conf), false);
        return new KafkaProducer(conf, keySerializer, valueSerializer);
    }

    public <K, V> KafkaConsumer<K, V> createConsumer(Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer, Properties overrideConfig) {
        Properties conf = this.consumerConfig();
        if (overrideConfig != null) {
            conf.putAll((Map<?, ?>)overrideConfig);
        }
        keyDeserializer.configure((Map)Maps.fromProperties((Properties)conf), true);
        valueDeserializer.configure((Map)Maps.fromProperties((Properties)conf), false);
        return new KafkaConsumer(conf, keyDeserializer, valueDeserializer);
    }

    public Optional<Integer> getKafkaPort() {
        return this.brokerStarted ? Optional.of(this.kafkaPort) : Optional.empty();
    }

    public Optional<Integer> getZookeeperPort() {
        return this.brokerStarted ? Optional.of(this.zookeeperPort) : Optional.empty();
    }

    public Optional<String> getLogDir() {
        return this.brokerStarted ? Optional.of(this.kafkaLogDir.toString()) : Optional.empty();
    }

    public Optional<String> getZookeeperConnectString() {
        return this.brokerStarted ? Optional.of(this.zookeeper.getConnectString()) : Optional.empty();
    }

    public Optional<String> getBrokerList() {
        return this.brokerStarted ? Optional.of("localhost:" + this.kafkaPort) : Optional.empty();
    }

    public boolean isRunning() {
        return this.brokerStarted;
    }
}

