gpt4 book ai didi

分库分表—4.数据迁移系统文档

转载 作者:撒哈拉 更新时间:2024-12-07 22:23:25 70 4
gpt4 key购买 nike

大纲 。

1.数据库设计 。

2.枚举类 。

3.接⼝设计 。

4.定时任务设计 。

(1)定时核对校验数据的定时任务 。

(2)数据量统计定时任务 。

(3)增量数据落地定时任务 。

(4)失败重试定时任务 。

5.技术亮点 。

(1)滚动拉取方案 。

(2)巧妙的统计滚动进度方案 。

(3)防止增量同步数据丢失和高效写入方案 。

(4)数据扩容方案 。

6.全量同步和增量同步整体流程图 。

7.功能升级 。

(1)数据迁移系统数据源动态化配置 。

(2)迁移数据库操作对应的xml动态⽣成 。

(3)扩容迁移数据时的问题 。

  。

1.数据库设计 。

(1)订单表——order_info 。

create table order_info (
    id bigint(32) auto_increment,
    order_no varchar(32) not null comment '订单号',
    order_amount decimal(8, 2) not null comment '订单⾦额',
    merchant_id bigint(32) not null comment '商户ID',
    user_id bigint(32) not null comment '⽤户ID',
    order_freight decimal(8, 2) default 0.00 not null comment '运费',
    order_status tinyint(3) default 0 not null comment '订单状态:10待付款,20待接单,30已接单,40配送中,50已完成,55部分退款,60全部退款,70取消订单',
    trans_time timestamp default CURRENT_TIMESTAMP not null on update CURRENT_TIMESTAMP comment '交易时间',
    pay_status tinyint(3) default 2 not null comment '⽀付状态:1待⽀付、2⽀付成功、3⽀付失败',
    recharge_time timestamp default CURRENT_TIMESTAMP not null comment '⽀付完成时间',
    pay_amount decimal(8, 2) default 0.00 not null comment '实际⽀付⾦额',
    pay_discount_amount decimal(8, 2) default 0.00 not null comment '⽀付优惠⾦额',
    address_id bigint(32) not null comment '收货地址ID',
    delivery_type tinyint(3) default 2 not null comment '配送⽅式:1⾃提,2配送',
    delivery_status tinyint(3) default 0 null comment '配送状态:0 配送中,2已送达,3待收货,4已送达',
    delivery_expect_time timestamp null comment '配送预计送达时间',
    delivery_complete_time timestamp null comment '配送送达时间',
    delivery_amount decimal(8, 2) default 0.00 not null comment '配送运费',
    coupon_id bigint(32) null comment '优惠券id',
    cancel_time timestamp null comment '订单取消时间',
    confirm_time timestamp null comment '订单确认时间',
    remark varchar(512) null comment '订单备注留⾔',
    create_user bigint(32) null comment '创建⽤户',
    update_user bigint(32) null comment '更新⽤户',
    create_time timestamp default CURRENT_TIMESTAMP not null comment '创建时间',
    update_time timestamp null on update CURRENT_TIMESTAMP comment '更新时间',
    delete_flag tinyint default 0 not null comment '逻辑删除标记',
    primary key (id, order_no)
) comment '订单表'; 
create index inx_order_no on order_info (order_no); 
create index inx_create_time on order_info (create_time, order_no);

(2)订单详情表——order_info 。

-- auto-generated definition 
create table order_item_detail (
    id bigint(32) auto_increment primary key,
    order_no varchar(32) not null comment '订单号',
    product_id bigint(32) not null comment '商品ID',
    category_id bigint(32) not null comment '商品分类ID',
    goods_num int(8) default 1 not null comment '商品购买数量',
    goods_price decimal(8, 2) default 0.00 not null comment '商品单价',
    goods_amount decimal(8, 2) default 0.00 not null comment '商品总价',
    product_name varchar(64) null comment '商品名',
    discount_amount decimal(8, 2) default 0.00 not null comment '商品优惠⾦额',
    discount_id bigint(32) null comment '参与活动ID',
    product_picture_url varchar(128) null comment '商品图⽚',
    create_user bigint(32) null comment '创建⽤户',
    update_user bigint(32) null comment '更新⽤户',
    create_time timestamp default CURRENT_TIMESTAMP not null comment '创建时间',
    update_time timestamp default CURRENT_TIMESTAMP not null on update CURRENT_TIMESTAMP comment '更新时间',
    delete_flag tinyint default 0 not null comment '逻辑删除标记' 
) comment '订单明细表' charset = utf8; 
create index inx_create_time on order_item_detail (create_time, order_no); 
create index inx_item_order_no on order_item_detail (order_no);

