package com.dtyunxi.tcbj.app.open.biz.config;

import com.alibaba.fastjson.JSONObject;
import com.aliyun.openservices.ons.api.Action;
import com.aliyun.openservices.ons.api.Consumer;
import com.aliyun.openservices.ons.api.MessageListener;
import com.aliyun.openservices.ons.api.ONSFactory;
import com.dtyunxi.app.ServiceContext;
import com.dtyunxi.cube.plugin.mq.impl.CommonsMqHelper;
import com.dtyunxi.huieryun.mq.api.IMessageProcessor;
import com.dtyunxi.huieryun.mq.provider.aliyun.assembler.MessageAssembler;
import com.dtyunxi.huieryun.mq.provider.aliyun.util.AliyunConsumerHelper;
import com.dtyunxi.huieryun.mq.vo.MessageRegistryVo;
import com.dtyunxi.huieryun.mq.vo.MessageResponse;
import com.dtyunxi.lang.BusinessRuntimeException;
import com.dtyunxi.tcbj.app.open.biz.mq.dispatch.processor.ItemUpdatePriceProcessor;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.CompletableFuture;
import javax.annotation.Resource;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.CommandLineRunner;
import org.springframework.core.env.Environment;
import org.springframework.stereotype.Component;

@Component
/* loaded from: input_file:com/dtyunxi/tcbj/app/open/biz/config/ConsumerExtConfiguration.class */
public class ConsumerExtConfiguration implements CommandLineRunner {
    private static final Logger logger = LoggerFactory.getLogger(ConsumerExtConfiguration.class);

    @Resource
    private MessageRegistryVo messageRegistryVo;

    @Resource
    private ItemUpdatePriceProcessor itemUpdatePriceProcessor;

    @Resource
    private Environment environment;

    @Value("${cube.global.profile:dev}")
    private String globalProfile;

    public void run(String... strArr) {
        initPriceConsumer();
    }

    private void initPriceConsumer() {
        CompletableFuture.runAsync(() -> {
            MessageRegistryVo messageRegistryVo = (MessageRegistryVo) JSONObject.parseObject(JSONObject.toJSONString(this.messageRegistryVo), MessageRegistryVo.class);
            messageRegistryVo.setConsumerId("PRICE_CONSUMER");
            Properties properties = AliyunConsumerHelper.getProperties(this.messageRegistryVo);
            String parseStringWithProfile = parseStringWithProfile(CommonsMqHelper.parseConsumer(this.messageRegistryVo, messageRegistryVo.getConsumerId(), ""));
            properties.setProperty("GROUP_ID", parseStringWithProfile);
            properties.setProperty("ConsumerId", parseStringWithProfile);
            properties.put("MessageModel", MessageModel.CLUSTERING);
            properties.setProperty("ConsumeThreadNums", String.valueOf(this.messageRegistryVo.getConsumeThreadNums()));
            Consumer createConsumer = ONSFactory.createConsumer(properties);
            String parseStringWithProfile2 = parseStringWithProfile(this.environment.resolvePlaceholders(this.environment.resolvePlaceholders("ITEM_PRICE_TOPIC")));
            HashMap hashMap = new HashMap(1);
            hashMap.put("to-yxy-update-price-inform", this.itemUpdatePriceProcessor);
            createConsumer.subscribe(parseStringWithProfile2, "to-yxy-update-price-inform", consumerMessageListener(hashMap));
            if (!createConsumer.isStarted()) {
                createConsumer.start();
            }
            logger.info("项目扩展消费者启动成功:{}", messageRegistryVo.getConsumerId());
        });
    }

    protected String parseStringWithProfile(String str) {
        return (str + "_" + this.globalProfile).replace("-", "_").toUpperCase();
    }

    private MessageListener consumerMessageListener(Map<String, IMessageProcessor> map) {
        return (message, consumeContext) -> {
            if (logger.isInfoEnabled()) {
                logger.info("扩展消费者Receive message success! message:{}" + message.toString());
            }
            try {
                try {
                    try {
                        try {
                            IMessageProcessor iMessageProcessor = (IMessageProcessor) map.get(message.getTag());
                            if (iMessageProcessor == null) {
                                logger.warn("无消息订阅者，tag={}", message.getTag());
                                Action action = Action.CommitMessage;
                                if (this.messageRegistryVo.isPassSvcContext()) {
                                    ServiceContext.removeContext();
                                }
                                return action;
                            }
                            MessageResponse process = iMessageProcessor.process(MessageAssembler.messageToVo(message, this.messageRegistryVo.getSerializeCode()));
                            if (process == null) {
                                if (logger.isDebugEnabled()) {
                                    logger.debug("没有正确返回处理结果(response is null)，消息重新回到队列！");
                                }
                                Action action2 = Action.ReconsumeLater;
                                if (this.messageRegistryVo.isPassSvcContext()) {
                                    ServiceContext.removeContext();
                                }
                                return action2;
                            }
                            if ("error".equals(process.getResultMsg())) {
                                if (logger.isDebugEnabled()) {
                                    logger.debug("返回错误状态(response message:{})，消息重新回到队列！", process.getResultMsg());
                                }
                                Action action3 = Action.ReconsumeLater;
                                if (this.messageRegistryVo.isPassSvcContext()) {
                                    ServiceContext.removeContext();
                                }
                                return action3;
                            }
                            if (logger.isDebugEnabled() && logger.isDebugEnabled()) {
                                logger.debug("处理成功，正常消费！");
                            }
                            Action action4 = Action.CommitMessage;
                            if (this.messageRegistryVo.isPassSvcContext()) {
                                ServiceContext.removeContext();
                            }
                            return action4;
                        } catch (BusinessRuntimeException e) {
                            if (logger.isDebugEnabled()) {
                                logger.error("业务无法正常处理，消息丢弃,异常消息：", e);
                            } else {
                                logger.error("业务无法正常处理，消息丢弃,异常消息：{}", e.getMessage());
                            }
                            Action action5 = Action.CommitMessage;
                            if (this.messageRegistryVo.isPassSvcContext()) {
                                ServiceContext.removeContext();
                            }
                            return action5;
                        }
                    } catch (Throwable th) {
                        if (logger.isDebugEnabled()) {
                            logger.error("其他未知异常，消息重新回到队列,异常消息：", th);
                        } else {
                            logger.error("其他未知异常，消息重新回到队列,异常消息：{}", th.getMessage());
                        }
                        Action action6 = Action.ReconsumeLater;
                        if (this.messageRegistryVo.isPassSvcContext()) {
                            ServiceContext.removeContext();
                        }
                        return action6;
                    }
                } catch (Error e2) {
                    if (logger.isDebugEnabled()) {
                        logger.error("业务无法正常处理，消息丢弃,错误消息：", e2);
                    } else {
                        logger.error("业务无法正常处理，消息丢弃,错误消息：{}", e2.getMessage());
                    }
                    Action action7 = Action.CommitMessage;
                    if (this.messageRegistryVo.isPassSvcContext()) {
                        ServiceContext.removeContext();
                    }
                    return action7;
                }
            } catch (Throwable th2) {
                if (this.messageRegistryVo.isPassSvcContext()) {
                    ServiceContext.removeContext();
                }
                throw th2;
            }
        };
    }
}
