Java学习者论坛

 找回密码
 立即注册

QQ登录

只需一步,快速开始

手机号码,快捷登录

恭喜Java学习者论坛(https://www.javaxxz.com)已经为数万Java学习者服务超过8年了!积累会员资料超过10000G+
成为本站VIP会员,下载本站10000G+会员资源,购买链接:点击进入购买VIP会员
JAVA高级面试进阶视频教程Java架构师系统进阶VIP课程

分布式高可用全栈开发微服务教程

Go语言视频零基础入门到精通

Java架构师3期(课件+源码)

Java开发全终端实战租房项目视频教程

SpringBoot2.X入门到高级使用教程

大数据培训第六期全套视频教程

深度学习(CNN RNN GAN)算法原理

Java亿级流量电商系统视频教程

互联网架构师视频教程

年薪50万Spark2.0从入门到精通

年薪50万!人工智能学习路线教程

年薪50万!大数据从入门到精通学习路线年薪50万!机器学习入门到精通视频教程
仿小米商城类app和小程序视频教程深度学习数据分析基础到实战最新黑马javaEE2.1就业课程从 0到JVM实战高手教程 MySQL入门到精通教程
查看: 862|回复: 0

[默认分类] Java,Python,Scala三种语言开发并部署Spark的WordCount程序

[复制链接]
  • TA的每日心情
    开心
    2021-12-13 21:45
  • 签到天数: 15 天

    [LV.4]偶尔看看III

    发表于 2018-3-20 13:59:16 | 显示全部楼层 |阅读模式
    # javapython,Scala三种语言开发并部署Spark的WordCount程序

    一、Java开发并部署Spark的wordcount

    Java实现WordCount程序:

    ```java
    package com.spark.wordcount;

    import java.util.Arrays;
    import java.util.Iterator;
    import java.util.Map;

    import org.apache.spark.SparkConf;
    import org.apache.spark.api.java.JavaPairRDD;
    import org.apache.spark.api.java.JavaRDD;
    import org.apache.spark.api.java.JavaSparkContext;
    import org.apache.spark.api.java.function.FlatMapFunction;
    import org.apache.spark.api.java.function.Function2;
    import org.apache.spark.api.java.function.PairFunction;
    import org.apache.spark.api.java.function.VoidFunction;

    import scala.Tuple2;


    public class WordCountApp {

        public static void main(String[] args) {
            /**
             * 1、创建SparkConf对象,设置Spark应用程序的配置信息
             */
            SparkConf conf =  new SparkConf();
            //设置spark应用程序的名称
            conf.setAppName(WordCountApp.class.getSimpleName());
            conf.setMaster("local");
            if(args.length>2) {
                    conf.setMaster(args[0]);
            }
            /**
             * 2、创建sparkContext对象--Java开发使用JavaSparkContext,scala开发使用SparkContext
             *    在saprk中SparkContext负责连接spark集群,创建RDD、累计量、广播量等
             *    Master参数是为了创建TaskSchedule(较低级的调度器,高层次的调度器为DAGSchedule),如下:
             *    如果setMaster("local")则创建LocalSchedule;
             *    如果setMaster("spark")则创建SparkDeploySchedulerBackend。在SparkDeploySchedulerBackend的start函数,会启动一个Client对象,连接到Spark集群。
             */
            JavaSparkContext sc = new JavaSparkContext(conf);

            /**
             * 3、sc中提供了textFile方法是SparkContext中定义的,如下:
             *      def textFile(path: String): JavaRDD[String] = sc.textFile(path)
             *    用来读取HDFS上的文本文件、集群中节点的本地文本文件或任何支持hadoop的文件系统上的文本文件,它的返回值是JavaRDD[String],是文本文件每一行
             */
            
            String filePath = "\\new_workspace\\SparkTest\\src\\com\\spark\\wordcount\\wordCount.txt";
            if(args.length>1) {
                    filePath = args[1];
            }
            
            JavaRDD lines = sc.textFile(filePath);
            System.out.println(conf);
            
            /**
             * 4、将行文本内容拆分为多个单词
             * lines调用flatMap这个transformation算子(参数类型是FlatMapFunction接口实现类)返回每一行的每个单词
             */
            JavaRDD words = lines.flatMap(new FlatMapFunction(){
                private static final long serialVersionUID = -3243665984299496473L;
                @Override
                public Iterator call(String line) throws Exception {
                    return Arrays.asList(line.split(" ")).iterator();
                }
                
            });
            
            /**
             * 5、将每个单词的初始数量都标记为1个
             * words调用mapToPair这个transformation算子(参数类型是PairFunction接口实现类,
             * PairFunction的三个参数是),返回一个新的RDD,即JavaPairRDD
             */
            JavaPairRDD pairs = words.mapToPair(new PairFunction() {
                private static final long serialVersionUID = -7879847028195817507L;
                @Override
                public Tuple2 call(String word) throws Exception {
                    return new Tuple2(word, 1);
                }
            });

            /**
             * 6、计算每个相同单词出现的次数
             * pairs调用reduceByKey这个transformation算子(参数是Function2接口实现类)
             * 对每个key的value进行reduce操作,返回一个JavaPairRDD,这个JavaPairRDD中的每一个Tuple的key是单词、value则是相同单词次数的和
             */
            JavaPairRDD wordCount = pairs.reduceByKey(new Function2() {
                private static final long serialVersionUID = -4171349401750495688L;
                @Override
                public Integer call(Integer v1, Integer v2) throws Exception {
                    return v1+v2;
                }
            });

            /**
             * 7、使用foreach这个action算子提交Spark应用程序
             * 在Spark中,每个应用程序都需要transformation算子计算,最终由action算子触发作业提交
             */
            wordCount.foreach(new VoidFunction>() {
                private static final long serialVersionUID = -5926812153234798612L;
                @Override
                public void call(Tuple2 wordCount) throws Exception {
                    System.out.println(wordCount._1+":"+wordCount._2);
                }
            });

            /**
             * 8、将计算结果文件输出到文件系统
             *  HDFS:使用新版API(org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;)
             *  wordCount.saveAsNewAPIHadoopFile("hdfs://ns1/spark/wordcount", Text.class, IntWritable.class, TextOutputFormat.class, new Configuration());
             *             使用旧版API(org.apache.hadoop.mapred.JobConf;org.apache.hadoop.mapred.OutputFormat;)
             *                 wordCount.saveAsHadoopFile("hdfs://ns1/spark/wordcount", Text.class, IntWritable.class, OutputFormat.class, new JobConf(new Configuration()));
             *             使用默认TextOutputFile写入到HDFS(注意写入HDFS权限,如无权限则执行:hdfs dfs -chmod -R 777 /spark)
             *                 wordCount.saveAsTextFile("hdfs://soy1:9000/spark/wordCount");
             */
            Map map = wordCount.collectAsMap();
            for(String key : map.keySet()) {
                    System.out.println(key+":"+map.get(key));
            }

            /**
             * 9、关闭SparkContext容器,结束本次作业
             */
            sc.close();

        }
    }
    ```

    运算结果:

    ```
    JSDLF:1
    HELLOWORLD:22
    SJF:1
    LDSDJEWUROWJ:1
    FDSLKFJS:1
    FDSK:1
    COUNT:2
    :1
    ```

    部署:

    ```shell
    spark-submit wordcount.jar local file:/data0/wordcount/wordcount.txt
    spark-submit wordcount.jar spark hdfs:/data/logs/wordcount/wordcount.txt
    ```

    二、使用Python开发Spark的wordcount

    Python实现wordcount程序

    ```python
    from operator import add
    from pyspark import SparkContext

    INPUT_FILE = "hdfs://dmp/data/logs/wordcount/wordcount.txt"
    MASTER = "spark://n1:7077"

    sc = SparkContext(MASTER,"WordCountApp")

    text_file = sc.textFile(INPUT_FILE)

    counts = text_file.flatMap(lambda line:line.split(" ")).map(lambda word:(word,1)).reduceByKey(lambda a,b:a+b);

    results = counts.collectAsMap();

    for key in results.iterkeys():
        print key + ":" + str(results[key])
    ```

    运行结果:

    ```
    COUNT:2
    :1
    FDSLKFJS:1
    FDSK:1
    JSDLF:1
    HELLOWORLD:22
    LDSDJEWUROWJ:1
    SJF:1
    ```

    部署:

    ```shell
    spark-submit wordcount.py
    ```

    三、使用scala开始Spark的wordcount

    scala实现wordcount程序

    ```scala
    import org.apache.spark.SparkConf
    import org.apache.spark.SparkContext

    object WordCountApp {
      
      def main(args: Array[String]) {
        val conf = new SparkConf();
        conf.setMaster("spark://n1:7077");
        conf.setAppName("WordCountApp");   
        val sc = new SparkContext(conf);
        val lines = sc.textFile("hdfs://dmp/data/logs/wordcount/wordcount.txt");
        lines.flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).collect.foreach(println);
        sc.stop();
      }
      
    }
    ```

    运行结果:

    ```scala
    (FDSLKFJS,1)
    (SJF,1)
    (,1)
    (JSDLF,1)
    (COUNT,2)
    (FDSK,1)
    (HELLOWORLD,22)
    (LDSDJEWUROWJ,1)
    ```

    部署:

    ```shell
    spark-submit --class com.spark.wordcount.WordCountApp wordcount.jar
    ```

    回复

    使用道具 举报

    您需要登录后才可以回帖 登录 | 立即注册

    本版积分规则

    QQ|手机版|Java学习者论坛 ( 声明:本站资料整理自互联网,用于Java学习者交流学习使用,对资料版权不负任何法律责任,若有侵权请及时联系客服屏蔽删除 )

    GMT+8, 2024-4-20 17:01 , Processed in 0.422262 second(s), 54 queries .

    Powered by Discuz! X3.4

    © 2001-2017 Comsenz Inc.

    快速回复 返回顶部 返回列表