当前位置: 代码网 > it编程>编程语言>Java > 使用Java实现MapReduce词频统计示例代码

使用Java实现MapReduce词频统计示例代码

2024年11月08日 Java 我要评论
前言在这篇博客中,我们将学习如何使用 java 实现 hadoop 的 mapreduce 框架,并通过一个词频统计(wordcount)的例子,来了解 mapreduce 的工作原理。mapredu

前言

在这篇博客中,我们将学习如何使用 java 实现 hadoop 的 mapreduce 框架,并通过一个词频统计(wordcount)的例子,来了解 mapreduce 的工作原理。mapreduce 是一种流行的大规模数据处理模式,广泛应用于分布式计算环境中。

一、正文

1. 代码结构

我们将在以下三个文件中实现 mapreduce 的核心功能:

  • map.java: 实现 mapper 类,负责将输入的文本数据按单词进行拆分。
  • reduce.java: 实现 reducer 类,负责对单词的出现次数进行汇总。
  • wordcount.java: 设置作业(job)配置,管理 map 和 reduce 的运行。

接下来我们将逐一分析这些代码。

2. map.java——mapper 实现

首先看下 mapper 类的代码实现:

package demo1;

import org.apache.hadoop.io.intwritable;
import org.apache.hadoop.io.longwritable;
import org.apache.hadoop.io.text;
import org.apache.hadoop.mapreduce.mapper;

import java.io.ioexception;
import java.util.stringtokenizer;


//public class mapper<keyin, valuein, keyout, valueout>

public class map extends mapper<longwritable, text, text, intwritable> {
    private final static intwritable one = new intwritable(1); // 计数器
    private text word = new text(); // 存储当前处理的单词

    @override
    public void map(longwritable key, text value, context context) throws ioexception, interruptedexception {
        // 将每行的文本数据分割成单词,可使用split()实现相同功能
        stringtokenizer tokenizer = new stringtokenizer(value.tostring());
        while (tokenizer.hasmoretokens()) {
            word.set(tokenizer.nexttoken()); // 获取下一个单词
            context.write(word, one); // 输出单词及其计数1
        }
    }
}

功能解读:

  • mapper 的作用
    mapper 类的任务是将输入的数据按行读取,并对每一行的内容进行处理。对于这个例子来说,我们的任务是将一行文本拆分成单词,并为每个单词标记它的初始计数值为 1

  • 重要方法与变量

    • longwritable key:表示输入数据的偏移量,即每行文本在文件中的位置。
    • text value:表示读取的一行文本。
    • context.write(word, one):将拆分出的单词作为键(text),值为 1intwritable),输出到框架中供下一阶段使用。

注意事项:

   stringtokenizer 用于分割每行文本,将其分割成单词。

   context.write(word, one) 将结果输出到 reducer 处理时会被聚合。每遇到一个相同的单词,后面会将其所有的 1 聚合成总和。

mapper类的泛型定义

典型的 mapper 类定义如下:

public class mapper&lt;keyin, valuein, keyout, valueout&gt;

这表示 mapper 是一个泛型类,带有四个类型参数。每个参数对应 mapper 任务中的不同数据类型。让我们逐个解释这些泛型参数的含义:

  • keyin (输入键的类型)

    • 这是输入数据的键的类型。在 mapreduce 程序中,输入数据通常来自文件或其他形式的数据源,keyin 是表示该输入数据片段的键。
    • 通常是文件中的偏移量(如文件的字节位置),所以经常使用 hadoop 提供的 longwritable 来表示这个偏移量。

    常见类型longwritable,表示输入文件中的行号或偏移量。

  • valuein (输入值的类型)

    • 这是输入数据的值的类型。valuein 是传递给 mapper 的实际数据,通常是一行文本。
    • 通常是文件的内容,比如一行文本,所以常用 text 来表示。

    常见类型text,表示输入文件中的一行文本。

  • keyout (输出键的类型)

    • 这是 mapper 处理后的输出数据的键的类型。mapper 的输出通常是某种键值对,keyout 表示输出键的类型。
    • 比如,在单词计数程序中,输出的键通常是一个单词,所以常用 text

    常见类型text,表示处理后的单词(在单词计数程序中)。

  • valueout (输出值的类型)

    • 这是 mapper 处理后的输出值的类型。valueout 是 mapper 输出键对应的值的类型。
    • 在单词计数程序中,输出的值通常是一个数字,用于表示单词的出现次数,所以常用 intwritable

    常见类型intwritable,表示单词计数时的次数(1)。

