Datafusion 使用简介

2024-10-16 warehouse rust

基本上分成了 OLAP Batch Streaming

算子落盘

存在 spill_count 指标, spill_record_batches read_spill_as_stream

SQL

SQL 解析依赖 SQLParser 实现,在 datafusion::sql 中将 sqlparser 重新导出,有也就意味着如下两种使用方式相同。

use datafusion::sql::sqlparser::parser::ParserError;
use sqlparser::parser::ParserError;

而 DataFusion 是在 SQLParser 基础上的定制化开发,可以根据场景配置不同的方言 Dialect,常见的如 MySQL、PostgreSQL 等。

Plan

LogicalPlan

逻辑执行计划。

ExecutionPlan

物理执行计划节点,支持流式、并行读取数据,包含了 Projection Filter Limit 算子,当执行 execute() 方法时,只是将物理执行计划生成 RecordBatchStream 算子,形成数据流算子树,当执行 collect() 操作时才开始真正的数据流动。

会通过 DefaultPhysicalPlanner 生成执行计划,也可以通过实现 PhysicalPlanner 特征扩展。最终会通过 create_physical_plan 方法将逻辑计划转换为物理计划,每个节点是 ExecutionPlan 类型。

执行计划树时,会从根节点开始执行 execute 方法,这里不会开始处理数据,而是将物理算子转换为 RecordBatchStream 类型,只有当执行类似 collect 时才会真正执行。

TableProvider

用于自定义表或者数据源,其中核心的是 scan 函数

JanKaul/iceberg-rust

#5520 --> MemTable 支持 insert into

pub trait TableProvider: Sync + Send {
    async fn scan(
        &self,
        state: &dyn Session,
        projection: Option<&Vec<usize>>,
        filters: &[Expr],
        limit: Option<usize>,
    ) -> Result<Arc<dyn ExecutionPlan>>;
}
可以返回自定义的 ExecutionPlan ,或者借助类似 ParquetExecBuilder 这种方式


示例参考
datafusion-example/examples/custom_datasource.rs
datafusion/core/src/datasourc/physical_plan/parquet.rs

datafusion-example/examples/dataframe_output.rs    通过建表语句输出

datafusion/core/src/datasource/file_format/csv.rs  数据读取

docs/source/library-user-guide/custom-table-providers.md


Table::new()
 |-Table::new_with_options()
   |-storage::parse_uri()
   |-Table::load_configs() 加载配置
     |-Storage::new()
     |-Storage::get_file_data() 读取并解析 .hoodie/hoodie.properties 文件中的内容
     |-HudiConfigs::new() 简单对 HashMap 的封装
     |-Table::validate_configs()

ObjectStore 最初由 Influx 编写,后来贡献给了 Arrow ,示例可以参考 alamb/rust_object_store_demo 中的实现。

RecordBatch

这是 Arrow 内存计算用的结构体,方便通过 SIMD 进行加速,实现在 arrow-array 包中的 src/record_batch.rs 文件中。

pub type SchemaRef = Arc<Schema>;
pub struct RecordBatch {
    schema: SchemaRef,
    columns: Vec<Arc<dyn Array>>,
    row_count: usize, // 单独保存,同时可以应对不存在列的场景
}

pub type ArrayRef = Arc<dyn Array>;

常见操作如下。

let a: Arc<dyn Array> = batch.column(0);   // 获取第一列数据
ensure!(a.data_type() == DataType::Int32); // 确保数据类型正确
// 获取真正的底层数据
let a: &Int32Array = a.as_any().downcast_ref().context("type mismatch")?;

Arrow Columnar Format

nvim-dap lldb

SIMD优化是在Arrow库中实现的,不过当前依赖 nightly 才可以,关于 Rust 中的 SIMD 实现可以参考 Rust语言中SIMD计算加速指令的使用 https://rustmagazine.github.io/rust_magazine_2021/chapter_8/hw-rust-simd.html

