package com.dtyunxi.huieryun.mq.provider.aliyun;

import com.aliyun.openservices.ons.api.Action;
import com.aliyun.openservices.ons.api.ConsumeContext;
import com.aliyun.openservices.ons.api.Consumer;
import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.MessageListener;
import com.aliyun.openservices.ons.api.ONSFactory;
import com.dtyunxi.huieryun.mq.api.AbstractConsumer;
import com.dtyunxi.huieryun.mq.api.IMessageProcessor;
import com.dtyunxi.huieryun.mq.provider.aliyun.util.AliyunConsumerHelper;
import com.dtyunxi.huieryun.mq.util.SerializeCode;
import com.dtyunxi.huieryun.mq.vo.MessageRegistryVo;
import com.dtyunxi.huieryun.mq.vo.MessageResponse;
import com.dtyunxi.lang.BusinessRuntimeException;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/dtyunxi/huieryun/mq/provider/aliyun/AliyunConsumer.class */
public class AliyunConsumer extends AbstractConsumer {
    private static Logger logger = LoggerFactory.getLogger(AliyunConsumer.class);
    private static final AliyunConsumer single = new AliyunConsumer();
    private static final ConcurrentHashMap<String, Consumer> consumerMap = new ConcurrentHashMap<>();

    private AliyunConsumer() {
    }

    public static AliyunConsumer getInstance() {
        return single;
    }

    public void createConsumer(MessageRegistryVo messageRegistryVo) {
        ((AbstractConsumer) this).messageRegistryVo = messageRegistryVo;
    }

    public <T> MessageResponse receiveMessage(String str, String str2, IMessageProcessor<T> iMessageProcessor) {
        logger.info("receiveMessage,topicName:{},routingKey:{}", str, str2);
        return subscribe(str, str2, iMessageProcessor);
    }

    public <T> MessageResponse receiveMessage(String str, String str2, String str3, IMessageProcessor<T> iMessageProcessor) {
        return subscribe(str, str2, str3, iMessageProcessor);
    }

    public MessageResponse receiveMessage(String str, String str2, Map<String, IMessageProcessor> map) {
        return subscribe(str, str2, map);
    }

    public <T> MessageResponse subscribe(String str, String str2, String str3, IMessageProcessor<T> iMessageProcessor) {
        logger.info("subscribeMessage,topicName:{},consumer:{},routingKey:{}", new Object[]{str, str2, str3});
        try {
            toSubscribe(this.messageRegistryVo, str, str2, str3, iMessageProcessor, "CLUSTERING");
            return MessageResponse.SUCCESS;
        } catch (Exception e) {
            logger.error("接收消息出错，topicName：{} consumer:{}", new Object[]{str, str3, e});
            throw new BusinessRuntimeException(e.getMessage());
        }
    }

    public <T> MessageResponse subscribe(String str, String str2, IMessageProcessor<T> iMessageProcessor) {
        return toSubscribe(this.messageRegistryVo, str, str2, iMessageProcessor, "CLUSTERING");
    }

    public <T> MessageResponse subscribe(String str, String[] strArr, IMessageProcessor<T> iMessageProcessor) {
        return toSubscribe(this.messageRegistryVo, str, getSubExpression(strArr), iMessageProcessor, "CLUSTERING");
    }

    public MessageResponse subscribe(String str, Map<String, Map<String, IMessageProcessor>> map) {
        logger.info("consumer={}", str);
        toSubscribe(this.messageRegistryVo, str, map, "CLUSTERING");
        return MessageResponse.SUCCESS;
    }

    public MessageResponse subscribe(String str, String str2, Map<String, IMessageProcessor> map) {
        try {
            toSingleTopicSubscribe(this.messageRegistryVo, str, str2, map, "CLUSTERING");
            return MessageResponse.SUCCESS;
        } catch (Exception e) {
            logger.error("接收消息出错，topicName：{} ", str, e);
            throw new BusinessRuntimeException(e.getMessage());
        }
    }

    private void toSubscribe(MessageRegistryVo messageRegistryVo, String str, Map<String, Map<String, IMessageProcessor>> map, String str2) {
        logger.info("consumer={}", str);
        try {
            if (!consumerMap.containsKey(str)) {
                createConsumer(messageRegistryVo, str, str2);
            }
            Consumer consumer = consumerMap.get(str);
            for (String str3 : map.keySet()) {
                consumer.subscribe(str3, getSubExpression(map.get(str3).keySet().toArray()), consumerMessageListener(map.get(str3)));
            }
            if (!consumer.isStarted()) {
                consumer.start();
            }
            logger.info("Connected to {}", messageRegistryVo.getHost());
        } catch (Exception e) {
            logger.info("Failed to connect to {}", messageRegistryVo.getHost(), e);
        }
    }

