Hadoop Updates and Incrementals, Hudi 是一个 Uber 开源的 Data Lakes 的解决方案。
基本概念
Hudi 提供了记录级的更新,每条记录都由 HoodieKey 唯一标识,通常是由分区路径(分区+FileID)和记录键组成,记录键在分区内保持唯一,记录键则通过 KeyGenerator 生成,支持多种形式以适配不同场景。
主键在 Hudi 中是一等公民,根据主键去重,同时利用 preCombineField 判断是否替换旧数据,只要比较大就替换,一般是 ts 时间戳,如果不设置则默认每次都用最新数据替换。
文件布局
在 Apache Hudi Technical Specification 包含了不同的版本,文件在存储介质上的分布,Hudi 会严格管理文件的命名、存放位置、大小等,会选择合适的实际新建、合并以及分裂数据文件。
对于 MOR 来说,文件切片 File Slice 由基本文件和多个增量日志文件组成,而 COW 只存在基础数据文件,不含日志文件。其中 File Slice 与某个提交关联,是文件组织系统中的最小管理单元。
相同 FileID 的文件属于同一个 File Group,通常会有多个不同的版本 (InstantTime) 的数据文件或者再加上日志文件的组合,当某个 File Group 超过阈值之后会创建新的 File Group,可以通过分区路径和文件 ID 唯一标识。
表类型
所有的湖格式实际上都为了解决新鲜度(准确性)、成本和查询延时(实时性)这个三角问题,要保证其中一项就需要牺牲另外两项。例如,为了提升新鲜度,引入了 MOR 包,就导致的了成本和查询延迟增加,主要是增量日志、查询时的 Merge 操作。
Hudi 支持 COW 和 MOR 两种格式,分别用于不同的场景:
Copy On Write, COW适用于离线批量更新场景,更新数据读取旧的BaseFile合并更新数据,生成新的BaseFile,读取时直接访问最新的BaseFile即可,但是会导致写入放大。Merge On Read, MOR前者每次更新会刷新原数据,导致写入放大;后者则保存增量信息,每次读取会合并增量信息。
示例
这里以 SparkSQL 为例进行操作。
简单介绍
注意,不同 Spark 版本对应的配置参数略有区别,详细可以参考官网 Spark Quick Start 的介绍。
----- 执行命令行时已经包含 hudi-sparkX.X-bundle_xxx.jar 路径
$ spark-sql --master yarn --deploy-mode client
----- 否则需要如下方式指定参数
$ spark-sql --master local[4] \
--jars hudi-spark-bundle/target/hudi-spark3.3-bundle_2.12-0.13.1.jar \
--jars spark/jars/spark-avro_2.12-341.jar \
--conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' \
--conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension'
除了在命令行参数中指定配置之外,还可以通过 HUDI_CONF_DIR 环境变量指定配置文件,同时增加一个 hudi-default.conf 的配置文件。接着可以创建表结构。
spark-sql> CREATE TABLE test_hudi_table(
id INT,
name STRING,
price DOUBLE,
ts LONG,
dt STRING
)
USING hudi
PARTITIONED BY(dt)
OPTIONS (
type = 'mor',
primaryKey = 'id',
preCombineField = 'ts'
) LOCATION '/tmp/hudi_mor_table';
已经存在的表,可以通过如下方式创建。
CREATE TABLE test_hudi_table
USING hudi
LOCATION '/tmp/hudi_mor_table';
除此之外,还可以使用 Create Table As Select 创建表,对于 LOCATION 参数来说,默认一般是 HDFS 存储,也可以通过如下方式指定本地盘,或者显示指定 HDFS 存储。
----- 在本地磁盘下创建
file:///tmp/hudi_mor_table
----- 指定HDFS存储
hdfs://hacluster/tmp/hudi_mor_table
----- 指定对象存储,以华为云为例
obs://starrocks/hudi/mor_table
建表语句的元数据信息会同步到 Hive MetaStore 中,此时,如果登录 Hive 查看元数据,可以看到如下内容。
$ beeline
jdbc:hive2://xxx> SHOW TABLES;
+---------------------+
| tab_name |
+---------------------+
| test_hudi_table |
| test_hudi_table_ro |
| test_hudi_table_rt |
+---------------------+
3 rows selected (0.263 seconds)
MOR 表会自动生成 ro/rt 后缀的表,实际上就是 Read Optimized 和 Real Time 的缩写,
如下是常用的 SQL 语句。
INSERT INTO test_hudi_table SELECT 1 AS id, 'hudi' AS name, 10 AS price, 1000 AS ts, '2021-05-05' AS dt;
INSERT INTO test_hudi_table SELECT 2, 'spark', 30.0, 1000, '2021-05-05';
INSERT INTO test_hudi_table VALUES(3, 'hello', 10.0, 1000, '2021-05-05');
SELECT * FROM test_hudi_table WHERE price > 20.0;
UPDATE test_hudi_table SET price = 20 WHERE id = 1;
DELETE FROM test_hudi_table WHERE id = 1;
DROP TABLE test_hudi_table_rt;
DROP TABLE test_hudi_table_ro;
DROP TABLE test_hudi_table;
SHOW TABLES;
当执行如下 SQL 时,由于 id=3 中的 ts=1000>500 实际不会替换,除非将 ts 替换为 1500 值。
INSERT INTO test_hudi_table VALUES(3, 'not_change', 10.0, 500, '2021-05-05');
另外,还可以通过如下方式通过动态分区或者指定分区的方式写入。
INSERT INTO test_hudi_table VALUES(3, 'hello', 10.0, 1000, '2021-05-05');
INSERT INTO test_hudi_table partition(dt = '2021-05-05') VALUES(3, 'hello', 10.0, 1000);
测试发现一个很奇怪的现象:动态 partition 会生成 parquet 文件,静态则会生成 log 文件。
MergeInto
可以将两个表的数据按照规则进行合并。
CREATE TABLE test_hudi_update(
id INT,
name STRING,
price DOUBLE,
ts LONG,
dt STRING
)
USING hudi
PARTITIONED BY(dt)
OPTIONS (
type = 'mor',
primaryKey = 'id',
preCombineField = 'ts'
) LOCATION '/tmp/hudi_mor_update';
INSERT INTO test_hudi_update VALUES
(3, 'hello', 50.0, 1000, '2021-05-05'),
(4, 'world', 10.0, 1000, '2021-05-05');
MERGE INTO test_hudi_table AS target
USING test_hudi_update AS source
ON target.id = source.id
WHEN MATCHED THEN UPDATE SET target.price = target.price + source.price
WHEN NOT MATCHED THEN INSERT *;
查询
Hudi 支持如下几种查询类型:
Snapshot默认,查询某个时间点最新数据,对于MOR会将Base和Log合并;而COW就是最新Base数据文件。Read Optimized优化查询可能会读到旧的数据,对于MOR来说就是只读取Base数据。Time Travel数据时按照时间线排序的,可以根据具体的时间进行查询。Incremental获取两个提交范围内的数据。
注意,只能查询到写入 Hudi 表的数据,也就是给定的 COMMIT/COMPACTION 之后的最新数据。
----- Snapshot Query
SELECT id, name, price, ts FROM test_hudi_table;
----- Optimized Read Query 当配置了 hoodie.query.as.ro.table=true 之后会自动生成
SELECT id, name, price, ts FROM test_hudi_table_ro;
----- TimeTravel 如果没有严格匹配上,则返回比指定时间略老的数据,需要 Spark 3.2+
SELECT id, name, price, ts FROM test_hudi_table TIMESTAMP AS OF '20240116113746358';
SELECT id, name, price, ts FROM test_hudi_table TIMESTAMP AS OF '20240116113746';
----- 查看直接空值即可
SET hoodie.datasource.query.type=incremental;
hoodie.datasource.read.begin.instanttime=20240419170531565
hoodie.datasource.read.end.instanttime=202305160000 # optional
----- 在 Hive 中可以根据提交时间排序
SELECT `_hoodie_commit_time` AS t, id, name FROM test_hudi_table ORDER BY t;
----- Incremental 设置增量读取模式,同时配置起止时间戳
其中 test 为具体表明
set hoodie.test.consume.mode=INCREMENTAL;
set hoodie.test.consume.start.timestamp=20240507190438;
set hoodie.test.consume.end.timestamp=20240507300438; # optional
SELECT id, name, price, ts FROM test_hudi_table where `_hoodie_commit_time`>'20240507190438';
社区版本是不支持这种方式的,在 0.15.1 中新增了 TVF 的方式。
其中 Spark 的版本可以通过 spark-shell 命令查看。
call show_clustering(table => 'test_hudi_table');
call show_table_properties(table => 'test_hudi_table');
目录结构
Hudi 数据集的目录结构和 Hive 非常相似,数据集根据分区打散到不同目录,分区字段以文件夹形式存在,文件夹包含该分区的所有文件,非分区则在直接在数据目录下保存。
cow/.hoodis/
cow/.hoodis/.aux/
cow/.hoodis/.temp/
cow/.hoodis/20230523174532.commit 组成时间线的文件
cow/.hoodis/20230523174532.commit.inflight
cow/.hoodis/20230523174532.commit.requested
cow/.hoodis/archived/
cow/.hoodis/metadata/ 元数据目录,采用MOR结构,支持Payload不同 hoodie.metadata.enable
cow/.hoodis/hoodie.index.properties 相关的索引信息
cow/.hoodis/hoodie.properties 基础的表配置信息,例如表名、版本等
cow/americas/
cow/americas/.hoodis_partition_metadata 元数据,保存了提交的时间和分区深度信息,文本格式保存
cow/americas/.1419a23b-5ecd-46e6-b96c-07a395c8029a-0_20230828160921471.log.1_0-103-148
cow/americas/1419a23b-5ecd-46e6-b96c-07a395c8029a-0_0-81-116_20230828160921471.parquet
元数据目录为 .hoodie,其包含了版本管理 Timeline、归档信息 (过时的 Instant)、Instant 提交信息 (包括提交行为、时间戳、状态等) 等其它信息。
剩下目录是按分区实际存储的数据,分区键可以指定,基础数据文件采用 Parquet 格式保存,针对读进行了优化;还包含一个元数据日志文件 .log*,同样进行了写优化;还包括分区元数据 .hoodie_partition_metadata 文件。
注意,COW 只有数据文件,而 MOR 格式才会通过 AVRO 格式保存增量日志信息。
元数据
Hudi 将一系列 CURD 操作称为 Timeline,而 Timeline 中的某次操作称为 Instant,其核心能力就是维护 Timeline,依次提供即时视图以及按到达顺序检索数据。其实现是在元数据目录下以日期进行保存相关信息,如下是一个简单的示例:
cow/.hoodis/20230523174532904.commit
cow/.hoodis/20230523174532904.commit.inflight
cow/.hoodis/20230523174532904.commit.requested
其文件格式为 <action timestamp>.<action type>[.<action state>],介绍如下:
timestamp首次调度时间,提供毫秒级的粒度,作为单调递增的时间线唯一标识。action标示操作类型,包括了commit/deltacommit更新数据 (包括插入、更新、删除);compaction/clean内部操作;savepoint/restore恢复操作。state标识状态,包括了 A)requested调度运行;B)inflight正在运行;C)completed执行完成。
元数据通过 JSON 或者 AVRO 格式保存,包括了表的修改信息,可以用来做数据恢复、使用 Snapshot 隔离级别等。其中常见 action 介绍如下:
COMMIT一批数据的原子写入,COW 写入事务或者 MOR 压缩。DELTACOMMIT写入 MOR 表的增量数据,通常可以作为增量日志文件保存。CLEAN后台任务,用于清理掉不需要的历史数据。ROLLBACK回滚异常的事务。COMPACTION后台任务,用于将 MOR 行格式的增量数据合并为列式数据,会生成一个特殊的提交。
表服务
用于维护 Hudi 表的读写性能、文件数量等,包含多种表服务:
Compaction合并BaseFile和LogFile生成新版本文件,提升读取效率。Clustering重分布FileGroup,主要用于合并小文件生成新版本文件,调整数据的分布。Clean清理版本过期文件。Rollback回滚未完成的instant所写入的文件及元数据。Indexing构建索引,提升读取性能。
Clean
用于清理不需要的文件。
----- 是否开启自动清理
set hoodie.clean.automatic=true;
----- 是否异步清理
set hoodie.clean.async=true;
----- 清理策略 KEEP_LATEST_FILE_VERSIONS KEEP_LATEST_BY_HOURS
set hoodie.cleaner.policy=KEEP_LATEST_COMMITS;
----- 最新时间线之前,保留时间线个数,最少保留一个
set hoodie.cleaner.commits.retained=10;
KEEP_LATEST_COMMITS根据提交次数进行清理,默认保留最新 10 个 Commit 所有文件。KEEP_LATEST_FILE_VERSIONS保留文件的版本数,默认保留三个版本。KEEP_LATEST_BY_HOURS按照小时保留,默认是 24 小时。
其它
记录级元数据
通过 Hudi 可以跟踪记录级的时间变化,为了达到此目的,在每个表中同时会维护行相关的元数据,可以查看 .hoodie/hoodie.properties 中的 hoodie.table.create.schema 配置项,主要包含如下几个字段:
_hoodie_record_key记录主键,用来处理更新和删除。_hoodie_commit_time最新记录提交时间。_hoodie_commit_seqno提交序列号。_hoodie_partition_path分区路径。_hoodie_file_name存储记录的文件名。
可以参考 Hudi Metafields demystified 中的介绍。
参考
- Apache Hudi Technical Specification 官方的技术参考文档。
- 官网 Hudi 提供了下载、文档等,最常用的可以参考 Quick Start 中的内容。
- Apache Hudi: From Zero To One 比较经典的 Hudi 相关材料。