package com.dtyunxi.yundt.cube.center.scheduler.biz.service.impl;

import com.dtyunxi.lang.BusinessRuntimeException;
import com.dtyunxi.yundt.cube.center.scheduler.api.dto.request.TaskRelationCreateReqDto;
import com.dtyunxi.yundt.cube.center.scheduler.api.dto.response.TaskRelationQueryRespDto;
import com.dtyunxi.yundt.cube.center.scheduler.api.dto.response.TaskRelationRespDto;
import com.dtyunxi.yundt.cube.center.scheduler.api.dto.response.TaskRespDto;
import com.dtyunxi.yundt.cube.center.scheduler.biz.service.ITaskRelationService;
import com.dtyunxi.yundt.cube.center.scheduler.biz.task.TaskExecutor;
import com.dtyunxi.yundt.cube.center.scheduler.common.constants.TaskInstShardStatusEnum;
import com.dtyunxi.yundt.cube.center.scheduler.common.constants.TaskRelationTypeEnum;
import com.dtyunxi.yundt.cube.center.scheduler.common.constants.TaskStatusEnum;
import com.dtyunxi.yundt.cube.center.scheduler.dao.das.TaskBatchDas;
import com.dtyunxi.yundt.cube.center.scheduler.dao.das.TaskBatchInstDas;
import com.dtyunxi.yundt.cube.center.scheduler.dao.das.TaskDas;
import com.dtyunxi.yundt.cube.center.scheduler.dao.das.TaskInstDas;
import com.dtyunxi.yundt.cube.center.scheduler.dao.das.TaskRelationDas;
import com.dtyunxi.yundt.cube.center.scheduler.dao.eo.TaskBatchEo;
import com.dtyunxi.yundt.cube.center.scheduler.dao.eo.TaskBatchInstEo;
import com.dtyunxi.yundt.cube.center.scheduler.dao.eo.TaskEo;
import com.dtyunxi.yundt.cube.center.scheduler.dao.eo.TaskInstEo;
import com.dtyunxi.yundt.cube.center.scheduler.dao.eo.TaskRelationEo;
import com.google.common.collect.Lists;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.stream.Collectors;
import javax.annotation.Resource;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

@Service
/* loaded from: input_file:com/dtyunxi/yundt/cube/center/scheduler/biz/service/impl/TaskRelationServiceImpl.class */
public class TaskRelationServiceImpl implements ITaskRelationService {
    private static final Logger logger = LoggerFactory.getLogger(TaskRelationServiceImpl.class);

    @Resource
    private TaskRelationDas taskRelationDas;

    @Resource
    private TaskBatchDas taskBatchDas;

    @Resource
    private TaskDas taskDas;

    @Resource
    private TaskInstDas taskInstDas;

    @Resource
    private TaskBatchInstDas taskBatchInstDas;

    @Resource(name = "cubeSchedulerTaskExecutor")
    private TaskExecutor taskExecutor;