(3)binlog消息消费记录表——etl_binlog_consume_record 。

create table etl_binlog_consume_record (
    id bigint auto_increment comment '主键' primary key,
    queue_id int null comment '消息队列id(即:queueId)',
    offset bigint null comment '消息偏移量(唯⼀定位该消息在队列中的位置)',
    topic varchar(500) null comment '消息所属主题',
    broker_name varchar(255) null comment '消息所在broker名称',
    consume_status tinyint(1) null comment '消费状态:0未消费,1消费成功,2已提交',
    create_time datetime null comment '记录创建时间',
    update_time datetime null comment '记录更新时间',
    constraint queue_id unique (queue_id, offset)
) comment 'binlog消息消费记录表' charset = utf8mb4;

(4)迁移明细表——etl_dirty_record 。

create table etl_dirty_record (
    id bigint auto_increment comment '主键' primary key,
    logic_model varchar(255) not null comment '逻辑模型名(逻辑表或模型名称)',
    ticket varchar(32) not null comment '迁移批次',
    cur_ticket_stage int(10) not null comment '当前所属批次阶段号',
    record_key varchar(60) not null comment '字段名',
    record_value varchar(128) null comment '字段值',
    status int(12) null comment '迁移状态',
    error_msg varchar(500) null comment '错误消息',
    retry_times int(12) null comment '已重试次数',
    last_retry_time datetime null comment '上次重试时间',
    is_deleted tinyint(1) default 0 null comment '0未被删除,1已删除',
    create_time datetime default CURRENT_TIMESTAMP not null comment '创建时间',
    update_time datetime default CURRENT_TIMESTAMP not null on update CURRENT_TIMESTAMP comment '修改时间',
    sync_size int default 0 null comment '每次同步数量' 
) comment '迁移明细表' charset = utf8mb4;

(5)迁移表——etl_progress 。

create table etl_progress (
    id bigint auto_increment comment '主键' primary key,
    logic_model varchar(255) null comment '逻辑模型名(逻辑表或模型名称)',
    ticket varchar(32) null comment '迁移批次',
    cur_ticket_stage int(10) null comment '当前所属批次阶段号',
    progress_type int(10) null comment '进度类型(0滚动查询数据,1核对查询数据)',
    status int(12) null comment '迁移状态:1同步中,2同步完成,3同步失败',
    retry_times int default 0 null comment '已同步次数',
    finish_record bigint default 0 null comment '已完成记录数',
    scroll_id varchar(100) default '0' null comment '记录上⼀次滚动最后记录字段值',
    scroll_time datetime null comment '开始滚动时间',
    scroll_end_time datetime null comment '滚动截⽌时间',
    is_deleted tinyint(1) default 0 null comment '0:未被删除,1:已删除',
    create_time datetime default CURRENT_TIMESTAMP not null comment '创建时间',
    update_time datetime default CURRENT_TIMESTAMP not null on update CURRENT_TIMESTAMP comment '修改时间' 
) comment '迁移表' charset = utf8mb4;

(6)迁移配置表——etl_progress_config 。

create table etl_progress_config (
    id bigint auto_increment comment '主键' primary key,
    logic_model varchar(255) null comment '逻辑模型名(逻辑表或模型名称)',
    record_key varchar(32) null comment '迁移批次模型字段名称',
    record_type int(10) null comment '迁移字段匹配类型(0唯⼀字段,1查询匹配字段)',
    is_deleted tinyint(1) default 0 null comment '0:未被删除,1:已删除',
    create_time datetime default CURRENT_TIMESTAMP not null comment '创建时间',
    update_time datetime default CURRENT_TIMESTAMP not null on update CURRENT_TIMESTAMP comment '修改时间' 
) comment '迁移配置表' charset = utf8mb4;

