package com.dtyunxi.huieryun.datadistribute.impl;

import com.alibaba.dts.common.Checkpoint;
import com.alibaba.dts.common.Context;
import com.alibaba.dts.common.RecordListener;
import com.alibaba.dts.common.UserRecord;
import com.alibaba.dts.common.Util;
import com.alibaba.dts.common.WorkThread;
import com.alibaba.dts.formats.avro.Operation;
import com.alibaba.dts.formats.avro.Record;
import com.alibaba.dts.recordgenerator.ConsumerWrapFactory;
import com.alibaba.dts.recordgenerator.Names;
import com.alibaba.dts.recordgenerator.OffsetCommitCallBack;
import com.alibaba.dts.recordgenerator.RecordGenerator;
import com.alibaba.dts.recordprocessor.EtlRecordProcessor;
import com.aliyuncs.CommonRequest;
import com.aliyuncs.DefaultAcsClient;
import com.aliyuncs.exceptions.ClientException;
import com.aliyuncs.exceptions.ServerException;
import com.aliyuncs.http.MethodType;
import com.aliyuncs.profile.DefaultProfile;
import com.dtyunxi.huieryun.datadistribute.DataDistributeClient;
import com.dtyunxi.huieryun.mq.vo.MessageResponse;
import com.dtyunxi.util.JacksonUtil;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.RejectedExecutionException;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

/* loaded from: input_file:com/dtyunxi/huieryun/datadistribute/impl/DtsKafkaDataDistributeClient.class */
public class DtsKafkaDataDistributeClient implements DataDistributeClient {
    private final List<DtsKafkaConfig> configs;
    private final AliyunSdkConfig sdkConfig;
    private final DtsKafkaEventHandler dtsKafkaEventHandler;
    private final ThreadPoolTaskExecutor threadPool;

    @Value("${huieryun.datadistribute.kafka.queue.capacity:512}")
    private Integer queueCapacity;
    private static final Logger logger = LoggerFactory.getLogger(DtsKafkaDataDistributeClient.class);
    private static final SimpleDateFormat SDF = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss:SSS");
    private static final List<Operation> OPERATIONS = new ArrayList(3);
    private final List<WorkThread> workThreads = new ArrayList();

    @Value("${huieryun.datadistribute.threadpool.enable:true}")
    private Boolean enableThreadPool = true;

    /* loaded from: input_file:com/dtyunxi/huieryun/datadistribute/impl/DtsKafkaDataDistributeClient$MessageHandlerExecutor.class */
    class MessageHandlerExecutor implements Runnable {
        private UserRecord userRecord;
        private DtsKafkaConfig config;

        public MessageHandlerExecutor(UserRecord userRecord, DtsKafkaConfig dtsKafkaConfig) {
            this.userRecord = userRecord;
            this.config = dtsKafkaConfig;
        }

        @Override // java.lang.Runnable
        public void run() {
            DtsKafkaDataDistributeClient.this.messageProcess(this.userRecord, this.config);
        }
    }

    public DtsKafkaDataDistributeClient(AliyunSdkConfig aliyunSdkConfig, List<DtsKafkaConfig> list, ThreadPoolTaskExecutor threadPoolTaskExecutor, DtsKafkaEventHandler dtsKafkaEventHandler) {
        this.sdkConfig = aliyunSdkConfig;
        this.configs = list;
        this.threadPool = threadPoolTaskExecutor;
        this.dtsKafkaEventHandler = dtsKafkaEventHandler;
    }

    public void start() {
        for (DtsKafkaConfig dtsKafkaConfig : this.configs) {
            Properties configs = getConfigs(dtsKafkaConfig);
            Context context = new Context();
            checkConfig(configs);
            RecordGenerator recordGenerator = getRecordGenerator(context, configs);
            EtlRecordProcessor etlRecordProcessor = getEtlRecordProcessor(context, configs, recordGenerator);
            etlRecordProcessor.registerRecordListener(dtsKafkaConfig.getKafkaTopic(), getRecordListener(dtsKafkaConfig));
            this.workThreads.add(new WorkThread(etlRecordProcessor));
            this.workThreads.add(new WorkThread(recordGenerator));
        }
        if (this.workThreads.isEmpty()) {
            return;
        }
        this.workThreads.forEach(workThread -> {
            workThread.start();
        });
    }

