package com.dtyunxi.cube.plugin.mq.impl;

import com.alibaba.fastjson.JSON;
import com.dtyunxi.cube.plugin.mq.ICommonsMqService;
import com.dtyunxi.cube.plugin.mq.TopicRegistryVo;
import com.dtyunxi.huieryun.mq.api.IConsumer;
import com.dtyunxi.huieryun.mq.api.IMessageProcessor;
import com.dtyunxi.huieryun.mq.api.IProducer;
import com.dtyunxi.huieryun.mq.constant.MessageType;
import com.dtyunxi.huieryun.mq.vo.MessageRegistryVo;
import com.dtyunxi.huieryun.mq.vo.MessageResponse;
import java.util.HashMap;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.scheduling.annotation.Async;

/* loaded from: input_file:com/dtyunxi/cube/plugin/mq/impl/CommonsMqServiceImpl.class */
public class CommonsMqServiceImpl implements ICommonsMqService {
    private static Logger logger = LoggerFactory.getLogger(CommonsMqServiceImpl.class);
    private IProducer commonsProducer;
    private IConsumer commonsConsumer;
    private MessageRegistryVo messageRegistryVo;
    private String profile;
    private TopicRegistryVo topicRegistryVo;

    public CommonsMqServiceImpl(IProducer iProducer, IConsumer iConsumer, MessageRegistryVo messageRegistryVo, String str, TopicRegistryVo topicRegistryVo) {
        this.commonsProducer = iProducer;
        this.commonsConsumer = iConsumer;
        this.messageRegistryVo = messageRegistryVo;
        this.profile = str;
        this.topicRegistryVo = topicRegistryVo;
    }

    @Override // com.dtyunxi.cube.plugin.mq.ICommonsMqService
    public MessageResponse sendDelaySingleMessage(String str, Object obj, Long l) {
        try {
            return sendDelaySingleMessageWithThrownException(str, obj, l);
        } catch (Exception e) {
            logger.error("发送点对点延迟MQ消息出现异常", e);
            return MessageResponse.ERROR;
        }
    }

    @Override // com.dtyunxi.cube.plugin.mq.ICommonsMqService
    public MessageResponse sendSingleMessage(String str, Object obj) {
        try {
            return sendSingleMessageWithThrownException(str, obj);
        } catch (Exception e) {
            logger.error("发送MQ消息出现异常", e);
            return MessageResponse.ERROR;
        }
    }

    @Override // com.dtyunxi.cube.plugin.mq.ICommonsMqService
    public MessageResponse sendDelaySingleMessageWithThrownException(String str, Object obj, Long l) {
        return sendDelaySingleMessageWithThrownException(this.topicRegistryVo.getSingleTopic(), str, obj, l);
    }

    @Override // com.dtyunxi.cube.plugin.mq.ICommonsMqService
    public MessageResponse sendSingleMessageWithThrownException(String str, Object obj) {
        return sendSingleMessageWithThrownException(this.topicRegistryVo.getSingleTopic(), str, obj);
    }

    @Override // com.dtyunxi.cube.plugin.mq.ICommonsMqService
    @Async
    public MessageResponse sendSingleMessageAsync(String str, Object obj) {
        return sendSingleMessage(str, obj);
    }

    @Override // com.dtyunxi.cube.plugin.mq.ICommonsMqService
    @Async
    public MessageResponse sendDelaySingleMessageAsync(String str, Object obj, Long l) {
        return sendDelaySingleMessage(str, obj, l);
    }

    @Override // com.dtyunxi.cube.plugin.mq.ICommonsMqService
    @Async
    public MessageResponse sendSingleMessageAsyncWithThrownException(String str, Object obj) {
        return sendSingleMessageWithThrownException(str, obj);
    }

    @Override // com.dtyunxi.cube.plugin.mq.ICommonsMqService
    @Async
    public MessageResponse sendDelaySingleMessageAsyncWithThrownException(String str, Object obj, Long l) {
        return sendDelaySingleMessageWithThrownException(str, obj, l);
    }

    @Override // com.dtyunxi.cube.plugin.mq.ICommonsMqService
    public MessageResponse sendDelaySingleMessage(String str, String str2, Object obj, Long l) {
        return sendDelaySingleMessageWithThrownException(str, str2, obj, l);
    }

