/*
 * Decompiled with CFR 0.152.
 */
package com.alibaba.dts.recordgenerator;

import com.alibaba.dts.common.Checkpoint;
import com.alibaba.dts.common.Context;
import com.alibaba.dts.common.Util;
import com.alibaba.dts.metastore.KafkaMetaStore;
import com.alibaba.dts.metastore.LocalFileMetaStore;
import com.alibaba.dts.metastore.MetaStoreCenter;
import com.alibaba.dts.recordgenerator.ConsumerWrap;
import com.alibaba.dts.recordgenerator.ConsumerWrapFactory;
import com.alibaba.dts.recordprocessor.EtlRecordProcessor;
import java.io.Closeable;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class RecordGenerator
implements Runnable,
Closeable {
    private static final Logger log = LoggerFactory.getLogger(RecordGenerator.class);
    private static final String LOCAL_FILE_STORE_NAME = "localCheckpointStore";
    private static final String KAFKA_STORE_NAME = "kafkaCheckpointStore";
    private final Properties properties;
    private final int tryTime;
    private final Context context;
    private final TopicPartition topicPartition;
    private final String groupID;
    private final ConsumerWrapFactory consumerWrapFactory;
    private final Checkpoint initialCheckpoint;
    private volatile Checkpoint toCommitCheckpoint = null;
    private final MetaStoreCenter metaStoreCenter = new MetaStoreCenter();
    private final AtomicBoolean useCheckpointConfig;
    private final ConsumerSubscribeMode subscribeMode;
    private final long tryBackTimeMS;
    private volatile boolean existed;

    public RecordGenerator(Properties properties, Context context, Checkpoint initialCheckpoint, ConsumerWrapFactory consumerWrapFactory) {
        this.properties = properties;
        this.tryTime = Integer.valueOf(properties.getProperty("stream.tryTime", "150"));
        this.tryBackTimeMS = Long.valueOf(properties.getProperty("stream.tryBackTimeMS", "10000"));
        this.context = context;
        this.consumerWrapFactory = consumerWrapFactory;
        this.initialCheckpoint = initialCheckpoint;
        this.topicPartition = new TopicPartition(properties.getProperty("kafkaTopic"), 0);
        this.groupID = properties.getProperty("group");
        this.subscribeMode = this.parseConsumerSubscribeMode(properties.getProperty("subscribeMode", "assign"));
        this.useCheckpointConfig = new AtomicBoolean(StringUtils.equalsIgnoreCase((CharSequence)properties.getProperty("useConfigCheckpoint"), (CharSequence)"true"));
        this.existed = false;
        String sid = properties.getProperty("sid");
        this.metaStoreCenter.registerStore(LOCAL_FILE_STORE_NAME, new LocalFileMetaStore("localCheckpointStore_" + sid));
        log.info("RecordGenerator: try time [" + this.tryTime + "], try backTimeMS [" + this.tryBackTimeMS + "]");
    }

    private ConsumerWrap getConsumerWrap() {
        return this.consumerWrapFactory.getConsumerWrap(this.properties);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void run() {
        int haveTryTime = 0;
        String message = "first start";
        ConsumerWrap kafkaConsumerWrap = null;
        while (!this.existed) {
            EtlRecordProcessor recordProcessor = this.context.getRecordProcessor();
            try {
                kafkaConsumerWrap = this.getConsumerWrap(message);
                while (!this.existed) {
                    this.mayCommitCheckpoint();
                    ConsumerRecords<byte[], byte[]> records = kafkaConsumerWrap.poll();
                    for (ConsumerRecord record : records) {
                        int offerTryCount = 0;
                        if (record.value() == null || ((byte[])record.value()).length <= 2) continue;
                        while (!recordProcessor.offer(1000L, TimeUnit.MILLISECONDS, record) && !this.existed) {
                            if (++offerTryCount % 10 != 0) continue;
                            log.info("RecordGenerator: offer record has failed for a period (10s) [ " + record + "]");
                        }
                    }
                }
            }
            catch (Throwable e) {
                if (this.isErrorRecoverable(e) && haveTryTime++ < this.tryTime) {
                    log.warn("RecordGenerator: error meet cause " + e.getMessage() + ", recover time [" + haveTryTime + "]", e);
                    Util.sleepMS(this.tryBackTimeMS);
                    message = "reconnect";
                    continue;
                }
                log.error("RecordGenerator: unrecoverable error  " + e.getMessage() + ", have try time [" + haveTryTime + "]", e);
                this.existed = true;
            }
            finally {
                Util.swallowErrorClose(kafkaConsumerWrap);
            }
        }
    }

    private void mayCommitCheckpoint() {
        if (null != this.toCommitCheckpoint) {
            this.commitCheckpoint(this.toCommitCheckpoint.getTopicPartition(), this.toCommitCheckpoint);
            this.toCommitCheckpoint = null;
        }
    }

    public void setToCommitCheckpoint(Checkpoint committedCheckpoint) {
        this.toCommitCheckpoint = committedCheckpoint;
    }

    private ConsumerWrap getConsumerWrap(String message) {
        ConsumerWrap kafkaConsumerWrap = this.getConsumerWrap();
        Checkpoint checkpoint = null;
        this.metaStoreCenter.registerStore(KAFKA_STORE_NAME, new KafkaMetaStore(kafkaConsumerWrap.getRawConsumer()));
        if (this.useCheckpointConfig.compareAndSet(true, false)) {
            log.info("RecordGenerator: force use initial checkpoint [{}] to start", (Object)checkpoint);
            checkpoint = this.initialCheckpoint;
        } else {
            checkpoint = this.getCheckpoint();
            if (null == checkpoint || Checkpoint.INVALID_STREAM_CHECKPOINT == checkpoint) {
                checkpoint = this.initialCheckpoint;
                log.info("RecordGenerator: use initial checkpoint [{}] to start", (Object)checkpoint);
            } else {
                log.info("RecordGenerator: load checkpoint from checkpoint store success, current checkpoint [{}]", (Object)checkpoint);
                if (checkpoint.getTimeStamp() < this.initialCheckpoint.getTimeStamp()) {
                    checkpoint = this.initialCheckpoint;
                    log.info("RecordGenerator: after set user defined checkpoint, current checkpoint [{}]", (Object)checkpoint);
                }
            }
        }
        switch (this.subscribeMode) {
            case SUBSCRIBE: {
                kafkaConsumerWrap.subscribeTopic(this.topicPartition, () -> {
                    Checkpoint ret = this.metaStoreCenter.seek(KAFKA_STORE_NAME, this.topicPartition, this.groupID);
                    if (null == ret) {
                        ret = this.initialCheckpoint;
                    } else if (this.initialCheckpoint.getTimeStamp() > ret.getTimeStamp()) {
                        log.info("RecordGenerator: use user defined checkpoint, current checkpoint [{}]", (Object)this.initialCheckpoint);
                        return this.initialCheckpoint;
                    }
                    return ret;
                });
                break;
            }
            case ASSIGN: {
                kafkaConsumerWrap.assignTopic(this.topicPartition, checkpoint);
                break;
            }
            default: {
                throw new RuntimeException("RecordGenerator: unknown mode not support");
            }
        }
        log.info("RecordGenerator:" + message + ", checkpoint " + checkpoint);
        return kafkaConsumerWrap;
    }

    private Checkpoint getCheckpoint() {
        Checkpoint checkpoint = this.metaStoreCenter.seek(LOCAL_FILE_STORE_NAME, this.topicPartition, this.groupID);
        if (null == checkpoint) {
            checkpoint = this.metaStoreCenter.seek(KAFKA_STORE_NAME, this.topicPartition, this.groupID);
        }
        return checkpoint;
    }

    public void commitCheckpoint(TopicPartition topicPartition, Checkpoint checkpoint) {
        if (null != topicPartition && null != checkpoint) {
            this.metaStoreCenter.store(topicPartition, this.groupID, checkpoint);
        }
    }

    private boolean isErrorRecoverable(Throwable e) {
        return true;
    }

    public Checkpoint getInitialCheckpoint() {
        return this.initialCheckpoint;
    }

    @Override
    public void close() {
        this.existed = true;
    }

    private ConsumerSubscribeMode parseConsumerSubscribeMode(String value) {
        if (StringUtils.equalsIgnoreCase((CharSequence)"assign", (CharSequence)value)) {
            return ConsumerSubscribeMode.ASSIGN;
        }
        if (StringUtils.equalsIgnoreCase((CharSequence)"subscribe", (CharSequence)value)) {
            return ConsumerSubscribeMode.SUBSCRIBE;
        }
        throw new RuntimeException("RecordGenerator: unknown subscribe mode [" + value + "]");
    }

    private static enum ConsumerSubscribeMode {
        ASSIGN,
        SUBSCRIBE,
        UNKNOWN;

    }
}

