一个HBase数据迁移到Mongodb需求,架构师说“你怎么有那么多意外”
共 8112字,需浏览 17分钟
·
2020-10-12 09:39
目录
业务背景
方案确定
数据平滑迁移方案
迁移阶段
迁移优化
复盘
❝只要还有一根头发,说明你还能努力一把?
业务背景
之前公司投票系统的统计用的是 HBase 进行存储,历史数据大概是四亿条,总监说现在需要将 HBase 数据迁移到mongodb,只保存最近两年的数据,其他的数据磁盘备份就行,要求有三:
不丢数据 平滑迁移 不停机
于是作为一个刚刚毕业充满激情和热血的有志美少女,我开完早会第一时刻就去百度搜索方案:
点开看了前面几条之后,摸着键盘的手有点发抖,就差留下两行清澈的泪水,然后默默的我打开了:
方案确定
于是我老大就带着我快乐的分析需求,主要是迁移投票统计数据,有每日投票记录,每日活动投票数记录,每个用户投票记录等等。最终确定三亿条数据分布在两张表里面,那两张数据量大的表由他迁移,另外七张表总共数据量差不多快一个亿,由我来迁移。
因为不能影响到线上的客户,所有我这边迁移方案最终是新老double write + offline data sync and check
,其实就是 线上双写+线下复核
。
我需要迁移的表,有五张表是数据量比较小的就十万条数据左右,于是我先拿着这些表出气,这些表由于数据量不大我是进行全表迁移的,就是从 HBase 查询到所有数据直接往 mongodb 里面倒。
这是我出的数据迁移方案:
数据平滑迁移方案
采用上线期间对新增的数据进行双写策略:
先上线一版对"增删改"数据写两份数据库的措施,在代码原来只操作hbase的情况下,加上mongodb库的操作逻辑; 编写数据迁移的 python 脚本和接口,将数据从 hbase 迁移到 mongodb; 将从 hbase 查询的逻辑旁边全部加上从 mongodb 库查询的逻辑; 迁移完毕,上线的时候,将hbase逻辑全部下掉,查询全部走 mongo; 考虑到数据迁移幂等性问题,迁移代码全部写成覆盖而不是增加票数;
迁移阶段
方案确定了,一切好像都在按照排期有条不紊的进行着:
建立 mongodb 数据表,建立好索引; 上线双写代码,投票写操作同时记录在 mongodb 库,那样迁移期间投票数据记录就不会丢失; 拿着迁移数据量小的表出气,全部查出来一股脑往 mongodb 倾倒;
// 查询 hbase
public List
listVoteRecord(VoteRecordable begin, VoteRecordable end) { byte[] startRow = begin.getRowKey();
byte[] endRow = end.getRowKey();
Scan scan = new Scan(startRow, endRow);
return this.hbaseTemplate.find(begin.getTableName(), scan, this.getRowMapper(begin.getClass()));
}
// mongodb 入库
public void batchInsertTotalActivityVoteRecordDoc(List
totalActivityVoteRecordDocs) { BulkOperations operations = mongoTemplate.bulkOps(BulkOperations.BulkMode.ORDERED, TotalActivityVoteRecordDoc.class);
List
> upsertList = new ArrayList<>(totalActivityVoteRecordDocs.size()); totalActivityVoteRecordDocs.forEach(data -> {
Query query = new Query(Criteria.where("activityId").is(data.getActivityId()));
Update update = new Update();
update.set("voteCount", data.getVoteCount());
Pair
upsertPair = Pair.of(query, update); upsertList.add(upsertPair);
});
operations.upsert(upsertList);
operations.execute();
}
一次 scan 查询会返回大量数据,因此客户端发起一次scan请求,实际并不会一次就将所有数据加载到本地,而是分成多次 RPC 请求进行加载,数据量小的话可以不计较得失愉快的scan,但是数据量太大会有两个问题躲不掉:
严重消耗网络带宽,从而影响其他业务; 本地客户端发生OOM; 请求太大太集中会把 HBase 打爆,因为我的请求是 scan 方式而不是 rowKey 等值查询(等值查询的话需要拼接详细的活动ID或者投票ID);
五张表我愉快的迁移了,但是迁移大表的时候报应来了,一个是查询慢,第二个是查询完插入过程中需要new 很多对象,愉快的 OOM 了,于是这种方式我也就是想着小表先给整了,大表还要另外寻出路。
迁移优化
优化思路:
scan 危险的话,那就等值查询喽,拼接要的东西我又不是没有; 查询慢要提高速度,那就多线程走起; 本地迁移太耽误事情了,放到服务器里面迁移,内存大各种访问走内网还快; 迁移期间调用接口请求失败了需要重试;
于是我在Java代码里面写好查询数据和插入数据的逻辑:
@ResponseBody
public void insertHourlyVoteRecordDoc2(@RequestParam(value = "voteItemId") String voteItemId,
@RequestParam(value = "beginDate") String beginDate,
@RequestParam(value = "endDate") String endDate){
VoteRecordable begin = new HourlyVoteRecord();
begin.setTraceId(voteItemId);
begin.setDate(beginDate);
VoteRecordable end = new HourlyVoteRecord();
end.setTraceId(voteItemId);
end.setDate(endDate);
List
voteRecordables = voteItemStatService.listVoteRecords(begin, end); System.out.println(voteRecordables.size() + "voteRecordables大小是多少 insertHourlyVoteRecordDoc");
if (CollectionUtils.isEmpty(voteRecordables)) {
return;
}
List
hourlyVoteRecordDocs = new ArrayList<>(voteRecordables.size()); for (VoteRecordable voteRecordable1 : voteRecordables) {
HourlyVoteRecord hourlyVoteRecord = (HourlyVoteRecord) voteRecordable1;
HourlyVoteRecordDoc hourlyVoteRecordDoc = new HourlyVoteRecordDoc();
hourlyVoteRecordDoc.setVoteItemId(new ObjectId(hourlyVoteRecord.getVoteItemId()));
hourlyVoteRecordDoc.setVoteCount(hourlyVoteRecord.getVoteCount());
hourlyVoteRecordDoc.setHour(hourlyVoteRecord.getHour());
hourlyVoteRecordDocs.add(hourlyVoteRecordDoc);
}
voteRecordService.batchInsertHourlyVoteRecordDoc(hourlyVoteRecordDocs);
}
然后在python写脚本进行多线程并发请求迁移:
"""
迁移
DailyVoteRecordDoc
HourlyVoteRecord
HourlyActivtiyVoteRecord
"""
import threading
import time
from datetime import datetime, timedelta
from core_service import vote_service
from scripts.biz.vote.migrate_user_vote_records import MigrateLimitException
class MyThread(threading.Thread):
def __init__(self, thread_id, name, archive_activities):
threading.Thread.__init__(self)
self.threadID = thread_id
self.name = name
self.archive_activities = archive_activities
def run(self):
print("Starting " + self.name)
main(self.archive_activities)
print("exiting " + self.name)
def main(archive_activities):
"""
迁移投票记录
:param archive_activities:
:return:
"""
final_date_end_timestamp = 1595433600
for activity in archive_activities:
date_start = time.strftime('%Y-%m-%d', time.localtime(int(activity['dateStart'] / 1000)))
hour_start = time.strftime('%Y-%m-%d-%H', time.localtime(int(activity['dateStart'] / 1000)))
date_end_timestamp = int(activity['dateEnd'] / 1000 + 24 * 60 * 60)
if final_date_end_timestamp < date_end_timestamp:
date_end_timestamp = final_date_end_timestamp
date_end = time.strftime('%Y-%m-%d', time.localtime(date_end_timestamp))
hour_end = time.strftime('%Y-%m-%d-%H', time.localtime(date_end_timestamp))
## 迁移 hourly_activity_vote_records
vote_service.batch_insert_hourly_activity_vote_records(activity.get('_id'), hour_start, hour_end)
print(activity)
vote_items = vote_service.list_vote_items(str(activity['_id']))
try:
for vote_item in vote_items:
# print("user_vote_record: activityId voteItemId score", activity.get('_id'), vote_item.get('_id'))
# print(vote_item)
if vote_item['score'] != 0:
## 迁移 daily_vote_records
vote_service.batch_insert_daily_vote_records(vote_item.get('_id'), date_start, date_end)
## 迁移 hourly_vote_records
print("开始结束时间时间", hour_start, hour_end)
print("开始结束日期", date_start, date_end)
vote_service.batch_insert_hourly_vote_records(vote_item.get('_id'), hour_start, hour_end)
except MigrateLimitException:
print("migrate limit error")
if __name__ == "__main__":
// 调用Java代码接口,查询到所有的活动
archive_activities_local = vote_service.archive_activities_v3()
# 开启多线程
i = 1
available_activities = []
threads = []
for index, local_activity in enumerate(archive_activities_local):
# 每循环700次开一个线程
if index == i * 700:
thread = MyThread(i, "Thread-" + str(i), available_activities)
thread.start()
threads.append(thread)
i += 1
available_activities = []
available_activities.append(local_activity)
if available_activities:
thread = MyThread(i, "Thread-" + str(i), available_activities)
thread.start()
threads.append(thread)
for t in threads:
t.join()
请求如果失败了重试三次
@retry(3)
def batch_insert_hourly_activity_vote_records(activityId, beginDate, endDate):
"""
批量插入活动小时投票记录
:param activityId:
:param beginDate:
:param endDate:
:return:
"""
url = "http://xxx/vote/api/internal/dbMigration/insertHourlyActivityVoteRecordDoc?" \
"activityId={activityId}&beginDate={beginDate}&endDate={endDate}".format(activityId=activityId, beginDate=beginDate, endDate=endDate)
r = requests.session().get(url)
最终放到服务器里面跑了两个小时就跑完了。我老大的三亿条数据跑了一天跑完,我一开始以为我老大说的一天12小时(实际是24小时算的),那我想着我的数据不到他的三分之一,三小时就差不多,于是本地跑,结果跑了一整天。
这便是所有的实现了。
复盘
但是这个过程并没有我说的这么顺利和轻松,架构师给了我五天我延期了三天,我浪费时间的部分有以下几个方面:
私人原因,中间有一天看了下小破牙请了半天假,痛的灵魂出窍无心工作,但是我周末自己在家赶了很多双写代码的进度; 前期迁移还算顺利,但是由于缺少数据迁移的经验,后期大数据表迁移出现了很多意外; 大数据表迁移,而且没有处理好中间中断情况的log记录,导致我开始了不敢轻易中断,然后如果代码逻辑有bug,要从来就很要命; 预估不到数据迁移时间,其中有一天我写完逻辑在本地跑数据迁移,跑了六个小时,严重影响到了我的操作; 线下数据复核的时候,有七张表数据,对的我有点头昏眼花,然后有三张大表数据一直对不上;业务里面有礼物投票刷数据的情况一开始没有想到这个龟孙儿,导致我一直以为数据迁移有问题,拼了命的找原因又找不到逻辑漏洞;
全程写的很痛苦,感觉根本不在我的掌控之中。说了这么多,菜是原罪,努力学习吧,只要还有一根头发就不放弃学习!!!