    @Override // com.dtyunxi.cube.plugin.mq.ICommonsMqService
    public MessageResponse sendSingleMessage(String str, String str2, Object obj) {
        return sendSingleMessageWithThrownException(str, str2, obj);
    }

    @Override // com.dtyunxi.cube.plugin.mq.ICommonsMqService
    public MessageResponse sendDelaySingleMessageWithThrownException(String str, String str2, Object obj, Long l) {
        String parseTopicWithProfile = parseTopicWithProfile(str);
        String parseTag = parseTag(str2);
        logger.info("开始发送点对点MQ消息，topic:{}, tag：{}， delayTime：{}", new Object[]{parseTopicWithProfile, parseTag, l});
        try {
            return this.commonsProducer.sendSingleMessage(parseTopicWithProfile, parseTag, obj, l.longValue());
        } catch (Exception e) {
            throw e;
        }
    }

    @Override // com.dtyunxi.cube.plugin.mq.ICommonsMqService
    public MessageResponse sendSingleMessageWithThrownException(String str, String str2, Object obj) {
        String parseTopicWithProfile = parseTopicWithProfile(str);
        String parseTag = parseTag(str2);
        logger.info("开始发送点对点MQ消息，topic :{}, tag：{}", parseTopicWithProfile, parseTag);
        try {
            return this.commonsProducer.sendSingleMessage(parseTopicWithProfile, parseTag, obj);
        } catch (Exception e) {
            throw e;
        }
    }

    @Override // com.dtyunxi.cube.plugin.mq.ICommonsMqService
    @Async
    public MessageResponse sendSingleMessageAsync(String str, String str2, Object obj) {
        return sendSingleMessage(str, str2, obj);
    }

    @Override // com.dtyunxi.cube.plugin.mq.ICommonsMqService
    @Async
    public MessageResponse sendDelaySingleMessageAsync(String str, String str2, Object obj, Long l) {
        return sendDelaySingleMessage(str, str2, obj, l);
    }

    @Override // com.dtyunxi.cube.plugin.mq.ICommonsMqService
    @Async
    public MessageResponse sendSingleMessageAsyncWithThrownException(String str, String str2, Object obj) {
        return sendSingleMessageWithThrownException(str, str2, obj);
    }

    @Override // com.dtyunxi.cube.plugin.mq.ICommonsMqService
    @Async
    public MessageResponse sendDelaySingleMessageAsyncWithThrownException(String str, String str2, Object obj, Long l) {
        return sendDelaySingleMessageWithThrownException(str, str2, obj, l);
    }

    @Override // com.dtyunxi.cube.plugin.mq.ICommonsMqService
    public MessageResponse publishMessage(String str, Object obj) {
        return publishMessage(this.topicRegistryVo.getPublishTopic(), str, obj);
    }

    @Override // com.dtyunxi.cube.plugin.mq.ICommonsMqService
    public MessageResponse publishDelayMessage(String str, Object obj, Long l) {
        return publishDelayMessage(this.topicRegistryVo.getPublishTopic(), str, obj, l);
    }

    @Override // com.dtyunxi.cube.plugin.mq.ICommonsMqService
    public MessageResponse publishMessageWithThrownException(String str, Object obj) {
        return publishMessageWithThrownException(this.topicRegistryVo.getPublishTopic(), str, obj);
    }

    @Override // com.dtyunxi.cube.plugin.mq.ICommonsMqService
    public MessageResponse publishDelayMessageWithThrownException(String str, Object obj, Long l) {
        return publishDelayMessageWithThrownException(this.topicRegistryVo.getPublishTopic(), str, obj, l);
    }

    @Override // com.dtyunxi.cube.plugin.mq.ICommonsMqService
    public MessageResponse publishMessage(String str, String str2, Object obj) {
        try {
            return publishMessageWithThrownException(str, str2, obj);
        } catch (Exception e) {
            logger.error("发送订阅延迟MQ消息出现异常", e);
            return MessageResponse.ERROR;
        }
    }

    @Override // com.dtyunxi.cube.plugin.mq.ICommonsMqService
    public MessageResponse publishDelayMessage(String str, String str2, Object obj, Long l) {
        try {
            publishDelayMessageWithThrownException(str, str2, obj, l);
        } catch (Exception e) {
            logger.error("发送订阅延迟MQ消息出现异常", e);
        }
        return MessageResponse.ERROR;
    }

