package com.dtyunxi.yundt.cube.center.scheduler.biz.service.impl;

import com.dtyunxi.cube.commons.dto.DtoHelper;
import com.dtyunxi.cube.plugin.mq.ICommonsMqService;
import com.dtyunxi.cube.utils.DateUtil;
import com.dtyunxi.cube.utils.bean.CubeBeanUtils;
import com.dtyunxi.eo.SqlFilter;
import com.dtyunxi.huieryun.cache.api.ICacheService;
import com.dtyunxi.huieryun.mq.vo.MessageResponse;
import com.dtyunxi.lang.BusinessRuntimeException;
import com.dtyunxi.yundt.cube.center.scheduler.api.dto.request.TaskBatchInstQueryReqDto;
import com.dtyunxi.yundt.cube.center.scheduler.api.dto.request.TaskInstQueryReqDto;
import com.dtyunxi.yundt.cube.center.scheduler.api.dto.request.TaskInstUpdateReqDto;
import com.dtyunxi.yundt.cube.center.scheduler.api.dto.response.AppBizQueryRespDto;
import com.dtyunxi.yundt.cube.center.scheduler.api.dto.response.TaskBatchInstQueryRespDto;
import com.dtyunxi.yundt.cube.center.scheduler.api.dto.response.TaskInstQueryRespDto;
import com.dtyunxi.yundt.cube.center.scheduler.api.dto.response.TaskQueryRespDto;
import com.dtyunxi.yundt.cube.center.scheduler.biz.service.IAppBizService;
import com.dtyunxi.yundt.cube.center.scheduler.biz.service.IRetryService;
import com.dtyunxi.yundt.cube.center.scheduler.biz.service.ITaskInstService;
import com.dtyunxi.yundt.cube.center.scheduler.biz.service.ITaskInstShardService;
import com.dtyunxi.yundt.cube.center.scheduler.biz.service.ITaskService;
import com.dtyunxi.yundt.cube.center.scheduler.biz.utils.MqUtils;
import com.dtyunxi.yundt.cube.center.scheduler.biz.vo.TaskExecutorVo;
import com.dtyunxi.yundt.cube.center.scheduler.biz.vo.TaskInstVo;
import com.dtyunxi.yundt.cube.center.scheduler.common.constants.TaskInstShardStatusEnum;
import com.dtyunxi.yundt.cube.center.scheduler.common.constants.TaskInstShardTypeEnum;
import com.dtyunxi.yundt.cube.center.scheduler.common.constants.TaskMsgParallelEnum;
import com.dtyunxi.yundt.cube.center.scheduler.common.msg.TaskMsg;
import com.dtyunxi.yundt.cube.center.scheduler.dao.das.TaskBatchInstDas;
import com.dtyunxi.yundt.cube.center.scheduler.dao.das.TaskInstDas;
import com.dtyunxi.yundt.cube.center.scheduler.dao.eo.TaskInstEo;
import com.dtyunxi.yundt.cube.center.scheduler.dao.eo.TaskInstShardEo;
import com.github.pagehelper.PageInfo;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Date;
import java.util.Iterator;
import java.util.List;
import java.util.stream.Collectors;
import javax.annotation.Resource;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

@Service
/* loaded from: input_file:com/dtyunxi/yundt/cube/center/scheduler/biz/service/impl/TaskInstServiceImpl.class */
public class TaskInstServiceImpl implements ITaskInstService {
    private static Logger logger = LoggerFactory.getLogger(TaskInstServiceImpl.class);

    @Resource
    private TaskInstDas taskInstDas;

    @Resource
    private TaskBatchInstDas taskBatchInstDas;

    @Resource
    private ITaskService taskService;

    @Autowired
    IAppBizService appBizService;

    @Autowired
    ICacheService cacheService;

    @Autowired
    ITaskInstShardService taskInstShardService;

    @Autowired
    ICommonsMqService commonsMqService;

    @Autowired
    MqUtils mqUtils;

    @Autowired
    IRetryService retryService;

