基本上分成了 OLAP Batch Streaming
算子落盘
存在 spill_count 指标, spill_record_batches read_spill_as_stream
SQL
SQL 解析依赖 SQLParser 实现,在 datafusion::sql
中将 sqlparser
重新导出,有也就意味着如下两种使用方式相同。
use datafusion::sql::sqlparser::parser::ParserError;
use sqlparser::parser::ParserError;
而 DataFusion 是在 SQLParser 基础上的定制化开发,可以根据场景配置不同的方言 Dialect
,常见的如 MySQL、PostgreSQL 等。
Plan
LogicalPlan
逻辑执行计划。
ExecutionPlan
物理执行计划节点,支持流式、并行读取数据,包含了 Projection Filter Limit 算子,当执行 execute()
方法时,只是将物理执行计划生成 RecordBatchStream
算子,形成数据流算子树,当执行 collect()
操作时才开始真正的数据流动。
会通过 DefaultPhysicalPlanner
生成执行计划,也可以通过实现 PhysicalPlanner
特征扩展。最终会通过 create_physical_plan
方法将逻辑计划转换为物理计划,每个节点是 ExecutionPlan
类型。
执行计划树时,会从根节点开始执行 execute
方法,这里不会开始处理数据,而是将物理算子转换为 RecordBatchStream
类型,只有当执行类似 collect
时才会真正执行。
TableProvider
用于自定义表或者数据源,其中核心的是 scan
函数
JanKaul/iceberg-rust
#5520 --> MemTable 支持 insert into
pub trait TableProvider: Sync + Send {
async fn scan(
&self,
state: &dyn Session,
projection: Option<&Vec<usize>>,
filters: &[Expr],
limit: Option<usize>,
) -> Result<Arc<dyn ExecutionPlan>>;
}
可以返回自定义的 ExecutionPlan ,或者借助类似 ParquetExecBuilder 这种方式
示例参考
datafusion-example/examples/custom_datasource.rs
datafusion/core/src/datasourc/physical_plan/parquet.rs
datafusion-example/examples/dataframe_output.rs 通过建表语句输出
datafusion/core/src/datasource/file_format/csv.rs 数据读取
docs/source/library-user-guide/custom-table-providers.md
Table::new()
|-Table::new_with_options()
|-storage::parse_uri()
|-Table::load_configs() 加载配置
|-Storage::new()
|-Storage::get_file_data() 读取并解析 .hoodie/hoodie.properties 文件中的内容
|-HudiConfigs::new() 简单对 HashMap 的封装
|-Table::validate_configs()
ObjectStore 最初由 Influx 编写,后来贡献给了 Arrow ,示例可以参考 alamb/rust_object_store_demo 中的实现。
RecordBatch
这是 Arrow 内存计算用的结构体,方便通过 SIMD 进行加速,实现在 arrow-array
包中的 src/record_batch.rs
文件中。
pub type SchemaRef = Arc<Schema>;
pub struct RecordBatch {
schema: SchemaRef,
columns: Vec<Arc<dyn Array>>,
row_count: usize, // 单独保存,同时可以应对不存在列的场景
}
pub type ArrayRef = Arc<dyn Array>;
常见操作如下。
let a: Arc<dyn Array> = batch.column(0); // 获取第一列数据
ensure!(a.data_type() == DataType::Int32); // 确保数据类型正确
// 获取真正的底层数据
let a: &Int32Array = a.as_any().downcast_ref().context("type mismatch")?;
Arrow Columnar Format
nvim-dap lldb
SIMD优化是在Arrow库中实现的,不过当前依赖 nightly 才可以,关于 Rust 中的 SIMD 实现可以参考 Rust语言中SIMD计算加速指令的使用 https://rustmagazine.github.io/rust_magazine_2021/chapter_8/hw-rust-simd.html
针对类型不错的介绍 https://github.com/skyzh/type-exercise-in-rust
https://github.com/jankaul/iceberg-rust
cargo fmt --all -- --check
cargo tarpaulin --engine llvm --no-dead-code --no-fail-fast --all-features --workspace -o html
其它
在源码中有很多不错的压测、示例代码可供参考:
benchmarks
保存了很多不错的测试用例,尤其支持不同版本的性能压测对比。datafusion-examples
很多入门级别的示例,可以根据不同的使用场景参考。datafusion/core/tests/data
测试用的数据,同时通过test-utils
包可以很容易加载。
其中测试数据可以通过类似如下方式加载,注意,需要通过 export DATAFUSION_TEST_DATA=/your/path/datafusion/core/tests/data
环境变量指定路径。
use datafusion::common::test_util::datafusion_test_data;
#[tokio::main]
async fn main() -> Result<()> {
// ... ...
let testdata = datafusion_test_data();
ctx.register_csv(
"ordered_table",
&format!("{testdata}/window_1.csv"),
CsvReadOptions::new().file_sort_order(vec![vec![col("ts").sort(true, true)]]),
)
.await?;
// ... ...
}
DataSource
存在 FileFormat
定义了当前支持的文件格式,包括了 Parquet
CSV
JSON
AVRO
等,底层通过 ObjectStore
实现。
各种格式会实现 infer_schema
函数,用来推断各种数据格式,默认会读取前 1k 行数据进行推断。
执行器
DataFrame
类似于 Saprk
和 Pandas
的实现,是带有表结构 Schema
的二维数据,后面可以跟很多类似 filter
select
aggregate
limit
的算子,注意,其是 Lazy
执行的,这样使用时可以作额外的优化,常见的执行方式有如下几种:
collect
执行查询同时将结果缓存到Vec<RecordBatch>
中。execute_stream
流式执行,会返回SendableRecordBatchStream
并在后续调用next()
时执行。cache
执行并将结果缓存到内存中的DataFrame
对象。
基于 Rust 中 Async 的实现也天然具有了 Stream
流式的特性,当然对于物化算子 (也被称为 Pipeline Breaker,例如 Sort、Hash 等) 需要阻塞等待所有的输入数据,然后才会有具体的输出,大部分算子都是流式读取 RecordBatch
并输出 RecordBatch
变量。
SessionContext.sql
|-SessionContext.sql_with_option()
DataFrame.collect() == 都是在 PhysicalPlan 中执行
|-DataFrame.task_ctx()
|-DataFrame.create_physical_plan()
|-collect() == ExecutionPlan 的实现,入参是上述的返回值
|-execute_stream()
|-common::collect()
|-SendableRecordBatchStream.try_collect()
|-TryCollect::new() 这里会将上述的流进行收集
通过 SessionContext 维护用户与真正引擎之间的接口,支持从数据源创建 DataFrame、注册表 (后续可以通过 SQL 使用)、执行 SQL 等。另外,除了可以通过 SQL 执行之外,还可以使用类似 col
lit
alias
的接口生成逻辑执行计划。
执行时还会创建 TaskContext 对象,维护了 SessionID TaskID 相关信息。
CPU IO Bound
DataFrame
的运行态使用的是 Tokio
包,而在官方文档 CPU-bound tasks and blocking code 中有如下的介绍。
在 Using Rustlang’s Async Tokio Runtime for CPU-Bound Tasks 文章中有详细的介绍,其实现也比较简单,基本原理就是通过 DedicatedExecutor
将 IO 和 CPU 任务分开执行,同时保留了对 Async 语义的支持。