定义了内存布局格式,从而允许在不同的系统之间高效地共享数据,通过减少不必要的序列化和反序列化成本,从而提高不同系统间传递数据效率。
简介
采用了列式内存存储格式,详细可以参考 Arrow Columnar Format 文档中的定义,不只是支持常规的格式,同时支持嵌套格式的定义,其设计格式时有如下的特性:
- 对于 Scan 擦作,数据可以邻近访问。
O(1)
级别的随机访问。- 对于 SIMD 以及向量操作友好。
- 共享内存中支持 Zero-Copy 方式。
为此,采用列式数据保存,元数据通过 FlatBuffers 保存,这也是一个无需反序列化的高效数据存储格式。
除了 Arrow 之外,还包括了 ArrowStream 格式,后者支持流方式处理数据。
Rust
严格来说,该项目除了包含基础的 Arrow 实现之外,还有 Parquet、ObjectStore、Flight 等相关的功能实现:
arrow-buffer
底层 Buffer 对象的实现,保存了基础的内存相关数据抽象,用来实现 ZeroCopy 。
Buffer
其中的 Bytes
与 tokio
的 bytes
库很像,提供了连续、固定大小、不可修改的内存存储空间。
pub struct Bytes {
ptr: NonNull<u8>, // 原始内存指针
len: usize, // 当前大小
deallocation: Deallocation, // 允许使用自定义的内存分配
}
pub struct Buffer {
data: Arc<Bytes>, // 底层依赖内存
ptr: *const u8, // 这里保存指针而非偏移,主要是为了防止 LLVM 的自动向量化失败
length: usize,
}
通常不建议直接使用 Buffer
结构体,如下是常用的方法。
use arrow::buffer::Buffer;
use arrow::datatypes::ToByteSlice;
fn main() {
// let buff = Buffer::from(&[0u8, 1, 2, 3]);
// let buff = Buffer::from_vec(vec![0u8, 1, 2, 3]);
let buff = Buffer::from([0u8, 1, 2, 3].to_byte_slice());
unsafe {
for i in 0..4 {
println!("{}", *buff.as_ptr().add(i));
}
}
println!("{:?}", buff);
}
其中的 to_byte_slice
会同时申请内存。
ArrayData
通过 ArrayData
保存相关的数据,其底层的数据可以被多个对象共享,另外,还可以通过 DataTypeLayout
查看在内存中的布局,例如,对于 Boolean
类型可以判断实际是采用 Bitmap
还是 Buffer
进行保存。
// arrow-data
pub struct ArrayData {
data_type: DataType, // 数据类型,几乎满足当前绝大部分诉求
offset: usize, // 底层数据的偏移
len: usize, // 以及长度
buffers: Vec<Buffer>,
child_data: Vec<ArrayData>,
nulls: Option<NullBuffer>,
}
pub type ArrayDataRef = Arc<ArrayData>;
这里的 ArrayData
是一个通用对象,对于已知的数据类型,通常使用 Int16Array
Int32Array
等类似类型,如下是使用示例,当然,可以查看源码有很多更方便的构建技巧。
use arrow::array::{ArrayData, Int32Array};
use arrow::buffer::Buffer;
use arrow::datatypes::DataType;
fn main() {
let buff = Buffer::from(vec![0i32, 1, 2, 3]);
let data = ArrayData::builder(DataType::Int32)
.add_buffer(buff.clone())
.len(4)
.build()
.unwrap();
println!("{:?}", Int32Array::from(data));
}
RecordBatch
在上述的基础之上,同时增加了 RecordBatch
结构,除了包含数据之外,还有 Schema 信息,一般需要依赖 arrow_schema
包,如下是简单示例。
use std::sync::Arc;
use arrow::{
array::{record_batch, Int32Array, RecordBatch},
datatypes::{Field, Schema},
};
use arrow_schema::DataType;
fn main() {
let batch = record_batch!(
("a", Int32, [1, 2, 3]),
("b", Float64, [Some(4.0), None, Some(5.0)]),
("c", Utf8, ["alpha", "beta", "gamma"])
);
println!("{:#?}", batch);
let ids = Int32Array::from(vec![1, 2, 3]);
let schema = Schema::new(vec![Field::new("id", DataType::Int32, false)]);
let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(ids)]).unwrap();
println!("{:#?}", batch);
}
还可以通过 try_new_with_options()
创建,另外,支持 features=["prettyprint"]
特性,在 use arrow::util::pretty::print_batches;
中通过 print_batches(&[batch]).unwrap();
打印。
对于 Schema
可以通过如下方式序列化。
let schema = Schema::new(vec![Field::new("id", DataType::Int32, false)]);
println!("{}", serde_json::to_string(&schema).unwrap());
println!("{}", serde_yaml::to_string(&schema).unwrap());
SIMD
与计算相关的实现应该是在 arrow::compute::kernels
模块中,其实际上是将 arrow_arith
、arrow_cast
、arrow_ord
等模块进行了简单的封装。