package com.dtyunxi.cube.starter.bundle.materiel.consumer.mq;

import com.dtyunxi.cube.notifier.api.NotifierFeignClientCaller;
import com.dtyunxi.cube.notifier.config.NotifierFeignClientConfig;
import com.dtyunxi.cube.starter.bundle.api.PullBundleDescriptionApi;
import com.dtyunxi.cube.starter.bundle.config.BundleReportModeConfig;
import com.dtyunxi.cube.starter.bundle.dto.BundleBaseDto;
import com.dtyunxi.cube.starter.bundle.dto.BundleDescDto;
import com.dtyunxi.cube.starter.bundle.dto.request.BundleDescReqDto;
import com.dtyunxi.cube.starter.bundle.enums.BundleDataTypeEnum;
import com.dtyunxi.cube.starter.bundle.enums.BundleReportModeEnum;
import com.dtyunxi.cube.starter.bundle.materiel.consumer.config.BundleBootMqConsumerConfig;
import com.dtyunxi.cube.starter.bundle.materiel.consumer.dto.AppStartedRecordReqDto;
import com.dtyunxi.cube.starter.bundle.materiel.consumer.enums.AppStartedPullStatus;
import com.dtyunxi.cube.starter.bundle.materiel.consumer.eo.AppStartedRecordEo;
import com.dtyunxi.cube.starter.bundle.materiel.consumer.service.IBundleSyncService;
import com.dtyunxi.cube.starter.bundle.materiel.consumer.service.IStarterAppStartedRecordService;
import com.dtyunxi.cube.starter.bundle.materiel.consumer.service.impl.StarterAppStartedRecordServiceImpl;
import com.dtyunxi.huieryun.lock.api.ILockService;
import com.dtyunxi.huieryun.lock.api.Mutex;
import com.dtyunxi.huieryun.mq.api.IMessageProcessor;
import com.dtyunxi.huieryun.mq.vo.MessageResponse;
import com.dtyunxi.lang.BusinessRuntimeException;
import com.dtyunxi.rest.RestResponse;
import com.dtyunxi.util.IdGenrator;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import javax.annotation.Resource;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.transaction.annotation.Transactional;

/* loaded from: input_file:com/dtyunxi/cube/starter/bundle/materiel/consumer/mq/AbstractBundleBootMqProcess.class */
public abstract class AbstractBundleBootMqProcess<T extends BundleBaseDto> implements IMessageProcessor<BundleDescDto<Map<String, T>>> {
    protected static final Logger logger = LoggerFactory.getLogger(AbstractBundleBootMqProcess.class);

    @Value("${app.started.lock.wait.timeout:600}")
    protected int waitTimeout;

    @Value("${app.started.lock.lease.time:300}")
    protected int leaseTime;
    protected IBundleSyncService bundleSyncService;

    @Resource
    protected Map<String, IStarterAppStartedRecordService> appStartedRecordServiceMap;
    protected IStarterAppStartedRecordService starterAppStartedRecordService;

    @Resource
    protected ILockService lockService;

    @Resource
    protected BundleBootMqConsumerConfig bundleBootMqConsumerConfig;

    @Resource
    protected BundleReportModeConfig bundleReportModeConfig;
    protected BundleDataTypeEnum bundleDataType;

    @Autowired(required = false)
    @Qualifier("notifierFeignClientConfig")
    protected NotifierFeignClientConfig feignClientConfig = new NotifierFeignClientConfig();
    protected boolean newMode = true;

    public AbstractBundleBootMqProcess(IBundleSyncService iBundleSyncService, BundleDataTypeEnum bundleDataTypeEnum) {
        this.bundleSyncService = iBundleSyncService;
        this.bundleDataType = bundleDataTypeEnum;
    }

    public void setNewMode(boolean z) {
        this.newMode = z;
    }

    public abstract T newBeanInstance();

    public abstract RestResponse<List<T>> restCall(PullBundleDescriptionApi pullBundleDescriptionApi, BundleDescReqDto bundleDescReqDto);

    public Mutex getMutex(BundleDescDto<Map<String, T>> bundleDescDto) {
        Mutex mutex = null;
        try {
            mutex = this.lockService.lock(bundleDescDto.getServiceId() + bundleDescDto.getApplicationVersion(), new SimpleDateFormat("yyyy-MM-ddHH:mm:ss").format(bundleDescDto.getBootTime()), this.waitTimeout, this.leaseTime, TimeUnit.SECONDS);
        } catch (BusinessRuntimeException e) {
            logger.info("当前版本的应用上报信息已经在处理中，serviceId:{}, version:{}, bootTime:{}", new Object[]{bundleDescDto.getServiceId(), bundleDescDto.getApplicationVersion(), bundleDescDto.getBootTime()});
        }
        return mutex;
    }

    public abstract boolean ignoreConsumer();

    public abstract int reportMode();

    public void preProcess(BundleDescDto<Map<String, T>> bundleDescDto, List<T> list) {
    }

    public BundleDataTypeEnum getBundleDataType() {
        return this.bundleDataType;
    }

