/*
 * Decompiled with CFR 0.152.
 */
package com.dtyunxi.huieryun.datadistribute.impl;

import com.alibaba.dts.common.Checkpoint;
import com.alibaba.dts.common.Context;
import com.alibaba.dts.common.RecordListener;
import com.alibaba.dts.common.UserRecord;
import com.alibaba.dts.common.Util;
import com.alibaba.dts.common.WorkThread;
import com.alibaba.dts.formats.avro.Operation;
import com.alibaba.dts.formats.avro.Record;
import com.alibaba.dts.recordgenerator.ConsumerWrapFactory;
import com.alibaba.dts.recordgenerator.OffsetCommitCallBack;
import com.alibaba.dts.recordgenerator.RecordGenerator;
import com.alibaba.dts.recordprocessor.EtlRecordProcessor;
import com.aliyuncs.CommonRequest;
import com.aliyuncs.CommonResponse;
import com.aliyuncs.DefaultAcsClient;
import com.aliyuncs.exceptions.ClientException;
import com.aliyuncs.exceptions.ServerException;
import com.aliyuncs.http.MethodType;
import com.aliyuncs.profile.DefaultProfile;
import com.aliyuncs.profile.IClientProfile;
import com.dtyunxi.huieryun.datadistribute.DataDistributeClient;
import com.dtyunxi.huieryun.datadistribute.impl.AliyunSdkConfig;
import com.dtyunxi.huieryun.datadistribute.impl.DescribeConsumerGroup;
import com.dtyunxi.huieryun.datadistribute.impl.DtsKafkaConfig;
import com.dtyunxi.huieryun.datadistribute.impl.DtsKafkaEventHandler;
import com.dtyunxi.huieryun.mq.vo.MessageResponse;
import com.dtyunxi.util.JacksonUtil;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.RejectedExecutionException;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

