package com.alibaba.dts.recordprocessor;

import com.alibaba.dts.common.Checkpoint;
import com.alibaba.dts.common.Context;
import com.alibaba.dts.common.RecordListener;
import com.alibaba.dts.common.UserCommitCallBack;
import com.alibaba.dts.common.UserRecord;
import com.alibaba.dts.common.Util;
import com.alibaba.dts.common.WorkThread;
import com.alibaba.dts.formats.avro.Record;
import com.alibaba.dts.recordgenerator.OffsetCommitCallBack;
import java.io.Closeable;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/alibaba/dts/recordprocessor/EtlRecordProcessor.class */
public class EtlRecordProcessor implements Runnable, Closeable {
    private static final Logger log = LoggerFactory.getLogger(EtlRecordProcessor.class);
    private final OffsetCommitCallBack offsetCommitCallBack;
    private volatile Checkpoint commitCheckpoint;
    private WorkThread commitThread;
    private final LinkedBlockingQueue<ConsumerRecord> toProcessRecord;
    private final AvroDeserializer fastDeserializer;
    private final Context context;
    private final Map<String, RecordListener> recordListeners;
    private volatile boolean existed;

    public boolean offer(long j, TimeUnit timeUnit, ConsumerRecord consumerRecord) {
        try {
            return this.toProcessRecord.offer(consumerRecord, j, timeUnit);
        } catch (Exception e) {
            log.error("EtlRecordProcessor: offer record failed, record[" + consumerRecord + "], cause " + e.getMessage(), e);
            return false;
        }
    }

    public EtlRecordProcessor(OffsetCommitCallBack offsetCommitCallBack, Context context) {
        this(offsetCommitCallBack, context, 512);
    }

    public EtlRecordProcessor(OffsetCommitCallBack offsetCommitCallBack, Context context, int i) {
        this.recordListeners = new HashMap();
        this.existed = false;
        this.offsetCommitCallBack = offsetCommitCallBack;
        this.toProcessRecord = new LinkedBlockingQueue<>(i);
        this.fastDeserializer = new AvroDeserializer();
        this.context = context;
        this.commitCheckpoint = new Checkpoint(null, -1L, -1L, "-1");
        this.commitThread = getCommitThread();
        this.commitThread.start();
    }

    @Override // java.lang.Runnable
    public void run() {
        while (!this.existed) {
            ConsumerRecord consumerRecord = null;
            int i = 0;
            while (true) {
                try {
                    ConsumerRecord peek = this.toProcessRecord.peek();
                    consumerRecord = peek;
                    if (null != peek || this.existed) {
                        break;
                    }
                    Util.sleepMS(5L);
                    i++;
                    if (i % 1000 == 0) {
                        log.info("EtlRecordProcessor: haven't receive records from generator for  5s");
                    }
                } catch (Exception e) {
                    log.error("EtlRecordProcessor: process record failed, raw consumer record [" + consumerRecord + "], parsed record [" + ((Object) null) + "], cause " + e.getMessage(), e);
                }
            }
            if (this.existed) {
                return;
            }
            Record deserialize = this.fastDeserializer.deserialize((byte[]) consumerRecord.value());
            log.debug("EtlRecordProcessor: meet [{}] record type", deserialize.getOperation());
            Iterator<RecordListener> it = this.recordListeners.values().iterator();
            while (it.hasNext()) {
                it.next().consume(new UserRecord(new TopicPartition(consumerRecord.topic(), consumerRecord.partition()), consumerRecord.offset(), deserialize, new UserCommitCallBack() { // from class: com.alibaba.dts.recordprocessor.EtlRecordProcessor.1
                    @Override // com.alibaba.dts.common.UserCommitCallBack
                    public void commit(TopicPartition topicPartition, Record record, long j, String str) {
                        EtlRecordProcessor.this.commitCheckpoint = new Checkpoint(topicPartition, record.getSourceTimestamp().longValue(), j, str);
                    }
                }));
            }
            this.toProcessRecord.poll();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void commit() {
        if (null == this.offsetCommitCallBack || this.commitCheckpoint.getTopicPartition() == null || this.commitCheckpoint.getOffset() == -1) {
            return;
        }
        log.info("commit record with checkpoint {}", this.commitCheckpoint);
        this.offsetCommitCallBack.commit(this.commitCheckpoint.getTopicPartition(), this.commitCheckpoint.getTimeStamp(), this.commitCheckpoint.getOffset(), this.commitCheckpoint.getInfo());
    }

    public void registerRecordListener(String str, RecordListener recordListener) {
        Util.require((null == str || null == recordListener) ? false : true, "null value not accepted");
        this.recordListeners.put(str, recordListener);
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        this.existed = true;
        this.commitThread.stop();
    }

    private WorkThread getCommitThread() {
        return new WorkThread(new Runnable() { // from class: com.alibaba.dts.recordprocessor.EtlRecordProcessor.2
            @Override // java.lang.Runnable
            public void run() {
                while (!EtlRecordProcessor.this.existed) {
                    Util.sleepMS(5000L);
                    EtlRecordProcessor.this.commit();
                }
            }
        });
    }
}
