package com.yunxi.dg.base.center.trade.service.entity.impl;

import cn.hutool.core.collection.CollectionUtil;
import cn.hutool.core.util.ObjectUtil;
import com.alibaba.fastjson.JSON;
import com.baomidou.mybatisplus.extension.ExtQueryChainWrapper;
import com.dtyunxi.cube.commons.exceptions.BizException;
import com.dtyunxi.cube.plugin.mq.ICommonsMqService;
import com.dtyunxi.cube.utils.bean.CubeBeanUtils;
import com.dtyunxi.huieryun.lock.api.ILockService;
import com.dtyunxi.huieryun.lock.api.Mutex;
import com.dtyunxi.icommerce.utils.RestResponseHelper;
import com.github.pagehelper.PageInfo;
import com.yunxi.dg.base.center.trade.convert.entity.MqFailRetryRecordConverter;
import com.yunxi.dg.base.center.trade.domain.entity.IMqFailRetryRecordDomain;
import com.yunxi.dg.base.center.trade.dto.entity.MqFailRetryRecordDto;
import com.yunxi.dg.base.center.trade.dto.entity.MqFailRetryTypeDto;
import com.yunxi.dg.base.center.trade.eo.MqFailRetryRecordEo;
import com.yunxi.dg.base.center.trade.service.entity.IMqFailRetryRecordService;
import com.yunxi.dg.base.center.trade.service.entity.IMqFailRetryTypeService;
import com.yunxi.dg.base.center.trade.utils.AssertUtils;
import com.yunxi.dg.base.framework.core.convert.IConverter;
import com.yunxi.dg.base.framework.core.service.impl.BaseServiceImpl;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import javax.annotation.Resource;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

@Service
/* loaded from: input_file:com/yunxi/dg/base/center/trade/service/entity/impl/MqFailRetryRecordServiceImpl.class */
public class MqFailRetryRecordServiceImpl extends BaseServiceImpl<MqFailRetryRecordDto, MqFailRetryRecordEo, IMqFailRetryRecordDomain> implements IMqFailRetryRecordService {
    private static final Logger logger = LoggerFactory.getLogger(MqFailRetryRecordServiceImpl.class);

    @Resource
    private IMqFailRetryTypeService mqFailRetryTypeService;

    @Resource
    private ILockService lockService;

    @Autowired
    private ICommonsMqService mqService;

    public MqFailRetryRecordServiceImpl(IMqFailRetryRecordDomain iMqFailRetryRecordDomain) {
        super(iMqFailRetryRecordDomain);
    }

    public IConverter<MqFailRetryRecordDto, MqFailRetryRecordEo> converter() {
        return MqFailRetryRecordConverter.INSTANCE;
    }

    @Override // com.yunxi.dg.base.center.trade.service.entity.IMqFailRetryRecordService
    public void retryByTypeConfig() {
        logger.info("根据类型配置重试");
        PageInfo pageInfo = (PageInfo) RestResponseHelper.extractData(this.mqFailRetryTypeService.page(new MqFailRetryTypeDto(), 1, 1000));
        if (ObjectUtil.isEmpty(pageInfo) || CollectionUtil.isEmpty(pageInfo.getList())) {
            logger.info("无类型配置信息");
            return;
        }
        logger.info("获取类型配置：{}", JSON.toJSONString(pageInfo.getList()));
        List<MqFailRetryTypeDto> list = (List) pageInfo.getList().stream().filter(mqFailRetryTypeDto -> {
            return mqFailRetryTypeDto.getRetryNumLimit().intValue() > 0;
        }).collect(Collectors.toList());
        if (CollectionUtil.isEmpty(list)) {
            logger.info("无配置需处理");
            return;
        }
        for (MqFailRetryTypeDto mqFailRetryTypeDto2 : list) {
            logger.info("处理类型配置：{}", JSON.toJSONString(mqFailRetryTypeDto2));
            List list2 = ((ExtQueryChainWrapper) ((ExtQueryChainWrapper) ((ExtQueryChainWrapper) ((ExtQueryChainWrapper) this.domain.filter().eq(ObjectUtil.isNotEmpty(mqFailRetryTypeDto2.getBizModel()), "biz_model", mqFailRetryTypeDto2.getBizModel()).eq(StringUtils.isNotBlank(mqFailRetryTypeDto2.getMqTopic()), "mq_topic", mqFailRetryTypeDto2.getMqTopic()).eq("mq_tag", mqFailRetryTypeDto2.getMqTag())).eq("mq_status", 0)).lt("retry_num", mqFailRetryTypeDto2.getRetryNumLimit())).orderByAsc("create_time")).list();
            if (CollectionUtil.isEmpty(list2)) {
                logger.info("无失败记录需处理");
            } else {
                list2.forEach(mqFailRetryRecordEo -> {
                    try {
                        retryByThroughId(mqFailRetryRecordEo.getThroughId());
                    } catch (Exception e) {
                        logger.error("重试异常：{}", e.getMessage());
                        logger.error(e.getMessage(), e);
                    }
                });
            }
        }
    }

