前言
在这篇博客中,我们将学习如何使用 java 实现 hadoop 的 mapreduce 框架,并通过一个词频统计(wordcount)的例子,来了解 mapreduce 的工作原理。mapreduce 是一种流行的大规模数据处理模式,广泛应用于分布式计算环境中。
一、正文
1. 代码结构
我们将在以下三个文件中实现 mapreduce 的核心功能:
- map.java: 实现
mapper
类,负责将输入的文本数据按单词进行拆分。 - reduce.java: 实现
reducer
类,负责对单词的出现次数进行汇总。 - wordcount.java: 设置作业(job)配置,管理 map 和 reduce 的运行。
接下来我们将逐一分析这些代码。
2. map.java——mapper 实现
首先看下 mapper 类的代码实现:
package demo1; import org.apache.hadoop.io.intwritable; import org.apache.hadoop.io.longwritable; import org.apache.hadoop.io.text; import org.apache.hadoop.mapreduce.mapper; import java.io.ioexception; import java.util.stringtokenizer; //public class mapper<keyin, valuein, keyout, valueout> public class map extends mapper<longwritable, text, text, intwritable> { private final static intwritable one = new intwritable(1); // 计数器 private text word = new text(); // 存储当前处理的单词 @override public void map(longwritable key, text value, context context) throws ioexception, interruptedexception { // 将每行的文本数据分割成单词,可使用split()实现相同功能 stringtokenizer tokenizer = new stringtokenizer(value.tostring()); while (tokenizer.hasmoretokens()) { word.set(tokenizer.nexttoken()); // 获取下一个单词 context.write(word, one); // 输出单词及其计数1 } } }
功能解读:
mapper 的作用:
mapper
类的任务是将输入的数据按行读取,并对每一行的内容进行处理。对于这个例子来说,我们的任务是将一行文本拆分成单词,并为每个单词标记它的初始计数值为1
。重要方法与变量:
longwritable key
:表示输入数据的偏移量,即每行文本在文件中的位置。text value
:表示读取的一行文本。context.write(word, one)
:将拆分出的单词作为键(text
),值为1
(intwritable
),输出到框架中供下一阶段使用。
注意事项:
stringtokenizer
用于分割每行文本,将其分割成单词。
context.write(word, one)
将结果输出到 reducer
处理时会被聚合。每遇到一个相同的单词,后面会将其所有的 1
聚合成总和。
mapper类的泛型定义
典型的 mapper
类定义如下:
public class mapper<keyin, valuein, keyout, valueout>
这表示 mapper
是一个泛型类,带有四个类型参数。每个参数对应 mapper 任务中的不同数据类型。让我们逐个解释这些泛型参数的含义:
keyin (输入键的类型):
- 这是输入数据的键的类型。在 mapreduce 程序中,输入数据通常来自文件或其他形式的数据源,
keyin
是表示该输入数据片段的键。 - 通常是文件中的偏移量(如文件的字节位置),所以经常使用 hadoop 提供的
longwritable
来表示这个偏移量。
常见类型:
longwritable
,表示输入文件中的行号或偏移量。- 这是输入数据的键的类型。在 mapreduce 程序中,输入数据通常来自文件或其他形式的数据源,
valuein (输入值的类型):
- 这是输入数据的值的类型。
valuein
是传递给mapper
的实际数据,通常是一行文本。 - 通常是文件的内容,比如一行文本,所以常用
text
来表示。
常见类型:
text
,表示输入文件中的一行文本。- 这是输入数据的值的类型。
keyout (输出键的类型):
- 这是
mapper
处理后的输出数据的键的类型。mapper 的输出通常是某种键值对,keyout
表示输出键的类型。 - 比如,在单词计数程序中,输出的键通常是一个单词,所以常用
text
。
常见类型:
text
,表示处理后的单词(在单词计数程序中)。- 这是
valueout (输出值的类型):
- 这是
mapper
处理后的输出值的类型。valueout
是 mapper 输出键对应的值的类型。 - 在单词计数程序中,输出的值通常是一个数字,用于表示单词的出现次数,所以常用
intwritable
。
常见类型:
intwritable
,表示单词计数时的次数(1
)。- 这是
3. reduce.java——reducer 实现
接下来我们实现 reducer:
package demo1; import org.apache.hadoop.io.intwritable; import org.apache.hadoop.mapreduce.reducer; import org.apache.hadoop.io.text; import java.io.ioexception; public class reduce extends reducer<text, intwritable, text, intwritable> { private intwritable result = new intwritable(); @override public void reduce(text key, iterable<intwritable> values, context context) throws ioexception, interruptedexception { int sum = 0; for (intwritable val : values) { sum += val.get(); // 累加单词出现的次数 } result.set(sum); // 设置聚合后的结果 context.write(key, result); // 输出单词及其总次数 } }
功能解读:
reducer 的作用:
reducer
类用于将mapper
输出的单词和它们的计数进行汇总。它会聚合每个单词的所有1
,得到该单词在整个输入中的总计数。重要方法与变量:
text key
:表示单词。iterable<intwritable> values
:表示所有与该单词相关联的计数(1的集合)。sum
:用于累加该单词出现的次数。context.write(key, result)
:输出单词及其出现的总次数。
注意事项:
for (intwritable val : values)
遍历所有的计数值,并累加得到单词的总次数。
结果会输出为 <单词, 出现次数>
,存储到最终的输出文件中。
reducer类的泛型定义
public class reducer<keyin, valuein, keyout, valueout>
reducer
类带有四个泛型参数,每个参数对应 reducer
任务中的不同数据类型。
keyin
(输入键的类型):keyin
是 reducer 接收的键的类型,它是由mapper
的输出键传递过来的类型。- 例如,在单词计数程序中,
mapper
输出的键是一个单词,所以keyin
通常是text
类型。
常见类型:
text
,表示单词(在单词计数程序中)。valuein
(输入值的类型):valuein
是 reducer 接收的值的类型,它是mapper
输出值的类型的集合。对于每个keyin
,reducer
会接收一个与该键相关的值列表。- 例如,在单词计数程序中,
mapper
输出的值是每个单词出现的次数(通常是intwritable
值为 1),所以valuein
的类型通常是intwritable
。
常见类型:
intwritable
,表示单词出现的次数。keyout
(输出键的类型):keyout
是reducer
输出的键的类型。- 在单词计数程序中,
reducer
输出的键还是单词,所以keyout
通常也是text
类型。
常见类型:
text
,表示单词。valueout
(输出值的类型):valueout
是reducer
输出的值的类型。这个值是 reducer 处理后的结果。- 在单词计数程序中,
reducer
输出的值是每个单词出现的总次数,所以valueout
通常是intwritable
。
常见类型:
intwritable
,表示单词的总次数。
4. wordcount.java——作业配置与执行
最后,我们编写主程序,用于配置和启动 mapreduce 作业:
package demo1; import org.apache.hadoop.conf.configuration; import org.apache.hadoop.fs.path; import org.apache.hadoop.io.intwritable; import org.apache.hadoop.io.text; import org.apache.hadoop.mapreduce.job; import org.apache.hadoop.mapreduce.lib.input.fileinputformat; import org.apache.hadoop.mapreduce.lib.output.fileoutputformat; import org.apache.hadoop.util.genericoptionsparser; import java.io.ioexception; public class wordcount { public static void main(string[] args) throws ioexception, classnotfoundexception, interruptedexception { configuration conf = new configuration(); // 配置项 job job = job.getinstance(conf, "word count"); // 创建一个新作业 string[] otherargs = new genericoptionsparser(conf, args).getremainingargs(); if (otherargs.length != 2) { system.err.println("usage: wordcount <in> <out>"); system.exit(1); // 输入输出路径检查 } job.setjarbyclass(wordcount.class); // 设置主类 job.setmapperclass(map.class); // 设置mapper类 job.setreducerclass(reduce.class); // 设置reducer类 // 设置map和reduce输出的键值对类型 job.setoutputkeyclass(text.class); job.setoutputvalueclass(intwritable.class); // 输入输出路径 fileinputformat.addinputpath(job, new path(otherargs[0])); // 输入路径 fileoutputformat.setoutputpath(job, new path(otherargs[1])); // 输出路径 // 提交作业,直到作业完成 system.exit(job.waitforcompletion(true) ? 0 : 1); } }
功能解读:
- 配置与作业初始化:
configuration conf = new configuration()
:创建 hadoop 的配置对象,存储作业的相关配置。job job = job.getinstance(conf, "word count")
:创建 mapreduce 作业,并为作业命名为"word count"
。
- 作业设置:
job.setjarbyclass(wordcount.class)
:设置运行时的主类。job.setmapperclass(map.class)
和job.setreducerclass(reduce.class)
:分别设置mapper
和reducer
类。
- 输入输出路径:
fileinputformat.addinputpath()
:指定输入数据的路径。fileoutputformat.setoutputpath()
:指定输出结果的路径。
- 作业提交与运行:
system.exit(job.waitforcompletion(true) ? 0 : 1)
:提交作业并等待完成,如果成功返回 0,失败返回 1。
注意事项:
genericoptionsparser
用于解析命令行输入,获取输入和输出路径。
提交作业后,hadoop 框架会根据配置自动运行 mapper 和 reducer,并将结果输出到指定的路径。
二、知识回顾与补充
数据的偏移量?
数据的偏移量,即 longwritable key
,是 mapreduce 程序中 mapper
输入的键,它表示输入数据文件中每行文本的起始字节位置。
例:假设我们有一个文本文件:
0: hello world 12: hadoop mapreduce 32: data processing
每行的前面的数字(0, 12, 32)就是对应行在文件中的偏移量,表示从文件开头到该行起始字节的距离。longwritable
类型的 key
就表示这个偏移量。
在 mapper
中,输入是以 <偏移量, 文本行>
这样的键值对形式提供的。虽然偏移量在词频统计任务中不重要,但在某些应用中,如文件处理、日志解析时,偏移量可以帮助追踪数据的位置。
context
context
是 mapreduce 框架中 mapper
和 reducer
中非常重要的一个类,它提供了与框架进行交互的方法。
context 的主要作用:
写入输出:在
mapper
和reducer
中,context.write(key, value)
用于将结果输出到框架。框架会自动处理这些输出结果,并将mapper
的输出作为reducer
的输入,或者将最终的reducer
输出保存到 hdfs。- 在
mapper
中,context.write(word, one)
将每个单词及其初始计数1
传递给框架。 - 在
reducer
中,context.write(key, result)
将每个单词及其总出现次数输出到最终结果。
- 在
配置访问:
context
可以访问作业的配置参数(如configuration
),帮助程序获取环境变量或作业参数。计数器:
context
提供计数器(counter)的支持,用于统计作业中的某些事件(如错误次数、特定条件的满足次数等)。记录状态:
context
可以报告作业的执行状态,帮助开发者追踪作业的进度或调试
iterable<intwritable> values是什么类型?
在 reducer
阶段,iterable<intwritable> values
表示与同一个键(即单词)相关联的所有 intwritable
值的集合。
类型解读:
iterable
表示一个可以迭代的集合,意味着它可以被遍历。intwritable
是 hadoop 定义的一个包装类,用于封装int
类型的值。
在词频统计的例子中:
对于每个单词,mapper
会输出多个<单词, 1>
,因此在reducer
中,对于每个键(即单词),会有多个1
作为值的集合,即values
。reducer
的任务就是对这些1
进行累加,计算单词的总出现次数。
其他遍历方法:
除了 for (intwritable val : values)
这种增强型 for 循环,我们还可以使用 iterable遍历
iterator<intwritable> iterator = values.iterator(); while (iterator.hasnext()) { intwritable val = iterator.next(); sum += val.get(); // 处理每个值 }
iterator
提供 hasnext()
方法检查是否有更多元素,next()
方法返回当前元素并指向下一个。
configuration conf = new configuration() 的作用是什么?
configuration
类用于加载和存储 hadoop 应用程序运行时的配置信息,它是一个 hadoop 配置系统的核心组件,能够让你定义和访问一些运行时参数。每个 mapreduce 作业都依赖 configuration
来初始化作业配置。
configuration
的具体作用:
读取配置文件:
- 它默认会加载系统的 hadoop 配置文件,如
core-site.xml
、hdfs-site.xml
、mapred-site.xml
等,这些文件包含了 hadoop 集群的信息(如 hdfs 的地址、作业调度器等)。 - 如果需要,可以通过代码手动添加或覆盖这些参数。
- 它默认会加载系统的 hadoop 配置文件,如
自定义参数传递:
- 你可以在运行 mapreduce 作业时通过
configuration
传递一些自定义参数。例如,你可以将某些控制逻辑写入配置文件或直接在代码中设置特定参数,并在mapper
或reducer
中通过context.getconfiguration()
来访问这些参数。 - 示例:
configuration conf = new configuration(); conf.set("my.custom.param", "some value");
- 你可以在运行 mapreduce 作业时通过
作业设置的依赖:
configuration
是 hadoop 作业运行的基础,它为job
提供上下文,包括输入输出格式、作业名称、运行时依赖库等等。
为什么需要 configuration
?
在 mapreduce 应用中,集群的规模较大,许多配置参数(如文件系统路径、任务调度器配置等)都存储在外部的配置文件中,configuration
类可以动态加载这些配置,避免硬编码。
用 split() 方法实现默认分隔符的分割
如果想实现类似于 stringtokenizer
的默认行为(用空白字符分割),可以使用正则表达式 \\s+
,它表示匹配一个或多个空白字符(与 stringtokenizer
的默认行为一样)。
示例代码:
@override public void map(longwritable key, text value, context context) throws ioexception, interruptedexception { // 使用 split() 方法来分割字符串,使用空白字符作为分隔符 string[] tokens = value.tostring().split("\\s+"); // 遍历分割后的标记 for (string token : tokens) { word.set(token); context.write(word, one); } }
intwritable
intwritable
是 hadoop 提供的一个类,属于 org.apache.hadoop.io
包。它是 hadoop 框架中用来封装 int
类型的数据的一个可序列化(writable)的包装类。
在 hadoop mapreduce 中,所有数据类型都需要实现 writable
和 comparable
接口,以便能够通过网络在节点之间传输。intwritable
作为 hadoop 中的基本数据类型之一,提供了一些便利方法来存储和处理 int
数据。
intwritable
类的作用:
在 mapreduce 中,hadoop的数据类型都需要实现 writable
接口,这样它们就可以在分布式系统中通过网络传输。intwritable
封装了一个 java 的 int
类型,用于 hadoop 的输入输出键值对。
主要的用途:
- mapreduce 中作为值类型:
intwritable
常用于表示 mapper 和 reducer 的输出值。 - 支持序列化和反序列化:它实现了
writable
接口,可以在分布式环境下高效地进行序列化和反序列化。
如何使用 intwritable
intwritable
提供了构造方法和一些方法来设置和获取 int
值。
1. 创建 intwritable 对象
可以通过构造方法直接创建对象:
- 默认构造函数:创建一个值为 0 的
intwritable
。 - 参数构造函数:可以直接设置初始值。
// 创建一个默认值为 0 的 intwritable 对象 intwritable writable1 = new intwritable(); // 创建一个值为 10 的 intwritable 对象 intwritable writable2 = new intwritable(10);
2. 设置值和获取值
可以通过 set()
方法来设置值,通过 get()
方法来获取 intwritable
封装的 int
值。
intwritable writable = new intwritable(); // 设置值为 42 writable.set(42); // 获取值 int value = writable.get(); // value == 42
3. 在 mapreduce 中使用
在 mapreduce 任务中,intwritable
通常被用于输出的值。例如,在计数器的 mapreduce 程序中,常将 intwritable
的值设置为 1,表示一个单词的出现次数。
示例:mapreduce 中的 intwritable 使用
public class map extends mapper<longwritable, text, text, intwritable> { private final static intwritable one = new intwritable(1); // 值为 1 的 intwritable private text word = new text(); @override public void map(longwritable key, text value, context context) throws ioexception, interruptedexception { string[] tokens = value.tostring().split("\\s+"); // 遍历每个单词并写入 context for (string token : tokens) { word.set(token); context.write(word, one); // 输出 <单词, 1> } } }
在这个 mapreduce 例子中:
- 每个单词对应的值都是
1
,使用intwritable one = new intwritable(1)
来封装这个整数值。 context.write()
将text
和intwritable
对象作为键值对输出,键是单词,值是1
。
总结:
intwritable
之所以在 hadoop 中使用,而不是原生的 int
类型,是因为:
hadoop 需要能通过网络传输的类型,intwritable
实现了 writable
接口,可以序列化和反序列化。
intwritable
实现了 comparable
接口,因此可以在 hadoop 的排序操作中使用。
job job = job.getinstance(conf, "word count"); 这是什么意思?
job job = job.getinstance(conf, "word count");
这行代码创建并配置一个新的 mapreduce 作业实例。
- conf:这是一个 hadoop 的
configuration
对象,包含了作业的配置信息。 - "word count":这是作业的名称,可以是任何字符串。它主要用于标识和记录作业。
在driver中,为什么只设置输出的键值对类型?不设置输入呢?
job.setoutputkeyclass(text.class); job.setoutputvalueclass(intwritable.class);
1. 输入数据的键值对类型
是由 inputformat(如 textinputformat
)决定的,默认读取每行数据的偏移量和内容作为键值对传递给 mapper
。
hadoop mapreduce 使用 inputformat 类来读取输入数据文件。默认的输入格式是 textinputformat,它会自动将输入文件解析成键值对形式,而你不需要在 driver 中显式指定输入的类型。
- textinputformat 的输出(即传递给
mapper
的输入)是:- 键:每一行文本在文件中的字节偏移量,类型为 longwritable。
- 值:每一行的内容,类型为 text。
所以,mapper
的输入键值对类型已经由 inputformat 控制,不需要你在 driver 中手动指定。
2. 最终输出的键值对类型
需要你在 driver 中显式设置,因为这是写入到 hdfs 中的数据类型。
setoutputkeyclass 和 setoutputvalueclass 的作用
在 driver 中,你需要明确指定的是 最终输出结果的键值对类型,即 reducer
输出的键值对类型,因为这是写入到 hdfs 中的数据类型。
job.setoutputkeyclass(text.class)
:指定 最终输出的键 类型为text
。job.setoutputvalueclass(intwritable.class)
:指定 最终输出的值 类型为intwritable
。
这两项设置明确告诉 hadoop,最后存储在 hdfs 中的结果文件中,键和值分别是什么类型。
总结
到此这篇关于使用java实现mapreduce词频统计的文章就介绍到这了,更多相关java mapreduce词频统计内容请搜索代码网以前的文章或继续浏览下面的相关文章希望大家以后多多支持代码网!
发表评论