整理一些常用的 Spark 相关示例代码。
示例
如下是一个最简单的示例,将数组中的元素乘以 2 并累加。
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import java.util.List;
import java.util.Arrays;
public class WordCount {
public static void main(String[] args) {
SparkConf conf = new SparkConf()
.setAppName("JavaWordCount")
.setMaster("local"); // 本地运行无需安装任何Spark集群
JavaSparkContext ctx = new JavaSparkContext(conf);
ctx.setLogLevel("WARN");
List<Integer> data = Arrays.asList(1, 2, 3);
JavaRDD<Integer> rdd = ctx.parallelize(data);
Integer result = rdd.map(new Function<Integer, Integer>() {
@Override
public Integer call(Integer v) {
return v * 2;
}
}).reduce(new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer v1, Integer v2) {
return v1 + v2;
}
});
System.out.println(result);
}
}
所有的 Spark 程序,都需要先创建一个上下文,初始化程序所需的核心组件,例如 DAGScheduler
、TaskScheduler
等,在此过程中,会向集群申请资源并构建相应的运行环境。
同时,注意上述的配置,会通过 setMaster()
接口设置本地运行,也可以通过 -Dspark.master=local
的运行参数方式设置,其它运行方式后面介绍。
除了使用上述的数组,也可以使用 textFile()
接口从文件系统中读取指定的文件,然后返回一个 RDD 实例对象。
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.api.java.function.FlatMapFunction;
import scala.Tuple2;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
public class WordCount {
public static void main(String[] args) {
SparkConf conf = new SparkConf()
.setAppName("JavaWordCount")
.setMaster("local"); // 本地运行无需安装任何Spark集群
JavaSparkContext ctx = new JavaSparkContext(conf);
JavaRDD<String> lines = ctx.textFile("D://foobar.txt", 1);
JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
@Override
public Iterator<String> call(String s) throws Exception {
return Arrays.asList(s.split(" ")).iterator();
}
});
JavaPairRDD<String, Integer> ones = words.mapToPair(
new PairFunction<String, String, Integer>() {
@Override
public Tuple2<String, Integer> call(String s) {
return new Tuple2<String, Integer>(s, 1);
}
}).reduceByKey(
new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer i1, Integer i2) {
return i1 + i2;
}
});
List<Tuple2<String, Integer>> output = ones.collect();
for (Tuple2<?,?> tuple : output) {
System.out.println(tuple._1() + ": " + tuple._2());
}
ctx.stop();
}
}
如下是一个等价的 Scala 实现,要简单好多。
import org.apache.spark.{SparkConf, SparkContext}
object HelloWorld {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("SparkWordCount").setMaster("local")
val ctx = new SparkContext(conf)
ctx.textFile("D://foobar.txt")
.flatMap(_.split(" "))
.map((_, 1))
.reduceByKey(_+_)
.collect()
.foreach(println)
ctx.stop()
}
}
Streaming
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.apache.spark.SparkConf;
import java.util.Arrays;
import java.util.Iterator;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import scala.Tuple2;
public class WordCount {
public static void main(String[] args) throws Exception{
Logger.getLogger("org").setLevel(Level.ERROR);
SparkConf conf = new SparkConf().setAppName("Spark Streaming Demo").setMaster("local[*]");
JavaStreamingContext ctx = new JavaStreamingContext(conf, Durations.seconds(5));
JavaReceiverInputDStream<String> lines = ctx.socketTextStream("100.94.12.165", 9090);
JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
public Iterator<String> call(String s) throws Exception {
return Arrays.asList(s.split(" ")).iterator();
}
});
words.foreachRDD(new VoidFunction<JavaRDD<String>>() {
@Override
public void call(JavaRDD<String> stringJavaRDD) throws Exception {
}
});
JavaPairDStream<String, Integer> wordCounts = words.mapToPair(new PairFunction<String, String, Integer>() {
public Tuple2<String, Integer> call(String s) throws Exception {
return new Tuple2<String, Integer>(s, 1);
}
}).reduceByKey(new Function2<Integer, Integer, Integer>() {
public Integer call(Integer integer, Integer integer2) throws Exception {
return integer + integer2;
}
});
wordCounts.print();
ctx.start();
ctx.awaitTermination();
}
}
import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
object HelloWorld {
def main(args: Array[String]): Unit = {
Logger.getLogger("org").setLevel(Level.ERROR)
val conf = new SparkConf().setAppName("Spark Streaming Demo").setMaster("local[*]")
val ctx = new StreamingContext(conf, Seconds(5))
ctx.socketTextStream("100.94.12.165", 9090)
.flatMap(_.split(" "))
.map((_, 1))
.reduceByKey(_+_)
.print()
ctx.start()
ctx.awaitTermination()
}
}