3. reduce.java——reducer 实现

接下来我们实现 reducer:

package demo1;

import org.apache.hadoop.io.intwritable;
import org.apache.hadoop.mapreduce.reducer;
import org.apache.hadoop.io.text;

import java.io.ioexception;

public class reduce extends reducer<text, intwritable, text, intwritable> {
    private intwritable result = new intwritable();

    @override
    public void reduce(text key, iterable<intwritable> values, context context) throws ioexception, interruptedexception {
        int sum = 0;
        for (intwritable val : values) {
            sum += val.get(); // 累加单词出现的次数
        }
        result.set(sum); // 设置聚合后的结果
        context.write(key, result); // 输出单词及其总次数
    }
}

功能解读:

  • reducer 的作用
    reducer 类用于将 mapper 输出的单词和它们的计数进行汇总。它会聚合每个单词的所有 1,得到该单词在整个输入中的总计数。

  • 重要方法与变量

    • text key:表示单词。
    • iterable<intwritable> values:表示所有与该单词相关联的计数(1的集合)。
    • sum:用于累加该单词出现的次数。
    • context.write(key, result):输出单词及其出现的总次数。

注意事项:

  for (intwritable val : values) 遍历所有的计数值,并累加得到单词的总次数。

      结果会输出为 <单词, 出现次数>,存储到最终的输出文件中。

reducer类的泛型定义

public class reducer<keyin, valuein, keyout, valueout>

reducer 类带有四个泛型参数,每个参数对应 reducer 任务中的不同数据类型。

  • keyin (输入键的类型):

    • keyin 是 reducer 接收的键的类型,它是由 mapper 的输出键传递过来的类型。
    • 例如,在单词计数程序中,mapper 输出的键是一个单词,所以 keyin 通常是 text 类型。

    常见类型text,表示单词(在单词计数程序中)。

  • valuein (输入值的类型):

    • valuein 是 reducer 接收的值的类型,它是 mapper 输出值的类型的集合。对于每个 keyinreducer 会接收一个与该键相关的值列表。
    • 例如,在单词计数程序中,mapper 输出的值是每个单词出现的次数(通常是 intwritable 值为 1),所以 valuein 的类型通常是 intwritable

    常见类型intwritable,表示单词出现的次数。

  • keyout (输出键的类型):

    • keyout 是 reducer 输出的键的类型。
    • 在单词计数程序中,reducer 输出的键还是单词,所以 keyout 通常也是 text 类型。

    常见类型text,表示单词。

  • valueout (输出值的类型):

    • valueout 是 reducer 输出的值的类型。这个值是 reducer 处理后的结果。
    • 在单词计数程序中,reducer 输出的值是每个单词出现的总次数,所以 valueout 通常是 intwritable

    常见类型intwritable,表示单词的总次数。

4. wordcount.java——作业配置与执行

最后,我们编写主程序,用于配置和启动 mapreduce 作业:

package demo1;

import org.apache.hadoop.conf.configuration;
import org.apache.hadoop.fs.path;
import org.apache.hadoop.io.intwritable;
import org.apache.hadoop.io.text;
import org.apache.hadoop.mapreduce.job;
import org.apache.hadoop.mapreduce.lib.input.fileinputformat;
import org.apache.hadoop.mapreduce.lib.output.fileoutputformat;
import org.apache.hadoop.util.genericoptionsparser;

import java.io.ioexception;

