Hudi 基本介绍

2022-09-21 warehouse hudi

Hadoop Updates and Incrementals, Hudi 是一个 Uber 开源的 Data Lakes 的解决方案。

基本概念

Hudi 提供了记录级的更新,每条记录都由 HoodieKey 唯一标识,通常是由分区路径(分区+FileID)和记录键组成,记录键在分区内保持唯一,记录键则通过 KeyGenerator 生成,支持多种形式以适配不同场景。

主键在 Hudi 中是一等公民,根据主键去重,同时利用 preCombineField 判断是否替换旧数据,只要比较大就替换,一般是 ts 时间戳,如果不设置则默认每次都用最新数据替换。

文件布局

Apache Hudi Technical Specification 包含了不同的版本,文件在存储介质上的分布,Hudi 会严格管理文件的命名、存放位置、大小等,会选择合适的实际新建、合并以及分裂数据文件。

对于 MOR 来说,文件切片 File Slice 由基本文件和多个增量日志文件组成,而 COW 只存在基础数据文件,不含日志文件。其中 File Slice 与某个提交关联,是文件组织系统中的最小管理单元。

相同 FileID 的文件属于同一个 File Group,通常会有多个不同的版本 (InstantTime) 的数据文件或者再加上日志文件的组合,当某个 File Group 超过阈值之后会创建新的 File Group,可以通过分区路径和文件 ID 唯一标识。

表类型

所有的湖格式实际上都为了解决新鲜度(准确性)、成本和查询延时(实时性)这个三角问题,要保证其中一项就需要牺牲另外两项。例如,为了提升新鲜度,引入了 MOR 包,就导致的了成本和查询延迟增加,主要是增量日志、查询时的 Merge 操作。

Hudi 支持 COWMOR 两种格式,分别用于不同的场景:

  • Copy On Write, COW 适用于离线批量更新场景,更新数据读取旧的 BaseFile 合并更新数据,生成新的 BaseFile,读取时直接访问最新的 BaseFile 即可,但是会导致写入放大。
  • Merge On Read, MOR 前者每次更新会刷新原数据,导致写入放大;后者则保存增量信息,每次读取会合并增量信息。

示例

这里以 SparkSQL 为例进行操作。

简单介绍

----- 执行命令行时已经包含 hudi-sparkX.X-bundle_xxx.jar 路径
$ spark-sql --master yarn --deploy-mode client
----- 否则需要如下方式指定参数
$ spark-sql --jars hudi-spark-bundle/target/hudi-spark3.3-bundle_2.12-0.13.1.jar \
   --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' \
   --conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension'

接着可以创建表结构。

spark-sql> CREATE TABLE test_hudi_table(
  id INT,
  name STRING,
  price DOUBLE,
  ts LONG,
  dt STRING
)
USING hudi
PARTITIONED BY(dt)
OPTIONS (
  type = 'mor',
  primaryKey = 'id',
  preCombineField = 'ts'
) LOCATION '/tmp/hudi_mor_table';

对于 LOCATION 参数来说,默认一般是 HDFS 存储,也可以通过如下方式指定本地盘,或者显示指定 HDFS 存储。

set hoodie.clean.automatic=false;

run clean on h1;
run clean on "/tmp/hudi_mor_table";

----- 在本地磁盘下创建
file:///tmp/hudi_mor_table
----- 指定HDFS存储
hdfs://hacluster/tmp/hudi_mor_table

建表语句的元数据信息会同步到 Hive MetaStore 中,此时,如果登录 Hive 查看元数据,可以看到如下内容。

$ beeline
jdbc:hive2://xxx> SHOW TABLES;
+---------------------+
|      tab_name       |
+---------------------+
| test_hudi_table     |
| test_hudi_table_ro  |
| test_hudi_table_rt  |
+---------------------+
3 rows selected (0.263 seconds)

MOR 表会自动生成 ro/rt 后缀的表,实际上就是 Read Optimized 和 Real Time 的缩写,

如下是常用的 SQL 语句。

