package com.dtyunxi.huieryun.mq.provider.rabbit.impl;

import com.dtyunxi.app.ServiceContext;
import com.dtyunxi.huieryun.mq.api.IMessageProcessor;
import com.dtyunxi.huieryun.mq.util.SerializeCode;
import com.dtyunxi.huieryun.mq.vo.MessageRegistryVo;
import com.dtyunxi.huieryun.mq.vo.MessageResponse;
import com.dtyunxi.lang.BusinessRuntimeException;
import com.dtyunxi.util.JacksonUtil;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import java.io.IOException;
import java.lang.reflect.Type;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.StringUtils;

/* loaded from: input_file:com/dtyunxi/huieryun/mq/provider/rabbit/impl/SubscriptionConsumer.class */
public class SubscriptionConsumer extends DefaultConsumer {
    private static final Logger logger = LoggerFactory.getLogger(SubscriptionConsumer.class);
    private final IMessageProcessor processor;
    private int retryCount;
    private final Channel channel;
    private final MessageRegistryVo messageRegistryVo;
    protected final SerializeCode serializeCode;

    public SubscriptionConsumer(Channel channel, IMessageProcessor iMessageProcessor, MessageRegistryVo messageRegistryVo) {
        super(channel);
        this.retryCount = 0;
        this.processor = iMessageProcessor;
        this.channel = channel;
        this.messageRegistryVo = messageRegistryVo;
        this.serializeCode = this.messageRegistryVo.getSerializeCode();
    }

    public void handleDelivery(String str, Envelope envelope, AMQP.BasicProperties basicProperties, byte[] bArr) throws IOException {
        try {
            Object unpackMessage = unpackMessage(basicProperties, bArr);
            String messageId = basicProperties.getMessageId();
            try {
                if (this.processor.isDuplicationMessage(messageId, unpackMessage)) {
                    logger.info("重复消息：msgId={}", messageId);
                    this.channel.basicAck(envelope.getDeliveryTag(), false);
                    return;
                }
                try {
                    try {
                        try {
                            addServiceContext(basicProperties);
                            MessageResponse process = this.processor.process(unpackMessage);
                            if (null == process || "error".equals(process.getResultMsg())) {
                                logger.info("消息 {} 处理失败", Long.valueOf(envelope.getDeliveryTag()));
                                dealMessageAck(envelope);
                            } else {
                                logger.info("消息 {} 处理成功", Long.valueOf(envelope.getDeliveryTag()));
                                this.channel.basicAck(envelope.getDeliveryTag(), false);
                                this.retryCount = 0;
                            }
                            if (this.messageRegistryVo.isPassSvcContext()) {
                                ServiceContext.removeContext();
                            }
                        } catch (Error e) {
                            if (logger.isDebugEnabled()) {
                                logger.error("业务无法正常处理，消息丢弃,错误消息：", e);
                            } else {
                                logger.error("业务无法正常处理，消息丢弃,错误消息：{}", e.getMessage());
                            }
                            this.channel.basicAck(envelope.getDeliveryTag(), false);
                            this.channel.basicAck(envelope.getDeliveryTag(), false);
                            this.retryCount = 0;
                            if (this.messageRegistryVo.isPassSvcContext()) {
                                ServiceContext.removeContext();
                            }
                        }
                    } catch (BusinessRuntimeException e2) {
                        if (logger.isDebugEnabled()) {
                            logger.error("业务无法正常处理，消息丢弃,异常消息：", e2);
                        } else {
                            logger.error("业务无法正常处理，消息丢弃,异常消息：{}", e2.getMessage());
                        }
                        this.channel.basicAck(envelope.getDeliveryTag(), false);
                        this.retryCount = 0;
                        if (this.messageRegistryVo.isPassSvcContext()) {
                            ServiceContext.removeContext();
                        }
                    }
                } catch (Throwable th) {
                    if (logger.isDebugEnabled()) {
                        logger.error("其他未知异常，消息重新回到队列,异常消息：", th);
                    } else {
                        logger.error("其他未知异常，消息重新回到队列,异常消息：{}", th.getMessage());
                    }
                    dealMessageAck(envelope);
                    if (this.messageRegistryVo.isPassSvcContext()) {
                        ServiceContext.removeContext();
                    }
                }
            } catch (Throwable th2) {
                if (this.messageRegistryVo.isPassSvcContext()) {
                    ServiceContext.removeContext();
                }
                throw th2;
            }
        } catch (BusinessRuntimeException e3) {
            logger.error("deSerialize to {} failure! msg={}", this.processor.getMsgType(), new String(bArr));
            this.channel.basicAck(envelope.getDeliveryTag(), false);
        }
    }

    private Object unpackMessage(AMQP.BasicProperties basicProperties, byte[] bArr) {
        Object deSerialize;
        if (this.serializeCode.isJdkSerializer()) {
            deSerialize = this.serializeCode.deSerialize(bArr, (Type) null);
        } else {
            String str = null;
            Map headers = basicProperties.getHeaders();
            if (headers != null && headers.containsKey("mqMessageBodyClassInfo")) {
                str = headers.get("mqMessageBodyClassInfo").toString();
            }
            if (StringUtils.isEmpty(str)) {
                deSerialize = this.serializeCode.deSerialize(bArr, this.processor.getMsgType());
            } else {
                deSerialize = this.serializeCode.deSerialize(bArr, (Class) JacksonUtil.readValue(str, Class.class));
            }
        }
        return deSerialize;
    }

    private void addServiceContext(AMQP.BasicProperties basicProperties) {
        if (this.messageRegistryVo.isPassSvcContext()) {
            ServiceContext context = ServiceContext.getContext();
            Map headers = basicProperties.getHeaders();
            if (headers != null) {
                headers.forEach((str, obj) -> {
                    if (!str.startsWith("_context-") || obj == null) {
                        return;
                    }
                    context.set(str.substring("_context-".length()), obj.toString());
                });
            }
        }
    }

    private void dealMessageAck(Envelope envelope) throws IOException {
        this.retryCount++;
        if (this.retryCount < 10) {
            logger.info("消费消息 {} 失败.第{}次 重新放入队列", Long.valueOf(envelope.getDeliveryTag()), Integer.valueOf(this.retryCount));
            this.channel.basicNack(envelope.getDeliveryTag(), false, true);
        } else {
            logger.info("消费消息 {} 处理失败已经超过10次. 拒绝该消息", Long.valueOf(envelope.getDeliveryTag()), Integer.valueOf(this.retryCount));
            this.channel.basicReject(envelope.getDeliveryTag(), false);
        }
    }
}
