/*
 * Decompiled with CFR 0.152.
 */
package com.dtyunxi.huieryun.mq.provider.rabbit.impl;

import com.dtyunxi.huieryun.mq.api.IMessageProcessor;
import com.dtyunxi.huieryun.mq.provider.base.BaseService;
import com.dtyunxi.huieryun.mq.provider.rabbit.impl.SubscriptionConsumer;
import com.dtyunxi.lang.BusinessRuntimeException;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Consumer;
import java.io.IOException;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class Subscription
extends BaseService {
    private static final Logger logger = LoggerFactory.getLogger(Subscription.class);
    private String routingKey;
    private String[] routingKeys;
    private String exchange;
    private IMessageProcessor processor;

    public <T> Subscription(String exchange, String routingKey, IMessageProcessor<T> processor) {
        this.exchange = exchange;
        this.routingKey = routingKey == null ? "*" : routingKey;
        this.processor = processor;
    }

    public <T> Subscription(String exchange, String[] routingKeys, IMessageProcessor<T> processor) {
        this.exchange = exchange;
        this.routingKeys = routingKeys;
        this.processor = processor;
    }

    public String start(Channel channel, String topicType, String queueName) throws IOException {
        if (channel != null) {
            channel.basicQos(this.prefetchCount);
            queueName = this.buildQueue(channel, topicType, queueName);
            try {
                SubscriptionConsumer consumer = new SubscriptionConsumer(channel, this.processor);
                String consumerTag = channel.basicConsume(queueName, this.autoAck, (Consumer)consumer);
                logger.info("Consuming  with tag:{} on channel:{}", (Object)consumerTag, (Object)channel);
                return consumerTag;
            }
            catch (Exception e) {
                logger.error("Failed to start consuming queue", (Throwable)e);
                throw new BusinessRuntimeException(10001, (Throwable)e);
            }
        }
        return null;
    }

    private String buildQueue(Channel channel, String topicType, String queueName) throws IOException {
        return this.queueBind(queueName, channel, this.routingKey, topicType);
    }

    private String queueBind(String queueName, Channel channel, String routingKey, String topicType) throws IOException {
        channel.exchangeDeclare(this.exchange, topicType, this.durable, false, null);
        if (StringUtils.isEmpty((CharSequence)queueName)) {
            queueName = channel.queueDeclare().getQueue();
        } else {
            channel.queueDeclare(queueName, this.durable, false, false, null);
        }
        channel.queueBind(queueName, this.exchange, routingKey);
        return queueName;
    }

    public void unsubscribe(Channel channel) {
        if (channel != null) {
            try {
                channel.queueUnbind(this.routingKey, this.exchange, this.routingKey);
            }
            catch (Exception e) {
                logger.error("\u53d6\u6d88\u8ba2\u9605\u5931\u8d25\uff0c exchange:{} queue:{}", new Object[]{this.exchange, this.routingKey, e});
                throw new BusinessRuntimeException(10001, (Throwable)e);
            }
        }
    }
}