    @Override // com.dtyunxi.yundt.cube.center.scheduler.biz.service.ITaskInstService
    public TaskInstQueryRespDto queryById(Long l) {
        TaskInstEo selectByPrimaryKey = this.taskInstDas.selectByPrimaryKey(l);
        if (selectByPrimaryKey == null) {
            throw new BusinessRuntimeException("11002", "任务实例不存在");
        }
        TaskInstQueryRespDto taskInstQueryRespDto = new TaskInstQueryRespDto();
        DtoHelper.eo2Dto(selectByPrimaryKey, taskInstQueryRespDto);
        return taskInstQueryRespDto;
    }

    @Override // com.dtyunxi.yundt.cube.center.scheduler.biz.service.ITaskInstService
    public PageInfo<TaskInstQueryRespDto> queryByPage(Long l, TaskInstQueryReqDto taskInstQueryReqDto, Integer num, Integer num2) {
        TaskInstEo taskInstEo = new TaskInstEo();
        DtoHelper.dto2Eo(taskInstQueryReqDto, taskInstEo);
        taskInstEo.setTaskId(l);
        PageInfo selectPage = this.taskInstDas.selectPage(taskInstEo, num, num2);
        ArrayList<TaskInstQueryRespDto> arrayList = new ArrayList();
        PageInfo<TaskInstQueryRespDto> pageInfo = new PageInfo<>(arrayList);
        DtoHelper.eoList2DtoList(selectPage.getList(), arrayList, TaskInstQueryRespDto.class);
        CubeBeanUtils.copyProperties(pageInfo, selectPage, new String[]{"list", "navigatepageNums"});
        if (CollectionUtils.isNotEmpty(arrayList)) {
            for (TaskInstQueryRespDto taskInstQueryRespDto : arrayList) {
                if (TaskInstShardStatusEnum.RUNNING.getCode().equals(taskInstQueryRespDto.getStatus())) {
                    taskInstQueryRespDto.setRunningTime(DateUtil.getTimeDiffWithMilliseconds(taskInstQueryRespDto.getCreateTime(), new Date()));
                } else {
                    taskInstQueryRespDto.setRunningTime(DateUtil.getTimeDiffWithMilliseconds(taskInstQueryRespDto.getCreateTime(), taskInstQueryRespDto.getUpdateTime()));
                }
            }
        }
        return pageInfo;
    }

    @Override // com.dtyunxi.yundt.cube.center.scheduler.biz.service.ITaskInstService
    public Long add(Long l, TaskInstVo taskInstVo) {
        TaskInstEo taskInstEo = new TaskInstEo();
        DtoHelper.dto2Eo(taskInstVo, taskInstEo);
        taskInstEo.setTaskId(l);
        this.taskInstDas.insert(taskInstEo);
        return taskInstEo.getId();
    }

    @Override // com.dtyunxi.yundt.cube.center.scheduler.biz.service.ITaskInstService
    public void modify(Long l, TaskInstUpdateReqDto taskInstUpdateReqDto) {
        TaskInstEo taskInstEo = new TaskInstEo();
        taskInstEo.setId(l);
        if (this.taskInstDas.count(taskInstEo) < 1) {
            throw new BusinessRuntimeException("11002", "任务实例ID不存在");
        }
        DtoHelper.dto2Eo(taskInstUpdateReqDto, taskInstEo);
        this.taskInstDas.updateSelective(taskInstEo);
    }

