/*
 * Decompiled with CFR 0.152.
 */
package com.dtyunxi.huieryun.mq.provider.kafka;

import com.dtyunxi.huieryun.mq.api.AbstractConsumer;
import com.dtyunxi.huieryun.mq.api.IMessageProcessor;
import com.dtyunxi.huieryun.mq.provider.kafka.impl.KafkaMessageManager;
import com.dtyunxi.huieryun.mq.vo.MessageRegistryVo;
import com.dtyunxi.huieryun.mq.vo.MessageResponse;
import java.util.Arrays;
import java.util.Properties;
import org.apache.kafka.clients.consumer.Consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KafkaConsumer
extends AbstractConsumer {
    private static Logger logger = LoggerFactory.getLogger(KafkaConsumer.class);
    private static final KafkaConsumer single = new KafkaConsumer();
    private static Consumer<String, String> kconsumer;
    private KafkaMessageManager kafkaMessageManager;

    public static KafkaConsumer getInstance() {
        return single;
    }

    @Override
    public <T> MessageResponse receiveMessage(String topic, String queueName, IMessageProcessor<T> processor) {
        return this.subscribe(topic, queueName, processor);
    }

    @Override
    public <T> MessageResponse subscribe(String topic, String routingKey, IMessageProcessor<T> processor) {
        try {
            kconsumer.subscribe(Arrays.asList(topic));
            logger.info("Subsribe Successfully! topic:{}", (Object)topic);
            this.kafkaMessageManager = new KafkaMessageManager(kconsumer, processor);
            this.kafkaMessageManager.start();
            return MessageResponse.SUCCESS;
        }
        catch (Exception e) {
            logger.error("Failed to subribe \uff0ctopicName\uff1a{} error:{}", (Object)topic, (Object)e.getMessage());
            return MessageResponse.ERROR;
        }
    }

    @Override
    public <T> MessageResponse subscribe(String topic, String[] routingKeys, IMessageProcessor<T> processor) {
        return this.subscribe(topic, processor);
    }

    @Override
    public <T> MessageResponse subscribe(String topic, String queueName, String routingKey, IMessageProcessor<T> processor) {
        return this.subscribe(topic, processor);
    }

    @Override
    public MessageResponse unsubscribe(String topic, String routingKey) {
        try {
            kconsumer.unsubscribe();
            logger.info("Unsubscribe Successfully!");
            return MessageResponse.SUCCESS;
        }
        catch (Exception e) {
            logger.error("Failed to unsubribe error:{}", (Object)e.getMessage());
            return MessageResponse.ERROR;
        }
    }

    public void createConsumer(MessageRegistryVo messageRegistryVo) {
        this.messageRegistryVo = messageRegistryVo;
        Properties properties = KafkaConsumer.initConsumerProperties(messageRegistryVo);
        try {
            kconsumer = new org.apache.kafka.clients.consumer.KafkaConsumer(properties);
            logger.info("Create kafka consumer Successfully! bootstrapServers:{},groupId:{}", properties.get("bootstrap.servers"), properties.get("group.id"));
        }
        catch (Exception e) {
            logger.error("Failed to create kafka consumer :" + e.getMessage());
        }
    }

    private static Properties initConsumerProperties(MessageRegistryVo messageRegistryVo) {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", messageRegistryVo.getBootstrapServers());
        properties.put("group.id", messageRegistryVo.getConsumerGroupId() == null ? "default_group" : messageRegistryVo.getConsumerGroupId());
        properties.put("key.deserializer", messageRegistryVo.getKeyDeserializer() == null ? "org.apache.kafka.common.serialization.StringDeserializer" : messageRegistryVo.getKeyDeserializer());
        properties.put("value.deserializer", messageRegistryVo.getValueDeserializer() == null ? "org.apache.kafka.common.serialization.StringDeserializer" : messageRegistryVo.getValueDeserializer());
        return properties;
    }
}