public class wordcount {
    public static void main(string[] args) throws ioexception, classnotfoundexception, interruptedexception {
        configuration conf = new configuration(); // 配置项
        job job = job.getinstance(conf, "word count"); // 创建一个新作业

        string[] otherargs = new genericoptionsparser(conf, args).getremainingargs();
        if (otherargs.length != 2) {
            system.err.println("usage: wordcount <in> <out>");
            system.exit(1); // 输入输出路径检查
        }

        job.setjarbyclass(wordcount.class); // 设置主类
        job.setmapperclass(map.class); // 设置mapper类
        job.setreducerclass(reduce.class); // 设置reducer类

        // 设置map和reduce输出的键值对类型
        job.setoutputkeyclass(text.class);
        job.setoutputvalueclass(intwritable.class);

        // 输入输出路径
        fileinputformat.addinputpath(job, new path(otherargs[0])); // 输入路径
        fileoutputformat.setoutputpath(job, new path(otherargs[1])); // 输出路径

        // 提交作业,直到作业完成
        system.exit(job.waitforcompletion(true) ? 0 : 1);
    }
}

功能解读:

  • 配置与作业初始化
    • configuration conf = new configuration():创建 hadoop 的配置对象,存储作业的相关配置。
    • job job = job.getinstance(conf, "word count"):创建 mapreduce 作业,并为作业命名为 "word count"
  • 作业设置
    • job.setjarbyclass(wordcount.class):设置运行时的主类。
    • job.setmapperclass(map.class) 和 job.setreducerclass(reduce.class):分别设置 mapper 和 reducer 类。
  • 输入输出路径
    • fileinputformat.addinputpath():指定输入数据的路径。
    • fileoutputformat.setoutputpath():指定输出结果的路径。
  • 作业提交与运行
    • system.exit(job.waitforcompletion(true) ? 0 : 1):提交作业并等待完成,如果成功返回 0,失败返回 1。

注意事项:

   genericoptionsparser 用于解析命令行输入,获取输入和输出路径。

         提交作业后,hadoop 框架会根据配置自动运行 mapper 和 reducer,并将结果输出到指定的路径。

二、知识回顾与补充

数据的偏移量?

数据的偏移量,即 longwritable key,是 mapreduce 程序中 mapper 输入的键,它表示输入数据文件中每行文本的起始字节位置

例:假设我们有一个文本文件:

0: hello world
12: hadoop mapreduce
32: data processing

        每行的前面的数字(0, 12, 32)就是对应行在文件中的偏移量,表示从文件开头到该行起始字节的距离。longwritable 类型的 key 就表示这个偏移量。

        在 mapper 中,输入是以 <偏移量, 文本行> 这样的键值对形式提供的。虽然偏移量在词频统计任务中不重要,但在某些应用中,如文件处理、日志解析时,偏移量可以帮助追踪数据的位置。

context

context 是 mapreduce 框架中 mapper 和 reducer 中非常重要的一个类,它提供了与框架进行交互的方法。

context 的主要作用

  • 写入输出:在 mapper 和 reducer 中,context.write(key, value) 用于将结果输出到框架。框架会自动处理这些输出结果,并将 mapper 的输出作为 reducer 的输入,或者将最终的 reducer 输出保存到 hdfs。

    • 在 mapper 中,context.write(word, one) 将每个单词及其初始计数 1 传递给框架。
    • 在 reducer 中,context.write(key, result) 将每个单词及其总出现次数输出到最终结果。
  • 配置访问context 可以访问作业的配置参数(如 configuration),帮助程序获取环境变量或作业参数。

  • 计数器context 提供计数器(counter)的支持,用于统计作业中的某些事件(如错误次数、特定条件的满足次数等)。

  • 记录状态context 可以报告作业的执行状态,帮助开发者追踪作业的进度或调试

iterable<intwritable> values是什么类型?

在 reducer 阶段,iterable<intwritable> values 表示与同一个键(即单词)相关联的所有 intwritable 值的集合。

  • 类型解读

    • iterable 表示一个可以迭代的集合,意味着它可以被遍历。
    • intwritable 是 hadoop 定义的一个包装类,用于封装 int 类型的值。
  • 在词频统计的例子中
    对于每个单词,mapper 会输出多个 <单词, 1>,因此在 reducer 中,对于每个键(即单词),会有多个 1 作为值的集合,即 valuesreducer 的任务就是对这些 1 进行累加,计算单词的总出现次数。

