一个HBase数据迁移到Mongodb需求,架构师说“你怎么有那么多意外”

码农沉思录

共 8112字,需浏览 17分钟

 ·

2020-10-12 09:39


目录

  • 业务背景

  • 方案确定

    • 数据平滑迁移方案

  • 迁移阶段

    • 迁移优化

  • 复盘


只要还有一根头发,说明你还能努力一把?

业务背景

之前公司投票系统的统计用的是 HBase 进行存储,历史数据大概是四亿条,总监说现在需要将 HBase 数据迁移到mongodb,只保存最近两年的数据,其他的数据磁盘备份就行,要求有三:

  1. 不丢数据
  2. 平滑迁移
  3. 不停机

于是作为一个刚刚毕业充满激情和热血的有志美少女,我开完早会第一时刻就去百度搜索方案:

数据迁移方案

点开看了前面几条之后,摸着键盘的手有点发抖,就差留下两行清澈的泪水,然后默默的我打开了:

人生就是这么戏剧

方案确定

于是我老大就带着我快乐的分析需求,主要是迁移投票统计数据,有每日投票记录,每日活动投票数记录,每个用户投票记录等等。最终确定三亿条数据分布在两张表里面,那两张数据量大的表由他迁移,另外七张表总共数据量差不多快一个亿,由我来迁移。

因为不能影响到线上的客户,所有我这边迁移方案最终是新老double write + offline data sync and check,其实就是 线上双写+线下复核

我需要迁移的表,有五张表是数据量比较小的就十万条数据左右,于是我先拿着这些表出气,这些表由于数据量不大我是进行全表迁移的,就是从 HBase 查询到所有数据直接往 mongodb 里面倒。

这是我出的数据迁移方案:

数据平滑迁移方案

采用上线期间对新增的数据进行双写策略:

  1. 先上线一版对"增删改"数据写两份数据库的措施,在代码原来只操作hbase的情况下,加上mongodb库的操作逻辑;
  2. 编写数据迁移的 python 脚本和接口,将数据从 hbase 迁移到 mongodb;
  3. 将从 hbase 查询的逻辑旁边全部加上从 mongodb 库查询的逻辑;
  4. 迁移完毕,上线的时候,将hbase逻辑全部下掉,查询全部走 mongo;
  5. 考虑到数据迁移幂等性问题,迁移代码全部写成覆盖而不是增加票数;

迁移阶段

方案确定了,一切好像都在按照排期有条不紊的进行着:

  1. 建立 mongodb 数据表,建立好索引;
  2. 上线双写代码,投票写操作同时记录在 mongodb 库,那样迁移期间投票数据记录就不会丢失;
  3. 拿着迁移数据量小的表出气,全部查出来一股脑往 mongodb 倾倒;
// 查询 hbasepublic List listVoteRecord(VoteRecordable begin, VoteRecordable end) {        byte[] startRow = begin.getRowKey();        byte[] endRow = end.getRowKey();        Scan scan = new Scan(startRow, endRow);        return this.hbaseTemplate.find(begin.getTableName(), scan, this.getRowMapper(begin.getClass()));    }
// mongodb 入库public void batchInsertTotalActivityVoteRecordDoc(List totalActivityVoteRecordDocs) {        BulkOperations operations = mongoTemplate.bulkOps(BulkOperations.BulkMode.ORDERED, TotalActivityVoteRecordDoc.class);        List> upsertList = new ArrayList<>(totalActivityVoteRecordDocs.size());        totalActivityVoteRecordDocs.forEach(data -> {            Query query = new Query(Criteria.where("activityId").is(data.getActivityId()));            Update update = new Update();            update.set("voteCount", data.getVoteCount());            Pair upsertPair = Pair.of(query, update);            upsertList.add(upsertPair);        });
operations.upsert(upsertList); operations.execute(); }