    @Override // com.dtyunxi.yundt.cube.center.scheduler.biz.service.ITaskRelationService
    @Transactional(rollbackFor = {Exception.class})
    public void addTaskRelation(Long l, List<TaskRelationCreateReqDto> list) {
        if (CollectionUtils.isEmpty(list)) {
            throw new BusinessRuntimeException("10002", "任务关系列表不能为空");
        }
        TaskBatchEo selectByPrimaryKey = this.taskBatchDas.selectByPrimaryKey(l);
        if (selectByPrimaryKey == null) {
            throw new BusinessRuntimeException("11002", "批处理记录不存在");
        }
        if (!TaskStatusEnum.NEW.getCode().equals(selectByPrimaryKey.getStatus())) {
            throw new BusinessRuntimeException("10002", "只能修改状态为新建状态的批处理任务关系");
        }
        ArrayList newArrayList = Lists.newArrayList();
        ArrayList arrayList = null;
        for (TaskRelationCreateReqDto taskRelationCreateReqDto : list) {
            ArrayList<Long> newArrayList2 = Lists.newArrayList();
            if (CollectionUtils.isNotEmpty(arrayList)) {
                newArrayList2.addAll(arrayList);
            }
            arrayList = Lists.newArrayList();
            String code = StringUtils.isBlank(taskRelationCreateReqDto.getTaskType()) ? TaskRelationTypeEnum.AND.getCode() : taskRelationCreateReqDto.getTaskType();
            List<Long> taskIdList = taskRelationCreateReqDto.getTaskIdList();
            if (CollectionUtils.isEmpty(taskIdList)) {
                throw new BusinessRuntimeException("10002", "任务id列表不能为空");
            }
            if (newArrayList.size() != 0) {
                for (Long l2 : taskIdList) {
                    for (Long l3 : newArrayList2) {
                        TaskRelationEo taskRelationEo = new TaskRelationEo();
                        taskRelationEo.setTaskBatchId(l);
                        taskRelationEo.setTaskId(l2);
                        taskRelationEo.setPreTaskId(l3);
                        taskRelationEo.setTaskType(code);
                        taskRelationEo.setExtension("");
                        newArrayList.add(taskRelationEo);
                    }
                    arrayList.add(l2);
                }
            } else {
                if (taskIdList.size() > 1) {
                    throw new BusinessRuntimeException("10002", "批处理只能有1个根节点任务");
                }
                TaskRelationEo taskRelationEo2 = new TaskRelationEo();
                taskRelationEo2.setTaskBatchId(l);
                taskRelationEo2.setTaskId((Long) taskIdList.get(0));
                taskRelationEo2.setTaskType(code);
                taskRelationEo2.setExtension("");
                newArrayList.add(taskRelationEo2);
                arrayList.add(taskIdList.get(0));
            }
        }
        if (CollectionUtils.isNotEmpty(newArrayList)) {
            deleteTaskRelation(l);
            this.taskRelationDas.insertBatch(newArrayList);
            setTaskBatchId(l, (List) newArrayList.stream().map((v0) -> {
                return v0.getTaskId();
            }).collect(Collectors.toList()));
        }
    }

    private void setTaskBatchId(Long l, List<Long> list) {
        List<TaskEo> selectByIds = this.taskDas.selectByIds(list);
        if (CollectionUtils.isNotEmpty(selectByIds)) {
            for (TaskEo taskEo : selectByIds) {
                taskEo.setTaskBatchId(l);
                this.taskDas.updateSelective(taskEo);
            }
        }
    }

    @Override // com.dtyunxi.yundt.cube.center.scheduler.biz.service.ITaskRelationService
    public void deleteTaskRelation(Long l) {
        this.taskRelationDas.deleteByTaskBatchId(l);
        TaskEo taskEo = new TaskEo();
        taskEo.setTaskBatchId(l);
        List<TaskEo> select = this.taskDas.select(taskEo);
        if (CollectionUtils.isNotEmpty(select)) {
            for (TaskEo taskEo2 : select) {
                taskEo2.setTaskBatchId((Long) null);
                this.taskDas.update(taskEo2);
            }
        }
    }

    @Override // com.dtyunxi.yundt.cube.center.scheduler.biz.service.ITaskRelationService
    public TaskRelationQueryRespDto queryByTaskBatchId(Long l) {
        TaskBatchEo selectByPrimaryKey = this.taskBatchDas.selectByPrimaryKey(l);
        if (selectByPrimaryKey == null) {
            throw new BusinessRuntimeException("11002", "批处理记录不存在");
        }
        TaskRelationQueryRespDto taskRelationQueryRespDto = new TaskRelationQueryRespDto();
        taskRelationQueryRespDto.setTaskBatchId(l);
        taskRelationQueryRespDto.setTaskBatchCode(selectByPrimaryKey.getTaskBatchCode());
        taskRelationQueryRespDto.setTaskBatchName(selectByPrimaryKey.getTaskBatchName());
        List<TaskRelationEo> selectByTaskBatchId = this.taskRelationDas.selectByTaskBatchId(l);
        if (CollectionUtils.isEmpty(selectByTaskBatchId)) {
            return taskRelationQueryRespDto;
        }
        taskRelationQueryRespDto.setTaskRelationList(getTaskRelationList(null, selectByTaskBatchId, Lists.newArrayList(), 1, false));
        return taskRelationQueryRespDto;
    }

