package com.dtyunxi.yundt.center.message.biz.message;

import com.dtyunxi.cube.commons.channel.message.IMessage;
import com.dtyunxi.cube.commons.channel.message.IMessageChannel;
import com.dtyunxi.cube.utils.threads.CommonThreadPool;
import com.dtyunxi.cube.utils.threads.pattens.ConsumerWorker;
import com.dtyunxi.cube.utils.threads.pattens.ProdConsuPatten;
import com.dtyunxi.cube.utils.threads.pattens.ProducerWorker;
import com.dtyunxi.cube.utils.threads.pattens.ProductQueueStore;
import com.dtyunxi.huieryun.cache.api.ICacheService;
import com.dtyunxi.util.SpringBeanUtil;
import com.dtyunxi.yundt.center.message.api.constants.MsgConstants;
import com.dtyunxi.yundt.center.message.biz.message.vo.InMailMessage;
import com.dtyunxi.yundt.center.message.biz.service.IChannelSelectService;
import com.dtyunxi.yundt.center.message.biz.utils.InMailUtils;
import com.dtyunxi.yundt.cube.center.message.dao.das.MessageDas;
import com.dtyunxi.yundt.cube.center.message.dao.eo.ChannelEo;
import com.dtyunxi.yundt.cube.center.message.dao.eo.MessageEo;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;

@Component
/* loaded from: input_file:com/dtyunxi/yundt/center/message/biz/message/MessageConsumerWorker.class */
public class MessageConsumerWorker implements ConsumerWorker<IMessage> {

    @Resource
    private MessageDas messageDas;

    @Resource
    private IChannelSelectService channelSelectService;

    @Resource
    private ICacheService cacheService;

    @Value("${channel.send.enabled:true}")
    private boolean sendEnabled;
    private Map<String, IMessageChannel> channelMap;
    private static final Logger logger = LoggerFactory.getLogger(MessageConsumerWorker.class);
    private final ScheduledExecutorService scheduledExecService = Executors.newScheduledThreadPool(2);
    private final ProdConsuPatten<IMessage> patten = new ProdConsuPatten<>(new CommonThreadPool(), new ProductQueueStore(10000), this, (ProducerWorker) null);

    /* loaded from: input_file:com/dtyunxi/yundt/center/message/biz/message/MessageConsumerWorker$ScheduledTask.class */
    class ScheduledTask implements Runnable {
        private final IMessage message;

        public ScheduledTask(IMessage iMessage) {
            this.message = iMessage;
        }

        @Override // java.lang.Runnable
        public void run() {
            try {
                MessageConsumerWorker.this.patten.put(this.message);
            } catch (InterruptedException e) {
                MessageConsumerWorker.logger.error("延时发送任务执行抛出InterruptedException:", e);
            }
        }
    }

    @PostConstruct
    protected void initChannelMap() {
        Map<String, IMessageChannel> beansOfType = SpringBeanUtil.getApplicationContext().getBeansOfType(IMessageChannel.class);
        if (CollectionUtils.isEmpty(beansOfType)) {
            this.channelMap = new HashMap(0);
        } else {
            this.channelMap = beansOfType;
        }
    }

    public MessageConsumerWorker() {
        this.patten.start();
    }

    @Deprecated
    public void send(IMessage iMessage) throws InterruptedException {
        if (iMessage == null) {
            return;
        }
        this.patten.put(iMessage);
    }

    public void sendDirect(IMessage iMessage) {
        doWork(iMessage);
    }

    public void sendDelay(int i, IMessage iMessage) {
        if (iMessage == null) {
            return;
        }
        this.scheduledExecService.schedule(new ScheduledTask(iMessage), i, TimeUnit.SECONDS);
    }

    public void doWork(IMessage iMessage) {
        logger.info("-------doWork----start----");
        String str = null;
        int intValue = MsgConstants.SendStatus.SEND.getCode().intValue();
        Long id = iMessage.id();
        try {
            ChannelEo adaptChannel = this.channelSelectService.adaptChannel(this.messageDas.selectByPrimaryKey(id));
            IMessageChannel iMessageChannel = this.channelMap.get(adaptChannel.getBeanName());
            if (iMessageChannel != null) {
                iMessage.setChannelConfig(adaptChannel.getConfig());
                if (this.sendEnabled) {
                    iMessageChannel.send(iMessage);
                } else {
                    intValue = MsgConstants.SendStatus.FAKE_SEND.getCode().intValue();
                }
            } else {
                logger.info("Channel is null when send {}! channelBeanName:{}", id, adaptChannel.getBeanName());
            }
        } catch (Exception e) {
            str = e.getMessage();
            intValue = MsgConstants.SendStatus.FAIL.getCode().intValue();
            logger.error("doWork Exception:", e);
        }
        MessageEo messageEo = new MessageEo();
        messageEo.setId(id);
        messageEo.setContent(iMessage.content());
        messageEo.setSendStatus(Integer.valueOf(intValue));
        messageEo.setFailInfo(str);
        messageEo.setSendTime(new Date());
        messageEo.setSender(iMessage.sender());
        this.messageDas.updateSelective(messageEo);
        if (iMessage instanceof InMailMessage) {
            InMailMessage inMailMessage = (InMailMessage) iMessage;
            InMailUtils.updateInMailStatusCache(inMailMessage.tenantId(), inMailMessage.instanceId(), inMailMessage.msgType(), MsgConstants.MsgStatus.UN_READ.getCode(), inMailMessage.targets(), this.cacheService);
        }
    }
}