一次 scan 查询会返回大量数据,因此客户端发起一次scan请求,实际并不会一次就将所有数据加载到本地,而是分成多次 RPC 请求进行加载,数据量小的话可以不计较得失愉快的scan,但是数据量太大会有两个问题躲不掉:

  1. 严重消耗网络带宽,从而影响其他业务;
  2. 本地客户端发生OOM;
  3. 请求太大太集中会把 HBase 打爆,因为我的请求是 scan 方式而不是 rowKey 等值查询(等值查询的话需要拼接详细的活动ID或者投票ID);

五张表我愉快的迁移了,但是迁移大表的时候报应来了,一个是查询慢,第二个是查询完插入过程中需要new 很多对象,愉快的 OOM 了,于是这种方式我也就是想着小表先给整了,大表还要另外寻出路。

迁移优化

优化思路:

  1. scan 危险的话,那就等值查询喽,拼接要的东西我又不是没有;
  2. 查询慢要提高速度,那就多线程走起;
  3. 本地迁移太耽误事情了,放到服务器里面迁移,内存大各种访问走内网还快;
  4. 迁移期间调用接口请求失败了需要重试;

于是我在Java代码里面写好查询数据和插入数据的逻辑:

@ResponseBody    public void insertHourlyVoteRecordDoc2(@RequestParam(value = "voteItemId") String voteItemId,                                           @RequestParam(value = "beginDate") String beginDate,                                           @RequestParam(value = "endDate") String endDate){        VoteRecordable begin = new HourlyVoteRecord();        begin.setTraceId(voteItemId);        begin.setDate(beginDate);
VoteRecordable end = new HourlyVoteRecord(); end.setTraceId(voteItemId); end.setDate(endDate);
List voteRecordables = voteItemStatService.listVoteRecords(begin, end); System.out.println(voteRecordables.size() + "voteRecordables大小是多少 insertHourlyVoteRecordDoc"); if (CollectionUtils.isEmpty(voteRecordables)) { return; } List hourlyVoteRecordDocs = new ArrayList<>(voteRecordables.size()); for (VoteRecordable voteRecordable1 : voteRecordables) { HourlyVoteRecord hourlyVoteRecord = (HourlyVoteRecord) voteRecordable1; HourlyVoteRecordDoc hourlyVoteRecordDoc = new HourlyVoteRecordDoc(); hourlyVoteRecordDoc.setVoteItemId(new ObjectId(hourlyVoteRecord.getVoteItemId())); hourlyVoteRecordDoc.setVoteCount(hourlyVoteRecord.getVoteCount()); hourlyVoteRecordDoc.setHour(hourlyVoteRecord.getHour());
hourlyVoteRecordDocs.add(hourlyVoteRecordDoc); } voteRecordService.batchInsertHourlyVoteRecordDoc(hourlyVoteRecordDocs);
}

然后在python写脚本进行多线程并发请求迁移:

"""迁移DailyVoteRecordDocHourlyVoteRecordHourlyActivtiyVoteRecord"""
import threadingimport timefrom datetime import datetime, timedelta
from core_service import vote_servicefrom scripts.biz.vote.migrate_user_vote_records import MigrateLimitException

class MyThread(threading.Thread): def __init__(self, thread_id, name, archive_activities): threading.Thread.__init__(self) self.threadID = thread_id self.name = name self.archive_activities = archive_activities
def run(self): print("Starting " + self.name) main(self.archive_activities) print("exiting " + self.name)

