/*
 * Decompiled with CFR 0.152.
 */
package com.aliyun.dts.subscribe.clients.recordprocessor;

import com.aliyun.dts.subscribe.clients.ConsumerContext;
import com.aliyun.dts.subscribe.clients.common.RecordListener;
import com.aliyun.dts.subscribe.clients.common.Util;
import com.aliyun.dts.subscribe.clients.record.DefaultUserRecord;
import java.io.Closeable;
import java.util.Map;
import java.util.concurrent.LinkedBlockingQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class EtlRecordProcessor
implements Runnable,
Closeable {
    private static final Logger log = LoggerFactory.getLogger(EtlRecordProcessor.class);
    private final LinkedBlockingQueue<DefaultUserRecord> toProcessRecord;
    private final Map<String, RecordListener> recordListeners;
    private ConsumerContext consumerContext;

    public EtlRecordProcessor(ConsumerContext consumerContext, LinkedBlockingQueue<DefaultUserRecord> toProcessRecord, Map<String, RecordListener> recordListeners) {
        this.consumerContext = consumerContext;
        this.toProcessRecord = toProcessRecord;
        this.recordListeners = recordListeners;
    }

    @Override
    public void run() {
        while (!this.consumerContext.isExited()) {
            DefaultUserRecord toProcess = null;
            int fetchFailedCount = 0;
            try {
                while (null == (toProcess = this.toProcessRecord.peek()) && !this.consumerContext.isExited()) {
                    Util.sleepMS(5L);
                    if (++fetchFailedCount % 1000 != 0 || !this.consumerContext.hasValidTopicPartitions()) continue;
                    log.info("EtlRecordProcessor: haven't receive records from generator for  5s");
                }
                if (this.consumerContext.isExited()) {
                    return;
                }
                fetchFailedCount = 0;
                DefaultUserRecord consumerRecord = toProcess;
                for (RecordListener recordListener : this.recordListeners.values()) {
                    recordListener.consume(consumerRecord);
                }
                this.toProcessRecord.poll();
            }
            catch (Exception e) {
                log.error("EtlRecordProcessor: process record failed, raw consumer record [" + toProcess + "],  cause " + e.getMessage(), (Throwable)e);
                this.consumerContext.exit();
            }
        }
    }

    public void registerRecordListener(String name, RecordListener recordListener) {
        Util.require(null != name && null != recordListener, "null value not accepted");
        this.recordListeners.put(name, recordListener);
    }

    @Override
    public void close() {
        this.consumerContext.exit();
    }
}

