package com.dtyunxi.huieryun.mq.provider.rabbit;

import com.dtyunxi.huieryun.mq.api.AbstractConsumer;
import com.dtyunxi.huieryun.mq.api.IMessageProcessor;
import com.dtyunxi.huieryun.mq.provider.rabbit.impl.RabbitMqManager;
import com.dtyunxi.huieryun.mq.provider.rabbit.impl.Subscription;
import com.dtyunxi.huieryun.mq.vo.MessageRegistryVo;
import com.dtyunxi.huieryun.mq.vo.MessageResponse;
import com.dtyunxi.lang.BusinessRuntimeException;
import com.rabbitmq.client.Channel;
import java.io.IOException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/dtyunxi/huieryun/mq/provider/rabbit/RabbitConsumer.class */
public class RabbitConsumer extends AbstractConsumer {
    private static Logger logger = LoggerFactory.getLogger(RabbitProducer.class);
    private static final RabbitConsumer single = new RabbitConsumer();
    private RabbitMqManager rabbitMqManager;
    private Channel channel;

    private RabbitConsumer() {
    }

    public static RabbitConsumer getInstance() {
        return single;
    }

    public void createConsumer(MessageRegistryVo messageRegistryVo) {
        ((AbstractConsumer) this).messageRegistryVo = messageRegistryVo;
        this.rabbitMqManager = RabbitMQHelper.connectRabbitMqManager(messageRegistryVo);
        this.channel = this.rabbitMqManager.createChannel();
    }

    public <T> MessageResponse receiveMessage(String str, String str2, IMessageProcessor<T> iMessageProcessor) {
        try {
            subscribe(this.rabbitMqManager.createSubscription(str, str2, iMessageProcessor), RabbitMQConstants.MQ_EXCHANGE_TYPE_DIRECT, str2);
            return MessageResponse.SUCCESS;
        } catch (Exception e) {
            logger.error("接收消息出错，topicName：{} queueName:{}", new Object[]{str, str2, e});
            throw new BusinessRuntimeException("10001", e);
        }
    }

    public <T> MessageResponse subscribe(String str, String str2, IMessageProcessor<T> iMessageProcessor) {
        try {
            subscribe(this.rabbitMqManager.createSubscription(str, str2, iMessageProcessor), RabbitMQConstants.MQ_EXCHANGE_TYPE_TOPIC, (String) null);
            return MessageResponse.SUCCESS;
        } catch (Exception e) {
            logger.error("接收消息出错，topicName：{} routingKey:{}", new Object[]{str, str2, e});
            throw new BusinessRuntimeException("10001", e);
        }
    }

    public <T> MessageResponse subscribe(String str, String str2, String str3, IMessageProcessor<T> iMessageProcessor) {
        try {
            subscribe(this.rabbitMqManager.createSubscription(str, str3, iMessageProcessor), RabbitMQConstants.MQ_EXCHANGE_TYPE_TOPIC, str2);
            return MessageResponse.SUCCESS;
        } catch (Exception e) {
            logger.error("接收消息出错，topicName：{} routingKey:{}", new Object[]{str, str3, e});
            throw new BusinessRuntimeException("10001", e);
        }
    }

    public <T> MessageResponse subscribe(String str, String[] strArr, IMessageProcessor<T> iMessageProcessor) {
        try {
            subscribe(this.rabbitMqManager.createSubscription(str, strArr, iMessageProcessor), RabbitMQConstants.MQ_EXCHANGE_TYPE_TOPIC, (String) null);
            return MessageResponse.SUCCESS;
        } catch (Exception e) {
            logger.error("接收消息出错，topicName：{} routingKeys:{}", new Object[]{str, strArr, e});
            throw new BusinessRuntimeException("10001", e);
        }
    }

    public <T> MessageResponse subscribeBroadcastMessage(String str, String str2, IMessageProcessor<T> iMessageProcessor) {
        try {
            subscribe(this.rabbitMqManager.createSubscription(str, str2, iMessageProcessor), RabbitMQConstants.MQ_EXCHANGE_TYPE_FANOUT, (String) null);
            return MessageResponse.SUCCESS;
        } catch (Exception e) {
            logger.error("接收消息出错，topicName：{} routingKey:{}", new Object[]{str, str2, e});
            throw new BusinessRuntimeException("10001", e);
        }
    }

    public <T> MessageResponse subscribeBroadcastMessage(String str, String[] strArr, IMessageProcessor<T> iMessageProcessor) {
        try {
            subscribe(this.rabbitMqManager.createSubscription(str, strArr, iMessageProcessor), RabbitMQConstants.MQ_EXCHANGE_TYPE_FANOUT, (String) null);
            return MessageResponse.SUCCESS;
        } catch (Exception e) {
            logger.error("接收消息出错，topicName：{} routingKeys:{}", new Object[]{str, strArr, e});
            throw new BusinessRuntimeException("10001", e);
        }
    }

    public MessageResponse unsubscribe(String str, String str2) {
        try {
            Subscription subscription = new Subscription(str, str2, (IMessageProcessor) null);
            subscription.setMessageRegistryVo(this.messageRegistryVo);
            subscription.unsubscribe(this.rabbitMqManager.createChannel());
            return MessageResponse.SUCCESS;
        } catch (Exception e) {
            logger.error("取消订阅失败，topicName：{}", str, e);
            throw new BusinessRuntimeException("10001", e);
        }
    }

    private void subscribe(Subscription subscription, String str, String str2) throws IOException {
        subscription.setMessageRegistryVo(this.messageRegistryVo);
        subscription.start(this.channel, str, str2);
    }
}
