开源的集群运算框架,与 Hadoop 在执行 MapReduce 需要落盘不同,Spark 的数据会尽量在内存中进行计算。
简介
安装部署
与 Spark 相关的包可以从官网或者 Apache 的归档仓库 archive.apache.org 中下载相关的包,注意其中的 hadoop 版本。 官方有点慢,可以从国内镜像 mirrors.aliyun.com 中下载。
这里以 Without Hadoop 的为例,如果只使用 Spark 那么建议使用 With Hadoop 版本,否则 SparkSQL 可能会无法使用,需要单独下载缺失的 JAR 包或者指定 Hadoop 环境才可以。
----- 增加如下全局环境变量
# vim /etc/profile
export SPARK_HOME=/opt/warehouse/spark
export PATH=$PATH:$SPARK_HOME/bin
----- 增加Spark环境变量,这里使用的是WithoutHadoop版本
# vim conf/spark-env.sh
export SPARK_DIST_CLASSPATH=$(/opt/module/hadoop/bin/hadoop classpath)
export SPARK_MASTER_HOST=localhost
export SPARK_MASTER_PORT=7077
单节点运行
有多种方式可以运行 Spark 环境,先介绍最简单的。
----- 本地运行,用于测试以及基本功能验证,如下会启动两个线程。
spark-sql --master local[2]
spark-shell --master local[2]
也可以本地启动。
----- 启动/停止服务,默认会本地启动一个Worker以及Master,可以通过jps命令查看
# sbin/start-all.sh
# sbin/stop-all.sh
----- 启动shell交互,可以显示指定,如下是默认值
spark-sql --master spark://127.0.0.1:7077
spark-shell --master spark://127.0.0.1:7077
----- 执行一个简单示例,计算Pi值,
# bin/run-example SparkPi
# bin/spark-submit examples/src/main/python/pi.py
# bin/spark-submit --master spark://127.0.0.1:7077 \
--class org.apache.spark.examples.SparkPi \
examples/jars/spark-examples_2.12-3.4.1.jar 10
----- 也可以使用Yarn做调度
# spark-shell --master yarn --deploy-mode client
注意,在启动 Worker 节点时,会通过 ssh
远程执行 (即使是本地也一样),所以,需要确保 ssh
是可以连接 Worker 节点的,默认是 localhost
本地,可以通过 ssh localhost true
进行验证,主要不报错就说明正常。
另外,当执行 SQL 时,包括在 spark-sql
中或者通过 spark.sql(...)
执行,如果没有配置如下的 Hive 集群,默认会在当前目录下创建 metastore_db
目录保存元数据。
Hive 集成
也就是将 Hive 作为元数据管理,可以用来维护相关的数据组织信息,可以通过如下方式配置。
---- 复制Hive的配置文件以及元数据驱动JAR包,也可以启动时通过 --jars 参数指定
# cp hive/conf/hive-site.xml spark/conf/
# cp hive/lib/mysql-connector-java-8.0.28.jar spark/jars/
---- 然后通过如下方式执行
# spark-shell --master local[2]
scala> spark.sql("show databases").show()
scala> spark.sql("select * from user_info").show()
# spark-sql --master local[2]
spark-sql (default)> show databases;
spark-sql (default)> select * from user_info;
简介
val lines = sc.textFile("hdfs://...") // HadoopRDD ==map==> MapPartitionsRDD
lines.flatMap(_.split(" ")) // MapPartitionsRDD
.filter(_.length >= 2) // MapPartitionsRDD
.map((_, 1)) // MapPartitionsRDD
.reduceByKey(_ + _) // ShuffleRDD
任务拆分
Spark 是由一堆的转换算子组成,会划分为 Job Stage Task 来执行。
- 根据算子是否为 Action 算子来划分 Job 。
- 根据算子运算是否需要 Shuffle 来划分 Stage 。
- 根据 RDD 分区数来划分 Task 。
例如如下的代码。
rdd = sc.parallelize([1, 2, 3], 3) // 初始化三个分区RDD
rdd.distinct().collect() // 转换算子+Action算子
rdd.filter(lambda x:x % 2 != 0).collect() // 转换算子+Action算子
上述包含了两个 Action 算子,也就是总计两个 Job:
distinct
含 Shuffle 操作,算一个 Stage,而 Collect 也算,总计两个 Stage,而且 RDD 含三个分区,总计有2x3=6
个 Task 。filter
不含 Shuffle 操作,不算一个 Stage,总计一个 Stage,总计有1x3=3
个 Task 。
总计有 2 个 Job,3 个 Stage,9 个 Task 。
Shuffle
其中 Stage 会根据是否有 Shuffle 进行划分。
RDD
Resilient Distributed Datasets, RDD 是 Spark 的核心数据结构,所有数据计算操作均基于该结构实现,
这里最关键的就是 compute()
接口实现,定义了如何从特定分区中获取数据,用来实现具体的计算逻辑,是每个 Task 执行的起点。
常见问题
winutils.exe
全部的报错内容为 java.io.IOException: Could not locate executable null\bin\winutils.exe in the Hadoop binaries.
,主要是 Windows 环境上的依赖问题。
hadoop 本身是基于 Linux 环境开发,不能直接部署运行到 Windows 上,如果需要在本地运行类似 Spark 的任务,那么需要通过 winutils.exe
hadoop.dll
来模拟,可以直接从 Github 下载,需要注意对应的版本。
然后在环境变量中设置 HADOOP_HOME
,并将 %HADOOP_HOME%\bin
添加到环境变量 PATH
中。
注意,使用 IDEA 时,直接修改系统的环境变量是无法立即生效的,需要在运行配置中的 Environment variables
内添加,然后在代码中就可以通过 System.out.println(System.getenv("HADOOP_HOME"));
确定是否生效。