针对类型不错的介绍 https://github.com/skyzh/type-exercise-in-rust

https://github.com/jankaul/iceberg-rust

cargo fmt --all -- --check
cargo tarpaulin --engine llvm --no-dead-code --no-fail-fast --all-features --workspace -o html

其它

在源码中有很多不错的压测、示例代码可供参考:

  • benchmarks 保存了很多不错的测试用例,尤其支持不同版本的性能压测对比。
  • datafusion-examples 很多入门级别的示例,可以根据不同的使用场景参考。
  • datafusion/core/tests/data 测试用的数据,同时通过 test-utils 包可以很容易加载。

其中测试数据可以通过类似如下方式加载,注意,需要通过 export DATAFUSION_TEST_DATA=/your/path/datafusion/core/tests/data 环境变量指定路径。

use datafusion::common::test_util::datafusion_test_data;

#[tokio::main]
async fn main() -> Result<()> {
    // ... ...
    let testdata = datafusion_test_data();
    ctx.register_csv(
        "ordered_table",
        &format!("{testdata}/window_1.csv"),
        CsvReadOptions::new().file_sort_order(vec![vec![col("ts").sort(true, true)]]),
    )
    .await?;
    // ... ...
}

DataSource

存在 FileFormat 定义了当前支持的文件格式,包括了 Parquet CSV JSON AVRO 等,底层通过 ObjectStore 实现。

各种格式会实现 infer_schema 函数,用来推断各种数据格式,默认会读取前 1k 行数据进行推断。

执行器

DataFrame 类似于 SaprkPandas 的实现,是带有表结构 Schema 的二维数据,后面可以跟很多类似 filter select aggregate limit 的算子,注意,其是 Lazy 执行的,这样使用时可以作额外的优化,常见的执行方式有如下几种:

  • collect 执行查询同时将结果缓存到 Vec<RecordBatch> 中。
  • execute_stream 流式执行,会返回 SendableRecordBatchStream 并在后续调用 next() 时执行。
  • cache 执行并将结果缓存到内存中的 DataFrame 对象。

基于 Rust 中 Async 的实现也天然具有了 Stream 流式的特性,当然对于物化算子 (也被称为 Pipeline Breaker,例如 Sort、Hash 等) 需要阻塞等待所有的输入数据,然后才会有具体的输出,大部分算子都是流式读取 RecordBatch 并输出 RecordBatch 变量。

SessionContext.sql
 |-SessionContext.sql_with_option()

DataFrame.collect() == 都是在 PhysicalPlan 中执行
 |-DataFrame.task_ctx()
 |-DataFrame.create_physical_plan()
 |-collect() == ExecutionPlan 的实现,入参是上述的返回值
   |-execute_stream()
   |-common::collect()
     |-SendableRecordBatchStream.try_collect()
       |-TryCollect::new() 这里会将上述的流进行收集

通过 SessionContext 维护用户与真正引擎之间的接口,支持从数据源创建 DataFrame、注册表 (后续可以通过 SQL 使用)、执行 SQL 等。另外,除了可以通过 SQL 执行之外,还可以使用类似 col lit alias 的接口生成逻辑执行计划。

执行时还会创建 TaskContext 对象,维护了 SessionID TaskID 相关信息。

CPU IO Bound

DataFrame 的运行态使用的是 Tokio 包,而在官方文档 CPU-bound tasks and blocking code 中有如下的介绍。

Using Rustlang’s Async Tokio Runtime for CPU-Bound Tasks 文章中有详细的介绍,其实现也比较简单,基本原理就是通过 DedicatedExecutor 将 IO 和 CPU 任务分开执行,同时保留了对 Async 语义的支持。

参考

  • Ballista 分布式查询引擎,设计灵感来自 SparkSQL 但是基于技术栈不同。
  • Arroyo 流处理引擎,类似于 Flink 实现。