/*
 * Decompiled with CFR 0.152.
 */
package com.dtyunxi.huieryun.mq.provider.rabbit.impl;

import com.dtyunxi.huieryun.mq.api.SendCallback;
import com.dtyunxi.huieryun.mq.provider.rabbit.impl.MessageInfo;
import com.dtyunxi.huieryun.mq.provider.rabbit.impl.MessageManager;
import com.dtyunxi.huieryun.mq.util.SerializeCode;
import com.dtyunxi.lang.BusinessRuntimeException;
import com.rabbitmq.client.AMQP;
import java.io.IOException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MessageManagerWithTopics
extends MessageManager
implements SendCallback,
Runnable {
    private static Logger logger = LoggerFactory.getLogger(MessageManagerWithTopics.class);
    private MessageInfo messageVo;

    @Override
    public void exchangeDeclare(String exchangeName) throws IOException {
        logger.debug("channel={}, exchangeName={}", (Object)this.channel, (Object)exchangeName);
        this.channel.exchangeDeclare(exchangeName, "topic", true, false, null);
    }

    public String sendTopicMessage(String routingKey, String exchangeName, long delayTime, Object message) {
        try {
            String messageId = this.getMessageId();
            byte[] bytes = SerializeCode.serialize(message);
            if (delayTime > 0L) {
                AMQP.BasicProperties props = new AMQP.BasicProperties.Builder().contentEncoding("UTF-8").messageId(messageId).deliveryMode(Integer.valueOf(2)).expiration(String.valueOf(delayTime * 1000L)).build();
                this.channel.basicPublish("delay." + exchangeName, "delay." + routingKey, props, bytes);
            } else {
                AMQP.BasicProperties props = new AMQP.BasicProperties.Builder().contentEncoding("UTF-8").messageId(messageId).deliveryMode(Integer.valueOf(2)).build();
                this.channel.basicPublish(exchangeName, routingKey, props, bytes);
            }
            return messageId;
        }
        catch (Exception e) {
            logger.error("\u53d1\u9001\u6d88\u606f\u51fa\u9519 exchangeName:{},routingKey{}", new Object[]{exchangeName, routingKey, e});
            throw new BusinessRuntimeException(10001, (Throwable)e);
        }
    }

    public void sendAsyncMessage(String routingKey, String topicName, long delayTime, Object message, SendCallback sendCallback) {
        this.messageVo = new MessageInfo(topicName, routingKey, message, delayTime);
        ExecutorService e = Executors.newCachedThreadPool();
        e.execute(this);
        sendCallback.onSuccess(this.getMessageId());
    }

    @Override
    public void run() {
        try {
            this.exchangeDeclare(this.messageVo.getTopicName());
            this.declareQueue(this.messageVo.getRoutingKey(), this.messageVo.getTopicName(), this.messageVo.getDelayTime());
            this.sendTopicMessage(this.messageVo.getRoutingKey(), this.messageVo.getTopicName(), this.messageVo.getDelayTime(), this.messageVo.getMessage());
        }
        catch (Exception e) {
            logger.error("\u53d1\u9001\u6d88\u606f\u51fa\u9519 exchangeName:{},routingKey{}", new Object[]{this.messageVo.getTopicName(), this.messageVo.getRoutingKey(), e});
            throw new BusinessRuntimeException(10001, (Throwable)e);
        }
    }

    @Override
    public void onSuccess(String messageId) {
    }

    @Override
    public void onException(Exception e, String messageId) {
    }

    @Override
    public String toString() {
        return "MessageManagerWithTopics [" + (this.messageVo != null ? "messageVo=" + this.messageVo + ", " : "") + (this.channel != null ? "channel=" + this.channel + ", " : "") + (this.executor != null ? "executor=" + this.executor + ", " : "") + (this.messageRegistryVo != null ? "messageRegistryVo=" + this.messageRegistryVo + ", " : "") + "durable=" + this.durable + ", prefetchCount=" + this.prefetchCount + ", autoAck=" + this.autoAck + "]";
    }
}

