MapReduce 案例:倒排索引

MapReduce 2017-01-05

实例描述

  通常情况下,倒排索引由一个单词(或词组)以及相关的文档列表组成,文档列表中的
文档或者是标识文档的 ID 号,或者是指文档所在位置的 URL。 如下所示:

单词                文档列表
单词1      文档1 ——> 文档4 ——> 文档13 ——> ...
单词2      文档3 ——> 文档5 ——> 文档15 ——> ...
单词3      文档1 ——> 文档8 ——> 文档20 ——> ...
......     ......

  从上图可以看出,单词 1 出现在{文档 1,文档 4,文档 13, ……}中,单词 2 出现在{文档 3,文档 5,文档 15, ……}中,而单词 3 出现在{文档 1,文档 8,文档 20, ……}中。在实际应用中,还需要给每个文档添加一个权值,用来指出每个文档与搜索内容的相关度,如下所示:

单词                    文档列表
单词1      文档1:权 ——> 文档4:权 ——> 文档13:权 ——> ...
单词2      文档3:权 ——> 文档5:权 ——> 文档15:权 ——> ...
单词3      文档1:权 ——> 文档8:权 ——> 文档20:权 ——> ...
......     ......

  最常用的是使用词频作为权重,即记录单词在文档中出现的次数。以英文为例,如下所示,索引文件中的“ MapReduce” 一行表示: “ MapReduce” 这个单词在文本 T0 中 出现过 1 次, T1 中出现过 1 次, T2 中出现过 2 次。当搜索条件为“ MapReduce” 、“ is” 、“ Simple” 时,对应的集合为: {T0, T1, T2}∩{T0, T1}∩{T0, T1}={T0, T1},即文档 T0 和 T1 包 含了所要索引的单词,而且只有 T0 是连续的。
被索引文件:

T0 = “MapReduce is simple”
T1 = “MapReduce is powerful is simple”
T2 = “Hello MapReduce bye MapReduce”

索引文件:

“MapReduce” :{(T0,1);(T1,1);(T2,2)} 
“is” :       {(T0,1);(T1,2)}
“simple” :   {(T0,1);(T1,1)}
“powerful” : {(T1,1)}
“Hello” :    {(T2,1)}
“bye” :      {(T2,1)}

倒排索引

  倒排索引是文档检索系统中最常用的数据结构,被广泛地应用于全文搜索引擎。 它主要是用来存储某个单词(或词组) 在一个文档或一组文档中的存储位置的映射,即提供了一种根据内容来查找文档的方式。由于不是根据文档来确定文档所包含的内容,而是进行相反的操作,因而称为倒排索引(Inverted Index)。

设计思路

  • Map过程

    • 首先使用默认的 TextInputFormat 类对输入文件进行处理,得到文本中每行的偏移量及其内容。显然, Map 过程首先必须分析输入的<key,value>对,得到倒排索引中需要的三个信息:单词、文档 URL 和词频,如下所示:

                                                         “MapReduce”    file1.txt 1
      <0, “MapReduce is simple”>              ——Map——>   “is”           file1.txt 1
                                                         “simple”       file1.txt 1
      
                                                         “MapReduce”    file2.txt 1
                                                         “is”           file2.txt 1
      <0, “MapReduce is powerful is simple”>  ——Map——>   “powerful”     file2.txt 1
                                                         “is”           file2.txt 1
                                                         “simple”       file2.txt 1
      
                                                         “Hello”        file3.txt 1
                                                         “MapReduce”    file3.txt 1
      <0, “Hello MapReduce bye MapReduce”>    ——Map——>   “bye”          file3.txt 1
                                                         “MapReduce”    file3.txt 1
    • 这里存在两个问题:第一, <key,value>对只能有两个值,在不使用 Hadoop 自定义数据类型的情况下,需要根据情况将其中两个值合并成一个值,作为 key 或 value 值;
    • 第二,通过一个 Reduce 过程无法同时完成词频统计和生成文档列表,所以必须增加一个 Combine 过程完成词频统计。
    • 这里将单词和 URL 组成 key 值(如“ MapReduce: file1.txt” ),将词频作为value,这样做的好处是可以利用 MapReduce 框架自带的 Map 端排序,将同一文档的相同单词的词频组成列表,传递给 Combine 过程,实现类似于 WordCount 的功能。
  • Combine 过程

    • 经过 map 方法处理后, Combine 过程将 key 值相同 value 值累加,得到一个单词在文档中的词频。 如果直接将图所示的输出作为 Reduce 过程的输入,在 Shuffle 过程时将面临一个问题:所有具有相同单词的记录(由单词、 URL 和词频组成)应该交由同一个Reducer 处理,但当前的 key 值无法保证这一点,所以必须修改 key 值和 value 值。这次将单词作为 key 值, URL 和词频组成 value 值(如“ file1.txt: 1” )。这样做的好处是可以利用 MapReduce 框架默认的 HashPartitioner 类完成 Shuffle 过程,将相同单词的所有记录发送给同一个 Reducer 进行处理。

      “MapReduce:file1.txt” list(1)                  “MapReduce:file1.txt”   1
      “is:file1.txt”        list(1)    ———Combine—>  “is:file1.txt”          1
      “simple:file1.txt”    list(1)                  “simple:file1.txt”      1
      
      “MapReduce:file2.txt” list(1)                  “MapReduce:file2.txt”   1
      “is:file2.txt”        list(1,1)  ———Combine—>  “is:file2.txt”          2
      “powerful:file2.txt”  list(1)                  “powerful:file2.txt”    1
      “simple:file2.txt”    list(1)                  “simple:file2.txt”      1
      
      “Hello:file3.txt”     list(1)                  “MapReduce:file3.txt”   1
      “MapReduce:file3.txt” list(1,1)  ———Combine—>  “MapReduce:file3.txt”   2
      “bye:file3.txt”       list(1)                  “simple:file3.txt”      1
  • Reduce 过程

    • 经过上述两个过程后, Reduce 过程只需将相同 key 值的 value 值组合成倒排索引文件所需的格式即可,剩下的事情就可以直接交给 MapReduce 框架进行处理了。

      “MapReduce”   “file1.txt:1”
      “is”          “file1.txt:1”
      “simple”      “file1.txt:1”
      
                                                 “MapReduce” “file1.txt:1;file2.txt:1;file3.txt:2”  
      “MapReduce”   “file2.txt:1”                “is”        “file1.txt:1;file2.txt:2”
      “is”          “file2.txt:2”                “simple”    “file1.txt:1;file2.txt:1”
      “powerful”    “file2.txt:1”  ——Reduce——>   “powerful”  “file2.txt:1”
      “simple”      “file2.txt:1”                “Hello”     “file3.txt:1”
                                                 “bye”       “file3.txt:1”
      
      “Hello”       “file3.txt:1”
      “MapReduce”   “file3.txt:2”
      “bye”         “file3.txt:1”

