数据湖解决方案关键一环,IceBerg会不会脱颖而出?
程序源代码
共 5190字,需浏览 11分钟
·
2021-02-03 01:12
发展历程
IceBerg的特性
Apache Iceberg is an open table format for huge analytic datasets. Iceberg adds tables to Trino and Spark that use a high-performance format that works just like a SQL table.
Iceberg是一个为大规模数据集设计的通用的表格形式。并且适配Trino(原PrestoSQL)和Spark适,提供SQL化解决方案。
模式演化,支持添加,删除,更新或重命名,并且没有副作用
隐藏分区,可以防止导致错误提示或非常慢查询的用户错误
分区布局演变,可以随着数据量或查询模式的变化而更新表的布局
快照控制,可实现使用完全相同的表快照的可重复查询,或者使用户轻松检查更改
版本回滚,使用户可以通过将表重置为良好状态来快速纠正问题
快速扫描数据,无需使用分布式SQL引擎即可读取表或查找文件
数据修剪优化,使用表元数据使用分区和列级统计信息修剪数据文件
兼容性好 ,可以存储在任意的云存储系统和HDFS中
支持事务,序列化隔离 表更改是原子性的,读者永远不会看到部分更改或未提交的更改
高并发,高并发写入器使用乐观并发,即使写入冲突,也会重试以确保兼容更新成功
ACID和多版本支持
支持批/流读写
多种分析引擎的支持
IceBerg初体验
import org.apache.iceberg.catalog.TableIdentifier
import org.apache.iceberg.spark.SparkSchemaUtil
val catalog = new HiveCatalog(spark.sparkContext.hadoopConfiguration)
val data = Seq((1, "a"), (2, "b"), (3, "c")).toDF("id", "data")
val schema = SparkSchemaUtil.convert(data.schema)
val name = TableIdentifier.of("default", "test_table")
val table = catalog.createTable(name, schema)
// write the dataset to the table
data.write.format("iceberg").mode("append").save("default.test_table")
// read the table
spark.read.format("iceberg").load("default.test_table")
spark.read.format("iceberg").load("default.test_table").createOrReplaceTempView("test_table")
spark.sql("""SELECT count(1) FROM test_table""")
// Configurate catalog
org.apache.hadoop.conf.Configuration hadoopConf =
new org.apache.hadoop.conf.Configuration();
hadoopConf.set(
org.apache.hadoop.hive.conf.HiveConf.ConfVars.METASTOREURIS.varname,
META_STORE_URIS);
hadoopConf.set(
org.apache.hadoop.hive.conf.HiveConf.ConfVars.METASTOREWAREHOUSE.varname,
META_STORE_WAREHOUSE);
Catalog icebergCatalog = new HiveCatalog(hadoopConf);
// Create Iceberg table
Schema schema = new Schema(
...
);
PartitionSpec partitionSpec = builderFor(schema)...
TableIdentifier tableIdentifier =
TableIdentifier.of(DATABASE_NAME, TABLE_NAME);
// If needed, check the existence of table by loadTable() and drop it
// before creating it
icebergCatalog.createTable(tableIdentifier, schema, partitionSpec);
// Obtain an execution environment
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
// Enable checkpointing
env.enableCheckpointing(...);
// Add Source
DataStream
在大厂的典型应用
未来期待
Apache Iceberg 正在朝着流批一体的数据湖存储层发展,manifest 和snapshot 的设计,有效地隔离不同 transaction 的变更,非常方便批处理和增量计算。而我们知道 Apache Flink 已经是一个流批一体的计算引擎,可以说这二者的长远规划完美匹配,未来二者将合力打造流批一体的数据湖架构。
评论