/*
 * Decompiled with CFR 0.152.
 */
package com.alibaba.otter.canal.client;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.parser.Feature;
import com.alibaba.google.common.collect.Lists;
import com.alibaba.otter.canal.client.CanalMQConnector;
import com.alibaba.otter.canal.client.CanalMessageDeserializer;
import com.alibaba.otter.canal.client.ConsumerBatchMessage;
import com.alibaba.otter.canal.protocol.FlatMessage;
import com.alibaba.otter.canal.protocol.Message;
import com.alibaba.otter.canal.protocol.exception.CanalClientException;
import com.aliyun.openservices.ons.api.Action;
import com.aliyun.openservices.ons.api.ConsumeContext;
import com.aliyun.openservices.ons.api.ONSFactory;
import com.aliyun.openservices.ons.api.batch.BatchConsumer;
import com.aliyun.openservices.ons.api.batch.BatchMessageListener;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class AliyunMQCanalConnector
implements CanalMQConnector {
    private static final Logger logger = LoggerFactory.getLogger(AliyunMQCanalConnector.class);
    private String nameServer;
    private String topic;
    private String groupName;
    private volatile boolean connected = false;
    private BatchConsumer consumer;
    private BlockingQueue<ConsumerBatchMessage<?>> messageBlockingQueue;
    private int batchSize = -1;
    private int consumeThreadNums = 1;
    private long batchProcessTimeout = 60000L;
    private boolean flatMessage;
    private volatile ConsumerBatchMessage<?> lastGetBatchMessage = null;
    private String accessKey;
    private String secretKey;

    public AliyunMQCanalConnector(String nameServer, String topic, String groupName, String accessKey, String secretKey, Integer batchSize, boolean flatMessage, boolean enableMessageTrace, String customizedTraceTopic, String accessChannel, String namespace) {
        this(nameServer, topic, groupName, accessKey, secretKey, batchSize, flatMessage, enableMessageTrace, customizedTraceTopic, accessChannel);
    }

    public AliyunMQCanalConnector(String nameServer, String topic, String groupName, String accessKey, String secretKey, Integer batchSize, boolean flatMessage, boolean enableMessageTrace, String customizedTraceTopic, String accessChannel) {
        this(nameServer, topic, groupName, accessKey, secretKey, batchSize, flatMessage);
    }

    public AliyunMQCanalConnector(String nameServer, String topic, String groupName, Integer batchSize, boolean flatMessage) {
        this.nameServer = nameServer;
        this.topic = topic;
        this.groupName = groupName;
        this.flatMessage = flatMessage;
        this.messageBlockingQueue = new LinkedBlockingQueue(1024);
        this.batchSize = batchSize;
    }

    public AliyunMQCanalConnector(String nameServer, String topic, String groupName, String accessKey, String secretKey, Integer batchSize, boolean flatMessage) {
        this(nameServer, topic, groupName, batchSize, flatMessage);
        this.accessKey = accessKey;
        this.secretKey = secretKey;
    }

    public AliyunMQCanalConnector(String nameServer, String topic, String groupName, String accessKey, String secretKey, Integer batchSize, int consumeThreadNums, boolean flatMessage) {
        this(nameServer, topic, groupName, batchSize, flatMessage);
        this.accessKey = accessKey;
        this.secretKey = secretKey;
        this.consumeThreadNums = consumeThreadNums;
    }

    public void connect() throws CanalClientException {
        Properties properties = new Properties();
        properties.put("GROUP_ID", this.groupName);
        properties.put("AccessKey", this.accessKey);
        properties.put("SecretKey", this.secretKey);
        properties.put("NAMESRV_ADDR", this.nameServer);
        properties.put("MessageModel", "CLUSTERING");
        properties.setProperty("ConsumeMessageBatchMaxSize", String.valueOf(this.batchSize));
        properties.setProperty("ConsumeThreadNums", String.valueOf(this.consumeThreadNums));
        this.consumer = ONSFactory.createBatchConsumer((Properties)properties);
    }

    public void disconnect() throws CanalClientException {
        this.consumer.shutdown();
        this.connected = false;
    }

    public boolean checkValid() throws CanalClientException {
        return this.connected;
    }

    public void subscribe(String filter) throws CanalClientException {
        if (this.connected) {
            return;
        }
        try {
            if (this.consumer == null) {
                this.connect();
            }
            BatchMessageListener listener = new BatchMessageListener(){

                public Action consume(List<com.aliyun.openservices.ons.api.Message> messageExts, ConsumeContext context) {
                    logger.info("consume,messageExts.size={}", (Object)messageExts.size());
                    boolean isSuccess = AliyunMQCanalConnector.this.process(messageExts);
                    if (isSuccess) {
                        return Action.CommitMessage;
                    }
                    return Action.ReconsumeLater;
                }
            };
            this.consumer.subscribe(this.topic, filter, listener);
            if (!this.consumer.isStarted()) {
                this.consumer.start();
            }
        }
        catch (Exception e) {
            logger.error("Failed to connect to {}", (Object)this.nameServer, (Object)e);
        }
        this.connected = true;
    }

    private boolean process(List<com.aliyun.openservices.ons.api.Message> messageExts) {
        boolean isCompleted;
        if (logger.isDebugEnabled()) {
            logger.debug("Get Message: {}", messageExts);
        }
        ArrayList<Object> messageList = new ArrayList<Object>(messageExts.size());
        ArrayList<String> messageIds = new ArrayList<String>(messageExts.size());
        for (com.aliyun.openservices.ons.api.Message messageExt : messageExts) {
            byte[] data = messageExt.getBody();
            messageIds.add(messageExt.getMsgID());
            if (data != null) {
                try {
                    if (!this.flatMessage) {
                        Message message = CanalMessageDeserializer.deserializer((byte[])data);
                        messageList.add(message);
                        continue;
                    }
                    FlatMessage flatMessage = (FlatMessage)JSON.parseObject((byte[])data, FlatMessage.class, (Feature[])new Feature[0]);
                    messageList.add(flatMessage);
                    continue;
                }
                catch (Exception ex) {
                    logger.error("Add message error", (Throwable)ex);
                    throw new CanalClientException((Throwable)ex);
                }
            }
            logger.warn("Received message data is null");
        }
        ConsumerBatchMessage batchMessage = !this.flatMessage ? new ConsumerBatchMessage(messageList) : new ConsumerBatchMessage(messageList);
        try {
            this.messageBlockingQueue.put(batchMessage);
        }
        catch (InterruptedException e) {
            logger.error("Put message to queue error", (Throwable)e);
            throw new RuntimeException(e);
        }
        try {
            isCompleted = batchMessage.waitFinish(this.batchProcessTimeout);
        }
        catch (InterruptedException e) {
            logger.error("Interrupted when waiting messages to be finished.", (Throwable)e);
            throw new RuntimeException(e);
        }
        boolean isSuccess = batchMessage.isSuccess();
        logger.info("messageExts.size={},status={},messageIds={}", new Object[]{messageExts.size(), isSuccess, JSON.toJSONString(messageIds)});
        return isCompleted && isSuccess;
    }

    public void subscribe() throws CanalClientException {
        this.subscribe("*");
    }

    public void unsubscribe() throws CanalClientException {
        this.consumer.unsubscribe(this.topic);
    }

    public List<Message> getList(Long timeout, TimeUnit unit) throws CanalClientException {
        List<Message> messages = this.getListWithoutAck(timeout, unit);
        if (messages != null && !messages.isEmpty()) {
            this.ack();
        }
        return messages;
    }

    public List<Message> getListWithoutAck(Long timeout, TimeUnit unit) throws CanalClientException {
        try {
            if (this.lastGetBatchMessage != null) {
                throw new CanalClientException("mq get/ack not support concurrent & async ack");
            }
            ConsumerBatchMessage<?> batchMessage = this.messageBlockingQueue.poll(timeout, unit);
            if (batchMessage != null) {
                this.lastGetBatchMessage = batchMessage;
                return batchMessage.getData();
            }
        }
        catch (InterruptedException ex) {
            logger.warn("Get message timeout", (Throwable)ex);
            throw new CanalClientException("Failed to fetch the data after: " + timeout);
        }
        return Lists.newArrayList();
    }

    public List<FlatMessage> getFlatList(Long timeout, TimeUnit unit) throws CanalClientException {
        List<FlatMessage> messages = this.getFlatListWithoutAck(timeout, unit);
        if (messages != null && !messages.isEmpty()) {
            this.ack();
        }
        return messages;
    }

    public List<FlatMessage> getFlatListWithoutAck(Long timeout, TimeUnit unit) throws CanalClientException {
        try {
            if (this.lastGetBatchMessage != null) {
                throw new CanalClientException("mq get/ack not support concurrent & async ack");
            }
            ConsumerBatchMessage<?> batchMessage = this.messageBlockingQueue.poll(timeout, unit);
            if (batchMessage != null) {
                this.lastGetBatchMessage = batchMessage;
                return batchMessage.getData();
            }
        }
        catch (InterruptedException ex) {
            logger.warn("Get message timeout", (Throwable)ex);
            throw new CanalClientException("Failed to fetch the data after: " + timeout);
        }
        return Lists.newArrayList();
    }

    public void ack() throws CanalClientException {
        try {
            if (this.lastGetBatchMessage != null) {
                this.lastGetBatchMessage.ack();
            }
        }
        catch (Throwable e) {
            if (this.lastGetBatchMessage != null) {
                this.lastGetBatchMessage.fail();
            }
        }
        finally {
            this.lastGetBatchMessage = null;
        }
    }

    public void rollback() throws CanalClientException {
        try {
            if (this.lastGetBatchMessage != null) {
                this.lastGetBatchMessage.fail();
            }
        }
        finally {
            this.lastGetBatchMessage = null;
        }
    }

    public Message get(int batchSize) throws CanalClientException {
        throw new CanalClientException("mq not support this method");
    }

    public Message get(int batchSize, Long timeout, TimeUnit unit) throws CanalClientException {
        throw new CanalClientException("mq not support this method");
    }

    public Message getWithoutAck(int batchSize) throws CanalClientException {
        throw new CanalClientException("mq not support this method");
    }

    public Message getWithoutAck(int batchSize, Long timeout, TimeUnit unit) throws CanalClientException {
        throw new CanalClientException("mq not support this method");
    }

    public void ack(long batchId) throws CanalClientException {
        throw new CanalClientException("mq not support this method");
    }

    public void rollback(long batchId) throws CanalClientException {
        throw new CanalClientException("mq not support this method");
    }

    public void setConsumeThreadNums(int consumeThreadNums) {
        this.consumeThreadNums = consumeThreadNums;
    }
}