def main(archive_activities): """ 迁移投票记录 :param archive_activities: :return: """
final_date_end_timestamp = 1595433600
for activity in archive_activities: date_start = time.strftime('%Y-%m-%d', time.localtime(int(activity['dateStart'] / 1000))) hour_start = time.strftime('%Y-%m-%d-%H', time.localtime(int(activity['dateStart'] / 1000)))
date_end_timestamp = int(activity['dateEnd'] / 1000 + 24 * 60 * 60)
if final_date_end_timestamp < date_end_timestamp: date_end_timestamp = final_date_end_timestamp
date_end = time.strftime('%Y-%m-%d', time.localtime(date_end_timestamp)) hour_end = time.strftime('%Y-%m-%d-%H', time.localtime(date_end_timestamp)) ## 迁移 hourly_activity_vote_records vote_service.batch_insert_hourly_activity_vote_records(activity.get('_id'), hour_start, hour_end) print(activity)
vote_items = vote_service.list_vote_items(str(activity['_id'])) try: for vote_item in vote_items: # print("user_vote_record: activityId voteItemId score", activity.get('_id'), vote_item.get('_id')) # print(vote_item) if vote_item['score'] != 0: ## 迁移 daily_vote_records vote_service.batch_insert_daily_vote_records(vote_item.get('_id'), date_start, date_end) ## 迁移 hourly_vote_records print("开始结束时间时间", hour_start, hour_end) print("开始结束日期", date_start, date_end) vote_service.batch_insert_hourly_vote_records(vote_item.get('_id'), hour_start, hour_end)
except MigrateLimitException: print("migrate limit error")


if __name__ == "__main__": // 调用Java代码接口,查询到所有的活动 archive_activities_local = vote_service.archive_activities_v3() # 开启多线程 i = 1 available_activities = [] threads = []
for index, local_activity in enumerate(archive_activities_local): # 每循环700次开一个线程 if index == i * 700: thread = MyThread(i, "Thread-" + str(i), available_activities) thread.start() threads.append(thread) i += 1 available_activities = [] available_activities.append(local_activity)
if available_activities: thread = MyThread(i, "Thread-" + str(i), available_activities) thread.start() threads.append(thread)
for t in threads: t.join()
请求如果失败了重试三次@retry(3)def batch_insert_hourly_activity_vote_records(activityId, beginDate, endDate):    """    批量插入活动小时投票记录    :param activityId:    :param beginDate:    :param endDate:    :return:    """    url = "http://xxx/vote/api/internal/dbMigration/insertHourlyActivityVoteRecordDoc?" \          "activityId={activityId}&beginDate={beginDate}&endDate={endDate}".format(activityId=activityId, beginDate=beginDate, endDate=endDate)
r = requests.session().get(url)

最终放到服务器里面跑了两个小时就跑完了。我老大的三亿条数据跑了一天跑完,我一开始以为我老大说的一天12小时(实际是24小时算的),那我想着我的数据不到他的三分之一,三小时就差不多,于是本地跑,结果跑了一整天。

这便是所有的实现了。

复盘

但是这个过程并没有我说的这么顺利和轻松,架构师给了我五天我延期了三天,我浪费时间的部分有以下几个方面:

  1. 私人原因,中间有一天看了下小破牙请了半天假,痛的灵魂出窍无心工作,但是我周末自己在家赶了很多双写代码的进度;
  2. 前期迁移还算顺利,但是由于缺少数据迁移的经验,后期大数据表迁移出现了很多意外;
  3. 大数据表迁移,而且没有处理好中间中断情况的log记录,导致我开始了不敢轻易中断,然后如果代码逻辑有bug,要从来就很要命;
  4. 预估不到数据迁移时间,其中有一天我写完逻辑在本地跑数据迁移,跑了六个小时,严重影响到了我的操作;
  5. 线下数据复核的时候,有七张表数据,对的我有点头昏眼花,然后有三张大表数据一直对不上;业务里面有礼物投票刷数据的情况一开始没有想到这个龟孙儿,导致我一直以为数据迁移有问题,拼了命的找原因又找不到逻辑漏洞;

全程写的很痛苦,感觉根本不在我的掌控之中。说了这么多,菜是原罪,努力学习吧,只要还有一根头发就不放弃学习!!!

点个在看支持我吧,转发就更好了
浏览 19
点赞
评论
收藏
分享

手机扫一扫分享

分享
举报
评论
图片
表情
推荐
点赞
评论
收藏
分享

手机扫一扫分享

分享
举报