简介
向量执行最初可追溯到 MonetDB/X100: Hyper-Pipelining Query Execution 论文,对于传统 Volcano
模型,实现的 next()
接口每次返回一批数据,通过 SIMD 加速执行。
与向量加速相关的项目常见的有如下几个:
- Photon 由 Databricks 在 2022 年推出商业化产品,一个 Native C++ 的向量化计算引擎,当前闭源。
- Datafusion Comet 是 2024 年由 Apple 主导的开源 Spark 向量化执行引擎,类似快手的 Blaze 实现。
- Gluten 由 Intel、Kyligence 主导的中间框架,提供了通用能力,并通过 Substrait 提供序列化 (Protobuf) 方案。
另外,Velox 是 Meta 公司 2022 年开源的 C++ 高性能计算引擎库,不象 Datafusion 框架,其不含 SQL 解析、优化等模块,而且不只是 Spark 在使用,还有基于 Presto 的 Prestissimo 执行加速。
Tungsten
提升 CPU 和内存的使用效率。
- 内存管理,在栈中实现自己的高效内存管理,不再依赖 Java GC 之类的实现。
Code Generation
代码生成包含两部分,一部分是基本表达式的代码生成,另一部分称为 WholeStageCodegen, WSCG 全阶段代码生成,主要是通过开源的 Janino 实现,是对 Task 的执行效率进行优化。
简介
包含了如下的组件。
Spark Extension
Spark 插件,实现 Spark 算子到 Native 算子之间的翻译。JNI Bridge
实现 Spark Execution 和 Native Engine 之间的相互调用。
使用时核心的是配置如下两个参数,分别是对 SQL、Shuffle 的扩展。
spark.sql.extensions org.apache.spark.sql.blaze.BlazeSparkSessionExtension
spark.shuffle.manager org.apache.spark.sql.execution.blaze.shuffle.BlazeShuffleManager
实际上在扩展时分为了两类,一个是在初始化或者类加载时候的数据处理,还有就是基于 Spark 的扩展。
SQL
也就是大部分 Spark 扩展的实现,主要是通过 SparkSessionExtensions
进行扩展,同时会在 apply()
函数中会同时设置如下参数,也就是开启 Spark 的 Adaptive Execution Engine
实现。
spark.sql.adaptive.enabled true
spark.sql.adaptive.forceApply true
同时通过 execnsions.injectXXX
注入不同阶段的实现,包括了:
Columnar
替换执行器的列式数据处理。
BlazeSparkSessionExtension.apply() 通过 Spark 自身的扩展配置,会调用 injectColumnar() 函数,最终调用如下实现
BlazeColumnarOverrides.apply()
|-BlazeConvertStrategy.apply() 会设置tag信息
| |-BlazeConverters.convertSparkPlan() 根据不同的Exec类型转换,如果失败则回退
| |-BlazeConverters.convertFileSourceScanExec()
| | |-Shims.get.createNativeParquetScanExec() FileSourceScanExec => NativeParquetScanExec/NativeOrcScanExec
| | | |-ShimsImpl.createNativeParquetScanExec()
| | | |-NativeParquetScanExec()
| |-BlazeConverters.convertProjectExec()
| | |-Shims.get.createNativeProjectExec() ProjectExec => NativeParquetScanExec/NativeOrcScanExec
| | |-NativeProjectExecProvider.provide()
| |-BlazeConverters.convertLocalLimitExec()
|-BlazeConverters.convertSparkPlanRecursively()
|-Shims.get.postTransform()
SparkPlan
|-NativeSupports <trait> 这里封装了 doExecute() 接口,会调用 doExecuteNative() 函数,后续的 Native 都需要实现该函数
|-NativeFileSourceScanBase extends LeafExecNode
| |-NativeParquetScanBase ==> doExecuteNative() extension
| | |-NativeParquetScanExec shims
| |-NativeOrcScanBase ==> doExecuteNative() extension
| |-NativeOrcScanExec shims
|-NativeProjectBase ==> doExecuteNative() extension
|-NativeProjectExecProvider shims
BlazeSparkSessionExtension() 构造函数实现
|-Shims.get.initExtension() 会调用 ShimsImpl 实现
|-ValidateSparkPlanInjector.inject() 通过 bytebuddy 注入代码,会调用如下代码
|-ForceApplyShuffledHashJoinInjector.inject() 根据参数设置
会将 org.apache.spark.sql.execution.adaptive.ValidateSparkPlan 替换为如下实现
ValidateSparkPlanApplyInterceptor.intercept()
|-InterceptedValidateSparkPlan.validate()
NativeParquetScanBase.doExecuteNative() 执行入口,会返回 NativeRDD 封装
NativeRDD.compute()
|-nativePlan() 不同算子传入的函数,主要是构建 Protobuf 序列化内容
|-NativeHelper.executeNativePlan()
|-BlazeCallNativeWrapper() 对 Native 的封装,还是 Scala 的实现
| |-BlazeCallNativeWrapper.initNative()
| | |-BlazeCallNativeWrapper.lazyInitNative()
| | |-BlazeCallNativeWrapper.loadLibBlaze() 加载Rust实现
| |-JniBridge.callNative() 这里就是调用的 Native 中的实现了
| |-NativeExecutionRuntime::start() 真正的 Rust 运行态实现
| |-BlazeCallNativeWrapper.getRawTaskDefinition() 调用 Java 实现,获取任务信息
|-BlazeCallNativeWrapper.getRowIterator() 返回 Iterator
|-CompletionIterator() 调用 Spark 相关实现,对应 rowIterator 中包含 hasNext next 等函数实现
详见 jni_bridge.rs 中的实现
上述调用对应了 Rust 中如下函数的实现,也就是 Rust/Java 相互调用的实现,详见 Rust 中的 jni 包。
Java_org_apache_spark_sql_blaze_JniBridge_callNative()
SparkPlan.execute() ==> 物理执行计划
|-RDD.doExecute()
===================>>>>>>>>>>> Rust
ParquetExec
会通过 from_proto.rs 转换,在 blaze-serde 库中
NativeSupport.doExecute() ==> 应该是真正的执行
toArrowSchema()
NativeBroadcastExchangeBase.doExecuteBroadcast()
ConvertToNativeBase.doExecuteNative()
ArrowFFIExporter
WholeStageCodegenExec 用于代码生成
Protobuf
尝试将 protobuf::PhysicalPlanNode 转换为 ExecutionPlan 实现,后者是 Datafusion 的结构体。
try_parse_physical_expr()
感觉这玩意应该是用于 Driver 和 Executor 之间的数据传递。
其它
在 spark-extention
实现中,会出现大量的 Shims.get.XXX()
相关调用,其实现方式介绍如下。在 Shims.scala
中有个相关的实现,该对象 lazy
会加载 ShimsImpl.scala
实现,在 spark-extension-shims
包中。
Byte Buddy
Byte Buddy 是一个字节码生成和操作库,可以在运行时创建和修改 Java 类,无需编译器支持。
Executor.launchTask()
|-Executor.createTaskRunner() 生成 TaskRunner 对象
TaskRunner.run()
|-TaskRunner.updateDependencies() 下载依赖的文件和 Jar 包
|-deserialize() 反序列化任务
|-Task.run() 开始执行,分为了 ShuffleMapTask ResultTask 两种
|-TaskContext.runTaskWithListeners()
|>>>Task.runTask()
|===ResultTask.runTask()
|===ShuffleMapTask.runTask()
|-ShuffleWriteProcess.write()
|-RDD.iterator()
|-RDD.getOrCompute() 尝试从 Cache 中读取
|-RDD.computeOrReadCheckpoint() 需要进行计算
|-RDD.compute() 这里就开始调用各种 RDD 实现的 compute 实现了