Spark案例:Java语言编程单词计数

SPARK 2017-01-14

实例描述

  通过java编写spark的单词计数程序。

测试数据

1.txt

hello world
hello hadoop
hadoop hive

pom.xml文件

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>cn.mn1024.spark</groupId>
    <artifactId>spark_demo_wordcount</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
        <encoding>UTF-8</encoding>
        <scala.version>2.11.8</scala.version>
        <scala.compat.version>2.11</scala.compat.version>
        <hadoop.version>2.7.4</hadoop.version>
        <spark.version>2.0.2</spark.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>${scala.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.11</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.11</artifactId>
            <version>${spark.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>${hadoop.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_2.11</artifactId>
            <version>${spark.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming-flume_2.10</artifactId>
            <version>${spark.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming-kafka_2.11</artifactId>
            <version>1.6.2</version>
        </dependency>

        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>5.1.38</version>
        </dependency>
    </dependencies>
    <build>
        <sourceDirectory>src/main/scala</sourceDirectory>
        <testSourceDirectory>src/test/scala</testSourceDirectory>
        <plugins>
            <plugin>
                <groupId>net.alchim31.maven</groupId>
                <artifactId>scala-maven-plugin</artifactId>
                <version>3.2.2</version>
                <executions>
                    <execution>
                        <goals>
                            <goal>compile</goal>
                            <goal>testCompile</goal>
                        </goals>
                        <configuration>
                            <args>
                                <arg>-dependencyfile</arg>
                                <arg>${project.build.directory}/.scala_dependencies</arg>
                            </args>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-surefire-plugin</artifactId>
                <version>2.18.1</version>
                <configuration>
                    <useFile>false</useFile>
                    <disableXmlReport>true</disableXmlReport>
                    <includes>
                        <include>**/*Test.*</include>
                        <include>**/*Suite.*</include>
                    </includes>
                </configuration>
            </plugin>

            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>2.3</version>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <filters>
                                <filter>
                                    <artifact>*:*</artifact>
                                    <excludes>
                                        <exclude>META-INF/*.SF</exclude>
                                        <exclude>META-INF/*.DSA</exclude>
                                        <exclude>META-INF/*.RSA</exclude>
                                    </excludes>
                                </filter>
                            </filters>
                            <transformers>
                                <transformer
                                        implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                    <mainClass>cn.mn1024.spark.WordCount</mainClass>
                                </transformer>
                            </transformers>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>

程序代码

package cn.mn1024.spark;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import scala.Tuple2;

import java.util.Arrays;
import java.util.Iterator;
import java.util.List;

/**
 * 功能:通过java编写spark的单词计数程序
 * @author GuangLing_Lin
 */
public class WordCount_Java {
    public static void main(String[] args) {
        //todo:1、创建SparkConf对象,设置appName和master地址
        SparkConf sparkConf = new SparkConf().setAppName("WordCount_Java").setMaster("local[2]");

        //todo:2、创建javaSparkContext对象
        JavaSparkContext jsc = new JavaSparkContext(sparkConf);

        //todo:3、读取数据文件
        JavaRDD<String> dataJavaRDD = jsc.textFile("F://wordcount//input//1.txt");

        //todo:4、对每一行进行切分压平
        JavaRDD<String> wordsJavaRDD = dataJavaRDD.flatMap(new FlatMapFunction<String, String>() {
            @Override
            // line表示每一行记录
            public Iterator<String> call(String line) throws Exception {
                // 切分每一行
                String[] words = line.split(" ");

                return Arrays.asList(words).iterator();
            }
        });

        //todo:5、每个单词记为1
        JavaPairRDD<String, Integer> wordAndOneJavaPairRDD = wordsJavaRDD.mapToPair(new PairFunction<String, String, Integer>() {
            @Override
            public Tuple2<String, Integer> call(String word) throws Exception {
                return new Tuple2<>(word, 1);
            }
        });

        //todo:6、把相同单词出现的次数累加  (_+_)
        JavaPairRDD<String, Integer> resultJavaPairRDD = wordAndOneJavaPairRDD.reduceByKey(new Function2<Integer, Integer, Integer>() {
            @Override
            public Integer call(Integer v1, Integer v2) throws Exception {
                return v1 + v2;
            }
        });

        //按照单词出现的次数降序排序
        //需要将(单词,次数)进行位置颠倒 (次数,单词)
        JavaPairRDD<Integer, String> sortJavaPairRDD = resultJavaPairRDD.mapToPair(new PairFunction<Tuple2<String, Integer>, Integer, String>() {
            @Override
            public Tuple2<Integer, String> call(Tuple2<String, Integer> t) throws Exception {
                return new Tuple2<>(t._2, t._1);
            }
        }).sortByKey(false);

        //将(次数,单词)变为(单词,次数)
        JavaPairRDD<String, Integer> finalSortJavaPairRDD = sortJavaPairRDD.mapToPair(new PairFunction<Tuple2<Integer, String>, String, Integer>() {
            @Override
            public Tuple2<String, Integer> call(Tuple2<Integer, String> t) throws Exception {
                return new Tuple2<>(t._2, t._1);
            }
        });

        //todo:7、收集打印
        List<Tuple2<String, Integer>> finalResult = finalSortJavaPairRDD.collect();

        for (Tuple2<String, Integer> t : finalResult) {
            System.out.println(t);
        }

        jsc.stop();
    }
}

测试结果

(hello,2)
(hadoop,2)
(hive,1)
(world,1)

Debug

  • Error:scalac:bad option:'-make:transitive'
  • 解决方法:scala版本问题,scala2.11不支持make参数,将pom.xml中的这个参数去掉即可解决

    <configuration>
    <args>
       <!--arg>-make:transitive</arg-->
       <arg>-dependencyfile</arg>
       <arg>${project.build.directory}/.scala_dependencies</arg>
    </args>
    </configuration>
  • 解决方法:jar包冲突,将Maven仓库中的Log4j删掉,即可解决。

    Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/log4j/Level
    at org.apache.spark.internal.Logging$class.initializeLogging(Logging.scala:111)
    at org.apache.spark.internal.Logging$class.initializeLogIfNecessary(Logging.scala:102)
    at org.apache.spark.SparkContext.initializeLogIfNecessary(SparkContext.scala:74)
    at org.apache.spark.internal.Logging$class.log(Logging.scala:46)
    at org.apache.spark.SparkContext.log(SparkContext.scala:74)
    at org.apache.spark.internal.Logging$class.logInfo(Logging.scala:54)
    at org.apache.spark.SparkContext.logInfo(SparkContext.scala:74)
    at org.apache.spark.SparkContext.<init>(SparkContext.scala:185)
    at cn.mn1024.spark.WordCount$.main(WordCount.scala:14)
    at cn.mn1024.spark.WordCount.main(WordCount.scala)
    Caused by: java.lang.ClassNotFoundException: org.apache.log4j.Level
    at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
    at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:335)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
    ... 10 more

每一个成功的背后都有无数个无人知晓的黑夜。

因为

夜晚,是超越对手的最佳时机。

===================== 码农1024 =====================#蔺光岭#


本文由 蔺光岭 创作,采用 知识共享署名 4.0,可自由转载、引用,但需署名作者且注明文章出处。

还不快抢沙发

添加新评论