其他遍历方法

除了 for (intwritable val : values) 这种增强型 for 循环,我们还可以使用 iterable遍历

iterator<intwritable> iterator = values.iterator();
while (iterator.hasnext()) {
    intwritable val = iterator.next();
    sum += val.get(); // 处理每个值
}

iterator 提供 hasnext() 方法检查是否有更多元素,next() 方法返回当前元素并指向下一个。

configuration conf = new configuration() 的作用是什么?

configuration 类用于加载和存储 hadoop 应用程序运行时的配置信息,它是一个 hadoop 配置系统的核心组件,能够让你定义和访问一些运行时参数。每个 mapreduce 作业都依赖 configuration 来初始化作业配置。

configuration 的具体作用:

  • 读取配置文件

    • 它默认会加载系统的 hadoop 配置文件,如 core-site.xmlhdfs-site.xmlmapred-site.xml 等,这些文件包含了 hadoop 集群的信息(如 hdfs 的地址、作业调度器等)。
    • 如果需要,可以通过代码手动添加或覆盖这些参数。
  • 自定义参数传递

    • 你可以在运行 mapreduce 作业时通过 configuration 传递一些自定义参数。例如,你可以将某些控制逻辑写入配置文件或直接在代码中设置特定参数,并在 mapper 或 reducer 中通过 context.getconfiguration() 来访问这些参数。
    • 示例:
      configuration conf = new configuration();
      conf.set("my.custom.param", "some value");
      
  • 作业设置的依赖

    • configuration 是 hadoop 作业运行的基础,它为 job 提供上下文,包括输入输出格式、作业名称、运行时依赖库等等。

为什么需要 configuration

在 mapreduce 应用中,集群的规模较大,许多配置参数(如文件系统路径、任务调度器配置等)都存储在外部的配置文件中,configuration 类可以动态加载这些配置,避免硬编码。

用 split() 方法实现默认分隔符的分割

如果想实现类似于 stringtokenizer 的默认行为(用空白字符分割),可以使用正则表达式 \\s+,它表示匹配一个或多个空白字符(与 stringtokenizer 的默认行为一样)。

示例代码:

    @override
    public void map(longwritable key, text value, context context) throws ioexception, interruptedexception {
        // 使用 split() 方法来分割字符串,使用空白字符作为分隔符
        string[] tokens = value.tostring().split("\\s+");

        // 遍历分割后的标记
        for (string token : tokens) {
            word.set(token);
            context.write(word, one);
        }
    }

intwritable

intwritable 是 hadoop 提供的一个类,属于 org.apache.hadoop.io 包。它是 hadoop 框架中用来封装 int 类型的数据的一个可序列化(writable)的包装类。

在 hadoop mapreduce 中,所有数据类型都需要实现 writable 和 comparable 接口,以便能够通过网络在节点之间传输。intwritable 作为 hadoop 中的基本数据类型之一,提供了一些便利方法来存储和处理 int 数据。

intwritable 类的作用:

在 mapreduce 中,hadoop的数据类型都需要实现 writable 接口,这样它们就可以在分布式系统中通过网络传输。intwritable 封装了一个 java 的 int 类型,用于 hadoop 的输入输出键值对。

主要的用途

  • mapreduce 中作为值类型intwritable 常用于表示 mapper 和 reducer 的输出值。
  • 支持序列化和反序列化:它实现了 writable 接口,可以在分布式环境下高效地进行序列化和反序列化。

如何使用 intwritable

intwritable 提供了构造方法和一些方法来设置和获取 int 值。

1. 创建 intwritable 对象

可以通过构造方法直接创建对象:

  • 默认构造函数:创建一个值为 0 的 intwritable
  • 参数构造函数:可以直接设置初始值。
// 创建一个默认值为 0 的 intwritable 对象
intwritable writable1 = new intwritable();

// 创建一个值为 10 的 intwritable 对象
intwritable writable2 = new intwritable(10);

2. 设置值和获取值