    @Override // com.dtyunxi.cube.plugin.mq.ICommonsMqService
    public MessageResponse publishMessageWithThrownException(String str, String str2, Object obj) {
        String parseTopicWithProfile = parseTopicWithProfile(str);
        String parseTag = parseTag(str2);
        logger.info("开始发送订阅MQ消息， topic:{}, tag：{}", parseTopicWithProfile, parseTag);
        try {
            return this.commonsProducer.sendMessage(parseTopicWithProfile, parseTag, obj);
        } catch (Exception e) {
            throw e;
        }
    }

    @Override // com.dtyunxi.cube.plugin.mq.ICommonsMqService
    public MessageResponse publishDelayMessageWithThrownException(String str, String str2, Object obj, Long l) {
        String parseTopicWithProfile = parseTopicWithProfile(str);
        String parseTag = parseTag(str2);
        logger.info("开始发送订阅MQ消息，topic:{}, tag：{}， delayTime:{}", new Object[]{parseTopicWithProfile, parseTag, l});
        try {
            return this.commonsProducer.sendMessage(parseTopicWithProfile, parseTag, obj, l.longValue());
        } catch (Exception e) {
            throw e;
        }
    }

    @Override // com.dtyunxi.cube.plugin.mq.ICommonsMqService
    @Async
    public MessageResponse publishMessageAsync(String str, Object obj) {
        return publishMessage(str, obj);
    }

    @Override // com.dtyunxi.cube.plugin.mq.ICommonsMqService
    @Async
    public MessageResponse publishDelayMessageAsync(String str, Object obj, Long l) {
        return publishDelayMessage(str, obj, l);
    }

    @Override // com.dtyunxi.cube.plugin.mq.ICommonsMqService
    @Async
    public MessageResponse publishMessageAsync(String str, String str2, Object obj) {
        return publishMessage(str, str2, obj);
    }

    @Override // com.dtyunxi.cube.plugin.mq.ICommonsMqService
    @Async
    public MessageResponse publishDelayMessageAsync(String str, String str2, Object obj, Long l) {
        return publishDelayMessage(str, str2, obj, l);
    }

    @Override // com.dtyunxi.cube.plugin.mq.ICommonsMqService
    @Async
    public MessageResponse publishMessageAsyncWithThrownException(String str, Object obj) {
        return publishMessageWithThrownException(str, obj);
    }

    @Override // com.dtyunxi.cube.plugin.mq.ICommonsMqService
    @Async
    public MessageResponse publishDelayMessageAsyncWithThrownException(String str, Object obj, Long l) {
        return publishDelayMessageWithThrownException(str, obj, l);
    }

    @Override // com.dtyunxi.cube.plugin.mq.ICommonsMqService
    @Async
    public MessageResponse publishMessageAsyncWithThrownException(String str, String str2, Object obj) {
        return publishMessageWithThrownException(str, str2, obj);
    }

    @Override // com.dtyunxi.cube.plugin.mq.ICommonsMqService
    @Async
    public MessageResponse publishDelayMessageAsyncWithThrownException(String str, String str2, Object obj, Long l) {
        return publishDelayMessageWithThrownException(str, str2, obj, l);
    }

    @Override // com.dtyunxi.cube.plugin.mq.ICommonsMqService
    @Deprecated
    public void receiveSingleMessage(Map<String, IMessageProcessor> map) {
    }

    @Override // com.dtyunxi.cube.plugin.mq.ICommonsMqService
    @Deprecated
    public void receiveSingleMessage(String str, String str2, Map<String, IMessageProcessor> map) {
    }

    private void receiveSingleMessage(String str, Map<String, Map<String, IMessageProcessor>> map) {
        Map<String, Map<String, IMessageProcessor>> parseTopicMap = parseTopicMap(map);
        String parseConsumerWithProfile = parseConsumerWithProfile(str);
        for (String str2 : parseTopicMap.keySet()) {
            parseTopicMap.put(str2, parseTagMap(parseTopicMap.get(str2)));
        }
        logger.info("监听 consumer:{} processor:{}", parseConsumerWithProfile, JSON.toJSONString(parseTopicMap));
        this.commonsConsumer.receiveMessage(parseConsumerWithProfile, parseTopicMap);
    }

