package com.dtyunxi.huieryun.mq.api;

import com.dtyunxi.app.ServiceContext;
import com.dtyunxi.huieryun.cache.api.ICacheService;
import com.dtyunxi.huieryun.mq.constant.MQConstants;
import com.dtyunxi.huieryun.mq.vo.MQMessageVo;
import com.dtyunxi.huieryun.mq.vo.MessageResponse;
import com.dtyunxi.lang.BusinessRuntimeException;
import com.dtyunxi.util.JacksonUtil;
import com.dtyunxi.util.SpringBeanUtil;
import java.lang.reflect.ParameterizedType;
import java.lang.reflect.Type;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.StringUtils;

/* loaded from: input_file:com/dtyunxi/huieryun/mq/api/IMessageProcessor.class */
public interface IMessageProcessor<T> {
    public static final Logger LOG = LoggerFactory.getLogger(IMessageProcessor.class);
    public static final Map<String, Type> CLAZZ_GENERIC_TYPES = new ConcurrentHashMap();

    MessageResponse process(T t);

    default boolean isDuplicationMessage(String str, T t) {
        ICacheService iCacheService = (ICacheService) SpringBeanUtil.getBean(ICacheService.class);
        String str2 = "_dtmqconsume/" + str;
        String andSet = iCacheService.getAndSet(str2, "1");
        iCacheService.expire(str2, 28800);
        return andSet != null && "1".equals(andSet);
    }

    default void cleanMessageConsumeFlag(String str) {
        ((ICacheService) SpringBeanUtil.getBean(ICacheService.class)).delCache(("_dtmqconsumed/" + str) + str);
    }

    default MessageResponse process(MQMessageVo mQMessageVo) {
        try {
            try {
                try {
                    try {
                        addServiceContext(mQMessageVo.getHeaderList());
                        T unpackMessage = unpackMessage(mQMessageVo);
                        if (isDuplicationMessage(mQMessageVo.getMessageKey(), unpackMessage)) {
                            LOG.info("重复消息，msgId:{},msgKey:{}", mQMessageVo.getMessageId(), mQMessageVo.getMessageKey());
                            MessageResponse messageResponse = MessageResponse.SUCCESS;
                            if (mQMessageVo.isPassSvcContext()) {
                                ServiceContext.removeContext();
                            }
                            return messageResponse;
                        }
                        MessageResponse process = process((IMessageProcessor<T>) unpackMessage);
                        if (process == null || MQConstants.ERROR.equals(process.getResultMsg())) {
                            if (LOG.isDebugEnabled()) {
                                LOG.debug("没有正确返回处理结果(response is {})，消息重新回到队列！", process == null ? null : process.getResultMsg());
                            }
                            MessageResponse messageResponse2 = MessageResponse.ERROR;
                            if (mQMessageVo.isPassSvcContext()) {
                                ServiceContext.removeContext();
                            }
                            return messageResponse2;
                        }
                        if (LOG.isDebugEnabled() && LOG.isDebugEnabled()) {
                            LOG.debug("处理成功，正常消费！");
                        }
                        if (mQMessageVo.isPassSvcContext()) {
                            ServiceContext.removeContext();
                        }
                        return process;
                    } catch (Throwable th) {
                        if (LOG.isDebugEnabled()) {
                            LOG.error("其他未知异常，消息重新回到队列,异常消息：", th);
                        } else {
                            LOG.error("其他未知异常，消息重新回到队列,异常消息：{}", th.getMessage());
                        }
                        cleanMessageConsumeFlag(mQMessageVo.getMessageId());
                        MessageResponse messageResponse3 = MessageResponse.ERROR;
                        if (mQMessageVo.isPassSvcContext()) {
                            ServiceContext.removeContext();
                        }
                        return messageResponse3;
                    }
                } catch (BusinessRuntimeException e) {
                    if (LOG.isDebugEnabled()) {
                        LOG.error("业务无法正常处理，消息丢弃,异常消息：", e);
                    } else {
                        LOG.error("业务无法正常处理，消息丢弃,异常消息：{}", e.getMessage());
                    }
                    MessageResponse messageResponse4 = MessageResponse.SUCCESS;
                    if (mQMessageVo.isPassSvcContext()) {
                        ServiceContext.removeContext();
                    }
                    return messageResponse4;
                }
            } catch (Error e2) {
                if (LOG.isDebugEnabled()) {
                    LOG.error("业务无法正常处理，消息丢弃,错误消息：", e2);
                } else {
                    LOG.error("业务无法正常处理，消息丢弃,错误消息：{}", e2.getMessage());
                }
                MessageResponse messageResponse5 = MessageResponse.SUCCESS;
                if (mQMessageVo.isPassSvcContext()) {
                    ServiceContext.removeContext();
                }
                return messageResponse5;
            }
        } catch (Throwable th2) {
            if (mQMessageVo.isPassSvcContext()) {
                ServiceContext.removeContext();
            }
            throw th2;
        }
    }

    default T unpackMessage(MQMessageVo mQMessageVo) {
        Object deSerialize;
        if (mQMessageVo.getSerializeCode().isJdkSerializer()) {
            deSerialize = mQMessageVo.getSerializeCode().deSerialize(mQMessageVo.getMessageBody(), null);
        } else {
            String userProperty = mQMessageVo.getUserProperty(MQConstants.MESSAGE_BODY_CLASS_INFO);
            if (StringUtils.isEmpty(userProperty)) {
                deSerialize = mQMessageVo.getSerializeCode().deSerialize(mQMessageVo.getMessageBody(), getMsgType());
            } else {
                deSerialize = mQMessageVo.getSerializeCode().deSerialize(mQMessageVo.getMessageBody(), (Class) JacksonUtil.readValue(userProperty, Class.class));
            }
        }
        return (T) deSerialize;
    }

    default void addServiceContext(Map<String, String> map) {
        if (map == null || map.size() <= 0) {
            return;
        }
        ServiceContext context = ServiceContext.getContext();
        map.forEach((str, str2) -> {
            if (!str.startsWith(MQConstants.SERVICE_CONTEXT_PREFIX) || str2 == null) {
                return;
            }
            context.set(str.substring(MQConstants.SERVICE_CONTEXT_PREFIX.length()), str2);
        });
    }

    default Type getMsgType() {
        Type[] typeArr;
        Class<?> cls = getClass();
        String name = cls.getName();
        if (CLAZZ_GENERIC_TYPES.containsKey(name)) {
            return CLAZZ_GENERIC_TYPES.get(name);
        }
        Type[] genericInterfaces = cls.getGenericInterfaces();
        while (true) {
            typeArr = genericInterfaces;
            if (typeArr.length != 0 || cls.equals(Object.class)) {
                break;
            }
            cls = cls.getSuperclass();
            genericInterfaces = cls.getGenericInterfaces();
        }
        if (typeArr.length == 0) {
            return null;
        }
        for (Type type : typeArr) {
            if (type instanceof ParameterizedType) {
                ParameterizedType parameterizedType = (ParameterizedType) type;
                if (IMessageProcessor.class.isAssignableFrom((Class) parameterizedType.getRawType())) {
                    Type type2 = parameterizedType.getActualTypeArguments()[0];
                    CLAZZ_GENERIC_TYPES.put(name, type2);
                    return type2;
                }
            }
        }
        return null;
    }
}