    private boolean illegalDataType(int i) {
        return this.bundleDataType.getKey() != i;
    }

    public IStarterAppStartedRecordService getStarterAppStartedRecordService() {
        if (this.starterAppStartedRecordService == null) {
            this.appStartedRecordServiceMap.forEach((str, iStarterAppStartedRecordService) -> {
                if (str.equals(StarterAppStartedRecordServiceImpl.BEAN_NAME)) {
                    return;
                }
                this.starterAppStartedRecordService = iStarterAppStartedRecordService;
            });
            if (this.starterAppStartedRecordService == null) {
                this.starterAppStartedRecordService = this.appStartedRecordServiceMap.get(StarterAppStartedRecordServiceImpl.BEAN_NAME);
            }
        }
        return this.starterAppStartedRecordService;
    }

    public MessageResponse process(BundleDescDto<Map<String, T>> bundleDescDto) {
        if (ignoreConsumer()) {
            logger.info("不消费功能包上报的{}数据", this.bundleDataType);
            return MessageResponse.SUCCESS;
        }
        if (illegalDataType(bundleDescDto.getType())) {
            logger.warn("非法的数据类型[期待类型：{}-{}，当前类型：{}]，不消费", new Object[]{this.bundleDataType.getDesc(), Integer.valueOf(this.bundleDataType.getKey()), Integer.valueOf(bundleDescDto.getType())});
            return MessageResponse.SUCCESS;
        }
        BundleDataTypeEnum[] values = BundleDataTypeEnum.values();
        int length = values.length;
        for (int i = 0; i < length && values[i].getKey() != bundleDescDto.getType(); i++) {
        }
        logger.info("开始处理应用启动MQ通知消息，serviceId={}，ip={}，port={}，versoin={},dataType={}", new Object[]{bundleDescDto.getServiceId(), bundleDescDto.getIp(), Integer.valueOf(bundleDescDto.getPort()), bundleDescDto.getApplicationVersion(), this.bundleDataType});
        Mutex mutex = null;
        try {
            try {
                Mutex mutex2 = getMutex(bundleDescDto);
                if (null == mutex2) {
                    logger.error("应用[{}]启动MQ通知消息，serviceId={}获取分布式锁失败", this.bundleDataType, bundleDescDto.getServiceId());
                    MessageResponse messageResponse = MessageResponse.ERROR;
                    logger.info("unlock mutex key:{}, acq:{}, threadId:{}", new Object[]{mutex2.getLockKey(), mutex2.getLockAcquirer(), Long.valueOf(Thread.currentThread().getId())});
                    if (null != mutex2) {
                        this.lockService.unlock(mutex2);
                    }
                    return messageResponse;
                }
                logger.info("mutex key:{}, acq:{}, threadId:{}", new Object[]{mutex2.getLockKey(), mutex2.getLockAcquirer(), Long.valueOf(Thread.currentThread().getId())});
                dealMessage(bundleDescDto);
                logger.info("应用[{}]启动MQ通知消息处理完毕", this.bundleDataType);
                MessageResponse messageResponse2 = MessageResponse.SUCCESS;
                logger.info("unlock mutex key:{}, acq:{}, threadId:{}", new Object[]{mutex2.getLockKey(), mutex2.getLockAcquirer(), Long.valueOf(Thread.currentThread().getId())});
                if (null != mutex2) {
                    this.lockService.unlock(mutex2);
                }
                return messageResponse2;
            } catch (Exception e) {
                logger.error("应用{}:{}:{} 启动MQ通知消息处理失败", new Object[]{bundleDescDto.getServiceId(), this.bundleDataType, bundleDescDto.getBootTime(), e});
                AppStartedRecordEo findAppStartRecord = getStarterAppStartedRecordService().findAppStartRecord(toAppStartedRecordReqDto(bundleDescDto));
                if (null != findAppStartRecord) {
                    getStarterAppStartedRecordService().modifyStatusById(findAppStartRecord.getId(), bundleDescDto.getType(), AppStartedPullStatus.FAILED.name(), bundleDescDto.getBundleDataUploadDto());
                }
                if (!this.bundleBootMqConsumerConfig.isErrorIgnore()) {
                    MessageResponse messageResponse3 = MessageResponse.ERROR;
                    logger.info("unlock mutex key:{}, acq:{}, threadId:{}", new Object[]{mutex.getLockKey(), mutex.getLockAcquirer(), Long.valueOf(Thread.currentThread().getId())});
                    if (0 != 0) {
                        this.lockService.unlock((Mutex) null);
                    }
                    return messageResponse3;
                }
                logger.warn("[{}]bundle.mq.consume.error.ignore值为：{}, 强制返回消费成功", this.bundleDataType, Boolean.valueOf(this.bundleBootMqConsumerConfig.isErrorIgnore()));
                MessageResponse messageResponse4 = MessageResponse.SUCCESS;
                logger.info("unlock mutex key:{}, acq:{}, threadId:{}", new Object[]{mutex.getLockKey(), mutex.getLockAcquirer(), Long.valueOf(Thread.currentThread().getId())});
                if (0 != 0) {
                    this.lockService.unlock((Mutex) null);
                }
                return messageResponse4;
            }
        } catch (Throwable th) {
            logger.info("unlock mutex key:{}, acq:{}, threadId:{}", new Object[]{mutex.getLockKey(), mutex.getLockAcquirer(), Long.valueOf(Thread.currentThread().getId())});
            if (0 != 0) {
                this.lockService.unlock((Mutex) null);
            }
            throw th;
        }
    }

