package com.dtyunxi.huieryun.mq.provider.aliyun;

import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.ONSFactory;
import com.aliyun.openservices.ons.api.OnExceptionContext;
import com.aliyun.openservices.ons.api.Producer;
import com.aliyun.openservices.ons.api.SendResult;
import com.aliyun.openservices.ons.api.exception.ONSClientException;
import com.dtyunxi.app.ServiceContext;
import com.dtyunxi.huieryun.mq.api.AbstractProducer;
import com.dtyunxi.huieryun.mq.api.SendCallback;
import com.dtyunxi.huieryun.mq.constant.MQConstants;
import com.dtyunxi.huieryun.mq.provider.aliyun.util.AliyunConsumerHelper;
import com.dtyunxi.huieryun.mq.vo.MessageResponse;
import com.dtyunxi.huieryun.retry.vo.RetryerRegistryVo;
import com.dtyunxi.lang.BusinessRuntimeException;
import com.dtyunxi.util.JacksonUtil;
import java.util.concurrent.Callable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/dtyunxi/huieryun/mq/provider/aliyun/AliyunProducer.class */
public class AliyunProducer extends AbstractProducer {
    private static final Logger logger = LoggerFactory.getLogger(AliyunProducer.class);
    private static final ThreadLocal<Integer> messageType = new ThreadLocal<>();
    private static final AliyunProducer single = new AliyunProducer();
    private Producer producer;

    private AliyunProducer() {
    }

    public static AliyunProducer getInstance() {
        return single;
    }

    protected Boolean connectMQServer() {
        try {
            this.producer = ONSFactory.createProducer(AliyunConsumerHelper.getProperties(this.messageRegistryVo));
            this.producer.start();
            return true;
        } catch (ONSClientException e) {
            logger.error("创建Aliyun Producer客户端异常！host:{},username:{},password{}", new Object[]{this.messageRegistryVo.getHost(), this.messageRegistryVo.getUsername(), this.messageRegistryVo.getPassword(), e});
            return false;
        } catch (Exception e2) {
            logger.error("创建Aliyun Producer时出错， host:{},username:{},password{}", new Object[]{this.messageRegistryVo.getHost(), this.messageRegistryVo.getUsername(), this.messageRegistryVo.getPassword(), e2});
            throw new BusinessRuntimeException("10001", e2);
        }
    }

    public MessageResponse sendSingleMessage(String str, String str2, Object obj, long j) {
        messageType.set(MQConstants.MQ_SEDN_TYPE_SINGLE);
        return executeMessageSend(str, str2, obj, j);
    }

    public MessageResponse sendMessage(String str, String str2, Object obj, long j) {
        messageType.set(MQConstants.MQ_SEDN_TYPE_PUB);
        return executeMessageSend(str, str2, obj, j);
    }

    private MessageResponse executeMessageSend(String str, String str2, Object obj, long j) {
        if (this.producer == null) {
            throw new BusinessRuntimeException("阿里云MQ发送者尚未创建，无法发送消息！");
        }
        try {
            Message packMessage = packMessage(str, str2, obj);
            packMessage.setKey("ORDERID_" + getMessageId());
            if (j > 0) {
                packMessage.setStartDeliverTime(System.currentTimeMillis() + (j * 1000));
            }
            SendResult sendMessage = sendMessage(packMessage);
            logger.info("messageKey:{}  messageId:{}", packMessage.getKey(), sendMessage.getMessageId());
            return new MessageResponse(sendMessage.getMessageId());
        } catch (Exception e) {
            logger.error("发送消息出错，topicName：{} routingKey:{}", new Object[]{str, str2, e});
            throw new BusinessRuntimeException("10001", e);
        } catch (ONSClientException e2) {
            logger.error("发送消息客户端异常，请参考阿里给出指引链接处理！topicName：{} routingKey:{}", new Object[]{str, str2, e2});
            throw new BusinessRuntimeException("10001", e2);
        }
    }

