/*
 * Decompiled with CFR 0.152.
 */
package com.dtyunxi.huieryun.datadistribute.impl;

import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalMQConnector;
import com.alibaba.otter.canal.protocol.Message;
import com.alibaba.otter.canal.protocol.exception.CanalClientException;
import com.dtyunxi.huieryun.datadistribute.DataDistributeClient;
import com.dtyunxi.huieryun.datadistribute.impl.CanalClientType;
import com.dtyunxi.huieryun.datadistribute.impl.CanalConfig;
import com.dtyunxi.huieryun.datadistribute.impl.CanalEventHandler;
import com.dtyunxi.huieryun.mq.vo.MessageResponse;
import java.net.ConnectException;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
import org.springframework.beans.factory.annotation.Value;

public class CanalDataDistributeClient
implements DataDistributeClient {
    private static final Logger logger = LoggerFactory.getLogger(CanalDataDistributeClient.class);
    @Value(value="${huieryun.datadistribute.canal.sleep.time:500}")
    private Integer SLEEP_TIME = 500;
    private final CanalConfig canalConfig;
    private final CanalEventHandler canalEventHandler;
    private CanalConnector connector;
    private Thread thread;
    private volatile boolean running = false;
    private int retryCount = 0;
    private Thread.UncaughtExceptionHandler handler = new Thread.UncaughtExceptionHandler(){

        @Override
        public void uncaughtException(Thread t, Throwable e) {
            logger.error("parse events has an error", e);
        }
    };

    public CanalDataDistributeClient(CanalConfig canalConfig, CanalEventHandler canalEventHandler) {
        this.canalConfig = canalConfig;
        this.canalEventHandler = canalEventHandler;
    }

    public void start() {
        String connectUrl = this.canalConfig.getConnectUrl();
        String destination = this.canalConfig.getDestination();
        logger.info("dataDistribute client is begin, use canal, connecturl [{}] destiantion [{}], tableFilterRegex [{}]", new Object[]{connectUrl, destination, this.canalConfig.getTableFilterRegex()});
        this.connector = this.canalConfig.createConnectorByType();
        this.thread = new Thread(new Runnable(){

            @Override
            public void run() {
                CanalDataDistributeClient.this.process();
            }
        }, "CanalDataDistributeClient");
        this.thread.setUncaughtExceptionHandler(this.handler);
        this.thread.start();
        this.running = true;
        logger.info("dataDistribute client is running, use canal, connecturl [{}] destiantion [{}]", (Object)connectUrl, (Object)destination);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void process() {
        boolean result = false;
        CanalClientType canalClientType = this.canalConfig.getCanalClientType();
        while (this.running) {
            try {
                MDC.put((String)"destination", (String)this.canalConfig.getDestination());
                MDC.put((String)"tableFilterRegex", (String)this.canalConfig.getTableFilterRegex());
                this.subscribe(this.canalConfig);
                while (this.running) {
                    result = false;
                    switch (canalClientType) {
                        case SIMPLE: 
                        case CLUSTER: {
                            result = this.processTcpMessage();
                            break;
                        }
                        case ALIYUNMQ: 
                        case ROCKETMQ: 
                        case KAFKA: {
                            result = this.processMqMessage();
                        }
                    }
                    if (result) continue;
                    try {
                        Thread.sleep(this.SLEEP_TIME.intValue());
                    }
                    catch (IllegalArgumentException | InterruptedException exception) {}
                }
            }
            catch (CanalClientException ce) {
                this.connector.rollback();
                logger.error("process error!", (Throwable)ce);
            }
            catch (Exception e) {
                this.connector.rollback();
                logger.error("process error!", (Throwable)e);
            }
            finally {
                this.connector.disconnect();
                MDC.remove((String)"destination");
            }
        }
        logger.info("dataDistribute client is stop");
    }

    private boolean processTcpMessage() {
        Message message = this.connector.getWithoutAck(this.canalConfig.getBatchSize());
        long batchId = message.getId();
        int size = message.getEntries().size();
        if (batchId == -1L && size == 0) {
            this.connector.ack(batchId);
            return false;
        }
        MessageResponse response = this.canalEventHandler.onEvent(message);
        if (response.equals(MessageResponse.SUCCESS)) {
            this.connector.ack(batchId);
        } else {
            this.connector.rollback();
        }
        return true;
    }

    private boolean processMqMessage() {
        CanalMQConnector mqConnector = (CanalMQConnector)this.connector;
        MessageResponse response = null;
        if (this.canalConfig.isFlatMessage()) {
            List flatMessages = mqConnector.getFlatListWithoutAck(Long.valueOf(0L), TimeUnit.NANOSECONDS);
            if (flatMessages.isEmpty()) {
                mqConnector.ack();
                return false;
            }
            try {
                response = this.canalEventHandler.onFlatMessageEvent(flatMessages);
            }
            catch (Exception e) {
                logger.error("execute onFlatMessageEvent cause ERROR", (Throwable)e);
            }
        } else {
            List messages = mqConnector.getListWithoutAck(Long.valueOf(0L), TimeUnit.NANOSECONDS);
            if (messages.isEmpty()) {
                mqConnector.ack();
                return false;
            }
            response = this.canalEventHandler.onMessagesEvent(messages);
        }
        if (response != null && response.getResultMsg().equals(MessageResponse.SUCCESS.getResultMsg())) {
            logger.info("ack");
            mqConnector.ack();
        } else {
            logger.warn("RollBack--------");
            mqConnector.rollback();
        }
        return true;
    }

    private void subscribe(CanalConfig canalConfig) {
        block7: {
            try {
                this.connector.connect();
                if (StringUtils.isNoneBlank((CharSequence[])new CharSequence[]{canalConfig.getTableFilterRegex()}) && !canalConfig.getCanalClientType().equals((Object)CanalClientType.KAFKA)) {
                    this.connector.subscribe(canalConfig.getTableFilterRegex());
                } else {
                    this.connector.subscribe();
                }
            }
            catch (CanalClientException ce) {
                logger.error("cannot connect to canal server", (Throwable)ce);
                if (!(ce.getCause() instanceof ConnectException)) break block7;
                ++this.retryCount;
                if (this.retryCount < 5) {
                    try {
                        Thread.sleep(1000L);
                        logger.error("reconnect to canal server with retry = {}", (Object)this.retryCount);
                        this.subscribe(canalConfig);
                    }
                    catch (InterruptedException ie) {
                        logger.error("cannot connect to canal server, stop canal client!", (Throwable)ie);
                        this.stop();
                    }
                }
                logger.error("cannot connect to canal server, stop canal client!");
                this.stop();
            }
        }
    }

    public void stop() {
        if (!this.running) {
            return;
        }
        this.running = false;
        if (this.thread != null) {
            try {
                this.thread.join();
            }
            catch (InterruptedException e) {
                logger.error("stop unkown error occur: ", (Throwable)e);
            }
        }
        if (null != this.connector) {
            this.connector.unsubscribe();
        }
        MDC.remove((String)"destination");
        MDC.remove((String)"tableFilterRegex");
    }
}