    @Override // com.dtyunxi.yundt.cube.center.scheduler.biz.service.ITaskInstService
    public PageInfo<TaskBatchInstQueryRespDto> queryByTaskBatchId(Long l, TaskBatchInstQueryReqDto taskBatchInstQueryReqDto, Integer num, Integer num2) {
        ArrayList arrayList = new ArrayList();
        PageInfo<TaskBatchInstQueryRespDto> pageInfo = new PageInfo<>(arrayList);
        List selectByTaskBatchId = this.taskBatchInstDas.selectByTaskBatchId(l);
        if (CollectionUtils.isNotEmpty(selectByTaskBatchId)) {
            TaskInstEo taskInstEo = new TaskInstEo();
            DtoHelper.dto2Eo(taskBatchInstQueryReqDto, taskInstEo);
            List list = (List) selectByTaskBatchId.stream().map((v0) -> {
                return v0.getId();
            }).collect(Collectors.toList());
            ArrayList arrayList2 = new ArrayList();
            arrayList2.add(SqlFilter.in("task_batch_inst_id", StringUtils.join(list, ",")));
            taskInstEo.setSqlFilters(arrayList2);
            PageInfo selectPage = this.taskInstDas.selectPage(taskInstEo, num, num2);
            DtoHelper.eoList2DtoList(selectPage.getList(), arrayList, TaskBatchInstQueryRespDto.class);
            CubeBeanUtils.copyProperties(pageInfo, selectPage, new String[]{"list", "navigatepageNums"});
        }
        for (TaskBatchInstQueryRespDto taskBatchInstQueryRespDto : pageInfo.getList()) {
            taskBatchInstQueryRespDto.setTaskRespDto(this.taskService.queryById(taskBatchInstQueryRespDto.getTaskId()));
            if (TaskInstShardStatusEnum.RUNNING.getCode().equals(taskBatchInstQueryRespDto.getStatus())) {
                DateUtil.getTimeDiff(taskBatchInstQueryRespDto.getCreateTime(), new Date());
            } else {
                taskBatchInstQueryRespDto.setRunningTime(DateUtil.getTimeDiff(taskBatchInstQueryRespDto.getCreateTime(), taskBatchInstQueryRespDto.getUpdateTime()));
            }
        }
        return pageInfo;
    }

    @Override // com.dtyunxi.yundt.cube.center.scheduler.biz.service.ITaskInstService
    public void taskInstExecute(TaskExecutorVo taskExecutorVo) {
        Long taskInstId = taskExecutorVo.getTaskInstId();
        TaskQueryRespDto queryById = this.taskService.queryById(taskExecutorVo.getTaskId());
        AppBizQueryRespDto queryById2 = this.appBizService.queryById(queryById.getAppBizId());
        String appCode = queryById2.getAppCode();
        if (!this.retryService.isExistActiveNode(queryById2, taskExecutorVo).booleanValue()) {
            String str = "该应用无活跃节点，任务停止 | appCode=" + appCode;
            modifyTaskInstanceFailed(taskInstId, str);
            throw new BusinessRuntimeException("10001", str);
        }
        List<TaskInstShardEo> splitTask = splitTask(taskInstId, queryById);
        if (splitTask == null || splitTask.isEmpty()) {
            throw new BusinessRuntimeException("10001", "无任务分片，任务停止");
        }
        this.taskInstShardService.addTaskInstShardList(splitTask);
        boolean z = true;
        Iterator<TaskInstShardEo> it = splitTask.iterator();
        while (it.hasNext()) {
            z = sendMqMessage(taskExecutorVo, it.next());
        }
        if (z || 1 != splitTask.size()) {
            return;
        }
        modifyTaskInstanceFailed(taskInstId, "下发任务消息失败");
        throw new BusinessRuntimeException("10001", "下发任务消息失败");
    }

    @Override // com.dtyunxi.yundt.cube.center.scheduler.biz.service.ITaskInstService
    public boolean sendMqMessage(TaskExecutorVo taskExecutorVo, TaskInstShardEo taskInstShardEo) {
        Long taskInstId = taskExecutorVo.getTaskInstId();
        AppBizQueryRespDto queryById = this.appBizService.queryById(this.taskService.queryById(taskExecutorVo.getTaskId()).getAppBizId());
        boolean z = true;
        TaskMsg taskMsg = new TaskMsg();
        taskMsg.setScheduleInstName(taskExecutorVo.getScheduleInstName());
        taskMsg.setTaskInstShardId(taskInstShardEo.getId());
        taskMsg.setTaskInstId(taskInstId);
        taskMsg.setTaskId(taskExecutorVo.getTaskId());
        taskMsg.setTaskBatchInstId(taskExecutorVo.getTaskBatchInstId());
        taskMsg.setTaskBatchId(taskExecutorVo.getTaskBatchId());
        taskMsg.setAppCode(queryById.getAppCode());
        taskMsg.setBizCode(queryById.getBizCode());
        taskMsg.setContent(new String(taskInstShardEo.getMsgContent(), StandardCharsets.UTF_8));
        if (TaskInstShardTypeEnum.CLIENT.getCode().equals(taskInstShardEo.getShardType())) {
            taskMsg.setIsParallel(TaskMsgParallelEnum.Y.getCode());
        } else {
            taskMsg.setIsParallel(TaskMsgParallelEnum.N.getCode());
        }
        taskMsg.setRetryType(taskExecutorVo.getRetryType());
        taskExecutorVo.setTaskInstShardId(taskInstShardEo.getId());
        MessageResponse messageResponse = this.retryService.getMessageResponse(queryById, taskMsg, taskExecutorVo);
        if (MessageResponse.ERROR.getResultMsg().equals(messageResponse.getResultMsg())) {
            z = false;
            taskInstShardEo.setStatus(TaskInstShardStatusEnum.EXCP.getCode());
        }
        taskInstShardEo.setMsgId((String) messageResponse.getData());
        taskInstShardEo.setTopic(this.mqUtils.getDefaultTopic());
        this.taskInstShardService.modifyTaskInstShard(taskInstShardEo);
        return z;
    }