测试数据

file1.txt

MapReduce is simple

file2.txt

MapReduce is powerful is simple

file3.txt

Hello MapReduce bye MapReduce

程序代码

package cn.mn1024.mapreduce.dedup;

import java.io.IOException;

import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

/**
 * 功能:倒排索引
 * @author GuangLing_Lin
 */
@SuppressWarnings("all")
public class InvertedIndexRunner {

    private static class InvertedIndexMapper extends Mapper<LongWritable, Text, Text, Text> {
        // 存储单词和URL组合
        private static Text keyInfo = new Text();
        // 存储词频,初始化为1
        private static final Text valueInfo = new Text("1");

        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            String line = value.toString();
            // 得到字段数组
            String[] fields = StringUtils.split(line, " ");

            // 得到这行数据所在的文件切片
            FileSplit fileSplit = (FileSplit) context.getInputSplit();
            // 根据文件切片得到文件名
            String fileName = fileSplit.getPath().getName();

            for (String field : fields) {
                // key值有单词和URL组成,如“MapReduce:file1.txt”
                keyInfo.set(field + ":" + fileName);
                context.write(keyInfo, valueInfo);
            }

        }
    }

    private static class InvertedIndexCombiner extends Reducer<Text, Text, Text, Text> {
        private static Text info = new Text();
        // 输入: <MapReduce:file3 {1,1,...}>
        // 输出: <MapReduce file3:2>

        @Override
        protected void reduce(Text key, Iterable<Text> values, Context context)    throws IOException, InterruptedException {
            // 统计词频
            int sum = 0;
            for (Text value : values) {
                sum += Integer.parseInt(value.toString());
            }

            int splitIndex = key.toString().indexOf(":");
            // 重新设置 value 值由 URL 和词频组成
            info.set(key.toString().substring(splitIndex + 1) + ":" + sum);
            // 重新设置 key 值为单词
            key.set(key.toString().substring(0, splitIndex));

            context.write(key, info);
        }
    }

    private static class InvertedIndexReducer extends Reducer<Text, Text, Text, Text> {
        private static Text result = new Text();

        // 输入: <MapReduce file3:2>
        // 输出: <MapReduce file1:1;file2:1;file3:2;>
        @Override
        protected void reduce(Text key, Iterable<Text> values, Context context)
                throws IOException, InterruptedException {
            // 生成文档列表
            String fileList = new String();

            for (Text value : values) {
                fileList += value.toString() + ";";
            }

            result.set(fileList);
            key = new Text("“" + key.toString() + "”");
            result = new Text("“" + result.toString() + "”");
            context.write(key, result);
        }

    }

    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        // 创建本次mr程序的job实例
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf);

        // 指定本次job运行的主类
        job.setJarByClass(InvertedIndexRunner.class);

        // 指定本次job的具体Mapper、Combiner 、Reducer的实现类
        job.setMapperClass(InvertedIndexMapper.class);
        job.setCombinerClass(InvertedIndexCombiner.class);
        job.setReducerClass(InvertedIndexReducer.class);

        // 指定本次job reduce阶段输出数据类型 也就是整个mr任务的最终输出数据类型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);

        // 指定本次job待处理数据的目录和程序执行完毕结果存放的目录
        FileInputFormat.setInputPaths(job, new Path("F://invertedindex//input"));
        FileOutputFormat.setOutputPath(job, new Path("F://invertedindex//output"));

        // 提交本次job
        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }

}

测试结果

“Hello”        “file3.txt:1;”
“MapReduce”    “file3.txt:2;file1.txt:1;file2.txt:1;”
“bye”          “file3.txt:1;”
“is”           “file1.txt:1;file2.txt:2;”
“powerful”     “file2.txt:1;”
“simple”       “file2.txt:1;file1.txt:1;”

总结:

  Combiner 绝不能改变最终的计算结果。 Combiner 只应该用于那种 Reduce 的输入 key/value与输出 key/value 类型完全一致,且不影响最终结果的场景。比如累加,最大值等。 Combiner的使用一定得慎重,如果用好,它对 job 执行效率有帮助,反之会影响 reduce 的最终结果。


每一个成功的背后都有无数个无人知晓的黑夜。

因为

夜晚,是超越对手的最佳时机。

===================== 码农1024 =====================#蔺光岭#


本文由 蔺光岭 创作,采用 知识共享署名 4.0,可自由转载、引用,但需署名作者且注明文章出处。

还不快抢沙发

添加新评论