Datafusion 使用简介

2024-10-16 warehouse rust

基本上分成了 OLAP Batch Streaming

SQL

Plan

LogicalPlan

逻辑执行计划。

ExecutionPlan

物理执行计划节点,支持流式、并行读取数据,包含了 Projection Filter Limit 算子,当执行 execute() 方法时,只是将物理执行计划生成 RecordBatchStream 算子,形成数据流算子树,当执行 collect() 操作时才开始真正的数据流动。

TableProvider

用于自定义表或者数据源,其中核心的是 scan 函数

JanKaul/iceberg-rust

#5520 --> MemTable 支持 insert into

pub trait TableProvider: Sync + Send {
    async fn scan(
        &self,
        state: &dyn Session,
        projection: Option<&Vec<usize>>,
        filters: &[Expr],
        limit: Option<usize>,
    ) -> Result<Arc<dyn ExecutionPlan>>;
}
可以返回自定义的 ExecutionPlan ,或者借助类似 ParquetExecBuilder 这种方式


示例参考
datafusion-example/examples/custom_datasource.rs
datafusion/core/src/datasourc/physical_plan/parquet.rs

datafusion-example/examples/dataframe_output.rs    通过建表语句输出

datafusion/core/src/datasource/file_format/csv.rs  数据读取

docs/source/library-user-guide/custom-table-providers.md


Table::new()
 |-Table::new_with_options()
   |-storage::parse_uri()
   |-Table::load_configs() 加载配置
     |-Storage::new()
     |-Storage::get_file_data() 读取并解析 .hoodie/hoodie.properties 文件中的内容
     |-HudiConfigs::new() 简单对 HashMap 的封装
     |-Table::validate_configs()

ObjectStore 最初由 Influx 编写,后来贡献给了 Arrow ,示例可以参考 alamb/rust_object_store_demo 中的实现。

RecordBatch

这是 Arrow 内存计算用的结构体,方便通过 SIMD 进行加速,实现在 arrow-array 包中的 src/record_batch.rs 文件中。

pub type SchemaRef = Arc<Schema>;
pub struct RecordBatch {
    schema: SchemaRef,
    columns: Vec<Arc<dyn Array>>,
    row_count: usize, // 单独保存,同时可以应对不存在列的场景
}

pub type ArrayRef = Arc<dyn Array>;

常见操作如下。

let a: Arc<dyn Array> = batch.column(0);   // 获取第一列数据
ensure!(a.data_type() == DataType::Int32); // 确保数据类型正确
// 获取真正的底层数据
let a: &Int32Array = a.as_any().downcast_ref().context("type mismatch")?;

Arrow Columnar Format

nvim-dap lldb

SIMD优化是在Arrow库中实现的,不过当前依赖 nightly 才可以,关于 Rust 中的 SIMD 实现可以参考 Rust语言中SIMD计算加速指令的使用 https://rustmagazine.github.io/rust_magazine_2021/chapter_8/hw-rust-simd.html

针对类型不错的介绍 https://github.com/skyzh/type-exercise-in-rust

https://github.com/jankaul/iceberg-rust

cargo fmt --all -- --check
cargo tarpaulin --engine llvm --no-dead-code --no-fail-fast --all-features --workspace -o html

参考

  • Datafusion Ballista 分布式查询引擎,设计灵感来自 SparkSQL 但是基于技术栈不同。