    private List<TaskInstShardEo> splitTask(Long l, TaskQueryRespDto taskQueryRespDto) {
        ArrayList arrayList = new ArrayList();
        String params = taskQueryRespDto.getParams() == null ? "" : taskQueryRespDto.getParams();
        String shardType = taskQueryRespDto.getShardType();
        if ("SINGLE".equals(shardType)) {
            TaskInstShardEo taskInstShardEo = new TaskInstShardEo();
            taskInstShardEo.setTaskInstId(l);
            taskInstShardEo.setTaskId(taskQueryRespDto.getId());
            taskInstShardEo.setAppBizId(taskQueryRespDto.getAppBizId());
            taskInstShardEo.setShardType(TaskInstShardTypeEnum.SERVER.getCode());
            taskInstShardEo.setTopic("");
            taskInstShardEo.setMsgId("");
            taskInstShardEo.setMsgContent(params.getBytes(StandardCharsets.UTF_8));
            taskInstShardEo.setStatus(TaskInstShardStatusEnum.RUNNING.getCode());
            arrayList.add(taskInstShardEo);
        } else if ("SERVER".equals(shardType)) {
            for (String str : params.split(",")) {
                TaskInstShardEo taskInstShardEo2 = new TaskInstShardEo();
                taskInstShardEo2.setTaskInstId(l);
                taskInstShardEo2.setTaskId(taskQueryRespDto.getId());
                taskInstShardEo2.setAppBizId(taskQueryRespDto.getAppBizId());
                taskInstShardEo2.setShardType(TaskInstShardTypeEnum.SERVER.getCode());
                taskInstShardEo2.setTopic("");
                taskInstShardEo2.setMsgId("");
                taskInstShardEo2.setMsgContent(str.getBytes(StandardCharsets.UTF_8));
                taskInstShardEo2.setStatus(TaskInstShardStatusEnum.RUNNING.getCode());
                arrayList.add(taskInstShardEo2);
            }
        } else {
            TaskInstShardEo taskInstShardEo3 = new TaskInstShardEo();
            taskInstShardEo3.setTaskInstId(l);
            taskInstShardEo3.setTaskId(taskQueryRespDto.getId());
            taskInstShardEo3.setAppBizId(taskQueryRespDto.getAppBizId());
            taskInstShardEo3.setShardType(TaskInstShardTypeEnum.CLIENT.getCode());
            taskInstShardEo3.setTopic("");
            taskInstShardEo3.setMsgId("");
            taskInstShardEo3.setMsgContent(params.getBytes(StandardCharsets.UTF_8));
            taskInstShardEo3.setStatus(TaskInstShardStatusEnum.RUNNING.getCode());
            arrayList.add(taskInstShardEo3);
        }
        return arrayList;
    }

    private void modifyTaskInstanceFailed(Long l, String str) {
        logger.error(str);
        TaskInstUpdateReqDto taskInstUpdateReqDto = new TaskInstUpdateReqDto();
        taskInstUpdateReqDto.setStatus(TaskInstShardStatusEnum.EXCP.getCode());
        taskInstUpdateReqDto.setFailReason(str);
        modify(l, taskInstUpdateReqDto);
    }
}
