/*
 * Decompiled with CFR 0.152.
 */
package com.dtyunxi.huieryun.datasubscribe.provider.dts;

import com.aliyun.drc.client.message.DataMessage;
import com.aliyun.drc.clusterclient.ClusterListener;
import com.aliyun.drc.clusterclient.DefaultClusterClient;
import com.aliyun.drc.clusterclient.RegionContext;
import com.aliyun.drc.clusterclient.message.ClusterMessage;
import com.dtyunxi.huieryun.datasubscribe.event.DataSubScribeEvent;
import com.dtyunxi.huieryun.datasubscribe.provider.DataSubcribeEventListener;
import com.dtyunxi.huieryun.datasubscribe.provider.dts.DtsEventHandler;
import com.google.common.eventbus.EventBus;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class DtsSubscribeJobThread
extends Thread {
    private static Logger logger = LoggerFactory.getLogger(DtsSubscribeJobThread.class);
    private String accessKey;
    private String accessSecret;
    private String subscribeInstanceId;
    private DtsEventHandler dtsEventHandler;
    private DataSubcribeEventListener dataSubcribeEventListener;

    public DtsSubscribeJobThread(String accessKey, String accessSecret, String subscribeInstanceId, DtsEventHandler dtsEventHandler, DataSubcribeEventListener dataSubcribeEventListener) {
        this.accessKey = accessKey;
        this.accessSecret = accessSecret;
        this.subscribeInstanceId = subscribeInstanceId;
        this.dtsEventHandler = dtsEventHandler;
        this.dataSubcribeEventListener = dataSubcribeEventListener;
    }

    @Override
    public void run() {
        RegionContext context = this.getRegionContext(this.accessKey, this.accessSecret, true);
        DefaultClusterClient client = new DefaultClusterClient(context);
        client.addConcurrentListener(new ClusterListener(){

            public void noException(Exception e) {
                logger.warn("clusterListener throw Exception", (Throwable)e);
            }

            public void notify(List<ClusterMessage> messages) throws Exception {
                EventBus eventBus = new EventBus("DtsEventBus");
                eventBus.register((Object)DtsSubscribeJobThread.this.dataSubcribeEventListener);
                for (ClusterMessage message : messages) {
                    DataMessage.Record record = (DataMessage.Record)message.getRecord();
                    logger.info("get messages success");
                    DataSubScribeEvent event = DtsSubscribeJobThread.this.dtsEventHandler.onEvent(record);
                    if (null != event) {
                        eventBus.post((Object)event);
                    }
                    message.ackAsConsumed();
                }
            }
        });
        try {
            client.askForGUID(this.subscribeInstanceId);
            client.start();
            logger.info("dts started! subscribeInstanceId={}", (Object)this.subscribeInstanceId);
        }
        catch (Exception e1) {
            throw new RuntimeException("DTS Client\u542f\u52a8\u5931\u8d25", e1);
        }
    }

    public RegionContext getRegionContext(String accessKey, String accessSecret, Boolean publicIp) {
        RegionContext context = new RegionContext();
        context.setAccessKey(accessKey);
        context.setSecret(accessSecret);
        context.setUsePublicIp(publicIp.booleanValue());
        return context;
    }
}