    private void createConsumer(MessageRegistryVo messageRegistryVo, String str, String str2) {
        logger.info("consumer={}", str);
        Properties properties = AliyunConsumerHelper.getProperties(messageRegistryVo);
        properties.setProperty("GROUP_ID", str);
        properties.setProperty("ConsumerId", str);
        properties.put("MessageModel", str2);
        properties.setProperty("ConsumeThreadNums", String.valueOf(messageRegistryVo.getConsumeThreadNums()));
        consumerMap.put(str, ONSFactory.createConsumer(properties));
    }

    private MessageListener consumerMessageListener(final Map<String, IMessageProcessor> map) {
        return new MessageListener() { // from class: com.dtyunxi.huieryun.mq.provider.aliyun.AliyunConsumer.1
            public Action consume(Message message, ConsumeContext consumeContext) {
                if (AliyunConsumer.logger.isInfoEnabled()) {
                    AliyunConsumer.logger.info("Receive message success! message:{}" + message.toString());
                }
                AliyunConsumerHelper.setMsgfo(message);
                AliyunConsumerHelper.setMsgId(message.getMsgID());
                try {
                    try {
                        try {
                            IMessageProcessor iMessageProcessor = (IMessageProcessor) map.get(message.getTag());
                            if (iMessageProcessor == null) {
                                AliyunConsumer.logger.warn("无消息订阅者，tag={}", message.getTag());
                                Action action = Action.CommitMessage;
                                AliyunConsumerHelper.clearThreadInfo();
                                return action;
                            }
                            MessageResponse process = iMessageProcessor.process(SerializeCode.deSerialize(message.getBody()));
                            if (process == null) {
                                Action action2 = Action.ReconsumeLater;
                                AliyunConsumerHelper.clearThreadInfo();
                                return action2;
                            }
                            if ("error".equals(process.getResultMsg())) {
                                Action action3 = Action.ReconsumeLater;
                                AliyunConsumerHelper.clearThreadInfo();
                                return action3;
                            }
                            Action action4 = Action.CommitMessage;
                            AliyunConsumerHelper.clearThreadInfo();
                            return action4;
                        } catch (Exception e) {
                            Action action5 = Action.ReconsumeLater;
                            AliyunConsumerHelper.clearThreadInfo();
                            return action5;
                        }
                    } catch (BusinessRuntimeException e2) {
                        Action action6 = Action.CommitMessage;
                        AliyunConsumerHelper.clearThreadInfo();
                        return action6;
                    }
                } catch (Throwable th) {
                    AliyunConsumerHelper.clearThreadInfo();
                    throw th;
                }
            }
        };
    }

    private void toSingleTopicSubscribe(MessageRegistryVo messageRegistryVo, String str, String str2, Map<String, IMessageProcessor> map, String str3) {
        try {
            if (!consumerMap.containsKey(str2)) {
                createConsumer(messageRegistryVo, str2, str3);
            }
            Consumer consumer = consumerMap.get(str2);
            consumer.subscribe(str, getSubExpression(map.keySet().toArray()), consumerMessageListener(map));
            if (!consumer.isStarted()) {
                consumer.start();
            }
            logger.info("Connected to {}", messageRegistryVo.getHost());
        } catch (Exception e) {
            logger.info("Failed to connect to {}", messageRegistryVo.getHost(), e);
        }
    }

    public MessageResponse unsubscribe(String str, String str2) {
        try {
            consumerMap.get(this.messageRegistryVo.getConsumerId()).unsubscribe(str);
            return MessageResponse.SUCCESS;
        } catch (Exception e) {
            logger.error("取消订阅失败，topicName：{}", str, e);
            throw new BusinessRuntimeException(e.getMessage());
        }
    }

    private void toSubscribe(MessageRegistryVo messageRegistryVo, String str, String str2, String str3, IMessageProcessor iMessageProcessor, String str4) {
        HashMap hashMap = new HashMap();
        hashMap.put(str3, iMessageProcessor);
        toSingleTopicSubscribe(messageRegistryVo, str, str2, hashMap, str4);
    }

    private <T> MessageResponse toSubscribe(MessageRegistryVo messageRegistryVo, String str, String str2, IMessageProcessor<T> iMessageProcessor, String str3) {
        try {
            toSubscribe(messageRegistryVo, str, messageRegistryVo.getConsumerId(), str2, iMessageProcessor, str3);
            return MessageResponse.SUCCESS;
        } catch (Exception e) {
            logger.error("接收消息出错，topicName：{} consumer:{}", new Object[]{str, str2, e});
            throw new BusinessRuntimeException(e.getMessage());
        }
    }

    public <T> MessageResponse subscribeBroadcastMessage(String str, String str2, IMessageProcessor<T> iMessageProcessor) {
        return toSubscribe(this.messageRegistryVo, str, str2, iMessageProcessor, "BROADCASTING");
    }

    public <T> MessageResponse subscribeBroadcastMessage(String str, String[] strArr, IMessageProcessor<T> iMessageProcessor) {
        return toSubscribe(this.messageRegistryVo, str, getSubExpression(strArr), iMessageProcessor, "BROADCASTING");
    }
}
