实例描述
通常情况下,倒排索引由一个单词(或词组)以及相关的文档列表组成,文档列表中的
文档或者是标识文档的 ID 号,或者是指文档所在位置的 URL。 如下所示:
单词 文档列表
单词1 文档1 ——> 文档4 ——> 文档13 ——> ...
单词2 文档3 ——> 文档5 ——> 文档15 ——> ...
单词3 文档1 ——> 文档8 ——> 文档20 ——> ...
...... ......
从上图可以看出,单词 1 出现在{文档 1,文档 4,文档 13, ……}中,单词 2 出现在{文档 3,文档 5,文档 15, ……}中,而单词 3 出现在{文档 1,文档 8,文档 20, ……}中。在实际应用中,还需要给每个文档添加一个权值,用来指出每个文档与搜索内容的相关度,如下所示:
单词 文档列表
单词1 文档1:权 ——> 文档4:权 ——> 文档13:权 ——> ...
单词2 文档3:权 ——> 文档5:权 ——> 文档15:权 ——> ...
单词3 文档1:权 ——> 文档8:权 ——> 文档20:权 ——> ...
...... ......
最常用的是使用词频作为权重,即记录单词在文档中出现的次数。以英文为例,如下所示,索引文件中的“ MapReduce” 一行表示: “ MapReduce” 这个单词在文本 T0 中 出现过 1 次, T1 中出现过 1 次, T2 中出现过 2 次。当搜索条件为“ MapReduce” 、“ is” 、“ Simple” 时,对应的集合为: {T0, T1, T2}∩{T0, T1}∩{T0, T1}={T0, T1},即文档 T0 和 T1 包 含了所要索引的单词,而且只有 T0 是连续的。
被索引文件:
T0 = “MapReduce is simple”
T1 = “MapReduce is powerful is simple”
T2 = “Hello MapReduce bye MapReduce”
索引文件:
“MapReduce” :{(T0,1);(T1,1);(T2,2)}
“is” : {(T0,1);(T1,2)}
“simple” : {(T0,1);(T1,1)}
“powerful” : {(T1,1)}
“Hello” : {(T2,1)}
“bye” : {(T2,1)}
倒排索引
倒排索引是文档检索系统中最常用的数据结构,被广泛地应用于全文搜索引擎。 它主要是用来存储某个单词(或词组) 在一个文档或一组文档中的存储位置的映射,即提供了一种根据内容来查找文档的方式。由于不是根据文档来确定文档所包含的内容,而是进行相反的操作,因而称为倒排索引(Inverted Index)。
设计思路
Map过程
首先使用默认的 TextInputFormat 类对输入文件进行处理,得到文本中每行的偏移量及其内容。显然, Map 过程首先必须分析输入的<key,value>对,得到倒排索引中需要的三个信息:单词、文档 URL 和词频,如下所示:
“MapReduce” file1.txt 1 <0, “MapReduce is simple”> ——Map——> “is” file1.txt 1 “simple” file1.txt 1 “MapReduce” file2.txt 1 “is” file2.txt 1 <0, “MapReduce is powerful is simple”> ——Map——> “powerful” file2.txt 1 “is” file2.txt 1 “simple” file2.txt 1 “Hello” file3.txt 1 “MapReduce” file3.txt 1 <0, “Hello MapReduce bye MapReduce”> ——Map——> “bye” file3.txt 1 “MapReduce” file3.txt 1
- 这里存在两个问题:第一, <key,value>对只能有两个值,在不使用 Hadoop 自定义数据类型的情况下,需要根据情况将其中两个值合并成一个值,作为 key 或 value 值;
- 第二,通过一个 Reduce 过程无法同时完成词频统计和生成文档列表,所以必须增加一个 Combine 过程完成词频统计。
- 这里将单词和 URL 组成 key 值(如“ MapReduce: file1.txt” ),将词频作为value,这样做的好处是可以利用 MapReduce 框架自带的 Map 端排序,将同一文档的相同单词的词频组成列表,传递给 Combine 过程,实现类似于 WordCount 的功能。
Combine 过程
经过 map 方法处理后, Combine 过程将 key 值相同 value 值累加,得到一个单词在文档中的词频。 如果直接将图所示的输出作为 Reduce 过程的输入,在 Shuffle 过程时将面临一个问题:所有具有相同单词的记录(由单词、 URL 和词频组成)应该交由同一个Reducer 处理,但当前的 key 值无法保证这一点,所以必须修改 key 值和 value 值。这次将单词作为 key 值, URL 和词频组成 value 值(如“ file1.txt: 1” )。这样做的好处是可以利用 MapReduce 框架默认的 HashPartitioner 类完成 Shuffle 过程,将相同单词的所有记录发送给同一个 Reducer 进行处理。
“MapReduce:file1.txt” list(1) “MapReduce:file1.txt” 1 “is:file1.txt” list(1) ———Combine—> “is:file1.txt” 1 “simple:file1.txt” list(1) “simple:file1.txt” 1 “MapReduce:file2.txt” list(1) “MapReduce:file2.txt” 1 “is:file2.txt” list(1,1) ———Combine—> “is:file2.txt” 2 “powerful:file2.txt” list(1) “powerful:file2.txt” 1 “simple:file2.txt” list(1) “simple:file2.txt” 1 “Hello:file3.txt” list(1) “MapReduce:file3.txt” 1 “MapReduce:file3.txt” list(1,1) ———Combine—> “MapReduce:file3.txt” 2 “bye:file3.txt” list(1) “simple:file3.txt” 1
Reduce 过程
经过上述两个过程后, Reduce 过程只需将相同 key 值的 value 值组合成倒排索引文件所需的格式即可,剩下的事情就可以直接交给 MapReduce 框架进行处理了。
“MapReduce” “file1.txt:1” “is” “file1.txt:1” “simple” “file1.txt:1” “MapReduce” “file1.txt:1;file2.txt:1;file3.txt:2” “MapReduce” “file2.txt:1” “is” “file1.txt:1;file2.txt:2” “is” “file2.txt:2” “simple” “file1.txt:1;file2.txt:1” “powerful” “file2.txt:1” ——Reduce——> “powerful” “file2.txt:1” “simple” “file2.txt:1” “Hello” “file3.txt:1” “bye” “file3.txt:1” “Hello” “file3.txt:1” “MapReduce” “file3.txt:2” “bye” “file3.txt:1”
测试数据
file1.txt
MapReduce is simple
file2.txt
MapReduce is powerful is simple
file3.txt
Hello MapReduce bye MapReduce
程序代码
package cn.mn1024.mapreduce.dedup;
import java.io.IOException;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
/**
* 功能:倒排索引
* @author GuangLing_Lin
*/
@SuppressWarnings("all")
public class InvertedIndexRunner {
private static class InvertedIndexMapper extends Mapper<LongWritable, Text, Text, Text> {
// 存储单词和URL组合
private static Text keyInfo = new Text();
// 存储词频,初始化为1
private static final Text valueInfo = new Text("1");
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString();
// 得到字段数组
String[] fields = StringUtils.split(line, " ");
// 得到这行数据所在的文件切片
FileSplit fileSplit = (FileSplit) context.getInputSplit();
// 根据文件切片得到文件名
String fileName = fileSplit.getPath().getName();
for (String field : fields) {
// key值有单词和URL组成,如“MapReduce:file1.txt”
keyInfo.set(field + ":" + fileName);
context.write(keyInfo, valueInfo);
}
}
}
private static class InvertedIndexCombiner extends Reducer<Text, Text, Text, Text> {
private static Text info = new Text();
// 输入: <MapReduce:file3 {1,1,...}>
// 输出: <MapReduce file3:2>
@Override
protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
// 统计词频
int sum = 0;
for (Text value : values) {
sum += Integer.parseInt(value.toString());
}
int splitIndex = key.toString().indexOf(":");
// 重新设置 value 值由 URL 和词频组成
info.set(key.toString().substring(splitIndex + 1) + ":" + sum);
// 重新设置 key 值为单词
key.set(key.toString().substring(0, splitIndex));
context.write(key, info);
}
}
private static class InvertedIndexReducer extends Reducer<Text, Text, Text, Text> {
private static Text result = new Text();
// 输入: <MapReduce file3:2>
// 输出: <MapReduce file1:1;file2:1;file3:2;>
@Override
protected void reduce(Text key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {
// 生成文档列表
String fileList = new String();
for (Text value : values) {
fileList += value.toString() + ";";
}
result.set(fileList);
key = new Text("“" + key.toString() + "”");
result = new Text("“" + result.toString() + "”");
context.write(key, result);
}
}
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
// 创建本次mr程序的job实例
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
// 指定本次job运行的主类
job.setJarByClass(InvertedIndexRunner.class);
// 指定本次job的具体Mapper、Combiner 、Reducer的实现类
job.setMapperClass(InvertedIndexMapper.class);
job.setCombinerClass(InvertedIndexCombiner.class);
job.setReducerClass(InvertedIndexReducer.class);
// 指定本次job reduce阶段输出数据类型 也就是整个mr任务的最终输出数据类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
// 指定本次job待处理数据的目录和程序执行完毕结果存放的目录
FileInputFormat.setInputPaths(job, new Path("F://invertedindex//input"));
FileOutputFormat.setOutputPath(job, new Path("F://invertedindex//output"));
// 提交本次job
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
测试结果
“Hello” “file3.txt:1;”
“MapReduce” “file3.txt:2;file1.txt:1;file2.txt:1;”
“bye” “file3.txt:1;”
“is” “file1.txt:1;file2.txt:2;”
“powerful” “file2.txt:1;”
“simple” “file2.txt:1;file1.txt:1;”
总结:
Combiner 绝不能改变最终的计算结果。 Combiner 只应该用于那种 Reduce 的输入 key/value与输出 key/value 类型完全一致,且不影响最终结果的场景。比如累加,最大值等。 Combiner的使用一定得慎重,如果用好,它对 job 执行效率有帮助,反之会影响 reduce 的最终结果。
每一个成功的背后都有无数个无人知晓的黑夜。
因为
夜晚,是超越对手的最佳时机。
===================== 码农1024 =====================#蔺光岭#
还不快抢沙发