/*
 * Decompiled with CFR 0.152.
 */
package com.taobao.drc.clusterclient.coordinator.impl;

import com.taobao.drc.clusterclient.BaseClusterContext;
import com.taobao.drc.clusterclient.PartitionManager;
import com.taobao.drc.clusterclient.clustermanager.APIProxy;
import com.taobao.drc.clusterclient.clustermanager.BatchCommitRequest;
import com.taobao.drc.clusterclient.clustermanager.BatchCommitResponse;
import com.taobao.drc.clusterclient.clustermanager.BatchGetPartitionRequest;
import com.taobao.drc.clusterclient.clustermanager.BatchGetPartitionResponse;
import com.taobao.drc.clusterclient.clustermanager.ExpectedConsumerStatus;
import com.taobao.drc.clusterclient.clustermanager.LocalConsumerStatus;
import com.taobao.drc.clusterclient.clustermanager.LocalPartitionStatus;
import com.taobao.drc.clusterclient.clustermanager.PartitionInfo;
import com.taobao.drc.clusterclient.coordinator.Coordinator;
import com.taobao.drc.clusterclient.partition.PartitionRef;
import com.taobao.drc.clusterclient.util.Futures;
import com.taobao.drc.clusterclient.util.SettableFuture;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.IdentityHashMap;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DefaultCoordinator
implements Coordinator {
    static final long MIN_DELAY_MS_BETWEEN_COMMIT = 3000L;
    static final long DEFAULT_COMMIT_PERIOD_MS = 10000L;
    private static final Logger logger = LoggerFactory.getLogger(Coordinator.class);
    private final String clusterAddress;
    private final APIProxy apiProxy;
    private final int id;
    private AtomicBoolean stopped = new AtomicBoolean(false);
    private volatile boolean inFinalCommit = false;
    private final SettableFuture finalCommitFuture = new SettableFuture();
    private volatile long commitPeriodMs = 10000L;
    private long sessionTimeoutMs = 0L;
    private long autoIncId = 0L;
    private final ScheduledExecutorService commitExecutor;
    private final ExecutorService eventExecutor;
    private final ExecutorService ioExecutor;
    private final Map<String, PartitionManager> seqToManager = new HashMap<String, PartitionManager>();
    private final Map<PartitionManager, String> managerToSeq = new IdentityHashMap<PartitionManager, String>();
    private final Queue<FinalCommit> finalCommits = new ArrayDeque<FinalCommit>();
    private final Map<String, FinalCommit> pendingFinalCommits = new HashMap<String, FinalCommit>();
    private final Runnable normalCommitTask = new Runnable(){

        @Override
        public void run() {
            DefaultCoordinator.this.collectAndCommit();
        }
    };
    private final Runnable finalCommitTask = new Runnable(){

        @Override
        public void run() {
            DefaultCoordinator.this.doFinalCommits();
        }
    };

    DefaultCoordinator(String clusterAddress, int id, String threadNamePrefix, int maxIOThreadNumPerCoordinator) {
        this(clusterAddress, id, threadNamePrefix, new APIProxy(clusterAddress), maxIOThreadNumPerCoordinator);
    }

    DefaultCoordinator(final String clusterAddress, final int id, final String threadNamePrefix, APIProxy apiProxy, int maxIOThreadNumPerCoordinator) {
        if (maxIOThreadNumPerCoordinator < 1) {
            throw new IllegalArgumentException("Invalid IO pool capacity [" + maxIOThreadNumPerCoordinator + "]");
        }
        this.clusterAddress = clusterAddress;
        this.id = id;
        this.apiProxy = apiProxy;
        this.commitExecutor = Executors.newSingleThreadScheduledExecutor(new ThreadFactory(){

            @Override
            public Thread newThread(Runnable r) {
                return new Thread(r, threadNamePrefix + "Commit-Loop[" + id + "]-" + clusterAddress);
            }
        });
        this.eventExecutor = Executors.newSingleThreadExecutor(new ThreadFactory(){

            @Override
            public Thread newThread(Runnable r) {
                return new Thread(r, threadNamePrefix + "Event-Loop[" + id + "]-" + clusterAddress);
            }
        });
        this.ioExecutor = new ThreadPoolExecutor(maxIOThreadNumPerCoordinator, maxIOThreadNumPerCoordinator, 30L, TimeUnit.SECONDS, new LinkedBlockingDeque<Runnable>(), new ThreadFactory(){

            @Override
            public Thread newThread(Runnable r) {
                return new Thread(r, threadNamePrefix + "IO-Thread[" + id + "]-" + clusterAddress);
            }
        });
        this.scheduleCommitAfter(3000L);
        this.scheduleFinalCommitAfter(3000L);
    }

    @Override
    public String getClusterAddress() {
        return this.clusterAddress;
    }

    @Override
    public long getSessionTimeoutMs() {
        return this.sessionTimeoutMs;
    }

    @Override
    public long getCommitPeriodMs() {
        return this.commitPeriodMs;
    }

    private String generateLocalSeq() {
        return "local-" + this.autoIncId++;
    }

    @Override
    public Future register(final PartitionManager partitionManager) {
        if (!this.clusterAddress.equals(((BaseClusterContext)partitionManager.getContext()).getClusterUrl())) {
            throw new IllegalArgumentException("Invalid cluster address of partition manager, expected: [" + this.clusterAddress + "], actual: [" + ((BaseClusterContext)partitionManager.getContext()).getClusterUrl() + "]");
        }
        return this.eventExecutor.submit(new Runnable(){

            @Override
            public void run() {
                if (DefaultCoordinator.this.managerToSeq.containsKey(partitionManager)) {
                    logger.info("The partition manager has been registered with seq [{}]!", DefaultCoordinator.this.managerToSeq.get(partitionManager));
                    throw new IllegalArgumentException("The partition manager has been registered!");
                }
                String seqId = DefaultCoordinator.this.generateLocalSeq();
                DefaultCoordinator.this.managerToSeq.put(partitionManager, seqId);
                DefaultCoordinator.this.seqToManager.put(seqId, partitionManager);
                logger.info("Allocated temp-seq [{}] for partition manager [{}][{}]", new Object[]{seqId, ((BaseClusterContext)partitionManager.getContext()).getAppGuid(), ((BaseClusterContext)partitionManager.getContext()).getAppGroup()});
            }
        });
    }

    @Override
    public Future deregister(final PartitionManager partitionManager) {
        final SettableFuture promise = new SettableFuture();
        this.eventExecutor.submit(new Runnable(){

            @Override
            public void run() {
                String seq = (String)DefaultCoordinator.this.managerToSeq.remove(partitionManager);
                if (seq == null) {
                    logger.warn("Partition manager of [{}][{}] was not registered on coordinator for [{}]", new Object[]{((BaseClusterContext)partitionManager.getContext()).getAppGuid(), ((BaseClusterContext)partitionManager.getContext()).getAppGroup(), DefaultCoordinator.this.clusterAddress});
                    promise.success(null);
                    return;
                }
                DefaultCoordinator.this.seqToManager.remove(seq);
                LocalConsumerStatus consumerStatus = partitionManager.getLocalConsumerStatus();
                consumerStatus.setSeq(seq);
                ArrayList<String> partitionNames = new ArrayList<String>();
                for (LocalPartitionStatus localPartitionStatus : consumerStatus.getPartitions()) {
                    partitionNames.add(localPartitionStatus.getPartition());
                }
                logger.info("Partition manager [{}][{}][{}] is about to stop, partitions {}", new Object[]{consumerStatus.getGuid(), consumerStatus.getGroup(), consumerStatus.getSeq(), partitionNames});
                if (consumerStatus.getPartitions().isEmpty()) {
                    promise.success(null);
                    return;
                }
                logger.info("Try to do final commit for partitions {} of [{}][{}]", new Object[]{partitionNames, consumerStatus.getGuid(), consumerStatus.getGroup()});
                DefaultCoordinator.this.finalCommits.add(new FinalCommit(consumerStatus, promise));
            }
        });
        return promise;
    }

    private void scheduleCommitAfter(long delayMs) {
        logger.debug("To schedule a batch commit for [{}][{}] after [{}] ms", new Object[]{this.clusterAddress, this.id, delayMs});
        this.commitExecutor.schedule(this.normalCommitTask, delayMs, TimeUnit.MILLISECONDS);
    }

    private void scheduleFinalCommitAfter(long delayMs) {
        logger.trace("To schedule a final commit for [{}][{}] after [{}] ms", new Object[]{this.clusterAddress, this.id, delayMs});
        this.commitExecutor.schedule(this.finalCommitTask, delayMs, TimeUnit.MILLISECONDS);
    }

    private Future<BatchCommitRequest> collectRequest() {
        return this.eventExecutor.submit(new Callable<BatchCommitRequest>(){

            @Override
            public BatchCommitRequest call() throws Exception {
                BatchCommitRequest request = new BatchCommitRequest();
                for (Map.Entry entry : DefaultCoordinator.this.seqToManager.entrySet()) {
                    LocalConsumerStatus consumerStatus = ((PartitionManager)entry.getValue()).getLocalConsumerStatus();
                    consumerStatus.setSeq((String)entry.getKey());
                    request.addConsumerStatus(consumerStatus);
                }
                return request;
            }
        });
    }

    private Future<BatchCommitRequest> collectFinalRequest() {
        return this.eventExecutor.submit(new Callable<BatchCommitRequest>(){

            @Override
            public BatchCommitRequest call() throws Exception {
                BatchCommitRequest request = new BatchCommitRequest();
                while (!DefaultCoordinator.this.finalCommits.isEmpty()) {
                    FinalCommit finalCommit = (FinalCommit)DefaultCoordinator.this.finalCommits.poll();
                    request.addConsumerStatus(finalCommit.getConsumerStatus());
                    DefaultCoordinator.this.pendingFinalCommits.put(finalCommit.getConsumerStatus().getSeq(), finalCommit);
                }
                return request;
            }
        });
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * Loose catch block
     */
    void collectAndCommit() {
        if (this.stopped.get()) {
            logger.info("Coordinator for [{}][{}] has been stopped", (Object)this.clusterAddress, (Object)this.id);
            return;
        }
        long beginNano = System.nanoTime();
        try {
            final BatchCommitRequest request = this.collectRequest().get();
            long endNano = System.nanoTime();
            long tookMs = TimeUnit.NANOSECONDS.toMillis(endNano - beginNano);
            logger.debug("Collect consumer status for [{}][{}] took [{}] ms", new Object[]{this.clusterAddress, this.id, tookMs});
            final BatchCommitResponse response = this.apiProxy.batchCommit(request);
            if (!response.isSuccess()) {
                throw new IllegalStateException("Batch commit for [" + this.clusterAddress + "] failed: [" + response.getErrMsg() + "]");
            }
            this.handleChangedParameter(response);
            this.eventExecutor.execute(new Runnable(){

                @Override
                public void run() {
                    DefaultCoordinator.this.handleBatchCommitSuccess(request, response);
                }
            });
        }
        catch (InterruptedException e) {
            logger.info("Coordinator for [{}][{}] interrupted", (Object)this.clusterAddress, (Object)this.id);
            long endNano = System.nanoTime();
            long tookMs = TimeUnit.NANOSECONDS.toMillis(endNano - beginNano);
            logger.debug("Batch commit for [{}][{}] took [{}] ms", new Object[]{this.clusterAddress, this.id, tookMs});
            if (!this.stopped.get()) {
                this.scheduleCommitAfter(Math.max(this.getCommitPeriodMs() - tookMs, 3000L));
            }
        }
        catch (Exception e2) {
            logger.error("Failed to commit for [{}][{}]", new Object[]{this.clusterAddress, this.id, e2});
            this.eventExecutor.execute(new Runnable(){

                @Override
                public void run() {
                    DefaultCoordinator.this.handleBatchCommitFailure(e2);
                }
            });
            {
                catch (Throwable throwable) {
                    long endNano = System.nanoTime();
                    long tookMs = TimeUnit.NANOSECONDS.toMillis(endNano - beginNano);
                    logger.debug("Batch commit for [{}][{}] took [{}] ms", new Object[]{this.clusterAddress, this.id, tookMs});
                    if (!this.stopped.get()) {
                        this.scheduleCommitAfter(Math.max(this.getCommitPeriodMs() - tookMs, 3000L));
                    }
                    throw throwable;
                }
            }
            long endNano = System.nanoTime();
            long tookMs = TimeUnit.NANOSECONDS.toMillis(endNano - beginNano);
            logger.debug("Batch commit for [{}][{}] took [{}] ms", new Object[]{this.clusterAddress, this.id, tookMs});
            if (!this.stopped.get()) {
                this.scheduleCommitAfter(Math.max(this.getCommitPeriodMs() - tookMs, 3000L));
            }
        }
        long endNano = System.nanoTime();
        long tookMs = TimeUnit.NANOSECONDS.toMillis(endNano - beginNano);
        logger.debug("Batch commit for [{}][{}] took [{}] ms", new Object[]{this.clusterAddress, this.id, tookMs});
        if (!this.stopped.get()) {
            this.scheduleCommitAfter(Math.max(this.getCommitPeriodMs() - tookMs, 3000L));
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * Enabled force condition propagation
     * Lifted jumps to return sites
     */
    private void doFinalCommits() {
        block14: {
            BatchCommitRequest request;
            long beginNano;
            boolean hasCommit;
            block12: {
                block13: {
                    if (this.stopped.get()) {
                        logger.info("To do the final commit for [{}][{}], remaining manager num: [{}]", new Object[]{this.clusterAddress, this.id, this.seqToManager.size()});
                        this.inFinalCommit = true;
                    }
                    hasCommit = false;
                    beginNano = System.nanoTime();
                    request = this.collectFinalRequest().get();
                    if (!request.getLocalConsumerStatusList().isEmpty()) break block12;
                    logger.trace("No final commits for [{}][{}]", (Object)this.clusterAddress, (Object)this.id);
                    if (this.inFinalCommit) {
                        this.finalCommitFuture.success(null);
                    }
                    if (!hasCommit) break block13;
                    long endNano = System.nanoTime();
                    long tookMs = TimeUnit.NANOSECONDS.toMillis(endNano - beginNano);
                    logger.debug("Final commit for [{}][{}] took [{}] ms", new Object[]{this.clusterAddress, this.id, tookMs});
                }
                if (this.stopped.get()) return;
                this.scheduleFinalCommitAfter(3000L);
                return;
            }
            try {
                hasCommit = true;
                final BatchCommitResponse response = this.apiProxy.batchCommit(request);
                if (!response.isSuccess()) {
                    throw new IllegalStateException("Batch commit to [" + this.clusterAddress + "] failed: [" + response.getErrMsg() + "]");
                }
                this.eventExecutor.execute(new Runnable(){

                    @Override
                    public void run() {
                        DefaultCoordinator.this.handleFinalCommitSuccess(request, response);
                    }
                });
                if (this.inFinalCommit) {
                    this.finalCommitFuture.success(null);
                }
                if (!hasCommit) break block14;
            }
            catch (Exception e) {
                block15: {
                    try {
                        logger.error("Failed to do final commit for [{}][{}]", new Object[]{this.clusterAddress, this.id, e});
                        this.eventExecutor.execute(new Runnable(){

                            @Override
                            public void run() {
                                DefaultCoordinator.this.handleFinalCommitFailure(e);
                            }
                        });
                        if (this.inFinalCommit) {
                            this.finalCommitFuture.success(null);
                        }
                        if (!hasCommit) break block15;
                    }
                    catch (Throwable throwable) {
                        if (this.inFinalCommit) {
                            this.finalCommitFuture.success(null);
                        }
                        if (hasCommit) {
                            long endNano = System.nanoTime();
                            long tookMs = TimeUnit.NANOSECONDS.toMillis(endNano - beginNano);
                            logger.debug("Final commit for [{}][{}] took [{}] ms", new Object[]{this.clusterAddress, this.id, tookMs});
                        }
                        if (this.stopped.get()) throw throwable;
                        this.scheduleFinalCommitAfter(3000L);
                        throw throwable;
                    }
                    long endNano = System.nanoTime();
                    long tookMs = TimeUnit.NANOSECONDS.toMillis(endNano - beginNano);
                    logger.debug("Final commit for [{}][{}] took [{}] ms", new Object[]{this.clusterAddress, this.id, tookMs});
                }
                if (this.stopped.get()) return;
                this.scheduleFinalCommitAfter(3000L);
                return;
            }
            long endNano = System.nanoTime();
            long tookMs = TimeUnit.NANOSECONDS.toMillis(endNano - beginNano);
            logger.debug("Final commit for [{}][{}] took [{}] ms", new Object[]{this.clusterAddress, this.id, tookMs});
        }
        if (this.stopped.get()) return;
        this.scheduleFinalCommitAfter(3000L);
        return;
    }

    private void handleChangedParameter(BatchCommitResponse response) {
        long newSessionTimeoutMs = response.getData().getSessionTimeoutSeconds() * 1000;
        long newCommitPeriodMs = response.getData().getCommitPeriodSeconds() * 1000;
        if (this.sessionTimeoutMs != newSessionTimeoutMs) {
            logger.info("Session timeout for cluster [{}] changed from [{}] to [{}]", new Object[]{this.clusterAddress, this.sessionTimeoutMs, newSessionTimeoutMs});
            this.sessionTimeoutMs = newSessionTimeoutMs;
        }
        if (this.commitPeriodMs != newCommitPeriodMs) {
            logger.info("Commit period for cluster [{}] changed from [{}] to [{}]", new Object[]{this.clusterAddress, this.commitPeriodMs, newCommitPeriodMs});
            this.commitPeriodMs = newCommitPeriodMs;
        }
    }

    void handleBatchCommitSuccess(BatchCommitRequest commitRequest, BatchCommitResponse commitResponse) {
        final BatchGetPartitionRequest request = new BatchGetPartitionRequest();
        final IdentityHashMap<PartitionManager, Map<PartitionRef, Long>> portToPartitions = new IdentityHashMap<PartitionManager, Map<PartitionRef, Long>>();
        HashSet<String> localConsumerSet = new HashSet<String>();
        for (LocalConsumerStatus localConsumerStatus : commitRequest.getLocalConsumerStatusList()) {
            localConsumerSet.add(localConsumerStatus.getSeq());
        }
        for (ExpectedConsumerStatus expectedConsumerStatus : commitResponse.getData().getConsumers()) {
            Map<PartitionRef, Long> partitionGenerations;
            PartitionManager partitionManager;
            if (expectedConsumerStatus.isNewSeqAllocated()) {
                logger.info("Allocated consumer seq [{}] for local seq [{}]", (Object)expectedConsumerStatus.getAllocatedSeq(), (Object)expectedConsumerStatus.getLocalSeq());
                partitionManager = this.seqToManager.get(expectedConsumerStatus.getLocalSeq());
                if (partitionManager == null) {
                    logger.warn("Manager with local seq [{}] not exists", (Object)expectedConsumerStatus.getLocalSeq());
                } else {
                    this.seqToManager.remove(expectedConsumerStatus.getLocalSeq());
                    this.seqToManager.put(expectedConsumerStatus.getAllocatedSeq(), partitionManager);
                    this.managerToSeq.put(partitionManager, expectedConsumerStatus.getAllocatedSeq());
                    partitionGenerations = partitionManager.onCommitComplete(expectedConsumerStatus, null);
                    portToPartitions.put(partitionManager, partitionGenerations);
                    for (PartitionRef ref : partitionGenerations.keySet()) {
                        request.addPartition(ref.getGuid(), ref.getGroup(), ref.getPartition());
                    }
                }
                localConsumerSet.remove(expectedConsumerStatus.getLocalSeq());
                continue;
            }
            partitionManager = this.seqToManager.get(expectedConsumerStatus.getSeq());
            if (partitionManager != null) {
                partitionGenerations = partitionManager.onCommitComplete(expectedConsumerStatus, null);
                portToPartitions.put(partitionManager, partitionGenerations);
                for (PartitionRef ref : partitionGenerations.keySet()) {
                    request.addPartition(ref.getGuid(), ref.getGroup(), ref.getPartition());
                }
            } else {
                logger.error("Manager with seq [{}] not exists on [{}][{}]", new Object[]{expectedConsumerStatus.getSeq(), this.clusterAddress, this.id});
            }
            localConsumerSet.remove(expectedConsumerStatus.getSeq());
        }
        if (!localConsumerSet.isEmpty()) {
            logger.error("Found consumers provided in batch commit request not exists in batch commit response: {}", localConsumerSet);
        }
        if (request.getPartitionRefs().isEmpty()) {
            return;
        }
        this.ioExecutor.execute(new Runnable(){

            @Override
            public void run() {
                try {
                    final BatchGetPartitionResponse partitionsResponse = DefaultCoordinator.this.apiProxy.batchGetPartitions(request);
                    if (!partitionsResponse.isSuccess()) {
                        throw new IllegalStateException("Batch get partitions from [" + DefaultCoordinator.this.clusterAddress + "] failed: [" + partitionsResponse.getErrMsg() + "]");
                    }
                    DefaultCoordinator.this.eventExecutor.execute(new Runnable(){

                        @Override
                        public void run() {
                            DefaultCoordinator.this.handleBatchGetPartitionSuccess(portToPartitions, partitionsResponse);
                        }
                    });
                }
                catch (Exception e) {
                    DefaultCoordinator.this.eventExecutor.execute(new Runnable(){

                        @Override
                        public void run() {
                            DefaultCoordinator.this.handleBatchGetException(portToPartitions, e);
                        }
                    });
                }
            }
        });
    }

    private void handleBatchCommitFailure(Exception e) {
        for (PartitionManager partitionManager : this.seqToManager.values()) {
            partitionManager.onCommitComplete(null, e);
        }
    }

    private void handleFinalCommitSuccess(BatchCommitRequest request, BatchCommitResponse response) {
        for (LocalConsumerStatus consumerStatus : request.getLocalConsumerStatusList()) {
            FinalCommit finalCommit = this.pendingFinalCommits.remove(consumerStatus.getSeq());
            logger.error("Final commit for [{}][{}][{}] succeeded", new Object[]{finalCommit.getConsumerStatus().getGuid(), finalCommit.getConsumerStatus().getGroup(), finalCommit.getConsumerStatus().getSeq()});
            finalCommit.getPromise().success(null);
        }
    }

    private void handleFinalCommitFailure(Exception e) {
        for (FinalCommit finalCommit : this.pendingFinalCommits.values()) {
            logger.error("Final commit for [{}][{}][{}] failed", new Object[]{finalCommit.getConsumerStatus().getGuid(), finalCommit.getConsumerStatus().getGroup(), finalCommit.getConsumerStatus().getSeq(), e});
            finalCommit.getPromise().success(null);
        }
        this.pendingFinalCommits.clear();
    }

    private void handleBatchGetPartitionSuccess(Map<PartitionManager, Map<PartitionRef, Long>> managerToPartitions, BatchGetPartitionResponse response) {
        Map<PartitionRef, PartitionInfo> partitionInfoMap = response.groupPartitionInfoMap();
        for (Map.Entry<PartitionManager, Map<PartitionRef, Long>> entry : managerToPartitions.entrySet()) {
            PartitionManager manager = entry.getKey();
            Map<PartitionRef, Long> refToGenerations = entry.getValue();
            HashMap<PartitionRef, PartitionInfo> infoMap = new HashMap<PartitionRef, PartitionInfo>();
            for (Map.Entry<PartitionRef, Long> refGeneration : refToGenerations.entrySet()) {
                PartitionInfo partitionInfo = partitionInfoMap.get(refGeneration.getKey());
                if (partitionInfo == null) {
                    logger.error("Got no partition info for [{}]", (Object)refGeneration.getKey());
                }
                infoMap.put(refGeneration.getKey(), partitionInfo);
            }
            manager.onGetPartitionInfoComplete(infoMap, refToGenerations, null);
        }
    }

    private void handleBatchGetException(Map<PartitionManager, Map<PartitionRef, Long>> managerToPartitions, Throwable throwable) {
        for (Map.Entry<PartitionManager, Map<PartitionRef, Long>> entry : managerToPartitions.entrySet()) {
            entry.getKey().onGetPartitionInfoComplete(new HashMap<PartitionRef, PartitionInfo>(), entry.getValue(), throwable);
        }
    }

    @Override
    public Future runOnIOPool(Runnable runnable) {
        return this.ioExecutor.submit(runnable);
    }

    @Override
    public Future runOnEventThread(Runnable runnable) {
        return this.eventExecutor.submit(runnable);
    }

    @Override
    public Future asyncClose() {
        if (!this.stopped.compareAndSet(false, true)) {
            logger.warn("The coordinator for [{}][{}] has already been closed", (Object)this.clusterAddress, (Object)this.id);
            return Futures.success(null);
        }
        final SettableFuture promise = new SettableFuture();
        Thread thread = new Thread(new Runnable(){

            @Override
            public void run() {
                try {
                    DefaultCoordinator.this.finalCommitFuture.get();
                    DefaultCoordinator.this.commitExecutor.shutdown();
                    DefaultCoordinator.this.ioExecutor.shutdown();
                    DefaultCoordinator.this.eventExecutor.shutdown();
                    DefaultCoordinator.this.eventExecutor.awaitTermination(1L, TimeUnit.MINUTES);
                    DefaultCoordinator.this.commitExecutor.awaitTermination(1L, TimeUnit.MINUTES);
                    DefaultCoordinator.this.ioExecutor.awaitTermination(10L, TimeUnit.MINUTES);
                }
                catch (InterruptedException e) {
                    logger.info("Interrupted while closing coordinator for [{}][{}]", (Object)DefaultCoordinator.this.clusterAddress, (Object)DefaultCoordinator.this.id);
                }
                catch (ExecutionException e) {
                    logger.error("Failed to close coordinator for [{}][{}] peacefully", (Object)DefaultCoordinator.this.clusterAddress, (Object)DefaultCoordinator.this.id);
                }
                finally {
                    promise.success(null);
                }
            }
        });
        thread.setName("Coordinator-Finalizer-[" + this.id + "]-" + this.clusterAddress);
        thread.setDaemon(true);
        thread.start();
        return promise;
    }

    static class FinalCommit {
        private final LocalConsumerStatus consumerStatus;
        private final SettableFuture<ExpectedConsumerStatus> promise;

        public FinalCommit(LocalConsumerStatus consumerStatus, SettableFuture<ExpectedConsumerStatus> promise) {
            this.consumerStatus = consumerStatus;
            this.promise = promise;
        }

        public LocalConsumerStatus getConsumerStatus() {
            return this.consumerStatus;
        }

        public SettableFuture<ExpectedConsumerStatus> getPromise() {
            return this.promise;
        }
    }
}