    @Transactional(rollbackFor = {Exception.class})
    public Long dealMessage(BundleDescDto<Map<String, T>> bundleDescDto) {
        boolean isNeedConsume = isNeedConsume(bundleDescDto);
        Long addAppStartedRecord = getStarterAppStartedRecordService().addAppStartedRecord(toAppStartedRecordReqDto(bundleDescDto));
        if (!isNeedConsume) {
            logger.info("应用[{}]版本[{}]和打包时间[{}]未变，不需要拉取应用的功能包[{}]物料", new Object[]{bundleDescDto.getServiceId(), bundleDescDto.getApplicationVersion(), bundleDescDto.getApplicationCreateTime(), this.bundleDataType});
            getStarterAppStartedRecordService().modifyStatusById(addAppStartedRecord, bundleDescDto.getType(), AppStartedPullStatus.DONE.name(), bundleDescDto.getBundleDataUploadDto());
            return addAppStartedRecord;
        }
        List<T> readData = readData(bundleDescDto);
        if (readData == null) {
            readData = new ArrayList();
        }
        preProcess(bundleDescDto, readData);
        this.bundleSyncService.handle(readData, this.bundleDataType);
        postProcess(bundleDescDto);
        getStarterAppStartedRecordService().modifyStatusById(addAppStartedRecord, bundleDescDto.getType(), AppStartedPullStatus.DONE.name(), bundleDescDto.getBundleDataUploadDto());
        return addAppStartedRecord;
    }

    public void postProcess(BundleDescDto<Map<String, T>> bundleDescDto) {
        this.bundleSyncService.bundlePostProcessService().do4Service(bundleDescDto.getServiceId());
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v24, types: [java.util.List] */
    protected List<T> readData(BundleDescDto<Map<String, T>> bundleDescDto) {
        ArrayList arrayList = null;
        if (BundleReportModeEnum.MQ_REST.getKey() == reportMode() && this.newMode) {
            BundleDescReqDto bundleDescReqDto = new BundleDescReqDto();
            if (bundleDescDto.getIp().indexOf("/") != -1) {
                bundleDescDto.setIp(bundleDescDto.getIp().split("/")[1]);
            }
            arrayList = (List) ((RestResponse) new NotifierFeignClientCaller().process(pullBundleDescriptionApi -> {
                return restCall(pullBundleDescriptionApi, bundleDescReqDto);
            }, bundleDescDto.getIp(), bundleDescDto.getPort(), PullBundleDescriptionApi.class, this.feignClientConfig)).getData();
            logger.info("应用功能包[{}]物料已拉取", this.bundleDataType);
        } else {
            Map map = (Map) bundleDescDto.getContent();
            if (map != null) {
                arrayList = new ArrayList(map.values());
            }
            logger.info("从消息体获取应用功能包[{}]物料", this.bundleDataType);
        }
        if (arrayList == null) {
            arrayList = new ArrayList();
        }
        return arrayList;
    }

    private AppStartedRecordReqDto toAppStartedRecordReqDto(BundleDescDto bundleDescDto) {
        AppStartedRecordReqDto appStartedRecordReqDto = new AppStartedRecordReqDto();
        appStartedRecordReqDto.setCode(bundleDescDto.getServiceId());
        appStartedRecordReqDto.setVersion(bundleDescDto.getApplicationVersion());
        appStartedRecordReqDto.setPackageTime(bundleDescDto.getApplicationCreateTime());
        appStartedRecordReqDto.setIp(bundleDescDto.getIp());
        appStartedRecordReqDto.setPort(Integer.valueOf(bundleDescDto.getPort()));
        appStartedRecordReqDto.setStatus(AppStartedPullStatus.DOING.name());
        appStartedRecordReqDto.setType(Integer.valueOf(bundleDescDto.getType()));
        appStartedRecordReqDto.setBootTime(bundleDescDto.getBootTime());
        return appStartedRecordReqDto;
    }

    private boolean isNeedConsume(BundleDescDto bundleDescDto) {
        if (StringUtils.isBlank(bundleDescDto.getApplicationVersion())) {
            logger.info("请求的应用版本为空,需要消费");
            return true;
        }
        if (null != bundleDescDto.getApplicationCreateTime()) {
            return !getStarterAppStartedRecordService().existsFinishedDataType(bundleDescDto.getApplicationCreateTime(), bundleDescDto.getApplicationVersion(), bundleDescDto.getServiceId(), bundleDescDto.getType());
        }
        logger.info("请求的应用打包时间为空,需要消费");
        return true;
    }

    public Long getId() {
        return Long.valueOf(IdGenrator.getDistributedId());
    }
}