(7)需要迁移的表的数据量统计表——etl_statistical 。

create table etl_statistical (
    id bigint auto_increment comment '主键' primary key,
    logic_model varchar(255) null comment '逻辑模型名(逻辑表或模型名称)',
    statistical_count bigint null comment '统计数据量',
    statistical_time int(8) null comment '统计时间(按天为单位)',
    is_deleted tinyint(1) default 0 null comment '0:未被删除,1:已删除',
    create_time datetime default CURRENT_TIMESTAMP not null comment '创建时间',
    update_time datetime default CURRENT_TIMESTAMP not null on update CURRENT_TIMESTAMP comment '修改时间' 
) comment '需要迁移的表的数据量统计表' charset = utf8mb4; 
create index inx_logic_model on etl_statistical (logic_model); 
create index inx_statistical_time on etl_statistical (statistical_time);

  。

2.枚举类 。

(1)增量数据同步的操作类型——BinlogType 。

public enum BinlogType {
    INSERT ("新增", "INSERT"),
    UPDATE ("修改", "UPDATE"),
    DELETE ("删除", "DELETE");
}

(2)增量同步消费操作结果——ConsumerStatus 。

public enum ConsumerStatus {
    NOT_CONSUME ("未消费", 0),
    CONSUME_SUCCESS ("消费成功", 1),
    COMMITTED("已提交", 2); 
}

(3)DB数据库渠道——DBChannel 。

public enum DBChannel {
    //渠道⼀
    CHANNEL_1 ("历史数据库", "1"),
    //渠道⼆
    CHANNEL_2 ("新的数据库", "2");
}

(4)操作结果枚举值——OperateResult 。

public enum OperateResult {
    FAILED ("失败", "1"),
    SUCCESS ("成功", "0"); 
}

(5)操作类型——OperateType 。

public enum OperateType {
    ADD ("增量", 1),
    ALL ("全量", 2);
}

(6)滚动类型——ProgressType 。

public enum ProgressType {
    RANGE_SCROLL("滚动查询数据", 0),
    CHECK_DATA ("核对查询数据", 1); 
}

  。

3.接⼝设计 。

(1)访问迁移看板界⾯ 。

http://localhost:8080/migrate/toIndex

(2)查询同步进度接⼝ 。

//取得迁移进度信息
//@param queryCondition 查询条件
@RequestMapping(value = "/getEtlProgresses", method = RequestMethod.POST)
public Map<String, Object> getEtlProgresses(@RequestBody EtlProgressReq queryCondition) {
    Map<String, Object> resultMap = new HashMap<>();
    resultMap.put("resultCode", OperateResult.SUCCESS.getValue());
    resultMap.put("resultMsg", OperateResult.SUCCESS.getName());
    EtlProgress etlProgress = new EtlProgress();
    BeanUtils.copyProperties(queryCondition, etlProgress);
    List<EtlProgress> resultList = migrateService.getEtlProgresses(etlProgress);
    resultMap.put("resultList", resultList);
    return resultMap;
}

(3)发起全量同步接⼝ 。

需要同步的表和时间段 。

//新增全量同步 将前端传过来的世界格式化
//@param rangeScroll 全量同步条件
//@return 保存结果
@RequestMapping(value = "/addScroll", method = RequestMethod.POST)
public Map<String, Object> addScroll(@RequestBody RangeScroll rangeScroll) {
    rangeScroll.setStartTime(DateUtils.getStartTimeOfDate(rangeScroll.getStartTime()));
    rangeScroll.setEndTime(DateUtils.getDayEndTime(rangeScroll.getEndTime()));
    Map<String, Object> resultMap = new HashMap<>();
    resultMap.put("resultCode", OperateResult.SUCCESS.getValue());
    resultMap.put("resultMsg", OperateResult.SUCCESS.getName());
    scrollProcessor.scroll(rangeScroll);
    return resultMap; 
}

  。

4.定时任务设计 。

(1)定时核对校验数据的定时任务 。

代码入口:CheckDataTask 。

