/*
 * Decompiled with CFR 0.152.
 */
package com.dtyunxi.huieryun.datadistribute.impl;

import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.FlatMessage;
import com.alibaba.otter.canal.protocol.Message;
import com.dtyunxi.huieryun.datadistribute.DataEventHandler;
import com.dtyunxi.huieryun.datadistribute.event.Column;
import com.dtyunxi.huieryun.datadistribute.event.DataDistributeEvent;
import com.dtyunxi.huieryun.datadistribute.event.DeleteEvent;
import com.dtyunxi.huieryun.datadistribute.event.InsertEvent;
import com.dtyunxi.huieryun.datadistribute.event.UpdateEvent;
import com.dtyunxi.huieryun.mq.vo.MessageResponse;
import com.dtyunxi.util.JacksonUtil;
import com.google.protobuf.ByteString;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;

public class CanalEventHandler
extends DataEventHandler {
    private static final Logger logger = LoggerFactory.getLogger(CanalEventHandler.class);
    private final ExecutorService executorPool;
    private static final List<String> ROW_EVENTS = Arrays.asList("INSERT", "DELETE", "UPDATE", "REPLACE");

    public CanalEventHandler(ExecutorService executorPool) {
        this.executorPool = executorPool;
    }

    public MessageResponse onEvent(Message message) {
        for (CanalEntry.Entry entry : message.getEntries()) {
            switch (entry.getEntryType()) {
                case ROWDATA: {
                    MessageResponse response = this.handlerRowData(entry);
                    if (response != MessageResponse.ERROR) break;
                    return response;
                }
                case TRANSACTIONBEGIN: 
                case TRANSACTIONEND: {
                    break;
                }
            }
        }
        return MessageResponse.SUCCESS;
    }

    public MessageResponse onEvent(FlatMessage flatMessage) {
        if (flatMessage == null || flatMessage.getIsDdl() == null || flatMessage.getIsDdl().booleanValue()) {
            return MessageResponse.SUCCESS;
        }
        if (!ROW_EVENTS.contains(flatMessage.getType())) {
            return MessageResponse.SUCCESS;
        }
        DataDistributeEvent[] events = new DataDistributeEvent[flatMessage.getData().size()];
        InsertEvent dataDistributeEvent = null;
        long executeTime = flatMessage.getEs();
        String schemaName = flatMessage.getDatabase();
        String tableName = flatMessage.getTable();
        ArrayList<Column> columns = null;
        Column column = null;
        int i = 0;
        switch (flatMessage.getType()) {
            case "INSERT": {
                for (Map data : flatMessage.getData()) {
                    dataDistributeEvent = new InsertEvent(schemaName, tableName, executeTime);
                    columns = new ArrayList<Column>(data.size());
                    for (Map.Entry entry : data.entrySet()) {
                        column = new Column((String)entry.getKey(), (String)entry.getValue(), false);
                        if (flatMessage.getPkNames() != null && !flatMessage.getPkNames().isEmpty() && flatMessage.getPkNames().contains(entry.getKey())) {
                            column.setPrimaryKey(true);
                        }
                        columns.add(column);
                    }
                    dataDistributeEvent.setColumns(columns);
                    events[i++] = dataDistributeEvent;
                }
                break;
            }
            case "DELETE": {
                for (Map data : flatMessage.getData()) {
                    dataDistributeEvent = new DeleteEvent(schemaName, tableName, executeTime);
                    columns = new ArrayList(data.size());
                    for (Map.Entry entry : data.entrySet()) {
                        column = new Column((String)entry.getKey(), (String)entry.getValue(), false);
                        if (flatMessage.getPkNames() != null && !flatMessage.getPkNames().isEmpty() && flatMessage.getPkNames().contains(entry.getKey())) {
                            column.setPrimaryKey(true);
                        }
                        columns.add(column);
                    }
                    dataDistributeEvent.setColumns(columns);
                    events[i++] = dataDistributeEvent;
                }
                break;
            }
            case "UPDATE": 
            case "REPLACE": {
                for (Map data : flatMessage.getData()) {
                    dataDistributeEvent = new UpdateEvent(schemaName, tableName, executeTime);
                    columns = new ArrayList(data.size());
                    Map odlData = (Map)flatMessage.getOld().get(i);
                    for (Map.Entry entry : data.entrySet()) {
                        if (odlData.containsKey(entry.getKey())) {
                            column = new Column((String)entry.getKey(), (String)entry.getValue(), true);
                            if (flatMessage.getPkNames() != null && !flatMessage.getPkNames().isEmpty() && flatMessage.getPkNames().contains(entry.getKey())) {
                                column.setPrimaryKey(true);
                            }
                            column.setOldValue((String)odlData.get(entry.getKey()));
                            if ("dr".equals(entry.getKey())) {
                                switch (column.getValue()) {
                                    case "1": {
                                        dataDistributeEvent.setDr(1);
                                        break;
                                    }
                                    case "2": {
                                        dataDistributeEvent.setDr(2);
                                    }
                                }
                            }
                        } else {
                            column = new Column((String)entry.getKey(), (String)entry.getValue(), false);
                        }
                        columns.add(column);
                    }
                    dataDistributeEvent.setColumns(columns);
                    events[i] = dataDistributeEvent;
                    ++i;
                }
                break;
            }
        }
        if (logger.isDebugEnabled()) {
            logger.debug("dispach events {}", (Object)JacksonUtil.toJson((Object)events));
        }
        return this.sendDataDistributeEvents(Arrays.asList(events));
    }

    public MessageResponse onFlatMessageEvent(List<FlatMessage> flatMessages) {
        long startTime = System.currentTimeMillis();
        ArrayList<Future<MessageResponse>> resultList = new ArrayList<Future<MessageResponse>>(flatMessages.size());
        for (FlatMessage flatMessage : flatMessages) {
            if (flatMessage.getId() < 1L || CollectionUtils.isEmpty((Collection)flatMessage.getData())) continue;
            Future<MessageResponse> task = this.executorPool.submit(() -> this.onEvent(flatMessage));
            resultList.add(task);
        }
        MessageResponse response = this.onHanderBatchResult(resultList);
        long useTime = System.currentTimeMillis() - startTime;
        logger.info("onFlatMessageEvent,messageSize={},useTime={}ms", (Object)flatMessages.size(), (Object)useTime);
        return response;
    }

    protected MessageResponse onHanderBatchResult(List<Future<MessageResponse>> resultList) {
        if (resultList.isEmpty()) {
            return MessageResponse.SUCCESS;
        }
        for (Future<MessageResponse> result : resultList) {
            if (result == null) continue;
            try {
                MessageResponse res = result.get();
                if (!MessageResponse.ERROR.equals(res)) continue;
                logger.warn("\u5904\u7406\u672c\u6279\u6b21\u7684flatmessage \u5b58\u5728\u4e00\u6761\u5f02\u5e38\uff0c\u4e3a\u4fdd\u8bc1\u5b8c\u6574\u6027\uff0c\u6574\u6279\u6d88\u606f\u90fd\u9700\u91cd\u53d1");
                return MessageResponse.ERROR;
            }
            catch (NullPointerException e) {
                return MessageResponse.SUCCESS;
            }
            catch (Exception e) {
                logger.error("\u591a\u7ebf\u7a0b\u6d88\u8d39canal\u6d88\u606f\u5f02\u5e38", (Throwable)e);
                return MessageResponse.ERROR;
            }
        }
        return MessageResponse.SUCCESS;
    }

    public MessageResponse onMessagesEvent(List<Message> messages) {
        long startTime = System.currentTimeMillis();
        ArrayList<Future<MessageResponse>> resultList = new ArrayList<Future<MessageResponse>>(messages.size());
        for (Message message : messages) {
            if (message.getId() < 1L || CollectionUtils.isEmpty((Collection)message.getEntries())) continue;
            Future<MessageResponse> task = this.executorPool.submit(() -> this.onEvent(message));
            resultList.add(task);
        }
        MessageResponse response = this.onHanderBatchResult(resultList);
        long useTime = System.currentTimeMillis() - startTime;
        logger.info("onMessagesEvent,messageSize={},useTime={}ms", (Object)messages.size(), (Object)useTime);
        return response;
    }

    private MessageResponse handlerRowData(CanalEntry.Entry entry) {
        CanalEntry.RowChange rowChage = null;
        try {
            rowChage = CanalEntry.RowChange.parseFrom((ByteString)entry.getStoreValue());
        }
        catch (Exception e) {
            throw new RuntimeException("parse event has an error , data:" + entry.toString(), e);
        }
        CanalEntry.EventType eventType = rowChage.getEventType();
        if (eventType == CanalEntry.EventType.QUERY || rowChage.getIsDdl()) {
            return MessageResponse.SUCCESS;
        }
        long executeTime = entry.getHeader().getExecuteTime();
        String schemaName = entry.getHeader().getSchemaName();
        String tableName = entry.getHeader().getTableName();
        DataDistributeEvent[] events = new DataDistributeEvent[rowChage.getRowDatasList().size()];
        int i = 0;
        block6: for (CanalEntry.RowData rowData : rowChage.getRowDatasList()) {
            switch (eventType) {
                case DELETE: {
                    events[i++] = this.handleDeleteData(rowData, schemaName, tableName, executeTime);
                    continue block6;
                }
                case INSERT: {
                    events[i++] = this.handleInsertData(rowData, schemaName, tableName, executeTime);
                    continue block6;
                }
            }
            events[i++] = this.handleUpdateData(rowData, schemaName, tableName, executeTime);
        }
        if (logger.isDebugEnabled()) {
            logger.debug("dispach events {}", (Object)JacksonUtil.toJson((Object)events));
        }
        MessageResponse response = this.sendDataDistributeEvents(Arrays.asList(events));
        return response;
    }

    private DataDistributeEvent handleDeleteData(CanalEntry.RowData rowData, String schemaName, String tableName, long executeTime) {
        DeleteEvent deteleEvent = new DeleteEvent(schemaName, tableName, executeTime);
        for (CanalEntry.Column column : rowData.getBeforeColumnsList()) {
            if (StringUtils.isEmpty((CharSequence)column.getValue())) continue;
            deteleEvent.addColumn(this.createEventColumn(column));
        }
        return deteleEvent;
    }

    private DataDistributeEvent handleInsertData(CanalEntry.RowData rowData, String schemaName, String tableName, long executeTime) {
        InsertEvent insertEvent = new InsertEvent(schemaName, tableName, executeTime);
        for (CanalEntry.Column column : rowData.getAfterColumnsList()) {
            if (StringUtils.isEmpty((CharSequence)column.getValue())) continue;
            insertEvent.addColumn(this.createEventColumn(column));
        }
        return insertEvent;
    }

    private Column createEventColumn(CanalEntry.Column column) {
        Column dataColumn = new Column(column.getName(), column.getMysqlType(), column.getValue(), column.getUpdated());
        dataColumn.setPrimaryKey(column.getIsKey());
        return dataColumn;
    }

    private DataDistributeEvent handleUpdateData(CanalEntry.RowData rowData, String schemaName, String tableName, long executeTime) {
        UpdateEvent updateEvent = new UpdateEvent(schemaName, tableName, executeTime);
        block8: for (int i = 0; i < rowData.getAfterColumnsList().size(); ++i) {
            CanalEntry.Column afterColumn = (CanalEntry.Column)rowData.getAfterColumnsList().get(i);
            CanalEntry.Column beforeColumn = (CanalEntry.Column)rowData.getBeforeColumnsList().get(i);
            if (StringUtils.isEmpty((CharSequence)beforeColumn.getValue()) && StringUtils.isEmpty((CharSequence)afterColumn.getValue())) continue;
            Column column = this.createEventColumn(afterColumn);
            column.setOldValue(beforeColumn.getValue());
            updateEvent.addColumn(column);
            if (!"dr".equals(column.getName()) || column.getValue().equals(column.getOldValue())) continue;
            switch (column.getValue()) {
                case "1": {
                    updateEvent.setDr(1);
                    continue block8;
                }
                case "2": {
                    updateEvent.setDr(2);
                }
            }
        }
        return updateEvent;
    }

    @Autowired
    protected void setContext(ApplicationContext context) {
        this.context = context;
    }
}

