Hadoop Updates and Incrementals, Hudi 是一个 Uber 开源的 Data Lakes 的解决方案。
基本概念
Hudi 提供了记录级的更新,每条记录都由 HoodieKey
唯一标识,通常是由分区路径(分区+FileID)和记录键组成,记录键在分区内保持唯一,记录键则通过 KeyGenerator
生成,支持多种形式以适配不同场景。
主键在 Hudi 中是一等公民,根据主键去重,同时利用 preCombineField
判断是否替换旧数据,只要比较大就替换,一般是 ts
时间戳,如果不设置则默认每次都用最新数据替换。
文件布局
在 Apache Hudi Technical Specification 包含了不同的版本,文件在存储介质上的分布,Hudi 会严格管理文件的命名、存放位置、大小等,会选择合适的实际新建、合并以及分裂数据文件。
对于 MOR
来说,文件切片 File Slice
由基本文件和多个增量日志文件组成,而 COW
只存在基础数据文件,不含日志文件。其中 File Slice
与某个提交关联,是文件组织系统中的最小管理单元。
相同 FileID
的文件属于同一个 File Group
,通常会有多个不同的版本 (InstantTime
) 的数据文件或者再加上日志文件的组合,当某个 File Group
超过阈值之后会创建新的 File Group
,可以通过分区路径和文件 ID
唯一标识。
表类型
所有的湖格式实际上都为了解决新鲜度(准确性)、成本和查询延时(实时性)这个三角问题,要保证其中一项就需要牺牲另外两项。例如,为了提升新鲜度,引入了 MOR 包,就导致的了成本和查询延迟增加,主要是增量日志、查询时的 Merge 操作。
Hudi 支持 COW
和 MOR
两种格式,分别用于不同的场景:
Copy On Write, COW
适用于离线批量更新场景,更新数据读取旧的BaseFile
合并更新数据,生成新的BaseFile
,读取时直接访问最新的BaseFile
即可,但是会导致写入放大。Merge On Read, MOR
前者每次更新会刷新原数据,导致写入放大;后者则保存增量信息,每次读取会合并增量信息。
示例
这里以 SparkSQL 为例进行操作。
简单介绍
----- 执行命令行时已经包含 hudi-sparkX.X-bundle_xxx.jar 路径
$ spark-sql --master yarn --deploy-mode client
----- 否则需要如下方式指定参数
$ spark-sql --jars hudi-spark-bundle/target/hudi-spark3.3-bundle_2.12-0.13.1.jar \
--conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' \
--conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension'
接着可以创建表结构。
spark-sql> CREATE TABLE test_hudi_table(
id INT,
name STRING,
price DOUBLE,
ts LONG,
dt STRING
)
USING hudi
PARTITIONED BY(dt)
OPTIONS (
type = 'mor',
primaryKey = 'id',
preCombineField = 'ts'
) LOCATION '/tmp/hudi_mor_table';
对于 LOCATION
参数来说,默认一般是 HDFS 存储,也可以通过如下方式指定本地盘,或者显示指定 HDFS 存储。
set hoodie.clean.automatic=false;
run clean on h1;
run clean on "/tmp/hudi_mor_table";
----- 在本地磁盘下创建
file:///tmp/hudi_mor_table
----- 指定HDFS存储
hdfs://hacluster/tmp/hudi_mor_table
建表语句的元数据信息会同步到 Hive MetaStore 中,此时,如果登录 Hive 查看元数据,可以看到如下内容。
$ beeline
jdbc:hive2://xxx> SHOW TABLES;
+---------------------+
| tab_name |
+---------------------+
| test_hudi_table |
| test_hudi_table_ro |
| test_hudi_table_rt |
+---------------------+
3 rows selected (0.263 seconds)
MOR 表会自动生成 ro/rt 后缀的表,实际上就是 Read Optimized 和 Real Time 的缩写,
如下是常用的 SQL 语句。
INSERT INTO test_hudi_table SELECT 1 AS id, 'hudi' AS name, 10 AS price, 1000 AS ts, '2021-05-05' AS dt;
INSERT INTO test_hudi_table SELECT 2, 'spark', 30.0, 1000, '2021-05-05';
INSERT INTO test_hudi_table VALUES(3, 'hello', 10.0, 1000, '2021-05-05');
SELECT * FROM test_hudi_table WHERE price > 20.0;
UPDATE test_hudi_table SET price = 20 WHERE id = 1;
DELETE FROM test_hudi_table WHERE id = 1;
DROP TABLE test_hudi_table_rt;
DROP TABLE test_hudi_table_ro;
DROP TABLE test_hudi_table;
SHOW TABLES;
当执行如下 SQL 时,由于 id=3
中的 ts=1000>500
实际不会替换,除非将 ts
替换为 1500
值。
INSERT INTO test_hudi_table VALUES(3, 'not_change', 10.0, 500, '2021-05-05');
MergeInto
可以将两个表的数据按照规则进行合并。
CREATE TABLE test_hudi_update(
id INT,
name STRING,
price DOUBLE,
ts LONG,
dt STRING
)
USING hudi
PARTITIONED BY(dt)
OPTIONS (
type = 'mor',
primaryKey = 'id',
preCombineField = 'ts'
) LOCATION '/tmp/hudi_mor_update';
INSERT INTO test_hudi_update VALUES
(3, 'hello', 50.0, 1000, '2021-05-05'),
(4, 'world', 10.0, 1000, '2021-05-05');
MERGE INTO test_hudi_table AS target
USING test_hudi_update AS source
ON target.id = source.id
WHEN MATCHED THEN UPDATE SET target.price = target.price + source.price
WHEN NOT MATCHED THEN INSERT *;
查询
Hudi 支持如下几种查询类型:
Snapshot
默认,查询某个时间点最新数据,对于MOR
会将Base
和Log
合并;而COW
就是最新Base
数据文件。Read Optimized
优化查询可能会读到旧的数据,对于MOR
来说就是只读取Base
数据。Time Travel
数据时按照时间线排序的,可以根据具体的时间进行查询。Incremental
获取两个提交范围内的数据。
注意,只能查询到写入 Hudi 表的数据,也就是给定的 COMMIT/COMPACTION 之后的最新数据。
INSERT INTO test_hudi_table VALUES(3, 'hello', 10.0, 1000, '2021-05-05');
----- Snapshot Query
SELECT id, name, price, ts FROM test_hudi_table;
----- Optimized Read Query 当配置了 hoodie.query.as.ro.table=true 之后会自动生成
SELECT id, name, price, ts FROM test_hudi_table_ro;
----- Time Travel Query 如果没有严格匹配上,则返回比指定时间略老的数据
SELECT id, name, price, ts FROM test_hudi_table TIMESTAMP AS OF '20240116113746358';
SELECT id, name, price, ts FROM test_hudi_table TIMESTAMP AS OF '20240419170531565';
SELECT id, name, price, ts FROM test_hudi_table TIMESTAMP AS OF '2024-01-16 11:37:46';
SELECT id, name, price, ts FROM test_hudi_table TIMESTAMP AS OF '2024-04-19 17:05:31';
----- 查看直接空值即可
SET hoodie.datasource.query.type=incremental;
hoodie.datasource.read.begin.instanttime=20240419170531565
hoodie.datasource.read.end.instanttime=202305160000 # optional
----- 在 Hive 中可以根据提交时间排序
SELECT `_hoodie_commit_time` AS t, id, name FROM test_hudi_table ORDER BY t;
通过 spark-shell 可以查看版本。
call show_clustering(table => 'test_hudi_table');
call show_table_properties(table => 'test_hudi_table');
目录结构
Hudi 数据集的目录结构和 Hive 非常相似,数据集根据分区打散到不同目录,分区字段以文件夹形式存在,文件夹包含该分区的所有文件,非分区则在直接在数据目录下保存。
cow/.hoodis/
cow/.hoodis/.aux/
cow/.hoodis/.temp/
cow/.hoodis/20230523174532.commit 组成时间线的文件
cow/.hoodis/20230523174532.commit.inflight
cow/.hoodis/20230523174532.commit.requested
cow/.hoodis/archived/
cow/.hoodis/metadata/ 元数据目录,采用MOR结构,支持Payload不同 hoodie.metadata.enable
cow/.hoodis/hoodie.index.properties 相关的索引信息
cow/.hoodis/hoodie.properties 基础的表配置信息,例如表名、版本等
cow/americas/
cow/americas/.hoodis_partition_metadata 元数据,保存了提交的时间和分区深度信息,文本格式保存
cow/americas/.1419a23b-5ecd-46e6-b96c-07a395c8029a-0_20230828160921471.log.1_0-103-148
cow/americas/1419a23b-5ecd-46e6-b96c-07a395c8029a-0_0-81-116_20230828160921471.parquet
元数据目录为 .hoodie
,其包含了版本管理 Timeline
、归档信息 (过时的 Instant
)、Instant
提交信息 (包括提交行为、时间戳、状态等) 等其它信息。
剩下目录是按分区实际存储的数据,分区键可以指定,基础数据文件采用 Parquet 格式保存,针对读进行了优化;还包含一个元数据日志文件 .log*
,同样进行了写优化;还包括分区元数据 .hoodie_partition_metadata
文件。
注意,COW
只有数据文件,而 MOR
格式才会通过 AVRO
格式保存增量日志信息。
元数据
Hudi 将一系列 CURD
操作称为 Timeline
,而 Timeline
中的某次操作称为 Instant
,其核心能力就是维护 Timeline
,依次提供即时视图以及按到达顺序检索数据。其实现是在元数据目录下以日期进行保存相关信息,如下是一个简单的示例:
cow/.hoodis/20230523174532904.commit
cow/.hoodis/20230523174532904.commit.inflight
cow/.hoodis/20230523174532904.commit.requested
其文件格式为 <action timestamp>.<action type>[.<action state>]
,介绍如下:
timestamp
首次调度时间,提供毫秒级的粒度,作为单调递增的时间线唯一标识。action
标示操作类型,包括了commit/deltacommit
更新数据 (包括插入、更新、删除);compaction/clean
内部操作;savepoint/restore
恢复操作。state
标识状态,包括了 A)requested
调度运行;B)inflight
正在运行;C)completed
执行完成。
元数据通过 JSON
或者 AVRO
格式保存,包括了表的修改信息,可以用来做数据恢复、使用 Snapshot
隔离级别等。其中常见 action
介绍如下:
COMMIT
一批数据的原子写入,COW 写入事务或者 MOR 压缩。DELTACOMMIT
写入 MOR 表的增量数据,通常可以作为增量日志文件保存。CLEAN
后台任务,用于清理掉不需要的历史数据。ROLLBACK
回滚异常的事务。COMPACTION
后台任务,用于将 MOR 行格式的增量数据合并为列式数据,会生成一个特殊的提交。
表服务
用于维护 Hudi 表的读写性能、文件数量等,包含多种表服务:
Compaction
合并BaseFile
和LogFile
生成新版本文件,提升读取效率。Clustering
重分布FileGroup
,主要用于合并小文件生成新版本文件,调整数据的分布。Clean
清理版本过期文件。Rollback
回滚未完成的instant
所写入的文件及元数据。Indexing
构建索引,提升读取性能。
Clean
用于清理不需要的文件。
----- 是否开启自动清理
set hoodie.clean.automatic=true;
----- 是否异步清理
set hoodie.clean.async=true;
----- 清理策略 KEEP_LATEST_FILE_VERSIONS KEEP_LATEST_BY_HOURS
set hoodie.cleaner.policy=KEEP_LATEST_COMMITS;
----- 最新时间线之前,保留时间线个数,最少保留一个
set hoodie.cleaner.commits.retained=10;
KEEP_LATEST_COMMITS
根据提交次数进行清理,默认保留最新 10 个 Commit 所有文件。KEEP_LATEST_FILE_VERSIONS
保留文件的版本数,默认保留三个版本。KEEP_LATEST_BY_HOURS
按照小时保留,默认是 24 小时。
其它
记录级元数据
通过 Hudi 可以跟踪记录级的时间变化,为了达到此目的,在每个表中同时会维护行相关的元数据,可以查看 .hoodie/hoodie.properties
中的 hoodie.table.create.schema
配置项,主要包含如下几个字段:
_hoodie_record_key
记录主键,用来处理更新和删除。_hoodie_commit_time
最新记录提交时间。_hoodie_commit_seqno
提交序列号。_hoodie_partition_path
分区路径。_hoodie_file_name
存储记录的文件名。
可以参考 Hudi Metafields demystified 中的介绍。
参考
- Apache Hudi Technical Specification 官方的技术参考文档。
- 官网 Hudi 提供了下载、文档等,最常用的可以参考 Quick Start 中的内容。
- Apache Hudi: From Zero To One 比较经典的 Hudi 相关材料。