/*
 * Decompiled with CFR 0.152.
 */
package com.alibaba.dts.recordgenerator;

import com.alibaba.dts.common.Checkpoint;
import com.alibaba.dts.common.Util;
import java.io.Closeable;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Supplier;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class ConsumerWrap
implements Closeable {
    private static final Logger log = LoggerFactory.getLogger(ConsumerWrap.class);

    public abstract void setFetchOffsetByOffset(TopicPartition var1, Checkpoint var2);

    public abstract void setFetchOffsetByTimestamp(TopicPartition var1, Checkpoint var2);

    public abstract void assignTopic(TopicPartition var1, Checkpoint var2);

    public abstract void subscribeTopic(TopicPartition var1, Supplier<Checkpoint> var2);

    public abstract ConsumerRecords<byte[], byte[]> poll();

    public abstract KafkaConsumer getRawConsumer();

    public static class DefaultConsumerWrap
    extends ConsumerWrap {
        private AtomicBoolean firstStart = new AtomicBoolean(true);
        private KafkaConsumer<byte[], byte[]> consumer;
        private final long poolTimeOut;

        public DefaultConsumerWrap(Properties properties) {
            Properties consumerConfig = new Properties();
            Util.mergeSourceKafkaProperties(properties, consumerConfig);
            this.checkConfig(consumerConfig);
            this.consumer = new KafkaConsumer(consumerConfig);
            this.poolTimeOut = Long.valueOf(properties.getProperty("stream.pool.timeout", "500"));
        }

        @Override
        public void setFetchOffsetByOffset(TopicPartition topicPartition, Checkpoint checkpoint) {
            this.consumer.seek(topicPartition, checkpoint.getOffset());
        }

        @Override
        public void setFetchOffsetByTimestamp(TopicPartition topicPartition, Checkpoint checkpoint) {
            long timeStamp = checkpoint.getTimeStamp();
            Map remoteOffset = this.consumer.offsetsForTimes(Collections.singletonMap(topicPartition, timeStamp));
            OffsetAndTimestamp toSet = (OffsetAndTimestamp)remoteOffset.get(topicPartition);
            if (null == toSet) {
                throw new RuntimeException("RecordGenerator:seek timestamp for topic [" + topicPartition + "] with timestamp [" + timeStamp + "] failed");
            }
            this.consumer.seek(topicPartition, toSet.offset());
        }

        @Override
        public void assignTopic(TopicPartition topicPartition, Checkpoint checkpoint) {
            this.consumer.assign(Arrays.asList(topicPartition));
            log.info("RecordGenerator:  assigned for {} with checkpoint {}", (Object)topicPartition, (Object)checkpoint);
            this.setFetchOffsetByTimestamp(topicPartition, checkpoint);
        }

        @Override
        public void subscribeTopic(final TopicPartition topicPartition, final Supplier<Checkpoint> streamCheckpoint) {
            this.consumer.subscribe(Arrays.asList(topicPartition.topic()), new ConsumerRebalanceListener(){

                public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
                    log.info("RecordGenerator: partition revoked for [{}]", (Object)StringUtils.join(partitions, (String)","));
                }

                public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
                    log.info("RecordGenerator: partition assigned for [{}]", (Object)StringUtils.join(partitions, (String)","));
                    if (partitions.contains(topicPartition)) {
                        if (firstStart.compareAndSet(true, false)) {
                            Checkpoint toSet = (Checkpoint)streamCheckpoint.get();
                            this.setFetchOffsetByTimestamp(topicPartition, toSet);
                            log.info("RecordGenerator:  subscribe for [{}] with checkpoint [{}] first start", (Object)topicPartition, (Object)toSet);
                        } else {
                            log.info("RecordGenerator:  subscribe for [{}]  reassign, do nothing", (Object)topicPartition);
                        }
                    }
                }
            });
        }

        @Override
        public ConsumerRecords<byte[], byte[]> poll() {
            return this.consumer.poll(Duration.ofMillis(this.poolTimeOut));
        }

        @Override
        public KafkaConsumer getRawConsumer() {
            return this.consumer;
        }

        @Override
        public synchronized void close() {
            if (null != this.consumer) {
                this.consumer.close();
            }
        }

        private void checkConfig(Properties properties) {
        }
    }
}