public void CheckData() {
    log.info("数据核对校验开始");
    if (lock.tryLock()) {
        try {
            CheckDataProcessor checkDataProcessor = CheckDataProcessor.getInstance();
            //查询已同步完成的批次,未核对的数据进⾏核对处理
            List<RangeScroll> rangeScrollList = checkDataProcessor.queryCheckDataList();
            for (RangeScroll rangeScroll:rangeScrollList) {// 已经在核对,本次跳过
                checkDataProcessor.checkData(rangeScroll);
            }
        } catch (Exception e) {
            log.error("数据核对过程中发⽣异常 {}", e.getMessage(), e);
        } finally {
            log.info("数据核对校验结束");
            lock.unlock();
        }
    } 
}

流程图:

核心代码如下:

//核验数据
//@param rangeScroll 要检查的数据抽取模型
public void checkData(RangeScroll rangeScroll) {
    EtlProgress etlProgress = addEtlProgress(rangeScroll);
    try {
        //1.先获取⽼库的⼀批数据
        List<Map<String, Object>> sourceList = querySourceList(rangeScroll);
        //2.再获取新库的⼀批数据
        List<Map<String, Object>> targetList = queryTargetList(sourceList, rangeScroll);
        //3.对数据进⾏核对校验
        Map<BinlogType, List<Map<String, Object>>> comparisonMap = comparison(sourceList,targetList,rangeScroll);
        //4.对数据进⾏归正处理
        updateComparisonData(comparisonMap,rangeScroll);
        //5.完成数据核对校验,更改状态
        updateEtlDirtyRecord(etlProgress, EtlProgressStatus.CHECK_SUCCESS.getValue(),rangeScroll, null);
    } catch (Exception e) {
        //数据核对过程失败,只记录数据核对错误信息 
        updateEtlDirtyRecord(etlProgress, EtlProgressStatus.SUCCESS.getValue(),rangeScroll, e.getMessage());
        log.error("数据核对过程中发⽣异常 {"+e.getMessage()+"}", etlProgress);
    }
}

(2)数据量统计定时任务 。

代码入口:CountCacheTask 。

@Scheduled(cron = "0 0 0/1 * * ? *")
void countRefresh() {
    try {
        Constants.statisticalCountMap.clear();
        //获取所有配置的需要同步的表
        List<String> filedKeyList = MergeConfig.getFiledKey(STATISTICAL_KEY);
        for (String fileKey : filedKeyList) {
            log.info("开始同步:" + fileKey + "的表数据");
            EtlStatistical etlStatistical = new EtlStatistical();
            etlStatistical.setLogicModel(fileKey);
            EtlStatistical etlStatistical1 = migrateScrollMapper.getMaxDateEtlStatistical(etlStatistical);
            //验证单个表的数据明细是否已统计,如果未统计则默认从最⼩时间天数开始统计
            //否则只更新最近15天的数据(超过15天的数据变化概率很低,暂不考虑更新)
            if (ObjectUtils.isNotEmpty(etlStatistical1)) {
                //已统计的最⼤时间格式
                Integer statisticalTime = etlStatistical1.getStatisticalTime();
                Long distanceDays = DateUtils.getDistanceDays(String.valueOf(statisticalTime), DateUtils.format(new Date()) + "");
                Date minDate = null;
                if (distanceDays < 15) {
                    //更新最近15天的该表统计数量
                    minDate = DateUtils.addDays( -15);
                } else {
                    minDate = DateUtils.parseStrToDate(String.valueOf(statisticalTime), DateUtils.DATE_FORMAT_YYYYMMDD);
                }
                saveStatistical(minDate, fileKey, false);
            } else {
                //先取最⼩的⽇期,然后以该⽇期,以天为单位开始统计
                Date minDate = getMinDate(fileKey);
                saveStatistical(minDate, fileKey, true);
            }
        }
    } catch (Exception e) {
        e.printStackTrace();
    } 
}

流程图:

(3)增量数据落地定时任务 。

代码入口:IncrementTask 。

void IncrementTask() {
    //获取阻塞队列的⽅法
    LocalQueue localQueue = LocalQueue.getInstance();
    //验证读队列的数据已被处理完毕
    if (!localQueue.getIsRead()) {
        log.info("增量数据执⾏写⼊");
        //执⾏数据写⼊
        localQueue.doCommit();
    } 
}

流程图: