package com.dtyunxi.tcbj.app.open.biz.mq.dispatch.processor;

import cn.hutool.core.bean.BeanUtil;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.dtyunxi.cube.commons.beans.mq.MessageVo;
import com.dtyunxi.cube.plugin.mq.ICommonsMqService;
import com.dtyunxi.huieryun.cache.api.ICacheService;
import com.dtyunxi.huieryun.mq.api.IMessageProcessor;
import com.dtyunxi.huieryun.mq.vo.MessageResponse;
import com.dtyunxi.tcbj.app.open.biz.mq.dispatch.vo.ItemUpdatePriceVo;
import com.dtyunxi.tcbj.app.open.biz.scheduler.PriceSyncEvent;
import com.dtyunxi.tcbj.app.open.biz.service.ISellerSkuPriceService;
import com.dtyunxi.tcbj.app.open.dao.eo.SellerSkuPriceEo;
import com.dtyunxi.yundt.cube.center.item.api.ISellerSkuPriceApi;
import com.dtyunxi.yundt.cube.center.item.api.dto.response.ProductPriceRespDto;
import com.google.common.collect.Lists;
import java.util.List;
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.annotation.Resource;
import org.apache.commons.collections.CollectionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

@Component
/* loaded from: input_file:com/dtyunxi/tcbj/app/open/biz/mq/dispatch/processor/ItemUpdatePriceProcessor.class */
public class ItemUpdatePriceProcessor implements IMessageProcessor<Object> {
    private static final Logger log = LoggerFactory.getLogger(ItemUpdatePriceProcessor.class);

    @Resource
    private ISellerSkuPriceApi sellerSkuPriceApi;

    @Value("${customer.price.relation.pageSize:5000}")
    private Integer pageSize;

    @Value("${customer.price.relation.limitCount:15}")
    private Integer limitCount;

    @Resource
    private ISellerSkuPriceService sellerSkuPriceService;

    @Value("${customer.price.relation.sync:false}")
    private boolean syncOpen;

    @Resource
    private ICommonsMqService commonsMqService;

    @Resource
    private ICacheService cacheService;
    public static final String SPLIT = "itemUpdatePriceVo";
    private final ExecutorService executorService = new ThreadPoolExecutor(2, 2, 1, TimeUnit.MINUTES, new ArrayBlockingQueue(2, true), Executors.defaultThreadFactory(), new ThreadPoolExecutor.CallerRunsPolicy());

    public MessageResponse process(Object obj) {
        MDC.put("yes.req.requestId", UUID.randomUUID().toString().replace("-", ""));
        log.info("接收到价格修改MQ数据:{}", obj);
        ItemUpdatePriceVo itemUpdatePriceVo = (ItemUpdatePriceVo) JSONObject.parseObject(obj.toString(), ItemUpdatePriceVo.class);
        if (!this.syncOpen && !itemUpdatePriceVo.isTouchSync()) {
            log.info("暂时先不处理");
            return MessageResponse.SUCCESS;
        }
        if (Objects.isNull(itemUpdatePriceVo.getTenantId())) {
            log.info("参数无租户id");
            return MessageResponse.SUCCESS;
        }
        List list = this.cacheService.getList(PriceSyncEvent.REDIS_KEY, String.class);
        if (!CollectionUtils.isNotEmpty(list)) {
            this.cacheService.lpush(PriceSyncEvent.REDIS_KEY, Lists.newArrayList(new String[]{itemUpdatePriceVo.getTenantId() + SPLIT + JSONObject.toJSONString(itemUpdatePriceVo)}));
        } else if (list.stream().noneMatch(str -> {
            return str.startsWith(itemUpdatePriceVo.getTenantId() + SPLIT);
        })) {
            this.cacheService.lpushx(PriceSyncEvent.REDIS_KEY, Lists.newArrayList(new String[]{itemUpdatePriceVo.getTenantId() + SPLIT + JSONObject.toJSONString(itemUpdatePriceVo)}));
        }
        return MessageResponse.SUCCESS;
    }

    public void execute(ItemUpdatePriceVo itemUpdatePriceVo) {
        this.executorService.execute(() -> {
            try {
                log.info("价格修改数据JSON转换后:{}", itemUpdatePriceVo);
                List<String> selectApplierIdsBySupplierId = this.sellerSkuPriceService.selectApplierIdsBySupplierId(itemUpdatePriceVo.getTenantId());
                if (CollectionUtils.isEmpty(selectApplierIdsBySupplierId)) {
                    return;
                }
                ItemUpdatePriceVo itemUpdatePriceVo2 = (ItemUpdatePriceVo) JSONObject.parseObject(JSONObject.toJSONString(itemUpdatePriceVo), ItemUpdatePriceVo.class);
                Stream.iterate(0, num -> {
                    return Integer.valueOf(num.intValue() + 1);
                }).limit(getCount(Integer.valueOf(selectApplierIdsBySupplierId.size())).intValue()).forEach(num2 -> {
                    itemUpdatePriceVo2.setApplierIds((List) selectApplierIdsBySupplierId.stream().skip(num2.intValue() * this.limitCount.intValue()).limit(this.limitCount.intValue()).collect(Collectors.toList()));
                    handle(this.sellerSkuPriceService.selectListNoLimit(itemUpdatePriceVo2));
                });
            } catch (Exception e) {
                log.error("监听器接收到消息失败", e);
                e.printStackTrace();
            }
        });
    }

    private Integer getCount(Integer num) {
        return Integer.valueOf(((this.limitCount.intValue() + num.intValue()) - 1) / this.limitCount.intValue());
    }

    private void handle(List<SellerSkuPriceEo> list) {
        if (CollectionUtils.isEmpty(list)) {
            return;
        }
        this.sellerSkuPriceApi.updatePriceList((List) list.stream().map(sellerSkuPriceEo -> {
            ProductPriceRespDto productPriceRespDto = new ProductPriceRespDto();
            BeanUtil.copyProperties(sellerSkuPriceEo, productPriceRespDto, new String[0]);
            return productPriceRespDto;
        }).collect(Collectors.toList()));
    }

    private void sendMessage(ProductPriceRespDto productPriceRespDto) {
        MessageVo messageVo = new MessageVo();
        messageVo.setData(productPriceRespDto);
        log.info("发送价格同步mq消息：{}", JSON.toJSONString(messageVo));
        this.commonsMqService.sendSingleMessageAsync("ITEM_UPDATE_PRICE_TOPIC", "ITEM_UPDATE_PRICE_TAG", messageVo);
    }
}