    private RecordListener getRecordListener(final DtsKafkaConfig dtsKafkaConfig) {
        return new RecordListener() { // from class: com.dtyunxi.huieryun.datadistribute.impl.DtsKafkaDataDistributeClient.1
            @Override // com.alibaba.dts.common.RecordListener
            public void consume(UserRecord userRecord) {
                try {
                    if (!DtsKafkaDataDistributeClient.this.enableThreadPool.booleanValue() || DtsKafkaDataDistributeClient.this.threadPool == null) {
                        DtsKafkaDataDistributeClient.this.messageProcess(userRecord, dtsKafkaConfig);
                    } else {
                        DtsKafkaDataDistributeClient.this.threadPool.execute(new MessageHandlerExecutor(userRecord, dtsKafkaConfig));
                    }
                } catch (RejectedExecutionException e) {
                    DtsKafkaDataDistributeClient.logger.error("put message handler executor into thread pool failure!", e);
                } catch (Exception e2) {
                    DtsKafkaDataDistributeClient.logger.error("execute message handler executor failure!", e2);
                }
            }
        };
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void messageProcess(UserRecord userRecord, DtsKafkaConfig dtsKafkaConfig) {
        if (userRecord == null || userRecord.getRecord() == null) {
            return;
        }
        Record record = userRecord.getRecord();
        if (!OPERATIONS.contains(record.getOperation())) {
            userRecord.commit(String.valueOf(record.getSourceTimestamp()));
            return;
        }
        if (logger.isDebugEnabled()) {
            logger.debug("receive dts msg,topic={},offset={},recordId={},sourceTimestamp={}", new Object[]{userRecord.getTopicPartition(), Long.valueOf(userRecord.getOffset()), record.getId(), record.getSourceTimestamp()});
        }
        long currentTimeMillis = System.currentTimeMillis();
        MessageResponse messageHandler = this.dtsKafkaEventHandler.messageHandler(record, dtsKafkaConfig);
        long currentTimeMillis2 = System.currentTimeMillis();
        if (currentTimeMillis2 - currentTimeMillis > 1000) {
            logger.info("startTime={},endTime={},handledTime ={}", new Object[]{SDF.format(Long.valueOf(currentTimeMillis)), SDF.format(Long.valueOf(currentTimeMillis2)), Long.valueOf(currentTimeMillis2 - currentTimeMillis)});
        }
        if (messageHandler == null || messageHandler.getresultCode() != MessageResponse.SUCCESS.getresultCode()) {
            logger.error("message handed failure!topic={},offset={},recordId={},sourceTimestamp={},response={}", new Object[]{userRecord.getTopicPartition(), Long.valueOf(userRecord.getOffset()), record.getId(), record.getSourceTimestamp(), JacksonUtil.toJson(messageHandler)});
            return;
        }
        if (logger.isDebugEnabled()) {
            logger.debug("commit dts msg,topic={},offset={},recordId={},sourceTimestamp={}", new Object[]{userRecord.getTopicPartition(), Long.valueOf(userRecord.getOffset()), record.getId(), record.getSourceTimestamp()});
        }
        userRecord.commit(String.valueOf(record.getSourceTimestamp()));
    }

    private Properties getConfigs(DtsKafkaConfig dtsKafkaConfig) {
        Properties properties = new Properties();
        properties.setProperty(Names.USER_NAME, dtsKafkaConfig.getUser());
        properties.setProperty(Names.PASSWORD_NAME, dtsKafkaConfig.getPassword());
        properties.setProperty(Names.SID_NAME, dtsKafkaConfig.getSid());
        properties.setProperty(Names.GROUP_NAME, dtsKafkaConfig.getGroup());
        properties.setProperty(Names.KAFKA_TOPIC, dtsKafkaConfig.getKafkaTopic());
        properties.setProperty(Names.KAFKA_BROKER_URL_NAME, dtsKafkaConfig.getBroker());
        properties.setProperty(Names.INITIAL_CHECKPOINT_NAME, dtsKafkaConfig.getCheckpoint());
        properties.setProperty(Names.USE_CONFIG_CHECKPOINT_NAME, dtsKafkaConfig.getUseConfigCheckpoint());
        properties.setProperty(Names.SUBSCRIBE_MODE_NAME, dtsKafkaConfig.getSubscribeMode());
        return properties;
    }

    private String getDescribeSubscriptionInstanceCheckpoint(String str) {
        DefaultAcsClient defaultAcsClient = new DefaultAcsClient(DefaultProfile.getProfile(this.sdkConfig.getRegionId(), this.sdkConfig.getAccessKey(), this.sdkConfig.getAccessSecret()));
        CommonRequest commonRequest = new CommonRequest();
        commonRequest.setSysMethod(MethodType.POST);
        commonRequest.setSysDomain(this.sdkConfig.getDomain());
        commonRequest.setSysVersion(this.sdkConfig.getVersion());
        commonRequest.setSysAction("DescribeConsumerGroup");
        commonRequest.putQueryParameter("SubscriptionInstanceId", str);
        try {
            return ((DescribeConsumerGroup) JacksonUtil.readValue(defaultAcsClient.getCommonResponse(commonRequest).getData(), DescribeConsumerGroup.class)).getCheckpoint();
        } catch (ClientException e) {
            logger.error("get describe subscription instance checkpoint failure!", e);
            return null;
        } catch (ServerException e2) {
            logger.error("get describe subscription instance checkpoint failure!", e2);
            return null;
        }
    }

    private void checkConfig(Properties properties) {
        Util.require(null != properties.getProperty(Names.USER_NAME), "use should supplied");
        Util.require(null != properties.getProperty(Names.PASSWORD_NAME), "password should supplied");
        Util.require(null != properties.getProperty(Names.SID_NAME), "sid should supplied");
        Util.require(null != properties.getProperty(Names.KAFKA_TOPIC), "kafka topic should supplied");
        Util.require(null != properties.getProperty(Names.KAFKA_BROKER_URL_NAME), "broker url should supplied");
    }

    private RecordGenerator getRecordGenerator(Context context, Properties properties) {
        RecordGenerator recordGenerator = new RecordGenerator(properties, context, parseCheckpoint(properties.getProperty(Names.INITIAL_CHECKPOINT_NAME)), new ConsumerWrapFactory.KafkaConsumerWrapFactory());
        context.setStreamSource(recordGenerator);
        return recordGenerator;
    }

    private Checkpoint parseCheckpoint(String str) {
        Util.require(null != str, "checkpoint should not be null");
        String[] split = str.split("@");
        Checkpoint checkpoint = null;
        if (split.length == 1) {
            checkpoint = new Checkpoint(null, Long.valueOf(split[0]).longValue(), -1L, "");
        } else if (split.length >= 2) {
            checkpoint = new Checkpoint(null, Long.valueOf(split[0]).longValue(), Long.valueOf(split[1]).longValue(), "");
        }
        return checkpoint;
    }

    private EtlRecordProcessor getEtlRecordProcessor(Context context, Properties properties, final RecordGenerator recordGenerator) {
        EtlRecordProcessor etlRecordProcessor = new EtlRecordProcessor(new OffsetCommitCallBack() { // from class: com.dtyunxi.huieryun.datadistribute.impl.DtsKafkaDataDistributeClient.2
            @Override // com.alibaba.dts.recordgenerator.OffsetCommitCallBack
            public void commit(TopicPartition topicPartition, long j, long j2, String str) {
                recordGenerator.setToCommitCheckpoint(new Checkpoint(topicPartition, j, j2, str));
            }
        }, context, this.queueCapacity.intValue());
        context.setRecordProcessor(etlRecordProcessor);
        return etlRecordProcessor;
    }

    public void stop() {
        logger.info("StreamBoot: shutting down...");
        if (null == this.workThreads || this.workThreads.isEmpty()) {
            return;
        }
        this.workThreads.forEach(workThread -> {
            workThread.stop();
        });
    }

    static {
        OPERATIONS.add(Operation.INSERT);
        OPERATIONS.add(Operation.UPDATE);
        OPERATIONS.add(Operation.DELETE);
    }
}
