/*
 * Decompiled with CFR 0.152.
 */
package com.aliyun.openservices.shade.com.alibaba.rocketmq.client.impl.consumer;

import com.aliyun.openservices.shade.com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.client.consumer.listener.MessageListenerOrderly;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.client.impl.consumer.AbstractConsumeMessageOrderlyService;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.client.impl.consumer.ConsumeRequest;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.client.impl.consumer.ConsumeThreadExecutor;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.client.impl.consumer.MergeRequest;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.client.impl.consumer.MergeThreadExecutor;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.client.impl.consumer.ProcessQueue;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.client.impl.consumer.QueueGroup;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.client.impl.consumer.QueuePair;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.common.Pair;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.common.message.MessageAccessor;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.common.message.MessageExt;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.common.message.MessageQueue;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.common.message.MessageQueueGroup;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.common.protocol.heartbeat.MessageModel;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.common.utils.MessageUtils;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

public class ConsumeMessageOrderlyByGroupService
extends AbstractConsumeMessageOrderlyService {
    private final ReadWriteLock lockRequestMap = new ReentrantReadWriteLock();
    private final MergeThreadExecutor mergeExecutor;
    private final ConsumeThreadExecutor consumeExecutor;
    private Map<String, Map<Integer, QueueGroup>> queueGroupMap = new HashMap<String, Map<Integer, QueueGroup>>();

    public ConsumeMessageOrderlyByGroupService(DefaultMQPushConsumerImpl defaultMQPushConsumerImpl, MessageListenerOrderly messageListener) {
        super(defaultMQPushConsumerImpl, messageListener);
        this.mergeExecutor = new MergeThreadExecutor(this);
        this.consumeExecutor = new ConsumeThreadExecutor(this, this.mergeExecutor);
    }

    @Override
    public void shutdown() {
        super.shutdown();
        this.consumeExecutor.shutdown();
        this.mergeExecutor.shutdown();
        if (MessageModel.CLUSTERING.equals((Object)this.defaultMQPushConsumerImpl.messageModel())) {
            this.unlockAllMessageQueues();
        }
    }

    @Override
    public void updateCorePoolSize(int corePoolSize) {
        if (corePoolSize > 0 && corePoolSize <= Short.MAX_VALUE && corePoolSize < this.defaultMQPushConsumer.getConsumeThreadMax()) {
            this.consumeExecutor.setCorePoolSize(corePoolSize);
        }
    }

    @Override
    public void incCorePoolSize() {
    }

    @Override
    public void decCorePoolSize() {
    }

    @Override
    public int getCorePoolSize() {
        return this.consumeExecutor.getCorePoolSize();
    }

    @Override
    public void allowCoreThreadTimeOut(boolean allowCoreThreadTimeOut) {
        this.consumeExecutor.allowCoreThreadTimeOut(allowCoreThreadTimeOut);
    }

    public DefaultMQPushConsumerImpl getDefaultMQPushConsumerImpl() {
        return this.defaultMQPushConsumerImpl;
    }

    public DefaultMQPushConsumer getDefaultMQPushConsumer() {
        return this.defaultMQPushConsumer;
    }

    public ScheduledExecutorService getScheduledExecutorService() {
        return this.scheduledExecutorService;
    }

    public MessageListenerOrderly getMessageListener() {
        return this.messageListener;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void updateQueueGroupMap(String topic, Set<MessageQueue> mqSet) {
        try {
            this.lockRequestMap.writeLock().lockInterruptibly();
            try {
                HashMap<Integer, QueueGroup> newQueueGroupMap = new HashMap<Integer, QueueGroup>();
                ArrayList<Integer> incompleteQueueGroupIds = new ArrayList<Integer>();
                for (MessageQueue mq : mqSet) {
                    int queueGroupId = mq.getQueueGroupId();
                    if (queueGroupId == -1) continue;
                    if (!newQueueGroupMap.containsKey(queueGroupId)) {
                        newQueueGroupMap.put(queueGroupId, new QueueGroup(topic, queueGroupId));
                    }
                    QueueGroup queueGroup = (QueueGroup)newQueueGroupMap.get(queueGroupId);
                    ProcessQueue pq = (ProcessQueue)this.defaultMQPushConsumerImpl.getRebalanceImpl().processQueueTable.get(mq);
                    if (pq == null) {
                        incompleteQueueGroupIds.add(queueGroupId);
                    }
                    QueuePair queuePair = new QueuePair(mq, pq);
                    if (queueGroup.getQueuePairs().contains(queuePair)) continue;
                    queueGroup.getQueuePairs().add(queuePair);
                    queueGroup.getMessageQueueGroup().getMessageQueueList().add(mq);
                    queueGroup.getProcessQueueGroup().getProcessQueueList().add(pq);
                }
                for (Integer id : incompleteQueueGroupIds) {
                    newQueueGroupMap.remove(id);
                }
                if (newQueueGroupMap.isEmpty()) {
                    return;
                }
                this.queueGroupMap.put(topic, newQueueGroupMap);
            }
            finally {
                this.lockRequestMap.writeLock().unlock();
            }
        }
        catch (InterruptedException e) {
            LOG.error("updateProcessQueueMap exception", e);
        }
    }

    @Override
    public void submitConsumeRequest(List<MessageExt> msgs, ProcessQueue processQueue, MessageQueue messageQueue) {
        if (msgs == null || msgs.isEmpty()) {
            this.submitMergeRequest(false, new ArrayList<MessageExt>(), processQueue, messageQueue);
        } else {
            Pair<Boolean, Integer> haOrderlyMsgSize;
            do {
                haOrderlyMsgSize = this.getHAOrderlyMsgSize(msgs);
                this.submitMergeRequest(haOrderlyMsgSize.getObject1(), msgs.subList(0, haOrderlyMsgSize.getObject2()), processQueue, messageQueue);
            } while (!(msgs = msgs.subList(haOrderlyMsgSize.getObject2(), msgs.size())).isEmpty());
        }
    }

    private Pair<Boolean, Integer> getHAOrderlyMsgSize(List<MessageExt> msgs) {
        Boolean lastMsgAttr = null;
        int size = 0;
        for (MessageExt msg : msgs) {
            boolean currentMsgAttr = this.isHAOrderlyMsg(msg);
            if (lastMsgAttr == null) {
                lastMsgAttr = currentMsgAttr;
            }
            if (currentMsgAttr != lastMsgAttr) {
                return new Pair<Boolean, Integer>(lastMsgAttr, size);
            }
            ++size;
        }
        return new Pair<Object, Integer>(lastMsgAttr, size);
    }

    private boolean isHAOrderlyMsg(MessageExt messageExt) {
        String queueGroupInfo = MessageAccessor.getQueueGroupSnapshot(messageExt);
        return queueGroupInfo != null && !queueGroupInfo.isEmpty();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void submitMergeRequest(boolean isHAOrderlyMsg, List<MessageExt> msgs, ProcessQueue pq, MessageQueue mq) {
        if (mq.getQueueGroupId() == -1) {
            if (msgs.isEmpty()) {
                return;
            }
            if (!isHAOrderlyMsg && !pq.isReceivedHAMsg()) {
                pq.setNormalMsgClean(false);
                pq.getMergeProgress().addAndGet(msgs.size());
                ArrayList<Pair<QueuePair, List<MessageExt>>> msgPairList = new ArrayList<Pair<QueuePair, List<MessageExt>>>();
                msgPairList.add(new Pair<QueuePair, List<MessageExt>>(new QueuePair(mq, pq), msgs));
                QueueGroup queueGroup = new QueueGroup(mq.getTopic(), -1);
                queueGroup.getMessageQueueGroup().getMessageQueueList().add(mq);
                queueGroup.getProcessQueueGroup().getProcessQueueList().add(pq);
                this.submitConsumeRequest(msgPairList, queueGroup);
            } else if (!pq.isReceivedHAMsg()) {
                pq.setReceivedHAMsg(true);
                LOG.warn("Topic {} upgrade incomplete, wait for route update and rebalance", (Object)mq.getTopic());
            }
            return;
        }
        try {
            this.lockRequestMap.readLock().lockInterruptibly();
            try {
                if (!this.queueGroupMap.containsKey(mq.getTopic())) {
                    return;
                }
                QueueGroup queueGroup = this.queueGroupMap.get(mq.getTopic()).get(mq.getQueueGroupId());
                if (queueGroup == null) {
                    return;
                }
                MergeRequest mergeRequest = new MergeRequest(queueGroup, this);
                if (!queueGroup.isNormalMsgClean()) {
                    this.mergeExecutor.submitLater(mergeRequest, 1000L);
                    return;
                }
                this.mergeExecutor.submit(mergeRequest, false);
            }
            finally {
                this.lockRequestMap.readLock().unlock();
            }
        }
        catch (InterruptedException e) {
            LOG.error("handleMergeRequest exception", e);
        }
    }

    public synchronized boolean lockMQGroup(MessageQueueGroup messageQueueGroup) {
        if (!this.stopped) {
            return this.defaultMQPushConsumerImpl.getRebalanceImpl().lockBatch(new HashSet<MessageQueue>(messageQueueGroup.getMessageQueueList()), true);
        }
        return false;
    }

    void submitConsumeRequest(List<Pair<QueuePair, List<MessageExt>>> msgPairList, QueueGroup queueGroup) {
        if (this.isConsumeAccelerated(queueGroup.getTopic())) {
            int totalSize = this.defaultMQPushConsumer.getMaxConcurrencyForOrderQueue();
            HashMap queuePairMap = new HashMap();
            for (Pair<QueuePair, List<MessageExt>> messagePair : msgPairList) {
                Set<Integer> shardingKeyIndexSet = MessageUtils.getShardingKeyIndexes((Collection<MessageExt>)messagePair.getObject2(), totalSize);
                Map<Integer, Integer> consumeBatchSizeMap = this.getConsumeBatchSize(shardingKeyIndexSet, new ArrayList<MessageExt>((Collection)messagePair.getObject2()), totalSize);
                for (Integer shardingKeyIndex : shardingKeyIndexSet) {
                    if (!queuePairMap.containsKey(shardingKeyIndex)) {
                        queuePairMap.put(shardingKeyIndex, new ArrayList());
                    }
                    ((List)queuePairMap.get(shardingKeyIndex)).add(new Pair<QueuePair, Integer>(messagePair.getObject1(), consumeBatchSizeMap.get(shardingKeyIndex)));
                }
            }
            for (Integer shardingKeyIndex : queuePairMap.keySet()) {
                ConsumeRequest req = new ConsumeRequest(queueGroup, this.consumeExecutor, this, shardingKeyIndex);
                this.consumeExecutor.submit(req, (List)queuePairMap.get(shardingKeyIndex), false);
            }
        } else {
            ConsumeRequest req = new ConsumeRequest(queueGroup, this.consumeExecutor, this);
            ArrayList<Pair<QueuePair, Integer>> queuePairList = new ArrayList<Pair<QueuePair, Integer>>();
            for (Pair<QueuePair, List<MessageExt>> msgPair : msgPairList) {
                queuePairList.add(new Pair<QueuePair, Integer>(msgPair.getObject1(), msgPair.getObject2().size()));
            }
            this.consumeExecutor.submit(req, queuePairList, false);
        }
    }

    private Map<Integer, Integer> getConsumeBatchSize(Set<Integer> shardingKeyIndexSet, List<MessageExt> messageList, int totalSize) {
        HashMap<Integer, Integer> consumeBatchSizeMap = new HashMap<Integer, Integer>();
        for (Integer shardingKeyIndex : shardingKeyIndexSet) {
            if (!consumeBatchSizeMap.containsKey(shardingKeyIndex)) {
                consumeBatchSizeMap.put(shardingKeyIndex, 0);
            }
            for (MessageExt messageExt : messageList) {
                if (!shardingKeyIndex.equals(MessageUtils.getShardingKeyIndexByMsg(messageExt, totalSize))) continue;
                consumeBatchSizeMap.put(shardingKeyIndex, (Integer)consumeBatchSizeMap.get(shardingKeyIndex) + 1);
            }
        }
        return consumeBatchSizeMap;
    }

    public Map<String, Map<Integer, QueueGroup>> getQueueGroupMap() {
        return this.queueGroupMap;
    }

    public boolean processConsumeResult(List<MessageExt> msgs, ConsumeOrderlyStatus status, ConsumeOrderlyContext context, ConsumeRequest consumeRequest, MessageQueue messageQueue, ProcessQueue processQueue, Pair<QueuePair, Integer> queuePair, int msgsConsumed) {
        boolean continueConsume = true;
        long commitOffset = -1L;
        int shardIndex = consumeRequest.getShardingKeyIndex();
        if (context.isAutoCommit()) {
            switch (status) {
                case COMMIT: 
                case ROLLBACK: {
                    LOG.warn("the message queue consume result is illegal, we think you want to ack these message {}", (Object)messageQueue);
                }
                case SUCCESS: {
                    commitOffset = processQueue.commit(msgs, shardIndex);
                    this.getConsumerStatsManager().incConsumeOKTPS(this.consumerGroup, messageQueue.getTopic(), msgs.size());
                    break;
                }
                case SUSPEND_CURRENT_QUEUE_A_MOMENT: {
                    this.getConsumerStatsManager().incConsumeFailedTPS(this.consumerGroup, messageQueue.getTopic(), msgs.size());
                    if (this.checkReconsumeTimes(msgs)) {
                        queuePair.setObject2(queuePair.getObject2() - msgsConsumed);
                        consumeRequest.getQueueToConsume().addFirst(queuePair);
                        processQueue.makeMessageToConsumeAgain(msgs, shardIndex);
                        this.consumeExecutor.submitLater(consumeRequest, context.getSuspendCurrentQueueTimeMillis());
                        continueConsume = false;
                        break;
                    }
                    commitOffset = processQueue.commit(msgs, shardIndex);
                    break;
                }
            }
        } else {
            switch (status) {
                case SUCCESS: {
                    this.getConsumerStatsManager().incConsumeOKTPS(this.consumerGroup, messageQueue.getTopic(), msgs.size());
                    break;
                }
                case COMMIT: {
                    commitOffset = processQueue.commit(msgs, shardIndex);
                    break;
                }
                case ROLLBACK: {
                    queuePair.setObject2(queuePair.getObject2() - msgsConsumed);
                    consumeRequest.getQueueToConsume().addFirst(queuePair);
                    processQueue.rollback(msgs, shardIndex);
                    this.consumeExecutor.submitLater(consumeRequest, context.getSuspendCurrentQueueTimeMillis());
                    continueConsume = false;
                    break;
                }
                case SUSPEND_CURRENT_QUEUE_A_MOMENT: {
                    this.getConsumerStatsManager().incConsumeFailedTPS(this.consumerGroup, messageQueue.getTopic(), msgs.size());
                    if (!this.checkReconsumeTimes(msgs)) break;
                    queuePair.setObject2(queuePair.getObject2() - msgsConsumed);
                    consumeRequest.getQueueToConsume().addFirst(queuePair);
                    processQueue.makeMessageToConsumeAgain(msgs, shardIndex);
                    this.consumeExecutor.submitLater(consumeRequest, context.getSuspendCurrentQueueTimeMillis());
                    continueConsume = false;
                    break;
                }
            }
        }
        if (commitOffset >= 0L && !processQueue.isDropped()) {
            this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(messageQueue, commitOffset, false);
        }
        if (!processQueue.isNormalMsgClean() && processQueue.getMergeProgress().get() <= processQueue.getConsumeProgress().get()) {
            processQueue.setNormalMsgClean(true);
        }
        return continueConsume;
    }

    private boolean isConsumeAccelerated(String topicName) {
        return this.defaultMQPushConsumerImpl.isConsumeAccelerated(topicName);
    }
}