    public MessageResponse sendAsyncSingleMessage(String str, String str2, Object obj, long j, SendCallback sendCallback) {
        messageType.set(MQConstants.MQ_SEDN_TYPE_SINGLE);
        return executeAsyncMessageSend(str, str2, obj, j, sendCallback);
    }

    public MessageResponse sendAsyncMessage(String str, Object obj, SendCallback sendCallback) {
        messageType.set(MQConstants.MQ_SEDN_TYPE_PUB);
        return executeAsyncMessageSend(this.messageRegistryVo.getTopicName(), str, obj, 0L, sendCallback);
    }

    public MessageResponse sendAsyncMessage(String str, Object obj, long j, SendCallback sendCallback) {
        messageType.set(MQConstants.MQ_SEDN_TYPE_PUB);
        return executeAsyncMessageSend(this.messageRegistryVo.getTopicName(), str, obj, j, sendCallback);
    }

    public MessageResponse sendAsyncMessage(String str, String str2, Object obj, long j, SendCallback sendCallback) {
        messageType.set(MQConstants.MQ_SEDN_TYPE_PUB);
        return executeAsyncMessageSend(str, str2, obj, j, sendCallback);
    }

    private MessageResponse executeAsyncMessageSend(String str, String str2, Object obj, long j, SendCallback sendCallback) {
        if (this.producer == null) {
            throw new BusinessRuntimeException("阿里云MQ发送者尚未创建，无法发送消息！");
        }
        try {
            Message packMessage = packMessage(str, str2, obj);
            packMessage.setKey("ORDERID_" + getMessageId());
            if (j > 0) {
                packMessage.setStartDeliverTime(System.currentTimeMillis() + (j * 1000));
            }
            logger.info("开始发送异步消息,topic:{}, tag:{},key:{}", new Object[]{packMessage.getTopic(), packMessage.getTag(), packMessage.getKey()});
            sendAsync(packMessage, sendCallback);
            return MessageResponse.SUCCESS;
        } catch (ONSClientException e) {
            logger.error("发送消息客户端异常，请参考阿里给出指引链接处理！topicName：{} routingKey:{}", new Object[]{str, str2, e});
            throw new BusinessRuntimeException("10001", e);
        } catch (Exception e2) {
            logger.error("发送消息出错，topicName：{} routingKey:{}", new Object[]{str, str2, e2});
            throw new BusinessRuntimeException("10001", e2);
        }
    }

    private Message packMessage(String str, String str2, Object obj) {
        Message message = new Message(str, str2, this.serializeCode.serialize(obj));
        if (this.serializeCode.isJdkSerializer()) {
            return message;
        }
        message.putUserProperties("mqMessageBodyClassInfo", JacksonUtil.toJson(obj.getClass()));
        return message;
    }

    public MessageResponse sendBroadcastMessage(String str, String str2, Object obj, long j) {
        messageType.set(MQConstants.MQ_SEDN_TYPE_BROADCAST);
        return executeMessageSend(str, str2, obj, j);
    }

    public MessageResponse sendAsyncBroadcastMessage(String str, String str2, Object obj, long j, SendCallback sendCallback) {
        messageType.set(MQConstants.MQ_SEDN_TYPE_BROADCAST);
        return executeAsyncMessageSend(str, str2, obj, 0L, sendCallback);
    }

    private SendResult sendMessage(Message message) throws Exception {
        return createSendMessageCallable(message).call();
    }

