当前位置: 代码网 > it编程>数据库>Mysql > 【Hbase】Hbase TableInputFormat、TableOutputFormat

【Hbase】Hbase TableInputFormat、TableOutputFormat

2024年07月28日 Mysql 我要评论
TableInputFormat是Apache HBase中的一个重要的类,它允许MapReduce作业直接从HBase表中读取数据作为其输入。这使得HBase可以作为一个数据源,供MapReduce作业处理其存储的大规模数据集,而无需将数据导出到HDFS或其他文件系统。这样不仅提高了数据处理的效率,还减少了数据传输的延迟和潜在的错误。TableInputFormat的作用TableInputFormat的主要作用是将HBase表中的数据转换成适合MapReduce作业处理的形式。

在这里插入图片描述

1.概述

2.tableinputformat

tableinputformat是apache hbase中的一个重要的类,它允许mapreduce作业直接从hbase表中读取数据作为其输入。这使得hbase可以作为一个数据源,供mapreduce作业处理其存储的大规模数据集,而无需将数据导出到hdfs或其他文件系统。这样不仅提高了数据处理的效率,还减少了数据传输的延迟和潜在的错误。

tableinputformat的作用

tableinputformat的主要作用是将hbase表中的数据转换成适合mapreduce作业处理的形式。它将hbase中的行映射为mapreduce作业中的键值对(<k, v>),其中k通常是一个immutablebyteswritable对象,代表hbase行的主键(rowkey),而v是一个result对象,包含了该行的所有列族、列和版本的信息。

tableinputformat的配置参数

tableinputformat可以通过一系列的配置参数来定制扫描行为,这些参数可以通过job的configuration对象设置。以下是主要的配置参数及其用途:

  • hbase.mapreduce.inputtable:指定要扫描的hbase表的名称。

  • hbase.mapreduce.scan:可以通过

  • tablemapreduceutil.convertscantostring(scan scan)生成的字符串来指定一个scan对象,从而控制扫描的具体行为。但是由于该方法不公开,一般会通过其他参数间接控制扫描行为。

  • hbase.mapreduce.scan.row.start 和 hbase.mapreduce.scan.row.stop:分别定义扫描的起始rowkey和结束rowkey。

  • hbase.mapreduce.scan.column.family:指定要扫描的列族。

  • hbase.mapreduce.scan.columns:指定要扫描的列,多个列之间用空格分隔。

  • hbase.mapreduce.scan.timestamp:如果设置,将只扫描指定时间戳的数据。

  • hbase.mapreduce.scan.timerange.start 和 hbase.mapreduce.scan.timerange.end:分别定义时间戳范围的开始和结束,以限制扫描的时间范围。

  • hbase.mapreduce.scan.maxversions:定义扫描结果中每列的最大版本数。

  • hbase.mapreduce.scan.cacheblocks:如果设置为true,则在扫描过程中缓存数据块,以提高读取速度。

  • hbase.mapreduce.scan.cachedrows:定义每次读取的最多行数,用于优化读取性能。

  • hbase.mapreduce.scan.batchsize:定义每次读取的最多值的数量,这会影响内存使用和处理速度。

结合map函数的说明

在mapreduce作业中,map()函数是处理tableinputformat输出的核心部分。函数签名如下:

java

public void map(immutablebyteswritable row, result value, context context)

在这个函数中:

  • row:类型为immutablebyteswritable,代表当前处理行的rowkey。

  • value:类型为result,包含了当前行的所有列族、列和版本的数据。

  • context:类型为context,用于向reducer发送键值对或写入日志等。

在map()函数内部,你可以根据value中的数据进行各种处理,如过滤、聚合等,然后通过context.write()将处理后的结果发送给reducer。

3.tableoutputformat

tableoutputformat是apache hbase提供的一个用于将mapreduce作业的输出直接写入hbase表的类。与tableinputformat相对应,tableoutputformat使得mapreduce作业能够将处理后的结果直接存储回hbase,而无需先写入hdfs再导入hbase,从而简化了数据流并提高了效率。

tableoutputformat的作用

tableoutputformat的主要功能是在mapreduce作业完成时,将mapreduce作业的输出数据写回到hbase表中。它接收的输出数据类型是<key, value>对,其中value必须是put或delete对象。put对象用于插入或更新hbase表中的行,而delete对象用于删除hbase表中的行。

tableoutputformat的配置参数

tableoutputformat需要通过job的configuration对象进行配置,主要的配置参数及其用途包括:

  • hbase.mapred.outputtable:指定写入数据的目的hbase表的名称。

  • hbase.mapred.output.quorum:指定目标hbase表所在的hbase集群的zookeeper配置信息,格式为:“zookeeper所在机器名(多个实例以逗号分隔):端口号:hbase根节点名”。例如:“zookeeper1.example.com,zookeeper2.example.com:2181:/hbase”。

  • hbase.mapred.output.quorum.port:zookeeper服务器的端口号,虽然可以通过hbase.mapred.output.quorum中的格式指定,但有时也可以单独配置这个参数。

  • hbase.mapred.output.rs.class 和 hbase.mapred.output.rs.impl:这两个参数用于指定regionserver的实现类和服务实现,但在实际应用中很少被直接配置,因为默认的实现通常足够满足需求。