    @Override // com.dtyunxi.cube.plugin.mq.ICommonsMqService
    @Deprecated
    public void subscribeMessage(Map<String, IMessageProcessor> map) {
    }

    @Override // com.dtyunxi.cube.plugin.mq.ICommonsMqService
    @Deprecated
    public void subscribeMessage(String str, String str2, Map<String, IMessageProcessor> map) {
    }

    private void subscribeMessage(String str, Map<String, Map<String, IMessageProcessor>> map) {
        Map<String, Map<String, IMessageProcessor>> parseTopicMap = parseTopicMap(map);
        String parseConsumerWithProfile = parseConsumerWithProfile(str);
        for (String str2 : parseTopicMap.keySet()) {
            parseTopicMap.put(str2, parseTagMap(parseTopicMap.get(str2)));
        }
        logger.info("监听 consumer:{} processor:{}", parseConsumerWithProfile, JSON.toJSONString(parseTopicMap));
        this.commonsConsumer.subscribe(parseConsumerWithProfile, parseTopicMap);
    }

    private String parseTag(String str) {
        if (MessageType.RABBIT.getName().equalsIgnoreCase(this.messageRegistryVo.getType())) {
            str = parseTagWithProfile(str);
        }
        return str;
    }

    private Map<String, IMessageProcessor> parseTagMap(Map<String, IMessageProcessor> map) {
        if (MessageType.RABBIT.getName().equalsIgnoreCase(this.messageRegistryVo.getType())) {
            HashMap hashMap = new HashMap();
            for (String str : map.keySet()) {
                hashMap.put(parseTag(str), map.get(str));
            }
            map = hashMap;
        }
        return map;
    }

    private Map<String, Map<String, IMessageProcessor>> parseTopicMap(Map<String, Map<String, IMessageProcessor>> map) {
        HashMap hashMap = new HashMap();
        for (String str : map.keySet()) {
            hashMap.put(parseTopicWithProfile(str), map.get(str));
        }
        return hashMap;
    }

    @Override // com.dtyunxi.cube.plugin.mq.ICommonsMqService
    @Deprecated
    public void subscribeMessage() {
    }

    @Override // com.dtyunxi.cube.plugin.mq.ICommonsMqService
    @Deprecated
    public void subscribeMessage(String str, String str2) {
    }

    @Override // com.dtyunxi.cube.plugin.mq.ICommonsMqService
    public void start(String str) {
        if (MessageType.RABBIT.getName().equalsIgnoreCase(this.messageRegistryVo.getType())) {
            Map<String, Map<String, IMessageProcessor>> singleProcessors = CommonsMqHelper.getSingleProcessors(str);
            if (null != singleProcessors) {
                receiveSingleMessage(str, singleProcessors);
            }
            Map<String, Map<String, IMessageProcessor>> publishProcessors = CommonsMqHelper.getPublishProcessors(str);
            if (null != publishProcessors) {
                subscribeMessage(str, publishProcessors);
                return;
            }
            return;
        }
        if (!MessageType.ALIYUN.getName().equalsIgnoreCase(this.messageRegistryVo.getType()) && !MessageType.ROCKET.getName().equalsIgnoreCase(this.messageRegistryVo.getType())) {
            logger.info("消息类型暂不支持, 请联系中台同学支持");
            return;
        }
        Map<String, Map<String, IMessageProcessor>> allProcessors = CommonsMqHelper.getAllProcessors(str);
        if (null != allProcessors) {
            subscribeMessage(str, allProcessors);
        } else {
            logger.info("不需要监听消息");
        }
    }

    @Override // com.dtyunxi.cube.plugin.mq.ICommonsMqService
    public String parseTopicWithProfile(String str) {
        return parseStringWithProfile(str);
    }

    @Override // com.dtyunxi.cube.plugin.mq.ICommonsMqService
    public String parseTagWithProfile(String str) {
        return parseStringWithProfile(str);
    }

    @Override // com.dtyunxi.cube.plugin.mq.ICommonsMqService
    public String parseConsumerWithProfile(String str) {
        return parseStringWithProfile(str);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public String parseStringWithProfile(String str) {
        return (str + "_" + this.profile).replace("-", "_").toUpperCase();
    }
}