    private List<TaskRelationRespDto> getTaskRelationList(List<Long> list, List<TaskRelationEo> list2, List<TaskRelationRespDto> list3, int i, boolean z) {
        if (i > 10) {
            throw new BusinessRuntimeException("10002", "数据异常：批处理任务关系存在循环依赖");
        }
        ArrayList newArrayList = Lists.newArrayList();
        String str = null;
        ArrayList newArrayList2 = Lists.newArrayList();
        for (TaskRelationEo taskRelationEo : list2) {
            boolean z2 = false;
            if (list == null && taskRelationEo.getPreTaskId() == null) {
                z2 = true;
            }
            if (list != null && list.get(0).equals(taskRelationEo.getPreTaskId())) {
                z2 = true;
            }
            if (z2) {
                newArrayList.add(taskRelationEo.getTaskId());
                str = taskRelationEo.getTaskType();
                TaskRespDto taskRespDto = new TaskRespDto();
                taskRespDto.setTaskId(taskRelationEo.getTaskId());
                TaskEo selectByPrimaryKey = this.taskDas.selectByPrimaryKey(taskRelationEo.getTaskId());
                if (selectByPrimaryKey == null) {
                    throw new BusinessRuntimeException("11002", "任务不存在");
                }
                taskRespDto.setTaskName(selectByPrimaryKey.getTaskName());
                newArrayList2.add(taskRespDto);
            }
        }
        if (list == null && newArrayList.size() == 0) {
            throw new BusinessRuntimeException("10002", "数据异常：批处理根节点任务不存在");
        }
        if (list == null && newArrayList.size() > 1) {
            throw new BusinessRuntimeException("10002", "数据异常：批处理只能有1个根节点任务");
        }
        if (CollectionUtils.isNotEmpty(newArrayList)) {
            TaskRelationRespDto taskRelationRespDto = new TaskRelationRespDto();
            taskRelationRespDto.setTaskType(str);
            taskRelationRespDto.setTaskList(newArrayList2);
            list3.add(taskRelationRespDto);
            if (!z) {
                getTaskRelationList(newArrayList, list2, list3, i + 1, false);
            }
        }
        return list3;
    }

    @Override // com.dtyunxi.yundt.cube.center.scheduler.biz.service.ITaskRelationService
    public TaskRelationRespDto querySufTaskList(Long l, List<Long> list) {
        List<TaskRelationEo> selectByTaskBatchId = this.taskRelationDas.selectByTaskBatchId(l);
        if (CollectionUtils.isEmpty(selectByTaskBatchId)) {
            throw new BusinessRuntimeException("10002", "数据异常：批处理任务关系不存在");
        }
        List<TaskRelationRespDto> taskRelationList = getTaskRelationList(list, selectByTaskBatchId, Lists.newArrayList(), 1, true);
        if (CollectionUtils.isNotEmpty(taskRelationList)) {
            return taskRelationList.get(0);
        }
        return null;
    }