INSERT INTO test_hudi_table SELECT 1 AS id, 'hudi' AS name, 10 AS price, 1000 AS ts, '2021-05-05' AS dt;
INSERT INTO test_hudi_table SELECT 2, 'spark', 30.0, 1000, '2021-05-05';
INSERT INTO test_hudi_table VALUES(3, 'hello', 10.0, 1000, '2021-05-05');
SELECT * FROM test_hudi_table WHERE price > 20.0;
UPDATE test_hudi_table SET price = 20 WHERE id = 1;
DELETE FROM test_hudi_table WHERE id = 1;
DROP TABLE test_hudi_table_rt;
DROP TABLE test_hudi_table_ro;
DROP TABLE test_hudi_table;
SHOW TABLES;

当执行如下 SQL 时,由于 id=3 中的 ts=1000>500 实际不会替换,除非将 ts 替换为 1500 值。

INSERT INTO test_hudi_table VALUES(3, 'not_change', 10.0, 500, '2021-05-05');

MergeInto

可以将两个表的数据按照规则进行合并。

CREATE TABLE test_hudi_update(
  id INT,
  name STRING,
  price DOUBLE,
  ts LONG,
  dt STRING
)
USING hudi
PARTITIONED BY(dt)
OPTIONS (
  type = 'mor',
  primaryKey = 'id',
  preCombineField = 'ts'
) LOCATION '/tmp/hudi_mor_update';

INSERT INTO test_hudi_update VALUES
(3, 'hello', 50.0, 1000, '2021-05-05'),
(4, 'world', 10.0, 1000, '2021-05-05');

MERGE INTO test_hudi_table AS target
USING test_hudi_update AS source
ON target.id = source.id
WHEN MATCHED THEN UPDATE SET target.price = target.price + source.price
WHEN NOT MATCHED THEN INSERT *;

查询

Hudi 支持如下几种查询类型:

  • Snapshot 默认,查询某个时间点最新数据,对于 MOR 会将 BaseLog 合并;而 COW 就是最新 Base 数据文件。
  • Read Optimized 优化查询可能会读到旧的数据,对于 MOR 来说就是只读取 Base 数据。
  • Time Travel 数据时按照时间线排序的,可以根据具体的时间进行查询。
  • Incremental 获取两个提交范围内的数据。

注意,只能查询到写入 Hudi 表的数据,也就是给定的 COMMIT/COMPACTION 之后的最新数据。

INSERT INTO test_hudi_table VALUES(3, 'hello', 10.0, 1000, '2021-05-05');

----- Snapshot Query
SELECT id, name, price, ts FROM test_hudi_table;
----- Optimized Read Query 当配置了 hoodie.query.as.ro.table=true 之后会自动生成
SELECT id, name, price, ts FROM test_hudi_table_ro;
----- Time Travel Query 如果没有严格匹配上,则返回比指定时间略老的数据
SELECT id, name, price, ts FROM test_hudi_table TIMESTAMP AS OF '20240116113746358';
SELECT id, name, price, ts FROM test_hudi_table TIMESTAMP AS OF '20240419170531565';
SELECT id, name, price, ts FROM test_hudi_table TIMESTAMP AS OF '2024-01-16 11:37:46';
SELECT id, name, price, ts FROM test_hudi_table TIMESTAMP AS OF '2024-04-19 17:05:31';

----- 查看直接空值即可
SET hoodie.datasource.query.type=incremental;
hoodie.datasource.read.begin.instanttime=20240419170531565
hoodie.datasource.read.end.instanttime=202305160000 # optional

----- 在 Hive 中可以根据提交时间排序
SELECT `_hoodie_commit_time` AS t, id, name FROM test_hudi_table ORDER BY t;

通过 spark-shell 可以查看版本。

call show_clustering(table => 'test_hudi_table');
call show_table_properties(table => 'test_hudi_table');

目录结构

Hudi 数据集的目录结构和 Hive 非常相似,数据集根据分区打散到不同目录,分区字段以文件夹形式存在,文件夹包含该分区的所有文件,非分区则在直接在数据目录下保存。

cow/.hoodis/
cow/.hoodis/.aux/
cow/.hoodis/.temp/
cow/.hoodis/20230523174532.commit            组成时间线的文件
cow/.hoodis/20230523174532.commit.inflight
cow/.hoodis/20230523174532.commit.requested
cow/.hoodis/archived/
cow/.hoodis/metadata/                        元数据目录,采用MOR结构,支持Payload不同 hoodie.metadata.enable
cow/.hoodis/hoodie.index.properties          相关的索引信息
cow/.hoodis/hoodie.properties                基础的表配置信息,例如表名、版本等
cow/americas/
cow/americas/.hoodis_partition_metadata      元数据,保存了提交的时间和分区深度信息,文本格式保存
cow/americas/.1419a23b-5ecd-46e6-b96c-07a395c8029a-0_20230828160921471.log.1_0-103-148
cow/americas/1419a23b-5ecd-46e6-b96c-07a395c8029a-0_0-81-116_20230828160921471.parquet

