/*
 * Decompiled with CFR 0.152.
 */
package com.dtyunxi.huieryun.datasubscribe.provider.canal;

import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.protocol.Message;
import com.dtyunxi.huieryun.datasubscribe.provider.DataSubcribeEventListener;
import com.dtyunxi.huieryun.datasubscribe.provider.canal.CanalClient;
import com.dtyunxi.huieryun.datasubscribe.provider.canal.CanalEventHandler;
import com.google.common.eventbus.EventBus;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;

public class CanalDataSubscribeService
implements Runnable {
    private static Logger logger = LoggerFactory.getLogger(CanalDataSubscribeService.class);
    private static final int SLEEP_TIME = 500;
    private static final int DEFAULT_BATCHSIZE = 1024;
    public static final int QUEUE_MAX_SIZE = 10000;
    private CanalClient canalClient;
    private DataSubcribeEventListener eventListener;
    private CanalEventHandler eventHandler;

    public CanalDataSubscribeService(CanalClient canalClient, DataSubcribeEventListener eventListener) {
        logger.info("Init CanalSubscribeService. canalClient is {}.", (Object)canalClient);
        this.canalClient = canalClient;
        this.eventListener = eventListener;
        this.eventHandler = new CanalEventHandler();
    }

    @Override
    public void run() {
        this.process();
    }

    public void process() {
        Integer batchSize = this.canalClient.getCanalConfig().getBatchSize();
        batchSize = batchSize == null ? 1024 : batchSize;
        MDC.put((String)"destination", (String)this.canalClient.getCanalConfig().getDestination());
        MDC.put((String)"tableFilterRegex", (String)this.canalClient.getCanalConfig().getTableFilterRegex());
        CanalConnector connector = this.canalClient.getConnector();
        EventBus eventBus = new EventBus("CanalEventBus");
        eventBus.register((Object)this.eventListener);
        while (true) {
            long batchId = -1L;
            try {
                Message message = connector.getWithoutAck(batchSize.intValue());
                batchId = message.getId();
                int size = message.getEntries().size();
                if (batchId == -1L && size == 0) {
                    Thread.sleep(500L);
                } else {
                    this.eventHandler.onEvent(message, eventBus);
                }
                connector.ack(batchId);
                continue;
            }
            catch (Exception e) {
                connector.rollback(batchId);
                logger.error("", (Throwable)e);
                continue;
            }
            break;
        }
    }
}