    @Override // com.dtyunxi.yundt.cube.center.scheduler.biz.service.ITaskRelationService
    public void executeTaskBatch(Long l, Long l2, Long l3, String str, String str2) {
        List select = this.taskRelationDas.select(l, l3, (Long) null);
        if (CollectionUtils.isEmpty(select)) {
            throw new BusinessRuntimeException("11002", "数据异常：批处理任务关系不存在");
        }
        Long l4 = null;
        Iterator it = select.iterator();
        while (true) {
            if (!it.hasNext()) {
                break;
            }
            TaskRelationEo taskRelationEo = (TaskRelationEo) it.next();
            if (taskRelationEo.getPreTaskId() != null) {
                l4 = taskRelationEo.getPreTaskId();
                break;
            }
        }
        if (l4 == null) {
            if (TaskInstShardStatusEnum.SUCCESS.getCode().equals(str)) {
                executeSufTask(l, l2, l3);
                return;
            } else {
                updateTaskBatchInst(l2, TaskInstShardStatusEnum.FAILED.getCode(), str2);
                return;
            }
        }
        List select2 = this.taskRelationDas.select(l, (Long) null, l4);
        if (CollectionUtils.isEmpty(select2)) {
            throw new BusinessRuntimeException("11002", "数据异常：批处理任务关系不存在");
        }
        if (select2.size() == 1) {
            if (TaskInstShardStatusEnum.SUCCESS.getCode().equals(str)) {
                executeSufTask(l, l2, l3);
                return;
            } else {
                updateTaskBatchInst(l2, TaskInstShardStatusEnum.FAILED.getCode(), str2);
                return;
            }
        }
        List<TaskInstEo> selectByTaskIds = this.taskInstDas.selectByTaskIds(l2, (List) select2.stream().map((v0) -> {
            return v0.getTaskId();
        }).collect(Collectors.toList()));
        if (TaskRelationTypeEnum.OR.equals(((TaskRelationEo) select2.get(0)).getTaskType())) {
            if (TaskInstShardStatusEnum.SUCCESS.getCode().equals(str)) {
                executeSufTask(l, l2, l3);
                return;
            } else {
                if (isAllDone(selectByTaskIds) && isAllFail(selectByTaskIds)) {
                    updateTaskBatchInst(l2, TaskInstShardStatusEnum.FAILED.getCode(), str2);
                    return;
                }
                return;
            }
        }
        if (!TaskInstShardStatusEnum.SUCCESS.getCode().equals(str)) {
            updateTaskBatchInst(l2, TaskInstShardStatusEnum.FAILED.getCode(), str2);
        } else if (isAllDone(selectByTaskIds) && isAllSuccess(selectByTaskIds)) {
            executeSufTask(l, l2, l3);
        }
    }

    private void executeSufTask(Long l, Long l2, Long l3) {
        List select = this.taskRelationDas.select(l, (Long) null, l3);
        if (CollectionUtils.isEmpty(select)) {
            updateTaskBatchInst(l2, TaskInstShardStatusEnum.SUCCESS.getCode(), null);
            return;
        }
        if (CollectionUtils.isEmpty(this.taskInstDas.selectByTaskIds(l2, (List) select.stream().map((v0) -> {
            return v0.getTaskId();
        }).collect(Collectors.toList())))) {
            Iterator it = select.iterator();
            while (it.hasNext()) {
                try {
                    this.taskExecutor.execute(((TaskRelationEo) it.next()).getTaskId(), null, l, l2);
                } catch (Exception e) {
                    logger.error(e.getMessage());
                }
            }
        }
    }

    private void updateTaskBatchInst(Long l, String str, String str2) {
        TaskBatchInstEo selectByPrimaryKey = this.taskBatchInstDas.selectByPrimaryKey(l);
        if (selectByPrimaryKey == null) {
            throw new BusinessRuntimeException("11002", "数据异常：批处理任务实例不存在");
        }
        selectByPrimaryKey.setStatus(str);
        selectByPrimaryKey.setFailReason(str2);
        this.taskBatchInstDas.updateSelective(selectByPrimaryKey);
    }

    private boolean isAllDone(List<TaskInstEo> list) {
        Iterator<TaskInstEo> it = list.iterator();
        while (it.hasNext()) {
            if (TaskInstShardStatusEnum.RUNNING.getCode().equals(it.next().getStatus())) {
                return false;
            }
        }
        return true;
    }

    private boolean isAllFail(List<TaskInstEo> list) {
        Iterator<TaskInstEo> it = list.iterator();
        while (it.hasNext()) {
            if (TaskInstShardStatusEnum.SUCCESS.getCode().equals(it.next().getStatus())) {
                return false;
            }
        }
        return true;
    }

    private boolean isAllSuccess(List<TaskInstEo> list) {
        for (TaskInstEo taskInstEo : list) {
            if (TaskInstShardStatusEnum.FAILED.getCode().equals(taskInstEo.getStatus()) || TaskInstShardStatusEnum.EXCP.getCode().equals(taskInstEo.getStatus())) {
                return false;
            }
        }
        return true;
    }
}
