/*
 * Decompiled with CFR 0.152.
 */
package com.taobao.drc.clusterclient;

import com.taobao.drc.clusterclient.AbstractClusterMessage;
import com.taobao.drc.clusterclient.MessageListener;
import com.taobao.drc.clusterclient.partition.BaseCheckpoint;
import com.taobao.drc.clusterclient.partition.IPartition;
import com.taobao.drc.clusterclient.util.Gaugeable;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MessageNotifier<M extends AbstractClusterMessage<?, ? extends IPartition, ? extends BaseCheckpoint>>
implements Runnable,
Gaugeable {
    private static final String KEY_MESSAGE_PENDING_NUM = "queue.size";
    private static final String KEY_MESSAGE_CAPACITY = "queue.capacity";
    private static final String KEY_MESSAGE_SIZE = "notified.size";
    private static final String KEY_MESSAGE_LAST_PERIOD_MS = "notified.sample_period_ms";
    private static final String KEY_MESSAGE_RPS_LAST_PERIOD = "notified.rps";
    private static final String KEY_MESSAGE_LAST_NOTIFIED_CHECKPOINT = "notified.last_notified_checkpoint";
    private static final String KEY_MESSAGE_LATENCY = "notified.latency_sec";
    private static final Logger logger = LoggerFactory.getLogger(MessageNotifier.class);
    private final int queueSize;
    private final BlockingQueue<List<M>> msgQueue;
    private final MessageListener<M> messageListener;
    private final Thread thread;
    private volatile long consumedCounter = 0L;
    private volatile long lastSampleCounter = 0L;
    private volatile long lastSampleMs = System.currentTimeMillis();
    private volatile M lastNotifiedMessage = null;
    private volatile boolean running;

    public MessageNotifier(MessageListener<M> messageListener, int queueSize, String threadNamePrefix) {
        if (messageListener == null) {
            throw new NullPointerException();
        }
        this.queueSize = queueSize;
        this.messageListener = messageListener;
        this.thread = new Thread(this);
        this.thread.setName(threadNamePrefix + "notifier");
        this.msgQueue = new ArrayBlockingQueue<List<M>>(queueSize, true);
        this.running = true;
    }

    public void start() {
        this.thread.start();
    }

    public void shutdown() throws InterruptedException {
        this.running = false;
        this.thread.interrupt();
        this.thread.join();
    }

    @Override
    public void run() {
        while (this.running) {
            try {
                List<M> messages = this.msgQueue.take();
                try {
                    this.messageListener.notify(messages);
                }
                finally {
                    this.consumedCounter += (long)messages.size();
                }
                if (messages.isEmpty()) continue;
                this.lastNotifiedMessage = (AbstractClusterMessage)messages.get(messages.size() - 1);
            }
            catch (InterruptedException e) {
                logger.info("Notified was interrupted");
            }
            catch (Exception e) {
                logger.error("Caught exception in notifier", (Throwable)e);
                this.messageListener.noException(e);
            }
        }
    }

    public void onMessages(List<M> messages) throws InterruptedException {
        if (messages == null) {
            throw new NullPointerException();
        }
        this.msgQueue.put(messages);
    }

    @Override
    public Map<String, Object> getMetrics() {
        TreeMap<String, Object> map = new TreeMap<String, Object>();
        long now = System.currentTimeMillis();
        long periodMs = now - this.lastSampleMs;
        long sampleCounter = this.consumedCounter;
        long msgNum = sampleCounter - this.lastSampleCounter;
        this.lastSampleMs = now;
        this.lastSampleCounter = sampleCounter;
        map.put(KEY_MESSAGE_SIZE, this.consumedCounter);
        if (periodMs > 0L) {
            map.put(KEY_MESSAGE_LAST_PERIOD_MS, periodMs);
            map.put(KEY_MESSAGE_RPS_LAST_PERIOD, msgNum * 1000L / periodMs);
        }
        map.put(KEY_MESSAGE_CAPACITY, this.queueSize);
        map.put(KEY_MESSAGE_PENDING_NUM, this.msgQueue.size());
        if (this.lastNotifiedMessage != null) {
            map.put(KEY_MESSAGE_LAST_NOTIFIED_CHECKPOINT, ((AbstractClusterMessage)this.lastNotifiedMessage).getCheckpoint());
            if (((AbstractClusterMessage)this.lastNotifiedMessage).getCheckpoint() != null) {
                long timestamp = Long.valueOf(((BaseCheckpoint)((AbstractClusterMessage)this.lastNotifiedMessage).getCheckpoint()).getTimestamp());
                map.put(KEY_MESSAGE_LATENCY, System.currentTimeMillis() / 1000L - timestamp);
            }
        }
        return map;
    }
}