    private Callable<SendResult> createSendMessageCallable(final Message message) {
        addServiceContext(message);
        return new Callable<SendResult>() { // from class: com.dtyunxi.huieryun.mq.provider.aliyun.AliyunProducer.1
            /* JADX WARN: Can't rename method to resolve collision */
            @Override // java.util.concurrent.Callable
            public SendResult call() throws Exception {
                try {
                    if (!AliyunProducer.this.producer.isStarted()) {
                        AliyunProducer.this.producer.start();
                    }
                    return AliyunProducer.this.producer.send(message);
                } catch (ONSClientException e) {
                    AliyunProducer.logger.error("发送消息客户端异常，请参考阿里给出指引链接处理！topicName：{} routingKey:{}", new Object[]{message.getTopic(), message.getTag(), e});
                    throw e;
                } catch (Exception e2) {
                    AliyunProducer.logger.error("发送消息出错，topicName：{} routingKey:{}", new Object[]{message.getTopic(), message.getTag(), e2});
                    throw e2;
                }
            }
        };
    }

    private void addServiceContext(Message message) {
        if (this.messageRegistryVo.isPassSvcContext()) {
            ServiceContext.getContext().getKeys().forEach((str, obj) -> {
                if (str == null || obj == null) {
                    return;
                }
                message.putUserProperties("_context-" + str, obj.toString());
            });
        }
    }

    private Callable<Boolean> createAsyncMessageCallable(final Message message, final SendCallback sendCallback) {
        addServiceContext(message);
        return new Callable<Boolean>() { // from class: com.dtyunxi.huieryun.mq.provider.aliyun.AliyunProducer.2
            /* JADX WARN: Can't rename method to resolve collision */
            @Override // java.util.concurrent.Callable
            public Boolean call() throws Exception {
                try {
                    if (!AliyunProducer.this.producer.isStarted()) {
                        AliyunProducer.this.producer.start();
                    }
                    AliyunProducer.this.producer.sendAsync(message, AliyunProducer.this.createMessageSendCallback(message, sendCallback));
                    return true;
                } catch (ONSClientException e) {
                    AliyunProducer.logger.error("发送消息客户端异常，请参考阿里给出指引链接处理！topicName：{} routingKey:{}", new Object[]{message.getTopic(), message.getTag(), e});
                    return false;
                } catch (Exception e2) {
                    AliyunProducer.logger.error("发送消息出错，topicName：{} routingKey:{}", new Object[]{message.getTopic(), message.getTag(), e2});
                    return false;
                }
            }
        };
    }

    private void sendAsync(Message message, SendCallback sendCallback) throws Exception {
        RetryerRegistryVo retryerRegistryVo = getRetryerRegistryVo();
        Callable<Boolean> createAsyncMessageCallable = createAsyncMessageCallable(message, sendCallback);
        if (retryerRegistryVo.isRetryEnable()) {
            getRetryService().retryWithResultAndIncreaseStrategy(true, retryerRegistryVo.getInitialSleepTime(), retryerRegistryVo.getIncrementSleepTime(), retryerRegistryVo.getAttemptNumber(), createAsyncMessageCallable);
        } else {
            createAsyncMessageCallable.call();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public com.aliyun.openservices.ons.api.SendCallback createMessageSendCallback(final Message message, final SendCallback sendCallback) {
        return new com.aliyun.openservices.ons.api.SendCallback() { // from class: com.dtyunxi.huieryun.mq.provider.aliyun.AliyunProducer.3
            public void onSuccess(SendResult sendResult) {
                AliyunProducer.logger.info("发送异步消息成功! Topic is {}, tag:{},msgId is: {}, key:{}", new Object[]{sendResult.getTopic(), message.getTag(), sendResult.getMessageId(), message.getKey()});
                sendCallback.onSuccess(sendResult.getMessageId());
            }

            public void onException(OnExceptionContext onExceptionContext) {
                AliyunProducer.logger.info("发送消息出错! Topic is {}, msgId is: {}", onExceptionContext.getTopic(), onExceptionContext.getMessageId());
                sendCallback.onException(onExceptionContext.getException(), onExceptionContext.getMessageId());
            }
        };
    }

    public void shutdown() {
        if (this.producer != null) {
            this.producer.shutdown();
        }
    }
}
