/*
 * Decompiled with CFR 0.152.
 */
package com.aliyun.dms.subscribe.clients;

import com.aliyun.dms.subscribe.clients.DBMapper;
import com.aliyun.dms.subscribe.clients.DTSConsumerWithDBMapping;
import com.aliyun.dms.subscribe.clients.DistributedDTSConsumer;
import com.aliyun.dts.subscribe.clients.ConsumerContext;
import com.aliyun.dts.subscribe.clients.DTSConsumer;
import com.aliyun.dts.subscribe.clients.common.Checkpoint;
import com.aliyun.dts.subscribe.clients.common.RecordListener;
import com.aliyun.dts.subscribe.clients.metastore.MetaStore;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DefaultDistributedDTSConsumer
implements DistributedDTSConsumer {
    private static final Logger LOG = LoggerFactory.getLogger(DefaultDistributedDTSConsumer.class);
    private List<DTSConsumer> dtsConsumers = new ArrayList<DTSConsumer>();
    private int corePoolSize = 8;
    private int maximumPoolSize = 8;
    private ThreadPoolExecutor executor;
    private volatile boolean isClosePoolExecutor = false;

    public void addDTSConsumer(DTSConsumer consumer) {
        this.dtsConsumers.add(consumer);
    }

    public void init(Map<String, String> topic2checkpoint, DBMapper dbMapper, String dProxy, Map<String, String> topic2Sid, String username, String password, ConsumerContext.ConsumerSubscribeMode subscribeMode, boolean isForceUseInitCheckpoint, MetaStore<Checkpoint> userRegisteredStore, Map<String, RecordListener> recordListeners) {
        this.init(topic2checkpoint, dbMapper, dProxy, topic2Sid, username, password, subscribeMode, isForceUseInitCheckpoint, userRegisteredStore, recordListeners, new Properties());
    }

    public void init(Map<String, String> topic2checkpoint, DBMapper dbMapper, String dProxy, Map<String, String> topic2Sid, String username, String password, ConsumerContext.ConsumerSubscribeMode subscribeMode, boolean isForceUseInitCheckpoint, MetaStore<Checkpoint> userRegisteredStore, Map<String, RecordListener> recordListeners, Properties properties) {
        this.executor = new ThreadPoolExecutor(this.corePoolSize, this.maximumPoolSize, 60000L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
        for (Map.Entry<String, String> topicCheckpoint : topic2checkpoint.entrySet()) {
            ConsumerContext consumerContext = new ConsumerContext(dbMapper, dProxy, topicCheckpoint.getKey(), topic2Sid.get(topicCheckpoint.getKey()), username, password, topicCheckpoint.getValue(), subscribeMode, properties);
            consumerContext.setUserRegisteredStore(userRegisteredStore);
            consumerContext.setForceUseCheckpoint(isForceUseInitCheckpoint);
            DTSConsumerWithDBMapping dtsConsumer = new DTSConsumerWithDBMapping(consumerContext);
            dtsConsumer.addRecordListeners(recordListeners);
            this.addDTSConsumer(dtsConsumer);
        }
    }

    @Override
    public void start() {
        for (DTSConsumer consumer : this.dtsConsumers) {
            try {
                this.executor.submit(consumer::start);
            }
            catch (Exception e) {
                LOG.error("error starting consumer:" + e);
                this.shutdownGracefully(10L, TimeUnit.SECONDS);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void shutdownGracefully(long timeout, TimeUnit timeUnit) {
        this.executor.shutdown();
        try {
            if (!this.executor.awaitTermination(timeout, timeUnit)) {
                this.executor.shutdownNow();
            }
        }
        catch (InterruptedException e) {
            this.executor.shutdownNow();
            Thread.currentThread().interrupt();
        }
        finally {
            this.isClosePoolExecutor = true;
        }
    }

    @Override
    public void addRecordListeners(Map<String, RecordListener> recordListeners) {
        for (DTSConsumer dtsConsumer : this.dtsConsumers) {
            dtsConsumer.addRecordListeners(recordListeners);
        }
    }
}