元数据目录为 .hoodie,其包含了版本管理 Timeline、归档信息 (过时的 Instant)、Instant 提交信息 (包括提交行为、时间戳、状态等) 等其它信息。

剩下目录是按分区实际存储的数据,分区键可以指定,基础数据文件采用 Parquet 格式保存,针对读进行了优化;还包含一个元数据日志文件 .log*,同样进行了写优化;还包括分区元数据 .hoodie_partition_metadata 文件。

注意,COW 只有数据文件,而 MOR 格式才会通过 AVRO 格式保存增量日志信息。

元数据

Hudi 将一系列 CURD 操作称为 Timeline,而 Timeline 中的某次操作称为 Instant,其核心能力就是维护 Timeline,依次提供即时视图以及按到达顺序检索数据。其实现是在元数据目录下以日期进行保存相关信息,如下是一个简单的示例:

cow/.hoodis/20230523174532904.commit
cow/.hoodis/20230523174532904.commit.inflight
cow/.hoodis/20230523174532904.commit.requested

其文件格式为 <action timestamp>.<action type>[.<action state>],介绍如下:

  • timestamp 首次调度时间,提供毫秒级的粒度,作为单调递增的时间线唯一标识。
  • action 标示操作类型,包括了 commit/deltacommit 更新数据 (包括插入、更新、删除);compaction/clean 内部操作;savepoint/restore 恢复操作。
  • state 标识状态,包括了 A) requested 调度运行;B) inflight 正在运行;C) completed 执行完成。

元数据通过 JSON 或者 AVRO 格式保存,包括了表的修改信息,可以用来做数据恢复、使用 Snapshot 隔离级别等。其中常见 action 介绍如下:

  • COMMIT 一批数据的原子写入,COW 写入事务或者 MOR 压缩。
  • DELTACOMMIT 写入 MOR 表的增量数据,通常可以作为增量日志文件保存。
  • CLEAN 后台任务,用于清理掉不需要的历史数据。
  • ROLLBACK 回滚异常的事务。
  • COMPACTION 后台任务,用于将 MOR 行格式的增量数据合并为列式数据,会生成一个特殊的提交。

表服务

用于维护 Hudi 表的读写性能、文件数量等,包含多种表服务:

  • Compaction 合并 BaseFileLogFile 生成新版本文件,提升读取效率。
  • Clustering 重分布 FileGroup,主要用于合并小文件生成新版本文件,调整数据的分布。
  • Clean 清理版本过期文件。
  • Rollback 回滚未完成的 instant 所写入的文件及元数据。
  • Indexing 构建索引,提升读取性能。

Clean

用于清理不需要的文件。

----- 是否开启自动清理
set hoodie.clean.automatic=true;
----- 是否异步清理
set hoodie.clean.async=true;
----- 清理策略 KEEP_LATEST_FILE_VERSIONS KEEP_LATEST_BY_HOURS
set hoodie.cleaner.policy=KEEP_LATEST_COMMITS;
----- 最新时间线之前,保留时间线个数,最少保留一个
set hoodie.cleaner.commits.retained=10;
  • KEEP_LATEST_COMMITS 根据提交次数进行清理,默认保留最新 10 个 Commit 所有文件。
  • KEEP_LATEST_FILE_VERSIONS 保留文件的版本数,默认保留三个版本。
  • KEEP_LATEST_BY_HOURS 按照小时保留,默认是 24 小时。

其它

记录级元数据

通过 Hudi 可以跟踪记录级的时间变化,为了达到此目的,在每个表中同时会维护行相关的元数据,可以查看 .hoodie/hoodie.properties 中的 hoodie.table.create.schema 配置项,主要包含如下几个字段:

  • _hoodie_record_key 记录主键,用来处理更新和删除。
  • _hoodie_commit_time 最新记录提交时间。
  • _hoodie_commit_seqno 提交序列号。
  • _hoodie_partition_path 分区路径。
  • _hoodie_file_name 存储记录的文件名。

可以参考 Hudi Metafields demystified 中的介绍。

参考