Spark 使用介绍

2022-09-19 warehouse

开源的集群运算框架,与 Hadoop 在执行 MapReduce 需要落盘不同,Spark 的数据会尽量在内存中进行计算。

简介

安装部署

与 Spark 相关的包可以从官网或者 Apache 的归档仓库 archive.apache.org 中下载相关的包,注意其中的 hadoop 版本。 官方有点慢,可以从国内镜像 mirrors.aliyun.com 中下载。

这里以 Without Hadoop 的为例,如果只使用 Spark 那么建议使用 With Hadoop 版本,否则 SparkSQL 可能会无法使用,需要单独下载缺失的 JAR 包或者指定 Hadoop 环境才可以。

----- 增加如下全局环境变量
# vim /etc/profile
export SPARK_HOME=/opt/warehouse/spark
export PATH=$PATH:$SPARK_HOME/bin
----- 增加Spark环境变量,这里使用的是WithoutHadoop版本
# vim conf/spark-env.sh
export SPARK_DIST_CLASSPATH=$(/opt/module/hadoop/bin/hadoop classpath)
export SPARK_MASTER_HOST=localhost
export SPARK_MASTER_PORT=7077

单节点运行

有多种方式可以运行 Spark 环境,先介绍最简单的。

----- 本地运行,用于测试以及基本功能验证,如下会启动两个线程。
spark-sql --master local[2]
spark-shell --master local[2]

也可以本地启动。

----- 启动/停止服务,默认会本地启动一个Worker以及Master,可以通过jps命令查看
# sbin/start-all.sh
# sbin/stop-all.sh

----- 启动shell交互,可以显示指定,如下是默认值
spark-sql --master spark://127.0.0.1:7077
spark-shell --master spark://127.0.0.1:7077

----- 执行一个简单示例,计算Pi值,
# bin/run-example SparkPi
# bin/spark-submit examples/src/main/python/pi.py
# bin/spark-submit --master spark://127.0.0.1:7077 \
    --class org.apache.spark.examples.SparkPi      \
    examples/jars/spark-examples_2.12-3.4.1.jar 10
----- 也可以使用Yarn做调度
# spark-shell --master yarn --deploy-mode client

注意,在启动 Worker 节点时,会通过 ssh 远程执行 (即使是本地也一样),所以,需要确保 ssh 是可以连接 Worker 节点的,默认是 localhost 本地,可以通过 ssh localhost true 进行验证,主要不报错就说明正常。

另外,当执行 SQL 时,包括在 spark-sql 中或者通过 spark.sql(...) 执行,如果没有配置如下的 Hive 集群,默认会在当前目录下创建 metastore_db 目录保存元数据。

Hive 集成

也就是将 Hive 作为元数据管理,可以用来维护相关的数据组织信息,可以通过如下方式配置。

---- 复制Hive的配置文件以及元数据驱动JAR包,也可以启动时通过 --jars 参数指定
# cp hive/conf/hive-site.xml spark/conf/
# cp hive/lib/mysql-connector-java-8.0.28.jar spark/jars/

---- 然后通过如下方式执行
# spark-shell --master local[2]
scala> spark.sql("show databases").show()
scala> spark.sql("select * from user_info").show()
# spark-sql --master local[2]
spark-sql (default)> show databases;
spark-sql (default)> select * from user_info;

简介

val lines = sc.textFile("hdfs://...")   // HadoopRDD ==map==> MapPartitionsRDD
lines.flatMap(_.split(" "))   // MapPartitionsRDD
     .filter(_.length >= 2)   // MapPartitionsRDD
     .map((_, 1))             // MapPartitionsRDD
     .reduceByKey(_ + _)      // ShuffleRDD

任务拆分

Spark 是由一堆的转换算子组成,会划分为 Job Stage Task 来执行。

  • 根据算子是否为 Action 算子来划分 Job 。
  • 根据算子运算是否需要 Shuffle 来划分 Stage 。
  • 根据 RDD 分区数来划分 Task 。

例如如下的代码。

rdd = sc.parallelize([1, 2, 3], 3)         // 初始化三个分区RDD
rdd.distinct().collect()                   // 转换算子+Action算子
rdd.filter(lambda x:x % 2 != 0).collect()  // 转换算子+Action算子

上述包含了两个 Action 算子,也就是总计两个 Job:

  1. distinct 含 Shuffle 操作,算一个 Stage,而 Collect 也算,总计两个 Stage,而且 RDD 含三个分区,总计有 2x3=6 个 Task 。
  2. filter 不含 Shuffle 操作,不算一个 Stage,总计一个 Stage,总计有 1x3=3 个 Task 。

总计有 2 个 Job,3 个 Stage,9 个 Task 。

Shuffle

其中 Stage 会根据是否有 Shuffle 进行划分。

RDD

Resilient Distributed Datasets, RDD 是 Spark 的核心数据结构,所有数据计算操作均基于该结构实现,

这里最关键的就是 compute() 接口实现,定义了如何从特定分区中获取数据,用来实现具体的计算逻辑,是每个 Task 执行的起点。

常见问题

winutils.exe

全部的报错内容为 java.io.IOException: Could not locate executable null\bin\winutils.exe in the Hadoop binaries. ,主要是 Windows 环境上的依赖问题。

hadoop 本身是基于 Linux 环境开发,不能直接部署运行到 Windows 上,如果需要在本地运行类似 Spark 的任务,那么需要通过 winutils.exe hadoop.dll 来模拟,可以直接从 Github 下载,需要注意对应的版本。

然后在环境变量中设置 HADOOP_HOME ,并将 %HADOOP_HOME%\bin 添加到环境变量 PATH 中。

注意,使用 IDEA 时,直接修改系统的环境变量是无法立即生效的,需要在运行配置中的 Environment variables 内添加,然后在代码中就可以通过 System.out.println(System.getenv("HADOOP_HOME")); 确定是否生效。