Flink CDC 系列 - Flink MongoDB CDC 在 XTransfer 的生产实践
Flink CDC MongoDB 复制机制 Flink MongoDB CDC 生产实践 后续规划
前言
一、Flink CDC
在 Flink 内部,changelog 记录由 RowData 表示,RowData 包括 4 种类型:+I (INSERT), -U (UPDATE_BEFORE),+U (UPDATE_AFTER), -D (DELETE)。根据 changelog 产生记录类型的不同,又可以分为 3 种 changelog mode。
INSERT_ONLY:只包含 +I,适用于批处理和事件流。 ALL:包含 +I, -U, +U, -D 全部的 RowKind,如 MySQL binlog。 UPSERT:只包含 +I, +U, -D 三种类型的 RowKind,不包含 -U,但必须按唯一键的幂等更新 , 如 MongoDB Change Streams。
二、MongoDB 复制机制
2.1 副本集和分片集群
2.2 Replica Set Oplog
{
"ts" : Timestamp(1640190995, 3),
"t" : NumberLong(434),
"h" : NumberLong(3953156019015894279),
"v" : 2,
"op" : "u",
"ns" : "db.firm",
"ui" : UUID("19c72da0-2fa0-40a4-b000-83e038cd2c01"),
"o2" : {
"_id" : ObjectId("61c35441418152715fc3fcbc")
},
"wall" : ISODate("2021-12-22T16:36:35.165Z"),
"o" : {
"$v" : 1,
"$set" : {
"address" : "Shanghai China"
}
}
}
字段 | 是否可空 | 描述 |
ts | N | 操作时间,BsonTimestamp |
t | Y | 对应raft协议里面的term,每次发生节点down掉,新节点加入,主从切换,term都会自增。 |
h | Y | 操作的全局唯一id的hash结果 |
v | N | oplog版本 |
op | N | 操作类型:"i" insert, "u" update, "d" delete, "c" db cmd, "n" no op |
ns | N | 命名空间,表示操作对应的集合全称 |
ui | N | session id |
o2 | Y | 在更新操作中记录_id和sharding key |
wall | N | 操作时间,精确到毫秒 |
o | N | 变更数据描述 |
2.3 Change Streams
■ 2.3.1 使用条件
WiredTiger 存储引擎 副本集 (测试环境下,也可以使用单节点的副本集) 或分片集群部署 副本集协议版本:pv1 (默认) 4.0 版本之前允许 Majority Read Concern: replication.enableMajorityReadConcern = true (默认允许) MongoDB 用户拥有 find 和 changeStream 权限
■ 2.3.2 Change Events
{
_id : {
Object > },"operationType" : "
" ,"fullDocument" : { <document> },
"ns" : {
"db" : "
" ,"coll" : "
" },
"to" : {
"db" : "
" ,"coll" : "
" },
"documentKey" : { "_id" :
}, "updateDescription" : {
"updatedFields" : { <document> },
"removedFields" : [ "
" , ... ],"truncatedArrays" : [
{ "field" :
, "newSize" : }, ...
]
},
"clusterTime" :
, "txnNumber" :
, "lsid" : {
"id" :
, "uid" :
}
}
字段 | 类型 | 描述 |
_id | document | 表示resumeToken |
operationType | string | 操作类型,包括:insert, delete, replace, update, drop, rename, dropDatabase, invalidate |
fullDocument | document | 完整文档记录,insert, replace默认包含,update需要开启updateLookup,delete和其他操作类型不包含 |
ns | document | 操作记录对应集合的完全名称 |
to | document | 当操作类型为rename时,to表示重命名后的完全名称 |
documentKey | document | 包含变更文档的主键 _id,如果该集合是一个分片集合,documentKey中也会包含分片建 |
updateDescription | document | 当操作类型为update时,描述有变更的字段和值 |
clusterTime | Timestamp | 操作时间 |
txnNumber | NumberLong | 事务号 |
lsid | Document | session id |
■ 2.3.3 Update Lookup
三、Flink MongoDB CDC
支持特性
支持 Exactly-Once 语义 支持全量、增量订阅 支持 Snapshot 数据过滤 支持从检查点、保存点恢复 支持元数据提取
四、生产实践
4.1 使用 RocksDB State Backend
4.2 合适的 oplog 容量和过期时间
db.adminCommand(
{
replSetResizeOplog: 1, // 固定值1
size: 20480, // 单位为MB,范围在990MB到1PB
minRetentionHours: 168 // 可选项,单位为小时
}
)
4.3 变更慢的表开启心跳事件
WITH (
'connector' = 'mongodb-cdc',
'heartbeat.interval.ms' = '60000'
)
4.4 自定义 MongoDB 连接参数
WITH (
'connector' = 'mongodb-cdc',
'connection.options' = 'authSource=authDB&maxPoolSize=3'
)
4.5 Change Stream 参数调优
可以在 Flink DDL 中通过 poll.await.time.ms 和 poll.max.batch.size 精细化配置变更事件的拉取。
poll.await.time.ms
变更事件拉取时间间隔,默认为 1500ms。对于变更频繁的集合,可以适当调小拉取间隔,提升处理时效;对于变更缓慢的集合,可以适当调大拉取时间间隔,减轻数据库压力。
poll.max.batch.size
每一批次拉取变更事件的最大条数,默认为 1000 条。调大改参数会加快从 Cursor 中拉取变更事件的速度,但会提升内存的开销。
4.6 订阅整库、集群变更
MongoDBSource.<String>builder()
.hosts("127.0.0.1:27017")
.database("")
.collection("")
.pipeline("[{'$match': {'ns.db': {'$regex': '/^(sandbox|firewall)$/'}}}]")
.deserializer(new JsonDebeziumDeserializationSchema())
.build();
4.7 权限控制
MongoDB 支持对用户、角色、权限进行细粒度的管控,开启 Change Stream 的用户需要拥有 find 和 changeStream 两个权限。
单集合
{ resource: { db:
, collection: }, actions: [ "find", "changeStream" ] }
单库
{ resource: { db:
, collection: "" }, actions: [ "find", "changeStream" ] }
集群
{ resource: { db: "", collection: "" }, actions: [ "find", "changeStream" ] }
use admin;
// 创建用户
db.createUser(
{
user: "flink",
pwd: "flinkpw",
roles: []
}
);
// 创建角色
db.createRole(
{
role: "flink_role",
privileges: [
{ resource: { db: "inventory", collection: "products" }, actions: [ "find", "changeStream" ] }
],
roles: []
}
);
// 给用户授予角色
db.grantRolesToUser(
"flink",
[
// 注意:这里的db指角色创建时的db,在admin下创建的角色可以包含不同database的访问权限
{ role: "flink_role", db: "admin" }
]
);
// 给角色追加权限
db.grantPrivilegesToRole(
"flink_role",
[
{ resource: { db: "inventory", collection: "orders" }, actions: [ "find", "changeStream" ] }
]
);
use admin;
db.createUser({
user: "flink",
pwd: "flinkpw",
roles: [
{ role: "read", db: "admin" },
{ role: "readAnyDatabase", db: "admin" }
]
});
五、后续规划
支持增量 Snapshot 目前,MongoDB CDC Connector 还不支持增量 Snapshot,对于数据量较大的表还不能很好发挥 Flink 并行计算的优势。后续将实现 MongoDB 的增量 Snapshot 功能,使其支持 Snapshot 阶段的 checkpoint,和并发度设置。
支持从指定时间进行变更订阅 目前,MongoDB CDC Connector 仅支持从当前时间开始 Change Stream 的订阅,后续将提供从指定时间点的 Change Stream 订阅。
支持库和集合的筛选 目前,MongoDB CDC Connector 支持集群、整库的变更订阅和筛选,但对于是否需要进行 Snapshot 的集合的筛选还不支持,后续将完善这个功能。
Flink CDC 系列 - 同步 MySQL 分库分表,构建 Iceberg 实时数据湖 Flink CDC 系列 - 实时抽取 Oracle 数据,排雷和调优实践 Flink CDC 系列 - 构建 MySQL 和 Postgres 上的 Streaming ETL
2022 年 1 月 8-9 日,FFA 2021 重磅开启,全球 40+ 多行业一线厂商,80+ 干货议题,带来专属于开发者的技术盛宴。
戳我,预约 FFA 2021~
评论