使用场景

tableoutputformat适用于以下场景:

  • 数据更新:当mapreduce作业的结果是对现有hbase表的更新时,可以直接使用tableoutputformat将更新写回表中。

  • 数据加载:当需要将大量数据从hdfs或其他数据源批量导入hbase时,可以使用tableoutputformat将数据直接写入hbase,避免了先导入临时表再进行数据迁移的复杂过程。

  • 数据分析:在进行数据分析或数据清洗后,可以直接将处理后的结果写回hbase表,以供后续分析或应用使用。

总之,tableoutputformat提供了将mapreduce作业的输出直接写入hbase的能力,极大地简化了数据处理流程,并提高了数据处理的效率和灵活性。

3.案例

tableinputformat和tableoutputformat的案例

import java.io.ioexception;
import java.util.list;
import org.apache.hadoop.conf.configuration;
import org.apache.hadoop.hbase.cell;
import org.apache.hadoop.hbase.client.put;
import org.apache.hadoop.hbase.client.result;
import org.apache.hadoop.hbase.client.scan;
import org.apache.hadoop.hbase.io.immutablebyteswritable;
import org.apache.hadoop.hbase.mapreduce.tableinputformat;
import org.apache.hadoop.hbase.mapreduce.tablemapreduceutil;
import org.apache.hadoop.hbase.mapreduce.tableoutputformat;
import org.apache.hadoop.hbase.util.bytes;
import org.apache.hadoop.mapreduce.job;
import org.apache.hadoop.mapreduce.mapper;
import org.apache.hadoop.mapreduce.reducer;

public class hbasetableioexample {

    public static class testtablemapper extends mapper<immutablebyteswritable, result, immutablebyteswritable, immutablebyteswritable> {

        @override
        protected void map(immutablebyteswritable key, result value, context context) throws ioexception, interruptedexception {
            list<cell> cells = value.listcells();
            for (cell cell : cells) {
                // assuming the family is 'cf' and qualifier is 'qual'
                if (bytes.equals(cell.getfamilyarray(), cell.getfamilyoffset(), cell.getfamilylength(), "cf".getbytes())) {
                    if (bytes.equals(cell.getqualifierarray(), cell.getqualifieroffset(), cell.getqualifierlength(), "qual".getbytes())) {
                        context.write(key, new immutablebyteswritable(cell.getvaluearray(), cell.getvalueoffset(), cell.getvaluelength()));
                    }
                }
            }
        }
    }

    public static class testtablereducer extends reducer<immutablebyteswritable, immutablebyteswritable, immutablebyteswritable, nullwritable> {

        @override
        protected void reduce(immutablebyteswritable key, iterable<immutablebyteswritable> values, context context) throws ioexception, interruptedexception {
            byte[] rowkey = key.get();
            put put = new put(rowkey);
            for (immutablebyteswritable value : values) {
                // assuming we want to write back to column 'cf:qual'
                put.addcolumn(bytes.tobytes("cf"), bytes.tobytes("qual"), value.get());
            }
            context.write(new immutablebyteswritable(rowkey), nullwritable.get());
            context.getcounter("custom", "processedrows").increment(1);
        }
    }

    public static void main(string[] args) throws exception {
        configuration conf = hbaseconfiguration.create();
        job job = job.getinstance(conf, "hbasetableioexample");
        job.setjarbyclass(hbasetableioexample.class);

        // set up input format
        job.setinputformatclass(tableinputformat.class);
        scan scan = new scan();
        scan.addcolumn(bytes.tobytes("cf"), bytes.tobytes("qual"));
        tablemapreduceutil.inittablemapperjob("test_input", scan, testtablemapper.class, immutablebyteswritable.class, immutablebyteswritable.class, job);

        // set up output format
        job.setoutputformatclass(tableoutputformat.class);
        tablemapreduceutil.inittablereducerjob("test_output", testtablereducer.class, job);

        job.setmapperclass(testtablemapper.class);
        job.setreducerclass(testtablereducer.class);

        boolean success = job.waitforcompletion(true);
        system.exit(success ? 0 : 1);
    }
}

重要点:

  • mapper: testtablemapper从输入表test_input读取数据,提取特定的列族和列的值,并将其传递给reducer。

  • reducer: testtablereducer接收来自mapper的输出,处理数据(本例中为简单传递),并将其写入输出表test_output。

  • job setup: 主函数main设置了mapreduce作业的输入和输出格式,以及mapper和reducer类,并使用tableinputformat和tableoutputformat初始化作业。

确保在运行此示例之前,你已经在hbase中创建了test_input和test_output表,并且test_input表中包含了适当的数据。此外,你可能需要根据你的环境调整hbase和hadoop的配置。

(0)

相关文章:

版权声明:本文内容由互联网用户贡献,该文观点仅代表作者本人。本站仅提供信息存储服务,不拥有所有权,不承担相关法律责任。 如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 2386932994@qq.com 举报,一经查实将立刻删除。

发表评论

验证码:
Copyright © 2017-2025  代码网 保留所有权利. 粤ICP备2024248653号
站长QQ:2386932994 | 联系邮箱:2386932994@qq.com