package com.dtyunxi.huieryun.mq.provider.rocket;

import com.alibaba.fastjson.JSON;
import com.dtyunxi.huieryun.mq.api.AbstractConsumer;
import com.dtyunxi.huieryun.mq.api.IMessageProcessor;
import com.dtyunxi.huieryun.mq.provider.rocket.impl.factory.RocketConsumerFactory;
import com.dtyunxi.huieryun.mq.provider.rocket.util.RocketMQConsumerHelper;
import com.dtyunxi.huieryun.mq.util.SerializeCode;
import com.dtyunxi.huieryun.mq.vo.MessageRegistryVo;
import com.dtyunxi.huieryun.mq.vo.MessageResponse;
import com.dtyunxi.lang.BusinessRuntimeException;
import java.util.List;
import java.util.Map;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
import org.apache.rocketmq.common.ServiceState;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/dtyunxi/huieryun/mq/provider/rocket/RocketConsumer.class */
public class RocketConsumer extends AbstractConsumer {
    private static Logger logger = LoggerFactory.getLogger(RocketConsumer.class);
    private static volatile RocketConsumer single = null;
    private static final String CONSUMER_GROUP_FORMATER = "%s_%s_%s";

    private RocketConsumer() {
    }

    public static RocketConsumer getInstance() {
        if (single == null) {
            synchronized (RocketConsumer.class) {
                single = new RocketConsumer();
            }
        }
        return single;
    }

    public void createConsumer(MessageRegistryVo messageRegistryVo) {
        ((AbstractConsumer) this).messageRegistryVo = messageRegistryVo;
    }

    public <T> MessageResponse receiveMessage(String str, String str2, IMessageProcessor<T> iMessageProcessor) {
        logger.info("receiveMessage,topicName:{},routingKey:{}", str, str2);
        return receiveMessage(str, this.messageRegistryVo.getConsumerId(), str2, iMessageProcessor);
    }

    public <T> MessageResponse receiveMessage(String str, String str2, String str3, IMessageProcessor<T> iMessageProcessor) {
        return toSubscribeMessage(str, str2, str3, iMessageProcessor, null);
    }

    public MessageResponse receiveMessage(String str, String str2, Map<String, IMessageProcessor> map) {
        try {
            for (Map.Entry<String, IMessageProcessor> entry : map.entrySet()) {
                receiveMessage(str, str2, entry.getKey(), entry.getValue());
            }
            return MessageResponse.SUCCESS;
        } catch (Exception e) {
            logger.error("接收消息出错，topicName:{},consumer:{}", new Object[]{str, str2, e});
            throw new BusinessRuntimeException("接收消息出错，topicName:" + str + ",consumer:" + str2, e);
        }
    }

    public <T> MessageResponse subscribe(String str, String str2, IMessageProcessor<T> iMessageProcessor) {
        return subscribe(str, this.messageRegistryVo.getConsumerId(), str2, iMessageProcessor);
    }

    public <T> MessageResponse subscribe(String str, String str2, String str3, IMessageProcessor<T> iMessageProcessor) {
        logger.info("subscribe,topicName:{},consumer:{},routingKey:{}", new Object[]{str, str2, str3});
        return toSubscribeMessage(str, str2, str3, iMessageProcessor, MessageModel.CLUSTERING);
    }

    private <T> MessageResponse toSubscribeMessage(String str, String str2, String str3, IMessageProcessor<T> iMessageProcessor, MessageModel messageModel) {
        String consumerId = str2 != null ? str2 : this.messageRegistryVo.getConsumerId();
        try {
            DefaultMQPushConsumer rocketConsumer = RocketConsumerFactory.getRocketConsumer(StringUtils.isNotBlank(str3) ? String.format(CONSUMER_GROUP_FORMATER, consumerId, str, str3) : String.format(CONSUMER_GROUP_FORMATER, consumerId, str, "ALL"), this.messageRegistryVo);
            rocketConsumer.subscribe(str, str3);
            if (messageModel != null) {
                rocketConsumer.setMessageModel(messageModel);
            } else {
                rocketConsumer.setMessageModel(MessageModel.CLUSTERING);
            }
            if (messageModel == null) {
                rocketConsumer.registerMessageListener(createOrderlyMessageListener(iMessageProcessor));
            } else {
                rocketConsumer.registerMessageListener(createMessageConcurrentlyListener(iMessageProcessor));
            }
            if (rocketConsumer.getDefaultMQPushConsumerImpl().getServiceState().equals(ServiceState.CREATE_JUST)) {
                rocketConsumer.start();
            }
            logger.info("ConsumerStarted.");
            return MessageResponse.SUCCESS;
        } catch (Exception e) {
            logger.error("接收消息出错，topicName:{},consumer:{},routingKey:{}", new Object[]{str, consumerId, str3, e});
            throw new BusinessRuntimeException("接收消息出错，topicName:" + str + ",consumer:" + consumerId + ",routingKey:" + str3, e);
        }
    }

