Arrow 内存列式数据存储格式

2022-11-09 warehouse

定义了内存布局格式,从而允许在不同的系统之间高效地共享数据,通过减少不必要的序列化和反序列化成本,从而提高不同系统间传递数据效率。

简介

采用了列式内存存储格式,详细可以参考 Arrow Columnar Format 文档中的定义,不只是支持常规的格式,同时支持嵌套格式的定义,其设计格式时有如下的特性:

  • 对于 Scan 擦作,数据可以邻近访问。
  • O(1) 级别的随机访问。
  • 对于 SIMD 以及向量操作友好。
  • 共享内存中支持 Zero-Copy 方式。

为此,采用列式数据保存,元数据通过 FlatBuffers 保存,这也是一个无需反序列化的高效数据存储格式。

除了 Arrow 之外,还包括了 ArrowStream 格式,后者支持流方式处理数据。

Rust

严格来说,该项目除了包含基础的 Arrow 实现之外,还有 Parquet、ObjectStore、Flight 等相关的功能实现:

  • arrow-buffer 底层 Buffer 对象的实现,保存了基础的内存相关数据抽象,用来实现 ZeroCopy 。

Buffer

其中的 Bytestokiobytes 库很像,提供了连续、固定大小、不可修改的内存存储空间。

pub struct Bytes {
    ptr: NonNull<u8>,  // 原始内存指针
    len: usize,        // 当前大小
    deallocation: Deallocation, // 允许使用自定义的内存分配
}

pub struct Buffer {
    data: Arc<Bytes>,  // 底层依赖内存
    ptr: *const u8,    // 这里保存指针而非偏移,主要是为了防止 LLVM 的自动向量化失败
    length: usize,
}

通常不建议直接使用 Buffer 结构体,如下是常用的方法。

use arrow::buffer::Buffer;
use arrow::datatypes::ToByteSlice;

fn main() {
    // let buff = Buffer::from(&[0u8, 1, 2, 3]);
    // let buff = Buffer::from_vec(vec![0u8, 1, 2, 3]);
    let buff = Buffer::from([0u8, 1, 2, 3].to_byte_slice());
    unsafe {
        for i in 0..4 {
            println!("{}", *buff.as_ptr().add(i));
        }
    }
    println!("{:?}", buff);
}

其中的 to_byte_slice 会同时申请内存。

ArrayData

通过 ArrayData 保存相关的数据,其底层的数据可以被多个对象共享,另外,还可以通过 DataTypeLayout 查看在内存中的布局,例如,对于 Boolean 类型可以判断实际是采用 Bitmap 还是 Buffer 进行保存。

// arrow-data
pub struct ArrayData {
    data_type: DataType, // 数据类型,几乎满足当前绝大部分诉求
    offset: usize,       // 底层数据的偏移
    len: usize,          // 以及长度
    buffers: Vec<Buffer>,
    child_data: Vec<ArrayData>,
    nulls: Option<NullBuffer>,
}
pub type ArrayDataRef = Arc<ArrayData>;

这里的 ArrayData 是一个通用对象,对于已知的数据类型,通常使用 Int16Array Int32Array 等类似类型,如下是使用示例,当然,可以查看源码有很多更方便的构建技巧。

use arrow::array::{ArrayData, Int32Array};
use arrow::buffer::Buffer;
use arrow::datatypes::DataType;

fn main() {
    let buff = Buffer::from(vec![0i32, 1, 2, 3]);
    let data = ArrayData::builder(DataType::Int32)
        .add_buffer(buff.clone())
        .len(4)
        .build()
        .unwrap();
    println!("{:?}", Int32Array::from(data));
}

RecordBatch

在上述的基础之上,同时增加了 RecordBatch 结构,除了包含数据之外,还有 Schema 信息,一般需要依赖 arrow_schema 包,如下是简单示例。

use std::sync::Arc;

use arrow::{
    array::{record_batch, Int32Array, RecordBatch},
    datatypes::{Field, Schema},
};
use arrow_schema::DataType;

fn main() {
    let batch = record_batch!(
        ("a", Int32, [1, 2, 3]),
        ("b", Float64, [Some(4.0), None, Some(5.0)]),
        ("c", Utf8, ["alpha", "beta", "gamma"])
    );
    println!("{:#?}", batch);

    let ids = Int32Array::from(vec![1, 2, 3]);
    let schema = Schema::new(vec![Field::new("id", DataType::Int32, false)]);
    let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(ids)]).unwrap();
    println!("{:#?}", batch);
}

还可以通过 try_new_with_options() 创建,另外,支持 features=["prettyprint"] 特性,在 use arrow::util::pretty::print_batches; 中通过 print_batches(&[batch]).unwrap(); 打印。

对于 Schema 可以通过如下方式序列化。

let schema = Schema::new(vec![Field::new("id", DataType::Int32, false)]);
println!("{}", serde_json::to_string(&schema).unwrap());
println!("{}", serde_yaml::to_string(&schema).unwrap());

SIMD

与计算相关的实现应该是在 arrow::compute::kernels 模块中,其实际上是将 arrow_aritharrow_castarrow_ord 等模块进行了简单的封装。