可以通过 set() 方法来设置值,通过 get() 方法来获取 intwritable 封装的 int 值。

intwritable writable = new intwritable();

// 设置值为 42
writable.set(42);

// 获取值
int value = writable.get(); // value == 42

3. 在 mapreduce 中使用

在 mapreduce 任务中,intwritable 通常被用于输出的值。例如,在计数器的 mapreduce 程序中,常将 intwritable 的值设置为 1,表示一个单词的出现次数。

示例:mapreduce 中的 intwritable 使用

public class map extends mapper<longwritable, text, text, intwritable> {
    private final static intwritable one = new intwritable(1); // 值为 1 的 intwritable
    private text word = new text();

    @override
    public void map(longwritable key, text value, context context) throws ioexception, interruptedexception {
        string[] tokens = value.tostring().split("\\s+");

        // 遍历每个单词并写入 context
        for (string token : tokens) {
            word.set(token);
            context.write(word, one);  // 输出 <单词, 1>
        }
    }
}

在这个 mapreduce 例子中:

  • 每个单词对应的值都是 1,使用 intwritable one = new intwritable(1) 来封装这个整数值。
  • context.write() 将 text 和 intwritable 对象作为键值对输出,键是单词,值是 1

总结:

intwritable 之所以在 hadoop 中使用,而不是原生的 int 类型,是因为:

        hadoop 需要能通过网络传输的类型,intwritable 实现了 writable 接口,可以序列化和反序列化。

  intwritable 实现了 comparable 接口,因此可以在 hadoop 的排序操作中使用。

job job = job.getinstance(conf, "word count"); 这是什么意思?

job job = job.getinstance(conf, "word count");

这行代码创建并配置一个新的 mapreduce 作业实例。

  • conf:这是一个 hadoop 的 configuration 对象,包含了作业的配置信息。
  • "word count":这是作业的名称,可以是任何字符串。它主要用于标识和记录作业。

在driver中,为什么只设置输出的键值对类型?不设置输入呢?

job.setoutputkeyclass(text.class);
job.setoutputvalueclass(intwritable.class);

        1. 输入数据的键值对类型

        是由 inputformat(如 textinputformat)决定的,默认读取每行数据的偏移量和内容作为键值对传递给 mapper。 

        hadoop mapreduce 使用 inputformat 类来读取输入数据文件。默认的输入格式是 textinputformat,它会自动将输入文件解析成键值对形式,而你不需要在 driver 中显式指定输入的类型。

  • textinputformat 的输出(即传递给 mapper 的输入)是:
    • :每一行文本在文件中的字节偏移量,类型为 longwritable
    • :每一行的内容,类型为 text

所以,mapper 的输入键值对类型已经由 inputformat 控制,不需要你在 driver 中手动指定。

        2. 最终输出的键值对类型

        需要你在 driver 中显式设置,因为这是写入到 hdfs 中的数据类型。

 setoutputkeyclass 和 setoutputvalueclass 的作用

        在 driver 中,你需要明确指定的是 最终输出结果的键值对类型,即 reducer 输出的键值对类型,因为这是写入到 hdfs 中的数据类型。

  • job.setoutputkeyclass(text.class):指定 最终输出的键 类型为 text
  • job.setoutputvalueclass(intwritable.class):指定 最终输出的值 类型为 intwritable

这两项设置明确告诉 hadoop,最后存储在 hdfs 中的结果文件中,键和值分别是什么类型。

总结

到此这篇关于使用java实现mapreduce词频统计的文章就介绍到这了,更多相关java mapreduce词频统计内容请搜索代码网以前的文章或继续浏览下面的相关文章希望大家以后多多支持代码网!

(0)

相关文章:

版权声明:本文内容由互联网用户贡献,该文观点仅代表作者本人。本站仅提供信息存储服务,不拥有所有权,不承担相关法律责任。 如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 2386932994@qq.com 举报,一经查实将立刻删除。

发表评论

验证码:
Copyright © 2017-2025  代码网 保留所有权利. 粤ICP备2024248653号
站长QQ:2386932994 | 联系邮箱:2386932994@qq.com