Spark 示例代码

2023-09-20 warehouse

整理一些常用的 Spark 相关示例代码。

示例

如下是一个最简单的示例,将数组中的元素乘以 2 并累加。

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;

import java.util.List;
import java.util.Arrays;

public class WordCount {
    public static void main(String[] args) {
        SparkConf conf = new SparkConf()
                .setAppName("JavaWordCount")
                .setMaster("local"); // 本地运行无需安装任何Spark集群
        JavaSparkContext ctx = new JavaSparkContext(conf);
        ctx.setLogLevel("WARN");

        List<Integer> data = Arrays.asList(1, 2, 3);
        JavaRDD<Integer> rdd = ctx.parallelize(data);

        Integer result = rdd.map(new Function<Integer, Integer>() {
            @Override
            public Integer call(Integer v) {
                return v * 2;
            }
        }).reduce(new Function2<Integer, Integer, Integer>() {
            @Override
            public Integer call(Integer v1, Integer v2) {
                return v1 + v2;
            }
        });

        System.out.println(result);
    }
}

所有的 Spark 程序,都需要先创建一个上下文,初始化程序所需的核心组件,例如 DAGSchedulerTaskScheduler 等,在此过程中,会向集群申请资源并构建相应的运行环境。

同时,注意上述的配置,会通过 setMaster() 接口设置本地运行,也可以通过 -Dspark.master=local 的运行参数方式设置,其它运行方式后面介绍。

除了使用上述的数组,也可以使用 textFile() 接口从文件系统中读取指定的文件,然后返回一个 RDD 实例对象。

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.api.java.function.FlatMapFunction;
import scala.Tuple2;

import java.util.Arrays;
import java.util.Iterator;
import java.util.List;

public class WordCount {
    public static void main(String[] args) {
        SparkConf conf = new SparkConf()
                .setAppName("JavaWordCount")
                .setMaster("local"); // 本地运行无需安装任何Spark集群
        JavaSparkContext ctx = new JavaSparkContext(conf);

        JavaRDD<String> lines = ctx.textFile("D://foobar.txt", 1);
        JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
            @Override
            public Iterator<String> call(String s) throws Exception {
                return Arrays.asList(s.split(" ")).iterator();
            }
        });

        JavaPairRDD<String, Integer> ones = words.mapToPair(
                new PairFunction<String, String, Integer>() {
                    @Override
                    public Tuple2<String, Integer> call(String s) {
                        return new Tuple2<String, Integer>(s, 1);
                    }
                }).reduceByKey(
                new Function2<Integer, Integer, Integer>() {
                    @Override
                    public Integer call(Integer i1, Integer i2) {
                        return i1 + i2;
                    }
                });

        List<Tuple2<String, Integer>> output = ones.collect();
        for (Tuple2<?,?> tuple : output) {
            System.out.println(tuple._1() + ": " + tuple._2());
        }
        ctx.stop();
    }
}

如下是一个等价的 Scala 实现,要简单好多。

import org.apache.spark.{SparkConf, SparkContext}

object HelloWorld {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("SparkWordCount").setMaster("local")
    val ctx = new SparkContext(conf)

    ctx.textFile("D://foobar.txt")
      .flatMap(_.split(" "))
      .map((_, 1))
      .reduceByKey(_+_)
      .collect()
      .foreach(println)
    ctx.stop()
  }
}

Streaming

import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.apache.spark.SparkConf;

import java.util.Arrays;
import java.util.Iterator;

import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import scala.Tuple2;

public class WordCount {
    public static void main(String[] args) throws Exception{
        Logger.getLogger("org").setLevel(Level.ERROR);
        SparkConf conf = new SparkConf().setAppName("Spark Streaming Demo").setMaster("local[*]");
        JavaStreamingContext ctx = new JavaStreamingContext(conf, Durations.seconds(5));
        JavaReceiverInputDStream<String> lines = ctx.socketTextStream("100.94.12.165", 9090);

        JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
            public Iterator<String> call(String s) throws Exception {
                return Arrays.asList(s.split(" ")).iterator();
            }
        });
        words.foreachRDD(new VoidFunction<JavaRDD<String>>() {
            @Override
            public void call(JavaRDD<String> stringJavaRDD) throws Exception {
            }
        });

        JavaPairDStream<String, Integer> wordCounts = words.mapToPair(new PairFunction<String, String, Integer>() {
            public Tuple2<String, Integer> call(String s) throws Exception {
                return new Tuple2<String, Integer>(s, 1);
            }
        }).reduceByKey(new Function2<Integer, Integer, Integer>() {
            public Integer call(Integer integer, Integer integer2) throws Exception {
                return integer + integer2;
            }
        });
        wordCounts.print();
        ctx.start();
        ctx.awaitTermination();
    }
}
import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}

object HelloWorld {
  def main(args: Array[String]): Unit = {
    Logger.getLogger("org").setLevel(Level.ERROR)
    val conf = new SparkConf().setAppName("Spark Streaming Demo").setMaster("local[*]")
    val ctx = new StreamingContext(conf, Seconds(5))

    ctx.socketTextStream("100.94.12.165", 9090)
      .flatMap(_.split(" "))
      .map((_, 1))
      .reduceByKey(_+_)
      .print()
    ctx.start()
    ctx.awaitTermination()
  }
}