/*
 * Decompiled with CFR 0.152.
 */
package com.dtyunxi.huieryun.datasubscribe.provider.canal;

import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import com.dtyunxi.huieryun.datasubscribe.event.Column;
import com.dtyunxi.huieryun.datasubscribe.event.DataSubScribeEvent;
import com.dtyunxi.huieryun.datasubscribe.event.DeleteEvent;
import com.dtyunxi.huieryun.datasubscribe.event.InsertEvent;
import com.dtyunxi.huieryun.datasubscribe.event.UpdateEvent;
import com.dtyunxi.huieryun.log.LoggerFactory;
import com.google.common.eventbus.EventBus;
import com.google.protobuf.ByteString;
import java.util.ArrayList;
import java.util.List;
import org.apache.commons.collections.CollectionUtils;
import org.slf4j.Logger;

public class CanalEventHandler {
    private static final Logger logger = LoggerFactory.getLogger(CanalEventHandler.class);

    public List<DataSubScribeEvent> onEvent(Message message) {
        ArrayList<DataSubScribeEvent> events = new ArrayList<DataSubScribeEvent>();
        for (CanalEntry.Entry entry : message.getEntries()) {
            List<DataSubScribeEvent> eventList;
            if (entry.getEntryType() != CanalEntry.EntryType.ROWDATA || !CollectionUtils.isNotEmpty(eventList = this.handlerRowData(entry, null))) continue;
            events.addAll(eventList);
        }
        return events;
    }

    public List<DataSubScribeEvent> onEvent(Message message, EventBus eventBus) {
        ArrayList<DataSubScribeEvent> events = new ArrayList<DataSubScribeEvent>();
        for (CanalEntry.Entry entry : message.getEntries()) {
            if (entry.getEntryType() != CanalEntry.EntryType.ROWDATA) continue;
            this.handlerRowData(entry, eventBus);
        }
        return events;
    }

    private List<DataSubScribeEvent> handlerRowData(CanalEntry.Entry entry, EventBus eventBus) {
        CanalEntry.RowChange rowChage = null;
        try {
            rowChage = CanalEntry.RowChange.parseFrom((ByteString)entry.getStoreValue());
        }
        catch (Exception e) {
            throw new RuntimeException("parse event has an error , data:" + entry.toString(), e);
        }
        CanalEntry.EventType eventType = rowChage.getEventType();
        if (eventType == CanalEntry.EventType.QUERY || rowChage.getIsDdl()) {
            return null;
        }
        ArrayList<DataSubScribeEvent> events = null;
        if (eventBus == null) {
            events = new ArrayList<DataSubScribeEvent>(rowChage.getRowDatasList().size());
        }
        long executeTime = entry.getHeader().getExecuteTime();
        String schemaName = entry.getHeader().getSchemaName();
        String tableName = entry.getHeader().getTableName();
        for (CanalEntry.RowData rowData : rowChage.getRowDatasList()) {
            DataSubScribeEvent event = null;
            if (eventType == CanalEntry.EventType.DELETE) {
                logger.info("handler canal event,type is delete,schemaName:{},tableName:{},columns:{}", new Object[]{schemaName, tableName, this.printRowData(rowData)});
                event = this.handleDeleteData(rowData, schemaName, tableName, executeTime);
            } else if (eventType == CanalEntry.EventType.INSERT) {
                logger.info("handler canal event,type is insert,schemaName:{},tableName:{},columns:{}", new Object[]{schemaName, tableName, this.printRowData(rowData)});
                event = this.handleInsertData(rowData, schemaName, tableName, executeTime);
            } else {
                logger.info("handler canal event,type is update,schemaName:{},tableName:{},columns:{}", new Object[]{schemaName, tableName, this.printRowData(rowData)});
                event = this.handleUpdateData(rowData, schemaName, tableName, executeTime);
            }
            if (null == event) continue;
            if (eventBus == null) {
                events.add(event);
                continue;
            }
            eventBus.post((Object)event);
        }
        return events;
    }

    private DataSubScribeEvent handleDeleteData(CanalEntry.RowData rowData, String schemaName, String tableName, long executeTime) {
        DeleteEvent deteleEvent = new DeleteEvent(schemaName, tableName, executeTime);
        for (CanalEntry.Column column : rowData.getBeforeColumnsList()) {
            deteleEvent.addColumn(this.createEventColumn(column));
        }
        return deteleEvent;
    }

    private DataSubScribeEvent handleInsertData(CanalEntry.RowData rowData, String schemaName, String tableName, long executeTime) {
        InsertEvent insertEvent = new InsertEvent(schemaName, tableName, executeTime);
        for (CanalEntry.Column column : rowData.getAfterColumnsList()) {
            insertEvent.addColumn(this.createEventColumn(column));
        }
        return insertEvent;
    }

    private Column createEventColumn(CanalEntry.Column column) {
        return new Column(column.getName(), column.getMysqlType(), column.getValue(), column.getUpdated());
    }

    private DataSubScribeEvent handleUpdateData(CanalEntry.RowData rowData, String schemaName, String tableName, long executeTime) {
        UpdateEvent updateEvent = new UpdateEvent(schemaName, tableName, executeTime);
        for (int i = 0; i < rowData.getAfterColumnsList().size(); ++i) {
            CanalEntry.Column afterColumn = (CanalEntry.Column)rowData.getAfterColumnsList().get(i);
            Column column = this.createEventColumn(afterColumn);
            CanalEntry.Column beforeColumn = (CanalEntry.Column)rowData.getBeforeColumnsList().get(i);
            column.setOldValue(beforeColumn.getValue());
            updateEvent.addColumn(column);
            if (!"dr".equals(column.getName()) || !"1".equals(column.getValue()) || !"0".equals(column.getOldValue())) continue;
            updateEvent.setDr(1);
        }
        return updateEvent;
    }

    private String printRowData(CanalEntry.RowData rowData) {
        StringBuilder sb = new StringBuilder();
        List columns = rowData.getAfterColumnsList();
        if (columns != null) {
            for (CanalEntry.Column column : columns) {
                if (column == null) continue;
                sb.append(column.getName() + "-" + column.getValue()).append(" ");
            }
        }
        return sb.toString();
    }
}

