/*
 * Decompiled with CFR 0.152.
 */
package com.dtyunxi.huieryun.mq.provider.kafka;

import com.dtyunxi.huieryun.mq.api.AbstractProducer;
import com.dtyunxi.huieryun.mq.api.MQConstants;
import com.dtyunxi.huieryun.mq.api.SendCallback;
import com.dtyunxi.huieryun.mq.vo.MessageRegistryVo;
import com.dtyunxi.huieryun.mq.vo.MessageResponse;
import com.dtyunxi.lang.BusinessRuntimeException;
import com.dtyunxi.util.JacksonUtil;
import java.util.Properties;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KafkaProducer
extends AbstractProducer {
    private static final KafkaProducer single = new KafkaProducer();
    private static Producer<String, String> kproducer;
    private static Logger logger;

    public static KafkaProducer getInstance() {
        return single;
    }

    @Override
    public MessageResponse sendSingleMessage(String topic, String queueName, Object message, long delayTime) {
        try {
            this.sendMessage(topic, null, message, delayTime);
            return MessageResponse.SUCCESS;
        }
        catch (Exception e) {
            this.saveMqLog(topic, queueName, queueName, message, MQConstants.MQ_SEDN_TYPE_SINGLE, e.getMessage(), message.getClass().getName(), "sendSingleMessage");
            throw new BusinessRuntimeException(10001, (Throwable)e);
        }
    }

    @Override
    public MessageResponse sendMessage(String topic, String routingKey, Object message, long delayTime) {
        try {
            kproducer.send(new ProducerRecord(topic, (Object)JacksonUtil.toJson((Object)message)));
            return MessageResponse.SUCCESS;
        }
        catch (Exception e) {
            this.saveMqLog(topic, null, routingKey, message, MQConstants.MQ_SEDN_TYPE_PUB, e.getMessage(), message.getClass().getName(), "sendMessage");
            throw new BusinessRuntimeException(10001, (Throwable)e);
        }
    }

    @Override
    public MessageResponse sendAsyncMessage(String routingKey, Object message, SendCallback callback) {
        return this.sendAsyncMessage(this.getTopicName(), routingKey, message, 0L, callback);
    }

    @Override
    public MessageResponse sendAsyncMessage(String routingKey, Object message, long delayTime, SendCallback callback) {
        return this.sendAsyncMessage(this.getTopicName(), routingKey, message, delayTime, callback);
    }

    @Override
    public MessageResponse sendAsyncMessage(String topic, String routingKey, Object message, long delayTime, final SendCallback callback) {
        kproducer.send(new ProducerRecord(topic, (Object)JacksonUtil.toJson((Object)message)), new Callback(){

            public void onCompletion(RecordMetadata metadata, Exception exception) {
                callback.onSuccess(metadata.offset() + "");
            }
        });
        return MessageResponse.SUCCESS;
    }

    public void createProducer(MessageRegistryVo messageRegistryVo) {
        this.messageRegistryVo = messageRegistryVo;
        try {
            kproducer = new org.apache.kafka.clients.producer.KafkaProducer(KafkaProducer.initProducerProperties(messageRegistryVo));
            logger.info("Create kafka producer Successfully! bootstrap.servers:{},topic:{}", (Object)messageRegistryVo.getBootstrapServers(), (Object)messageRegistryVo.getTopicName());
        }
        catch (Exception e) {
            logger.error("Failed to create kafka producer :" + e.getMessage());
        }
    }

    private static Properties initProducerProperties(MessageRegistryVo messageRegistryVo) {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", messageRegistryVo.getBootstrapServers());
        properties.put("acks", messageRegistryVo.getAcks() == null ? "all" : messageRegistryVo.getAcks());
        properties.put("retries", messageRegistryVo.getRetries() == null ? Integer.valueOf(0) : messageRegistryVo.getRetries());
        properties.put("batch.size", messageRegistryVo.getBatchSize() == null ? Integer.valueOf(16384) : messageRegistryVo.getBatchSize());
        properties.put("linger.ms", messageRegistryVo.getLinger() == null ? Integer.valueOf(1) : messageRegistryVo.getLinger());
        properties.put("buffer.memory", messageRegistryVo.getBufferMemory() == null ? Integer.valueOf(0x2000000) : messageRegistryVo.getBufferMemory());
        properties.put("key.serializer", messageRegistryVo.getKeySerializer() == null ? "org.apache.kafka.common.serialization.StringSerializer" : messageRegistryVo.getKeySerializer());
        properties.put("value.serializer", messageRegistryVo.getValueSerializer() == null ? "org.apache.kafka.common.serialization.StringSerializer" : messageRegistryVo.getValueSerializer());
        return properties;
    }

    static {
        logger = LoggerFactory.getLogger(KafkaProducer.class);
    }
}

