Spark Native 加速

2022-09-19 warehouse

简介

向量执行最初可追溯到 MonetDB/X100: Hyper-Pipelining Query Execution 论文,对于传统 Volcano 模型,实现的 next() 接口每次返回一批数据,通过 SIMD 加速执行。

与向量加速相关的项目常见的有如下几个:

  • Photon 由 Databricks 在 2022 年推出商业化产品,一个 Native C++ 的向量化计算引擎,当前闭源。
  • Datafusion Comet 是 2024 年由 Apple 主导的开源 Spark 向量化执行引擎,类似快手的 Blaze 实现。
  • Gluten 由 Intel、Kyligence 主导的中间框架,提供了通用能力,并通过 Substrait 提供序列化 (Protobuf) 方案。

另外,Velox 是 Meta 公司 2022 年开源的 C++ 高性能计算引擎库,不象 Datafusion 框架,其不含 SQL 解析、优化等模块,而且不只是 Spark 在使用,还有基于 Presto 的 Prestissimo 执行加速。

Tungsten

提升 CPU 和内存的使用效率。

  • 内存管理,在栈中实现自己的高效内存管理,不再依赖 Java GC 之类的实现。

Code Generation

代码生成包含两部分,一部分是基本表达式的代码生成,另一部分称为 WholeStageCodegen, WSCG 全阶段代码生成,主要是通过开源的 Janino 实现,是对 Task 的执行效率进行优化。

简介

包含了如下的组件。

  • Spark Extension Spark 插件,实现 Spark 算子到 Native 算子之间的翻译。
  • JNI Bridge 实现 Spark Execution 和 Native Engine 之间的相互调用。

使用时核心的是配置如下两个参数,分别是对 SQL、Shuffle 的扩展。

spark.sql.extensions org.apache.spark.sql.blaze.BlazeSparkSessionExtension
spark.shuffle.manager org.apache.spark.sql.execution.blaze.shuffle.BlazeShuffleManager

实际上在扩展时分为了两类,一个是在初始化或者类加载时候的数据处理,还有就是基于 Spark 的扩展。

SQL

也就是大部分 Spark 扩展的实现,主要是通过 SparkSessionExtensions 进行扩展,同时会在 apply() 函数中会同时设置如下参数,也就是开启 Spark 的 Adaptive Execution Engine 实现。

spark.sql.adaptive.enabled true
spark.sql.adaptive.forceApply true

同时通过 execnsions.injectXXX 注入不同阶段的实现,包括了:

  • Columnar 替换执行器的列式数据处理。
BlazeSparkSessionExtension.apply() 通过 Spark 自身的扩展配置,会调用 injectColumnar() 函数,最终调用如下实现
BlazeColumnarOverrides.apply()
 |-BlazeConvertStrategy.apply() 会设置tag信息
 | |-BlazeConverters.convertSparkPlan() 根据不同的Exec类型转换,如果失败则回退
 |   |-BlazeConverters.convertFileSourceScanExec()
 |   | |-Shims.get.createNativeParquetScanExec() FileSourceScanExec => NativeParquetScanExec/NativeOrcScanExec
 |   | | |-ShimsImpl.createNativeParquetScanExec()
 |   | |   |-NativeParquetScanExec()
 |   |-BlazeConverters.convertProjectExec()
 |   | |-Shims.get.createNativeProjectExec()     ProjectExec => NativeParquetScanExec/NativeOrcScanExec
 |   |   |-NativeProjectExecProvider.provide()
 |   |-BlazeConverters.convertLocalLimitExec()
 |-BlazeConverters.convertSparkPlanRecursively()
 |-Shims.get.postTransform()

SparkPlan
 |-NativeSupports <trait> 这里封装了 doExecute() 接口,会调用 doExecuteNative() 函数,后续的 Native 都需要实现该函数
   |-NativeFileSourceScanBase extends LeafExecNode
   | |-NativeParquetScanBase ==> doExecuteNative() extension
   | | |-NativeParquetScanExec                     shims
   | |-NativeOrcScanBase     ==> doExecuteNative() extension
   |   |-NativeOrcScanExec                         shims
   |-NativeProjectBase       ==> doExecuteNative() extension
     |-NativeProjectExecProvider                   shims
