大纲 。
1.数据库设计 。
2.枚举类 。
3.接⼝设计 。
4.定时任务设计 。
(1)定时核对校验数据的定时任务 。
(2)数据量统计定时任务 。
(3)增量数据落地定时任务 。
(4)失败重试定时任务 。
5.技术亮点 。
(1)滚动拉取方案 。
(2)巧妙的统计滚动进度方案 。
(3)防止增量同步数据丢失和高效写入方案 。
(4)数据扩容方案 。
6.全量同步和增量同步整体流程图 。
7.功能升级 。
(1)数据迁移系统数据源动态化配置 。
(2)迁移数据库操作对应的xml动态⽣成 。
(3)扩容迁移数据时的问题 。
。
1.数据库设计 。
(1)订单表——order_info 。
create table order_info (
id bigint(32) auto_increment,
order_no varchar(32) not null comment '订单号',
order_amount decimal(8, 2) not null comment '订单⾦额',
merchant_id bigint(32) not null comment '商户ID',
user_id bigint(32) not null comment '⽤户ID',
order_freight decimal(8, 2) default 0.00 not null comment '运费',
order_status tinyint(3) default 0 not null comment '订单状态:10待付款,20待接单,30已接单,40配送中,50已完成,55部分退款,60全部退款,70取消订单',
trans_time timestamp default CURRENT_TIMESTAMP not null on update CURRENT_TIMESTAMP comment '交易时间',
pay_status tinyint(3) default 2 not null comment '⽀付状态:1待⽀付、2⽀付成功、3⽀付失败',
recharge_time timestamp default CURRENT_TIMESTAMP not null comment '⽀付完成时间',
pay_amount decimal(8, 2) default 0.00 not null comment '实际⽀付⾦额',
pay_discount_amount decimal(8, 2) default 0.00 not null comment '⽀付优惠⾦额',
address_id bigint(32) not null comment '收货地址ID',
delivery_type tinyint(3) default 2 not null comment '配送⽅式:1⾃提,2配送',
delivery_status tinyint(3) default 0 null comment '配送状态:0 配送中,2已送达,3待收货,4已送达',
delivery_expect_time timestamp null comment '配送预计送达时间',
delivery_complete_time timestamp null comment '配送送达时间',
delivery_amount decimal(8, 2) default 0.00 not null comment '配送运费',
coupon_id bigint(32) null comment '优惠券id',
cancel_time timestamp null comment '订单取消时间',
confirm_time timestamp null comment '订单确认时间',
remark varchar(512) null comment '订单备注留⾔',
create_user bigint(32) null comment '创建⽤户',
update_user bigint(32) null comment '更新⽤户',
create_time timestamp default CURRENT_TIMESTAMP not null comment '创建时间',
update_time timestamp null on update CURRENT_TIMESTAMP comment '更新时间',
delete_flag tinyint default 0 not null comment '逻辑删除标记',
primary key (id, order_no)
) comment '订单表';
create index inx_order_no on order_info (order_no);
create index inx_create_time on order_info (create_time, order_no);
(2)订单详情表——order_info 。
-- auto-generated definition
create table order_item_detail (
id bigint(32) auto_increment primary key,
order_no varchar(32) not null comment '订单号',
product_id bigint(32) not null comment '商品ID',
category_id bigint(32) not null comment '商品分类ID',
goods_num int(8) default 1 not null comment '商品购买数量',
goods_price decimal(8, 2) default 0.00 not null comment '商品单价',
goods_amount decimal(8, 2) default 0.00 not null comment '商品总价',
product_name varchar(64) null comment '商品名',
discount_amount decimal(8, 2) default 0.00 not null comment '商品优惠⾦额',
discount_id bigint(32) null comment '参与活动ID',
product_picture_url varchar(128) null comment '商品图⽚',
create_user bigint(32) null comment '创建⽤户',
update_user bigint(32) null comment '更新⽤户',
create_time timestamp default CURRENT_TIMESTAMP not null comment '创建时间',
update_time timestamp default CURRENT_TIMESTAMP not null on update CURRENT_TIMESTAMP comment '更新时间',
delete_flag tinyint default 0 not null comment '逻辑删除标记'
) comment '订单明细表' charset = utf8;
create index inx_create_time on order_item_detail (create_time, order_no);
create index inx_item_order_no on order_item_detail (order_no);
(3)binlog消息消费记录表——etl_binlog_consume_record 。
create table etl_binlog_consume_record (
id bigint auto_increment comment '主键' primary key,
queue_id int null comment '消息队列id(即:queueId)',
offset bigint null comment '消息偏移量(唯⼀定位该消息在队列中的位置)',
topic varchar(500) null comment '消息所属主题',
broker_name varchar(255) null comment '消息所在broker名称',
consume_status tinyint(1) null comment '消费状态:0未消费,1消费成功,2已提交',
create_time datetime null comment '记录创建时间',
update_time datetime null comment '记录更新时间',
constraint queue_id unique (queue_id, offset)
) comment 'binlog消息消费记录表' charset = utf8mb4;
(4)迁移明细表——etl_dirty_record 。
create table etl_dirty_record (
id bigint auto_increment comment '主键' primary key,
logic_model varchar(255) not null comment '逻辑模型名(逻辑表或模型名称)',
ticket varchar(32) not null comment '迁移批次',
cur_ticket_stage int(10) not null comment '当前所属批次阶段号',
record_key varchar(60) not null comment '字段名',
record_value varchar(128) null comment '字段值',
status int(12) null comment '迁移状态',
error_msg varchar(500) null comment '错误消息',
retry_times int(12) null comment '已重试次数',
last_retry_time datetime null comment '上次重试时间',
is_deleted tinyint(1) default 0 null comment '0未被删除,1已删除',
create_time datetime default CURRENT_TIMESTAMP not null comment '创建时间',
update_time datetime default CURRENT_TIMESTAMP not null on update CURRENT_TIMESTAMP comment '修改时间',
sync_size int default 0 null comment '每次同步数量'
) comment '迁移明细表' charset = utf8mb4;
(5)迁移表——etl_progress 。
create table etl_progress (
id bigint auto_increment comment '主键' primary key,
logic_model varchar(255) null comment '逻辑模型名(逻辑表或模型名称)',
ticket varchar(32) null comment '迁移批次',
cur_ticket_stage int(10) null comment '当前所属批次阶段号',
progress_type int(10) null comment '进度类型(0滚动查询数据,1核对查询数据)',
status int(12) null comment '迁移状态:1同步中,2同步完成,3同步失败',
retry_times int default 0 null comment '已同步次数',
finish_record bigint default 0 null comment '已完成记录数',
scroll_id varchar(100) default '0' null comment '记录上⼀次滚动最后记录字段值',
scroll_time datetime null comment '开始滚动时间',
scroll_end_time datetime null comment '滚动截⽌时间',
is_deleted tinyint(1) default 0 null comment '0:未被删除,1:已删除',
create_time datetime default CURRENT_TIMESTAMP not null comment '创建时间',
update_time datetime default CURRENT_TIMESTAMP not null on update CURRENT_TIMESTAMP comment '修改时间'
) comment '迁移表' charset = utf8mb4;
(6)迁移配置表——etl_progress_config 。
create table etl_progress_config (
id bigint auto_increment comment '主键' primary key,
logic_model varchar(255) null comment '逻辑模型名(逻辑表或模型名称)',
record_key varchar(32) null comment '迁移批次模型字段名称',
record_type int(10) null comment '迁移字段匹配类型(0唯⼀字段,1查询匹配字段)',
is_deleted tinyint(1) default 0 null comment '0:未被删除,1:已删除',
create_time datetime default CURRENT_TIMESTAMP not null comment '创建时间',
update_time datetime default CURRENT_TIMESTAMP not null on update CURRENT_TIMESTAMP comment '修改时间'
) comment '迁移配置表' charset = utf8mb4;
(7)需要迁移的表的数据量统计表——etl_statistical 。
create table etl_statistical (
id bigint auto_increment comment '主键' primary key,
logic_model varchar(255) null comment '逻辑模型名(逻辑表或模型名称)',
statistical_count bigint null comment '统计数据量',
statistical_time int(8) null comment '统计时间(按天为单位)',
is_deleted tinyint(1) default 0 null comment '0:未被删除,1:已删除',
create_time datetime default CURRENT_TIMESTAMP not null comment '创建时间',
update_time datetime default CURRENT_TIMESTAMP not null on update CURRENT_TIMESTAMP comment '修改时间'
) comment '需要迁移的表的数据量统计表' charset = utf8mb4;
create index inx_logic_model on etl_statistical (logic_model);
create index inx_statistical_time on etl_statistical (statistical_time);
。
2.枚举类 。
(1)增量数据同步的操作类型——BinlogType 。
public enum BinlogType {
INSERT ("新增", "INSERT"),
UPDATE ("修改", "UPDATE"),
DELETE ("删除", "DELETE");
}
(2)增量同步消费操作结果——ConsumerStatus 。
public enum ConsumerStatus {
NOT_CONSUME ("未消费", 0),
CONSUME_SUCCESS ("消费成功", 1),
COMMITTED("已提交", 2);
}
(3)DB数据库渠道——DBChannel 。
public enum DBChannel {
//渠道⼀
CHANNEL_1 ("历史数据库", "1"),
//渠道⼆
CHANNEL_2 ("新的数据库", "2");
}
(4)操作结果枚举值——OperateResult 。
public enum OperateResult {
FAILED ("失败", "1"),
SUCCESS ("成功", "0");
}
(5)操作类型——OperateType 。
public enum OperateType {
ADD ("增量", 1),
ALL ("全量", 2);
}
(6)滚动类型——ProgressType 。
public enum ProgressType {
RANGE_SCROLL("滚动查询数据", 0),
CHECK_DATA ("核对查询数据", 1);
}
。
3.接⼝设计 。
(1)访问迁移看板界⾯ 。
http://localhost:8080/migrate/toIndex
(2)查询同步进度接⼝ 。
//取得迁移进度信息
//@param queryCondition 查询条件
@RequestMapping(value = "/getEtlProgresses", method = RequestMethod.POST)
public Map<String, Object> getEtlProgresses(@RequestBody EtlProgressReq queryCondition) {
Map<String, Object> resultMap = new HashMap<>();
resultMap.put("resultCode", OperateResult.SUCCESS.getValue());
resultMap.put("resultMsg", OperateResult.SUCCESS.getName());
EtlProgress etlProgress = new EtlProgress();
BeanUtils.copyProperties(queryCondition, etlProgress);
List<EtlProgress> resultList = migrateService.getEtlProgresses(etlProgress);
resultMap.put("resultList", resultList);
return resultMap;
}
(3)发起全量同步接⼝ 。
需要同步的表和时间段 。
//新增全量同步 将前端传过来的世界格式化
//@param rangeScroll 全量同步条件
//@return 保存结果
@RequestMapping(value = "/addScroll", method = RequestMethod.POST)
public Map<String, Object> addScroll(@RequestBody RangeScroll rangeScroll) {
rangeScroll.setStartTime(DateUtils.getStartTimeOfDate(rangeScroll.getStartTime()));
rangeScroll.setEndTime(DateUtils.getDayEndTime(rangeScroll.getEndTime()));
Map<String, Object> resultMap = new HashMap<>();
resultMap.put("resultCode", OperateResult.SUCCESS.getValue());
resultMap.put("resultMsg", OperateResult.SUCCESS.getName());
scrollProcessor.scroll(rangeScroll);
return resultMap;
}
。
4.定时任务设计 。
(1)定时核对校验数据的定时任务 。
代码入口:CheckDataTask 。
public void CheckData() {
log.info("数据核对校验开始");
if (lock.tryLock()) {
try {
CheckDataProcessor checkDataProcessor = CheckDataProcessor.getInstance();
//查询已同步完成的批次,未核对的数据进⾏核对处理
List<RangeScroll> rangeScrollList = checkDataProcessor.queryCheckDataList();
for (RangeScroll rangeScroll:rangeScrollList) {// 已经在核对,本次跳过
checkDataProcessor.checkData(rangeScroll);
}
} catch (Exception e) {
log.error("数据核对过程中发⽣异常 {}", e.getMessage(), e);
} finally {
log.info("数据核对校验结束");
lock.unlock();
}
}
}
流程图:
核心代码如下:
//核验数据
//@param rangeScroll 要检查的数据抽取模型
public void checkData(RangeScroll rangeScroll) {
EtlProgress etlProgress = addEtlProgress(rangeScroll);
try {
//1.先获取⽼库的⼀批数据
List<Map<String, Object>> sourceList = querySourceList(rangeScroll);
//2.再获取新库的⼀批数据
List<Map<String, Object>> targetList = queryTargetList(sourceList, rangeScroll);
//3.对数据进⾏核对校验
Map<BinlogType, List<Map<String, Object>>> comparisonMap = comparison(sourceList,targetList,rangeScroll);
//4.对数据进⾏归正处理
updateComparisonData(comparisonMap,rangeScroll);
//5.完成数据核对校验,更改状态
updateEtlDirtyRecord(etlProgress, EtlProgressStatus.CHECK_SUCCESS.getValue(),rangeScroll, null);
} catch (Exception e) {
//数据核对过程失败,只记录数据核对错误信息
updateEtlDirtyRecord(etlProgress, EtlProgressStatus.SUCCESS.getValue(),rangeScroll, e.getMessage());
log.error("数据核对过程中发⽣异常 {"+e.getMessage()+"}", etlProgress);
}
}
(2)数据量统计定时任务 。
代码入口:CountCacheTask 。
@Scheduled(cron = "0 0 0/1 * * ? *")
void countRefresh() {
try {
Constants.statisticalCountMap.clear();
//获取所有配置的需要同步的表
List<String> filedKeyList = MergeConfig.getFiledKey(STATISTICAL_KEY);
for (String fileKey : filedKeyList) {
log.info("开始同步:" + fileKey + "的表数据");
EtlStatistical etlStatistical = new EtlStatistical();
etlStatistical.setLogicModel(fileKey);
EtlStatistical etlStatistical1 = migrateScrollMapper.getMaxDateEtlStatistical(etlStatistical);
//验证单个表的数据明细是否已统计,如果未统计则默认从最⼩时间天数开始统计
//否则只更新最近15天的数据(超过15天的数据变化概率很低,暂不考虑更新)
if (ObjectUtils.isNotEmpty(etlStatistical1)) {
//已统计的最⼤时间格式
Integer statisticalTime = etlStatistical1.getStatisticalTime();
Long distanceDays = DateUtils.getDistanceDays(String.valueOf(statisticalTime), DateUtils.format(new Date()) + "");
Date minDate = null;
if (distanceDays < 15) {
//更新最近15天的该表统计数量
minDate = DateUtils.addDays( -15);
} else {
minDate = DateUtils.parseStrToDate(String.valueOf(statisticalTime), DateUtils.DATE_FORMAT_YYYYMMDD);
}
saveStatistical(minDate, fileKey, false);
} else {
//先取最⼩的⽇期,然后以该⽇期,以天为单位开始统计
Date minDate = getMinDate(fileKey);
saveStatistical(minDate, fileKey, true);
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
流程图:
(3)增量数据落地定时任务 。
代码入口:IncrementTask 。
void IncrementTask() {
//获取阻塞队列的⽅法
LocalQueue localQueue = LocalQueue.getInstance();
//验证读队列的数据已被处理完毕
if (!localQueue.getIsRead()) {
log.info("增量数据执⾏写⼊");
//执⾏数据写⼊
localQueue.doCommit();
}
}
流程图: