package com.dtyunxi.cube.notifier.starter.publisher.rest;

import com.dtyunxi.cube.notifier.starter.publisher.rest.event.RetryRefreshEvent;
import com.dtyunxi.cube.utils.threads.CommonThreadPool;
import com.dtyunxi.cube.utils.threads.pattens.ConsumerWorker;
import com.dtyunxi.cube.utils.threads.pattens.ProdConsuPatten;
import com.dtyunxi.cube.utils.threads.pattens.ProducerWorker;
import com.dtyunxi.cube.utils.threads.pattens.ProductQueueStore;
import com.dtyunxi.cube.utils.threads.pattens.ProductStore;
import com.dtyunxi.rest.RestResponse;
import feign.Feign;
import feign.Request;
import feign.Retryer;
import feign.codec.Decoder;
import feign.codec.Encoder;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.BeansException;
import org.springframework.boot.autoconfigure.http.HttpMessageConverters;
import org.springframework.cloud.client.ServiceInstance;
import org.springframework.cloud.client.discovery.DiscoveryClient;
import org.springframework.cloud.openfeign.support.ResponseEntityDecoder;
import org.springframework.cloud.openfeign.support.SpringDecoder;
import org.springframework.cloud.openfeign.support.SpringEncoder;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.http.converter.HttpMessageConverter;
import org.springframework.http.converter.json.MappingJackson2HttpMessageConverter;

/* loaded from: input_file:com/dtyunxi/cube/notifier/starter/publisher/rest/AbstractRestRulePublishProcessor.class */
public abstract class AbstractRestRulePublishProcessor<FEIGN_API> implements ApplicationContextAware, ConsumerWorker<CallBack> {
    private static Logger logger = LoggerFactory.getLogger(AbstractRestRulePublishProcessor.class);
    private ApplicationContext applicationContext;
    private RestRibbonConfig restRibbonConfig;
    private DiscoveryClient discoveryClient;
    private String receiverServiceId;
    private Class<FEIGN_API> feignApiClass;
    private CommonThreadPool pool = new CommonThreadPool();
    private ProductStore store = new ProductQueueStore(5000);
    private ProdConsuPatten dispatcher = new ProdConsuPatten(this.pool, this.store, this, (ProducerWorker) null);

    /* loaded from: input_file:com/dtyunxi/cube/notifier/starter/publisher/rest/AbstractRestRulePublishProcessor$CallBack.class */
    public interface CallBack<FEIGN_API> {
        RestResponse<Void> onEvent(FEIGN_API feign_api);
    }

    public AbstractRestRulePublishProcessor(Class<FEIGN_API> cls, RestRibbonConfig restRibbonConfig, String str, DiscoveryClient discoveryClient) {
        this.receiverServiceId = str;
        this.discoveryClient = discoveryClient;
        this.restRibbonConfig = restRibbonConfig;
        this.dispatcher.start();
        this.feignApiClass = cls;
    }

    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        this.applicationContext = applicationContext;
    }

    public Decoder feignDecoder() {
        HttpMessageConverters httpMessageConverters = new HttpMessageConverters(new HttpMessageConverter[]{new MappingJackson2HttpMessageConverter()});
        return new ResponseEntityDecoder(new SpringDecoder(() -> {
            return httpMessageConverters;
        }));
    }

    public Encoder feignEncoder() {
        HttpMessageConverters httpMessageConverters = new HttpMessageConverters(new HttpMessageConverter[]{new MappingJackson2HttpMessageConverter()});
        return new SpringEncoder(() -> {
            return httpMessageConverters;
        });
    }

    public FEIGN_API getApiInstance(String str, Class<FEIGN_API> cls) {
        return (FEIGN_API) Feign.builder().decoder(feignDecoder()).encoder(feignEncoder()).options(new Request.Options(this.restRibbonConfig.getRibbonConnectTimeout(), this.restRibbonConfig.getRibbonReadTimeout())).retryer(new Retryer.Default(this.restRibbonConfig.getPeriod(), this.restRibbonConfig.getMaxPeriod(), this.restRibbonConfig.getMaxAttempts())).target(cls, this.restRibbonConfig.getProtocol() + "://" + str);
    }

    public void process(CallBack<FEIGN_API> callBack) {
        try {
            this.dispatcher.put(callBack);
        } catch (InterruptedException e) {
            logger.info("rest通知刷新放入消费队列异常：", this.receiverServiceId, e);
        }
    }

    public void doWork(CallBack callBack) {
        try {
            List instances = this.discoveryClient.getInstances(this.receiverServiceId);
            CompletableFuture[] completableFutureArr = new CompletableFuture[instances.size()];
            ArrayList arrayList = new ArrayList(instances.size());
            for (int i = 0; i < instances.size(); i++) {
                ServiceInstance serviceInstance = (ServiceInstance) instances.get(i);
                arrayList.add(getApiInstance(serviceInstance.getHost() + ":" + serviceInstance.getPort(), this.feignApiClass));
                int i2 = i;
                completableFutureArr[i] = CompletableFuture.supplyAsync(() -> {
                    return callBack.onEvent(arrayList.get(i2));
                });
            }
            for (int i3 = 0; i3 < completableFutureArr.length; i3++) {
                try {
                    if (!"0".equals(((RestResponse) completableFutureArr[i3].get()).getResultCode())) {
                        this.applicationContext.publishEvent(new RetryRefreshEvent(this, arrayList.get(i3), callBack, 1));
                    }
                } catch (InterruptedException e) {
                    logger.info("收集配置刷新通知结果进程中断异常：", e);
                    this.applicationContext.publishEvent(new RetryRefreshEvent(this, arrayList.get(i3), callBack, 1));
                } catch (ExecutionException e2) {
                    logger.info("收集配置刷新通知结果执行异常：", e2);
                    this.applicationContext.publishEvent(new RetryRefreshEvent(this, arrayList.get(i3), callBack, 1));
                }
            }
        } catch (Exception e3) {
            logger.info("获取服务{}的实例异常：", this.receiverServiceId, e3);
        }
    }
}