BlazeSparkSessionExtension() 构造函数实现
 |-Shims.get.initExtension() 会调用 ShimsImpl 实现
   |-ValidateSparkPlanInjector.inject() 通过 bytebuddy 注入代码,会调用如下代码
   |-ForceApplyShuffledHashJoinInjector.inject() 根据参数设置
会将 org.apache.spark.sql.execution.adaptive.ValidateSparkPlan 替换为如下实现
ValidateSparkPlanApplyInterceptor.intercept()
 |-InterceptedValidateSparkPlan.validate()

NativeParquetScanBase.doExecuteNative() 执行入口,会返回 NativeRDD 封装

NativeRDD.compute()
 |-nativePlan() 不同算子传入的函数,主要是构建 Protobuf 序列化内容
 |-NativeHelper.executeNativePlan()
   |-BlazeCallNativeWrapper()  Native 的封装,还是 Scala 的实现
   | |-BlazeCallNativeWrapper.initNative()
   | | |-BlazeCallNativeWrapper.lazyInitNative()
   | |   |-BlazeCallNativeWrapper.loadLibBlaze() 加载Rust实现
   | |-JniBridge.callNative() 这里就是调用的 Native 中的实现了
   |   |-NativeExecutionRuntime::start() 真正的 Rust 运行态实现
   |     |-BlazeCallNativeWrapper.getRawTaskDefinition() 调用 Java 实现,获取任务信息
   |-BlazeCallNativeWrapper.getRowIterator() 返回 Iterator
     |-CompletionIterator() 调用 Spark 相关实现,对应 rowIterator 中包含 hasNext next 等函数实现


详见 jni_bridge.rs 中的实现
上述调用对应了 Rust 中如下函数的实现,也就是 Rust/Java 相互调用的实现,详见 Rust 中的 jni 包。
Java_org_apache_spark_sql_blaze_JniBridge_callNative()


SparkPlan.execute() ==> 物理执行计划
 |-RDD.doExecute()


===================>>>>>>>>>>> Rust
ParquetExec
会通过 from_proto.rs 转换,在 blaze-serde 库中





NativeSupport.doExecute()  ==> 应该是真正的执行


toArrowSchema()

NativeBroadcastExchangeBase.doExecuteBroadcast()
ConvertToNativeBase.doExecuteNative()

ArrowFFIExporter


WholeStageCodegenExec 用于代码生成

Protobuf

尝试将 protobuf::PhysicalPlanNode 转换为 ExecutionPlan 实现,后者是 Datafusion 的结构体。
try_parse_physical_expr()
感觉这玩意应该是用于 Driver 和 Executor 之间的数据传递。

其它

spark-extention 实现中,会出现大量的 Shims.get.XXX() 相关调用,其实现方式介绍如下。在 Shims.scala 中有个相关的实现,该对象 lazy 会加载 ShimsImpl.scala 实现,在 spark-extension-shims 包中。

Byte Buddy

Byte Buddy 是一个字节码生成和操作库,可以在运行时创建和修改 Java 类,无需编译器支持。

Executor.launchTask()
 |-Executor.createTaskRunner() 生成 TaskRunner 对象
TaskRunner.run()
 |-TaskRunner.updateDependencies() 下载依赖的文件和 Jar 包
 |-deserialize() 反序列化任务
 |-Task.run() 开始执行,分为了 ShuffleMapTask ResultTask 两种
   |-TaskContext.runTaskWithListeners()
     |>>>Task.runTask()
     |===ResultTask.runTask()
     |===ShuffleMapTask.runTask()
       |-ShuffleWriteProcess.write()
       |-RDD.iterator()
         |-RDD.getOrCompute() 尝试从 Cache 中读取
         |-RDD.computeOrReadCheckpoint() 需要进行计算
           |-RDD.compute() 这里就开始调用各种 RDD 实现的 compute 实现了