/*
 * Decompiled with CFR 0.152.
 */
package com.dtyunxi.huieryun.mq.provider.rabbit;

import com.dtyunxi.huieryun.mq.api.AbstractConsumer;
import com.dtyunxi.huieryun.mq.api.IMessageProcessor;
import com.dtyunxi.huieryun.mq.provider.rabbit.RabbitProducer;
import com.dtyunxi.huieryun.mq.provider.rabbit.impl.RabbitMqManager;
import com.dtyunxi.huieryun.mq.provider.rabbit.impl.Subscription;
import com.dtyunxi.huieryun.mq.vo.MessageRegistryVo;
import com.dtyunxi.huieryun.mq.vo.MessageResponse;
import com.dtyunxi.lang.BusinessRuntimeException;
import com.rabbitmq.client.Channel;
import java.io.IOException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class RabbitConsumer
extends AbstractConsumer {
    private static Logger logger = LoggerFactory.getLogger(RabbitProducer.class);
    private static final RabbitConsumer single = new RabbitConsumer();
    private RabbitMqManager rabbitMqManager;
    private Channel channel;

    private RabbitConsumer() {
    }

    public static RabbitConsumer getInstance() {
        return single;
    }

    public void createConsumer(MessageRegistryVo messageRegistryVo) {
        this.messageRegistryVo = messageRegistryVo;
        this.rabbitMqManager = super.connectRabbitMqManager(messageRegistryVo);
        this.channel = this.rabbitMqManager.createChannel();
    }

    @Override
    public <T> MessageResponse receiveMessage(String topicName, String queueName, IMessageProcessor<T> processor) {
        try {
            Subscription subscription = this.rabbitMqManager.createSubscription(topicName, queueName, processor);
            this.subscribe(subscription, "direct", queueName);
            return MessageResponse.SUCCESS;
        }
        catch (Exception e) {
            logger.error("\u63a5\u6536\u6d88\u606f\u51fa\u9519\uff0ctopicName\uff1a{} queueName:{}", new Object[]{topicName, queueName, e});
            throw new BusinessRuntimeException(10001, (Throwable)e);
        }
    }

    @Override
    public <T> MessageResponse subscribe(String topicName, String routingKey, IMessageProcessor<T> processor) {
        try {
            Subscription subscription = this.rabbitMqManager.createSubscription(topicName, routingKey, processor);
            this.subscribe(subscription, "topic", null);
            return MessageResponse.SUCCESS;
        }
        catch (Exception e) {
            logger.error("\u63a5\u6536\u6d88\u606f\u51fa\u9519\uff0ctopicName\uff1a{} routingKey:{}", new Object[]{topicName, routingKey, e});
            throw new BusinessRuntimeException(10001, (Throwable)e);
        }
    }

    @Override
    public <T> MessageResponse subscribe(String topicName, String queueName, String routingKey, IMessageProcessor<T> processor) {
        try {
            Subscription subscription = this.rabbitMqManager.createSubscription(topicName, routingKey, processor);
            this.subscribe(subscription, "topic", queueName);
            return MessageResponse.SUCCESS;
        }
        catch (Exception e) {
            logger.error("\u63a5\u6536\u6d88\u606f\u51fa\u9519\uff0ctopicName\uff1a{} routingKey:{}", new Object[]{topicName, routingKey, e});
            throw new BusinessRuntimeException(10001, (Throwable)e);
        }
    }

    @Override
    public <T> MessageResponse subscribe(String topicName, String[] routingKeys, IMessageProcessor<T> processor) {
        try {
            Subscription subscription = this.rabbitMqManager.createSubscription(topicName, routingKeys, processor);
            this.subscribe(subscription, "topic", null);
            return MessageResponse.SUCCESS;
        }
        catch (Exception e) {
            logger.error("\u63a5\u6536\u6d88\u606f\u51fa\u9519\uff0ctopicName\uff1a{} routingKeys:{}", new Object[]{topicName, routingKeys, e});
            throw new BusinessRuntimeException(10001, (Throwable)e);
        }
    }

    @Override
    public MessageResponse unsubscribe(String topicName, String routingKey) {
        try {
            Subscription subscription = new Subscription(topicName, routingKey, null);
            subscription.setMessageRegistryVo(this.messageRegistryVo);
            subscription.unsubscribe(this.rabbitMqManager.createChannel());
            return MessageResponse.SUCCESS;
        }
        catch (Exception e) {
            logger.error("\u53d6\u6d88\u8ba2\u9605\u5931\u8d25\uff0ctopicName\uff1a{}", (Object)topicName, (Object)e);
            throw new BusinessRuntimeException(10001, (Throwable)e);
        }
    }

    private void subscribe(Subscription subscription, String topicType, String queueName) throws IOException {
        subscription.setMessageRegistryVo(this.messageRegistryVo);
        subscription.start(this.channel, topicType, queueName);
    }
}

