Parquet 文件格式详解

2021-12-01 warehouse

简介

Parquet 文件包含了一个 Header、多个 Block、一个 Footer 组成,其中 Header 就是一个简单的 4 字节 PAR1 字符串作为模数,所有的元数据保存在 Footer 中,包含了格式的版本、Schema信息等等。

更详细的标准可以参考 Parquet Apache Org 文档中的介绍。

Schema

采用类 Protobuf 协议来描述,每个字段有三个属性:重复性(Repetition)、类型 (Type)和名称 (Name),如下是一个简单的示例。

message Document {                      MaxRepetition MaxDefinition
    required string DocId;              0             0
    optinal group Links {               0             1
        repeated int64 Backward;        1             2
        repeated int64 Forward;         1             2
    }
    repeated group Name {               1             1
        repeated group Language {       2             2
            required string Code;       2             2
            optional string Country;    2             3
        }
        optional string Url;            1             2
    }
}

通过 group 类型可以组成类似树状的嵌套关系,不过单纯使用树结构需要依赖上下层的关联关系,为了确保任意字段都可以独立恢复,引入了重复级别 (Repetition Level) 和定义级别 (Definition Level) 两个概念。

元数据

元数据包含了 文件、列、Page 三层级,所有的元数据通过 Thrift 格式存储,详细可以查看 Github Parquet Format 中的内容,不同的语言可以通过如下命令生成。

thrift -gen py parquet.thrift

Repetition VS. Definition Level

这两个主要是为了解决嵌套字段的定位,采用了与 Dremel 相同的编码方式,也就是采用 Repetition 和 Definition 进行定义,其在官方有如下的定义。

Definition levels specify how many optional fields in the path for the column are defined. Repetition levels specify at what repeated field in the path has the value repeated.

更多的内容可以参考 Dremel: Interactive Analysis of Web-Scale Datasets 中的介绍。

编码

这两个信息会同时编码到数据页中,可以参考 Data Pages 中的介绍。

最大的 Definition 和 Repetition 值是可以计算出来的,这样就可以确定需要多少位保存 Levels 数据,同时在 DataPageHeader 中的元数据包含了行数及其编码方式,一般分别是 RLE BitPacked 两种,因为 BitPacked 是 RLE 的子集,所以,也可以说只有 RLE 编码。

另外,当 Schema 没有嵌套时,那么 Repetition 的值一直为 1 会直接省略;同时,当数据是 required 时,对应 Definition 值恒为 Max 同样会省略。

编码

Parquet 实际保存的类型支持 Boolean Int32 Int64 FLoat Double ByteArray FixedLenByteArray 几种类型。

Plain(0)

所有的类型都采用小端存储,不同基本类型其长度有所区别,但是基本上都是原样存储。

BitPacked(4)

新版本已经通过如下的方式替换,通常是为了兼容老的版本,当前仅支持 Repetition 和 Definition 的编码。数据通过固定位进行编码,也就意味着,在编码或者解码时最大位已经确定。

dec value:   0   1   2   3   4   5   6   7
bit value: 000 001 010 011 100 101 110 111
bit label: ABC DEF GHI JKL MNO PQR STU VWX

bit value: 00000101 00111001 01110111
bit label: ABCDEFGH IJKLMNOP QRSTUVWX

上述,最大值为 7 ,那么只需要 3bits 就可以表示,编码时按照最高位到最低位的方式编码,而且除了最后一个字节外,其它中间字节不会进行对齐。

RLE/BitPacked(3)

通过 Run Length Encoding, RLE 和 Bit Packing 编码重复数据,已经替代原有的 BitPacked 编码。

示例代码

Python

import pandas as pd
import pyarrow
import pyarrow.parquet as parquet

# 定义 Schema
schema = pyarrow.schema([
    ('id', pyarrow.int32()),
    ('email', pyarrow.string())
])

# 准备数据
ids = pyarrow.array([1, 2], type = pyarrow.int32())
emails = pyarrow.array(['first@example.com', 'second@example.com'], pyarrow.string())

# 生成 Parquet 数据
batch = pyarrow.RecordBatch.from_arrays(
    [ids, emails],
    schema = schema
)
table = pyarrow.Table.from_batches([batch])

# 写 Parquet 文件 plain.parquet
parquet.write_table(table, 'user.parquet')
import pandas
import pyarrow.parquet as parquet

schema = parquet.read_schema('user.parquet')
print(schema)

df = pandas.read_parquet('user.parquet')
print(df.to_json())

Rust

依赖 Cargo.toml 如下,其中 futurestokio 主要是因为要使用异步方式。

arrow = { version = "52.2.0", features = ["prettyprint"] }
futures = "0.3.30"
parquet = { version = "52.2.0", features = ["async", "object_store"] }
tokio = { version = "1.40.0", features = ["full"] }

同步方式数据操作。

use std::{fs::File, sync::Arc};

use arrow::{
    array::{StructArray, UInt64Builder},
    datatypes::{DataType::UInt64, Field, Schema},
    util::pretty::print_batches,
};
use parquet::{
    arrow::{arrow_reader::ParquetRecordBatchReaderBuilder, ArrowWriter as ParquetWriter},
    errors::Result,
    file::properties::WriterProperties,
};

fn main() -> Result<()> {
    let file = File::create("/tmp/hudi/foobar.parquet").unwrap();
    let properties = WriterProperties::builder().build();
    let schema = Arc::new(Schema::new(vec![Field::new("id", UInt64, false)]));

    // Write
    let mut array = UInt64Builder::new();
    array.append_value(128 as u64);

    let mut writer = ParquetWriter::try_new(file, schema.clone(), Some(properties))?;
    writer.write(
        &StructArray::new(
            schema.fields().clone(),
            vec![Arc::new(array.finish())],
            None,
        )
        .into(),
    )?;

    writer.flush()?;
    writer.close()?;

    // Read
    let file = File::open("/tmp/hudi/foobar.parquet").unwrap();
    let reader = ParquetRecordBatchReaderBuilder::try_new(file)?.build()?;
    let mut batches = Vec::new();
    for batch in reader {
        batches.push(batch?);
    }
    print_batches(&batches).unwrap();

    Ok(())
}

异步读取数据。

use arrow::util::pretty::print_batches;
use futures::StreamExt;
use parquet::{arrow::ParquetRecordBatchStreamBuilder, errors::Result};
use tokio::fs::File as TFile;

#[tokio::main]
async fn main() -> Result<()> {
    let file = TFile::open("/tmp/hudi/foobar.parquet").await.unwrap();
    let builder = ParquetRecordBatchStreamBuilder::new(file).await.unwrap();
    let mut stream = builder.build().unwrap();

    //let batches = stream.try_collect::<Vec<_>>().await?;

    let mut batches = Vec::new();
    while let Some(batch) = stream.next().await {
        batches.push(batch?);
    }
    print_batches(&batches).unwrap();

    Ok(())
}