    @Override // com.yunxi.dg.base.center.trade.service.entity.IMqFailRetryRecordService
    public void retryByThroughId(String str) {
        logger.info("根据实体id重试：{}", str);
        Mutex mutex = null;
        try {
            try {
                mutex = this.lockService.lock("MQ_FAIL_RETRY_DIS", str, 10, 10, TimeUnit.SECONDS);
                MqFailRetryRecordEo mqFailRetryRecordEo = new MqFailRetryRecordEo();
                mqFailRetryRecordEo.setThroughId(str);
                MqFailRetryRecordEo selectOne = this.domain.selectOne(mqFailRetryRecordEo);
                AssertUtils.isFalse(ObjectUtil.isEmpty(selectOne), "找不到该补偿明细：" + str);
                logger.info("根据实体id重试结果：{}，{}", str, JSON.toJSONString(this.mqService.sendSingleMessage(selectOne.getMqTag(), selectOne.getMessageText())));
                selectOne.setRetryNum(Integer.valueOf(selectOne.getRetryNum().intValue() + 1));
                this.domain.updateSelective(selectOne);
                if (mutex != null) {
                    this.lockService.unlock(mutex);
                }
            } catch (Exception e) {
                logger.error("根据实体id重试，处理异常：{}", e.getMessage());
                logger.error(e.getMessage(), e);
                throw new BizException(e.getMessage());
            }
        } catch (Throwable th) {
            if (mutex != null) {
                this.lockService.unlock(mutex);
            }
            throw th;
        }
    }

    @Override // com.yunxi.dg.base.center.trade.service.entity.IMqFailRetryRecordService
    public void save(MqFailRetryRecordDto mqFailRetryRecordDto) {
        AssertUtils.isFalse(ObjectUtil.isEmpty(mqFailRetryRecordDto.getThroughId()), "实体id不能为空");
        AssertUtils.isFalse(ObjectUtil.isEmpty(mqFailRetryRecordDto.getMqStatus()), "mq状态不能为空");
        logger.info("MQ失败重试补偿保存：{}", JSON.toJSONString(mqFailRetryRecordDto));
        try {
            try {
                Mutex lock = this.lockService.lock("MQ_FAIL_RETRY_DIS", mqFailRetryRecordDto.getThroughId(), 10, 10, TimeUnit.SECONDS);
                MqFailRetryRecordEo mqFailRetryRecordEo = (MqFailRetryRecordEo) ((ExtQueryChainWrapper) this.domain.filter().eq("through_id", mqFailRetryRecordDto.getThroughId())).one();
                if (ObjectUtil.isEmpty(mqFailRetryRecordEo) && mqFailRetryRecordDto.getMqStatus().intValue() == 0) {
                    MqFailRetryRecordEo mqFailRetryRecordEo2 = new MqFailRetryRecordEo();
                    mqFailRetryRecordEo2.setRetryNum(0);
                    CubeBeanUtils.copyProperties(mqFailRetryRecordEo2, mqFailRetryRecordDto, new String[]{"id"});
                    this.domain.insert(mqFailRetryRecordEo2);
                } else if (ObjectUtil.isNotEmpty(mqFailRetryRecordEo) && mqFailRetryRecordDto.getMqStatus().intValue() == 1) {
                    mqFailRetryRecordEo.setMqStatus(1);
                    this.domain.updateSelective(mqFailRetryRecordEo);
                }
                if (lock != null) {
                    this.lockService.unlock(lock);
                }
            } catch (Exception e) {
                logger.error("根据实体id重试，处理异常：{}", e.getMessage());
                logger.error(e.getMessage(), e);
                if (0 != 0) {
                    this.lockService.unlock((Mutex) null);
                }
            }
        } catch (Throwable th) {
            if (0 != 0) {
                this.lockService.unlock((Mutex) null);
            }
            throw th;
        }
    }
}