    private <T> MessageListenerConcurrently createMessageConcurrentlyListener(final IMessageProcessor<T> iMessageProcessor) {
        return new MessageListenerConcurrently() { // from class: com.dtyunxi.huieryun.mq.provider.rocket.RocketConsumer.1
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                MessageExt messageExt = list.get(0);
                try {
                    Object deSerialize = SerializeCode.deSerialize(messageExt.getBody());
                    try {
                        try {
                            RocketMQConsumerHelper.setMsgfo(messageExt);
                            RocketMQConsumerHelper.setMsgId(messageExt.getKeys());
                            if (MessageResponse.SUCCESS.equals(iMessageProcessor.process(deSerialize))) {
                                if (RocketConsumer.logger.isInfoEnabled()) {
                                    RocketConsumer.logger.info("消息 {} 处理成功", JSON.toJSONString(deSerialize));
                                }
                                ConsumeConcurrentlyStatus consumeConcurrentlyStatus = ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                                RocketMQConsumerHelper.clearThreadInfo();
                                return consumeConcurrentlyStatus;
                            }
                            if (RocketConsumer.logger.isInfoEnabled()) {
                                RocketConsumer.logger.info("消息 {} 处理失败", JSON.toJSONString(deSerialize));
                            }
                            ConsumeConcurrentlyStatus consumeConcurrentlyStatus2 = ConsumeConcurrentlyStatus.RECONSUME_LATER;
                            RocketMQConsumerHelper.clearThreadInfo();
                            return consumeConcurrentlyStatus2;
                        } catch (Exception e) {
                            RocketConsumer.logger.error("", e);
                            ConsumeConcurrentlyStatus consumeConcurrentlyStatus3 = ConsumeConcurrentlyStatus.RECONSUME_LATER;
                            RocketMQConsumerHelper.clearThreadInfo();
                            return consumeConcurrentlyStatus3;
                        }
                    } catch (Throwable th) {
                        RocketMQConsumerHelper.clearThreadInfo();
                        throw th;
                    }
                } catch (BusinessRuntimeException e2) {
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                }
            }
        };
    }

    private <T> MessageListenerOrderly createOrderlyMessageListener(final IMessageProcessor<T> iMessageProcessor) {
        return new MessageListenerOrderly() { // from class: com.dtyunxi.huieryun.mq.provider.rocket.RocketConsumer.2
            public ConsumeOrderlyStatus consumeMessage(List<MessageExt> list, ConsumeOrderlyContext consumeOrderlyContext) {
                consumeOrderlyContext.setAutoCommit(true);
                MessageExt messageExt = list.get(0);
                try {
                    try {
                        Object deSerialize = SerializeCode.deSerialize(messageExt.getBody());
                        try {
                            RocketMQConsumerHelper.setMsgId(messageExt.getKeys());
                            RocketMQConsumerHelper.setMsgfo(messageExt);
                            if (MessageResponse.SUCCESS.equals(iMessageProcessor.process(deSerialize))) {
                                if (RocketConsumer.logger.isInfoEnabled()) {
                                    RocketConsumer.logger.info("消息 {} 处理成功", JSON.toJSONString(deSerialize));
                                }
                                ConsumeOrderlyStatus consumeOrderlyStatus = ConsumeOrderlyStatus.SUCCESS;
                                RocketMQConsumerHelper.clearThreadInfo();
                                return consumeOrderlyStatus;
                            }
                            if (RocketConsumer.logger.isInfoEnabled()) {
                                RocketConsumer.logger.info("消息 {} 处理失败", JSON.toJSONString(deSerialize));
                            }
                            ConsumeOrderlyStatus consumeOrderlyStatus2 = ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
                            RocketMQConsumerHelper.clearThreadInfo();
                            return consumeOrderlyStatus2;
                        } catch (Exception e) {
                            RocketConsumer.logger.error("", e);
                            RocketMQConsumerHelper.clearThreadInfo();
                            return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
                        }
                    } catch (Throwable th) {
                        RocketMQConsumerHelper.clearThreadInfo();
                        throw th;
                    }
                } catch (BusinessRuntimeException e2) {
                    return ConsumeOrderlyStatus.SUCCESS;
                }
            }
        };
    }

    public <T> MessageResponse subscribe(String str, String[] strArr, IMessageProcessor<T> iMessageProcessor) {
        logger.info("subscribe,topicName:{},routingKeys:{}", str, strArr);
        return subscribe(str, getSubExpression(strArr), iMessageProcessor);
    }

    public <T> MessageResponse subscribeBroadcastMessage(String str, String str2, IMessageProcessor<T> iMessageProcessor) {
        return toSubscribeMessage(str, this.messageRegistryVo.getConsumerId(), str2, iMessageProcessor, MessageModel.BROADCASTING);
    }

    public <T> MessageResponse subscribeBroadcastMessage(String str, String[] strArr, IMessageProcessor<T> iMessageProcessor) {
        return toSubscribeMessage(str, this.messageRegistryVo.getConsumerId(), getSubExpression(strArr), iMessageProcessor, MessageModel.BROADCASTING);
    }

    public MessageResponse unsubscribe(String str, String str2) {
        try {
            RocketConsumerFactory.getRocketConsumer(String.format(CONSUMER_GROUP_FORMATER, this.messageRegistryVo.getConsumerId() != null ? this.messageRegistryVo.getConsumerId() : "", str, str2), this.messageRegistryVo).unsubscribe(str);
            return MessageResponse.SUCCESS;
        } catch (Exception e) {
            logger.error("取消订阅失败，topicName：{}", str, e);
            throw new BusinessRuntimeException(e.getMessage());
        }
    }
}