public class DtsKafkaDataDistributeClient
implements DataDistributeClient {
    private static final Logger logger = LoggerFactory.getLogger(DtsKafkaDataDistributeClient.class);
    private static final SimpleDateFormat SDF = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss:SSS");
    private final List<WorkThread> workThreads = new ArrayList<WorkThread>();
    private final List<DtsKafkaConfig> configs;
    private final AliyunSdkConfig sdkConfig;
    private final DtsKafkaEventHandler dtsKafkaEventHandler;
    private final ThreadPoolTaskExecutor threadPool;
    @Value(value="${huieryun.datadistribute.kafka.queue.capacity:512}")
    private Integer queueCapacity;
    @Value(value="${huieryun.datadistribute.threadpool.enable:true}")
    private Boolean enableThreadPool = true;
    private static final List<Operation> OPERATIONS = new ArrayList<Operation>(3);

    public DtsKafkaDataDistributeClient(AliyunSdkConfig sdkConfig, List<DtsKafkaConfig> configs, ThreadPoolTaskExecutor threadPool, DtsKafkaEventHandler dtsKafkaEventHandler) {
        this.sdkConfig = sdkConfig;
        this.configs = configs;
        this.threadPool = threadPool;
        this.dtsKafkaEventHandler = dtsKafkaEventHandler;
    }

    public void start() {
        for (DtsKafkaConfig config : this.configs) {
            Properties properties = this.getConfigs(config);
            Context context = new Context();
            this.checkConfig(properties);
            RecordGenerator recordGenerator = this.getRecordGenerator(context, properties);
            EtlRecordProcessor etlRecordProcessor = this.getEtlRecordProcessor(context, properties, recordGenerator);
            RecordListener recordListener = this.getRecordListener(config);
            etlRecordProcessor.registerRecordListener(config.getKafkaTopic(), recordListener);
            this.workThreads.add(new WorkThread<EtlRecordProcessor>(etlRecordProcessor));
            this.workThreads.add(new WorkThread<RecordGenerator>(recordGenerator));
        }
        if (!this.workThreads.isEmpty()) {
            this.workThreads.forEach(workThread -> workThread.start());
        }
    }

    private RecordListener getRecordListener(final DtsKafkaConfig config) {
        RecordListener mysqlRecordPrintListener = new RecordListener(){

            @Override
            public void consume(UserRecord record) {
                try {
                    if (DtsKafkaDataDistributeClient.this.enableThreadPool.booleanValue() && DtsKafkaDataDistributeClient.this.threadPool != null) {
                        DtsKafkaDataDistributeClient.this.threadPool.execute((Runnable)new MessageHandlerExecutor(record, config));
                    } else {
                        DtsKafkaDataDistributeClient.this.messageProcess(record, config);
                    }
                }
                catch (RejectedExecutionException e) {
                    logger.error("put message handler executor into thread pool failure!", (Throwable)e);
                }
                catch (Exception e) {
                    logger.error("execute message handler executor failure!", (Throwable)e);
                }
            }
        };
        return mysqlRecordPrintListener;
    }

    private void messageProcess(UserRecord userRecord, DtsKafkaConfig config) {
        if (userRecord == null || userRecord.getRecord() == null) {
            return;
        }
        Record record = userRecord.getRecord();
        if (!OPERATIONS.contains((Object)record.getOperation())) {
            userRecord.commit(String.valueOf(record.getSourceTimestamp()));
            return;
        }
        if (logger.isDebugEnabled()) {
            logger.debug("receive dts msg,topic={},offset={},recordId={},sourceTimestamp={}", new Object[]{userRecord.getTopicPartition(), userRecord.getOffset(), record.getId(), record.getSourceTimestamp()});
        }
        long startTime = System.currentTimeMillis();
        MessageResponse response = this.dtsKafkaEventHandler.messageHandler(record, config);
        long endTime = System.currentTimeMillis();
        if (endTime - startTime > 1000L) {
            logger.info("startTime={},endTime={},handledTime ={}", new Object[]{SDF.format(startTime), SDF.format(endTime), endTime - startTime});
        }
        if (response != null && response.getresultCode() == MessageResponse.SUCCESS.getresultCode()) {
            if (logger.isDebugEnabled()) {
                logger.debug("commit dts msg,topic={},offset={},recordId={},sourceTimestamp={}", new Object[]{userRecord.getTopicPartition(), userRecord.getOffset(), record.getId(), record.getSourceTimestamp()});
            }
            userRecord.commit(String.valueOf(record.getSourceTimestamp()));
        } else {
            logger.error("message handed failure!topic={},offset={},recordId={},sourceTimestamp={},response={}", new Object[]{userRecord.getTopicPartition(), userRecord.getOffset(), record.getId(), record.getSourceTimestamp(), JacksonUtil.toJson((Object)response)});
        }
    }

    private Properties getConfigs(DtsKafkaConfig config) {
        Properties properties = new Properties();
        properties.setProperty("user", config.getUser());
        properties.setProperty("password", config.getPassword());
        properties.setProperty("sid", config.getSid());
        properties.setProperty("group", config.getGroup());
        properties.setProperty("kafkaTopic", config.getKafkaTopic());
        properties.setProperty("broker", config.getBroker());
        properties.setProperty("checkpoint", config.getCheckpoint());
        properties.setProperty("useConfigCheckpoint", config.getUseConfigCheckpoint());
        properties.setProperty("subscribeMode", config.getSubscribeMode());
        return properties;
    }

    private String getDescribeSubscriptionInstanceCheckpoint(String subscriptionInstanceId) {
        DefaultProfile profile = DefaultProfile.getProfile((String)this.sdkConfig.getRegionId(), (String)this.sdkConfig.getAccessKey(), (String)this.sdkConfig.getAccessSecret());
        DefaultAcsClient client = new DefaultAcsClient((IClientProfile)profile);
        CommonRequest request = new CommonRequest();
        request.setSysMethod(MethodType.POST);
        request.setSysDomain(this.sdkConfig.getDomain());
        request.setSysVersion(this.sdkConfig.getVersion());
        request.setSysAction("DescribeConsumerGroup");
        request.putQueryParameter("SubscriptionInstanceId", subscriptionInstanceId);
        try {
            CommonResponse response = client.getCommonResponse(request);
            DescribeConsumerGroup describeConsumerGroup = (DescribeConsumerGroup)JacksonUtil.readValue((String)response.getData(), DescribeConsumerGroup.class);
            return describeConsumerGroup.getCheckpoint();
        }
        catch (ServerException e) {
            logger.error("get describe subscription instance checkpoint failure!", (Throwable)e);
        }
        catch (ClientException e) {
            logger.error("get describe subscription instance checkpoint failure!", (Throwable)e);
        }
        return null;
    }

    private void checkConfig(Properties properties) {
        Util.require(null != properties.getProperty("user"), "use should supplied");
        Util.require(null != properties.getProperty("password"), "password should supplied");
        Util.require(null != properties.getProperty("sid"), "sid should supplied");
        Util.require(null != properties.getProperty("kafkaTopic"), "kafka topic should supplied");
        Util.require(null != properties.getProperty("broker"), "broker url should supplied");
    }

    private RecordGenerator getRecordGenerator(Context context, Properties properties) {
        RecordGenerator recordGenerator = new RecordGenerator(properties, context, this.parseCheckpoint(properties.getProperty("checkpoint")), new ConsumerWrapFactory.KafkaConsumerWrapFactory());
        context.setStreamSource(recordGenerator);
        return recordGenerator;
    }

    private Checkpoint parseCheckpoint(String checkpoint) {
        Util.require(null != checkpoint, "checkpoint should not be null");
        String[] offsetAndTS = checkpoint.split("@");
        Checkpoint streamCheckpoint = null;
        if (offsetAndTS.length == 1) {
            streamCheckpoint = new Checkpoint(null, Long.valueOf(offsetAndTS[0]), -1L, "");
        } else if (offsetAndTS.length >= 2) {
            streamCheckpoint = new Checkpoint(null, Long.valueOf(offsetAndTS[0]), Long.valueOf(offsetAndTS[1]), "");
        }
        return streamCheckpoint;
    }

    private EtlRecordProcessor getEtlRecordProcessor(Context context, Properties properties, final RecordGenerator recordGenerator) {
        EtlRecordProcessor etlRecordProcessor = new EtlRecordProcessor(new OffsetCommitCallBack(){

            @Override
            public void commit(TopicPartition tp, long timestamp, long offset, String metadata) {
                recordGenerator.setToCommitCheckpoint(new Checkpoint(tp, timestamp, offset, metadata));
            }
        }, context, this.queueCapacity);
        context.setRecordProcessor(etlRecordProcessor);
        return etlRecordProcessor;
    }

    public void stop() {
        logger.info("StreamBoot: shutting down...");
        if (null != this.workThreads && !this.workThreads.isEmpty()) {
            this.workThreads.forEach(workThread -> workThread.stop());
        }
    }

    static {
        OPERATIONS.add(Operation.INSERT);
        OPERATIONS.add(Operation.UPDATE);
        OPERATIONS.add(Operation.DELETE);
    }

    class MessageHandlerExecutor
    implements Runnable {
        private UserRecord userRecord;
        private DtsKafkaConfig config;

        public MessageHandlerExecutor(UserRecord record, DtsKafkaConfig config) {
            this.userRecord = record;
            this.config = config;
        }

        @Override
        public void run() {
            DtsKafkaDataDistributeClient.this.messageProcess(this.userRecord, this.config);
        }
    }
}

