/*
 * Decompiled with CFR 0.152.
 */
package com.alibaba.dts.recordprocessor;

import com.alibaba.dts.common.Checkpoint;
import com.alibaba.dts.common.Context;
import com.alibaba.dts.common.RecordListener;
import com.alibaba.dts.common.UserCommitCallBack;
import com.alibaba.dts.common.UserRecord;
import com.alibaba.dts.common.Util;
import com.alibaba.dts.common.WorkThread;
import com.alibaba.dts.formats.avro.Record;
import com.alibaba.dts.recordgenerator.OffsetCommitCallBack;
import com.alibaba.dts.recordprocessor.AvroDeserializer;
import java.io.Closeable;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class EtlRecordProcessor
implements Runnable,
Closeable {
    private static final Logger log = LoggerFactory.getLogger(EtlRecordProcessor.class);
    private final OffsetCommitCallBack offsetCommitCallBack;
    private volatile Checkpoint commitCheckpoint;
    private WorkThread commitThread;
    private final LinkedBlockingQueue<ConsumerRecord> toProcessRecord;
    private final AvroDeserializer fastDeserializer;
    private final Context context;
    private final Map<String, RecordListener> recordListeners = new HashMap<String, RecordListener>();
    private volatile boolean existed = false;

    public boolean offer(long timeOut, TimeUnit timeUnit, ConsumerRecord record) {
        try {
            return this.toProcessRecord.offer(record, timeOut, timeUnit);
        }
        catch (Exception e) {
            log.error("EtlRecordProcessor: offer record failed, record[" + record + "], cause " + e.getMessage(), (Throwable)e);
            return false;
        }
    }

    public EtlRecordProcessor(OffsetCommitCallBack offsetCommitCallBack, Context context) {
        this(offsetCommitCallBack, context, 512);
    }

    public EtlRecordProcessor(OffsetCommitCallBack offsetCommitCallBack, Context context, int queueCapacity) {
        this.offsetCommitCallBack = offsetCommitCallBack;
        this.toProcessRecord = new LinkedBlockingQueue(queueCapacity);
        this.fastDeserializer = new AvroDeserializer();
        this.context = context;
        this.commitCheckpoint = new Checkpoint(null, -1L, -1L, "-1");
        this.commitThread = this.getCommitThread();
        this.commitThread.start();
    }

    @Override
    public void run() {
        while (!this.existed) {
            ConsumerRecord toProcess = null;
            Record record = null;
            int fetchFailedCount = 0;
            try {
                while (null == (toProcess = this.toProcessRecord.peek()) && !this.existed) {
                    Util.sleepMS(5L);
                    if (++fetchFailedCount % 1000 != 0) continue;
                    log.info("EtlRecordProcessor: haven't receive records from generator for  5s");
                }
                if (this.existed) {
                    return;
                }
                fetchFailedCount = 0;
                ConsumerRecord consumerRecord = toProcess;
                record = this.fastDeserializer.deserialize((byte[])consumerRecord.value());
                log.debug("EtlRecordProcessor: meet [{}] record type", (Object)record.getOperation());
                for (RecordListener recordListener : this.recordListeners.values()) {
                    recordListener.consume(new UserRecord(new TopicPartition(consumerRecord.topic(), consumerRecord.partition()), consumerRecord.offset(), record, new UserCommitCallBack(){

                        @Override
                        public void commit(TopicPartition tp, Record commitRecord, long offset, String metadata) {
                            EtlRecordProcessor.this.commitCheckpoint = new Checkpoint(tp, commitRecord.getSourceTimestamp(), offset, metadata);
                        }
                    }));
                }
                this.toProcessRecord.poll();
            }
            catch (Exception e) {
                log.error("EtlRecordProcessor: process record failed, raw consumer record [" + toProcess + "], parsed record [" + record + "], cause " + e.getMessage(), (Throwable)e);
            }
        }
    }

    private void commit() {
        if (null != this.offsetCommitCallBack && this.commitCheckpoint.getTopicPartition() != null && this.commitCheckpoint.getOffset() != -1L) {
            log.info("commit record with checkpoint {}", (Object)this.commitCheckpoint);
            this.offsetCommitCallBack.commit(this.commitCheckpoint.getTopicPartition(), this.commitCheckpoint.getTimeStamp(), this.commitCheckpoint.getOffset(), this.commitCheckpoint.getInfo());
        }
    }

    public void registerRecordListener(String name, RecordListener recordListener) {
        Util.require(null != name && null != recordListener, "null value not accepted");
        this.recordListeners.put(name, recordListener);
    }

    @Override
    public void close() {
        this.existed = true;
        this.commitThread.stop();
    }

    private WorkThread getCommitThread() {
        WorkThread<2> workThread = new WorkThread<2>(new Runnable(){

            @Override
            public void run() {
                while (!EtlRecordProcessor.this.existed) {
                    Util.sleepMS(5000L);
                    EtlRecordProcessor.this.commit();
                }
            }
        });
        return workThread;
    }
}

