1.概述
2.tableinputformat
tableinputformat是apache hbase中的一个重要的类,它允许mapreduce作业直接从hbase表中读取数据作为其输入。这使得hbase可以作为一个数据源,供mapreduce作业处理其存储的大规模数据集,而无需将数据导出到hdfs或其他文件系统。这样不仅提高了数据处理的效率,还减少了数据传输的延迟和潜在的错误。
tableinputformat的作用
tableinputformat的主要作用是将hbase表中的数据转换成适合mapreduce作业处理的形式。它将hbase中的行映射为mapreduce作业中的键值对(<k, v>),其中k通常是一个immutablebyteswritable对象,代表hbase行的主键(rowkey),而v是一个result对象,包含了该行的所有列族、列和版本的信息。
tableinputformat的配置参数
tableinputformat可以通过一系列的配置参数来定制扫描行为,这些参数可以通过job的configuration对象设置。以下是主要的配置参数及其用途:
-
hbase.mapreduce.inputtable:指定要扫描的hbase表的名称。
-
hbase.mapreduce.scan:可以通过
-
tablemapreduceutil.convertscantostring(scan scan)生成的字符串来指定一个scan对象,从而控制扫描的具体行为。但是由于该方法不公开,一般会通过其他参数间接控制扫描行为。
-
hbase.mapreduce.scan.row.start 和 hbase.mapreduce.scan.row.stop:分别定义扫描的起始rowkey和结束rowkey。
-
hbase.mapreduce.scan.column.family:指定要扫描的列族。
-
hbase.mapreduce.scan.columns:指定要扫描的列,多个列之间用空格分隔。
-
hbase.mapreduce.scan.timestamp:如果设置,将只扫描指定时间戳的数据。
-
hbase.mapreduce.scan.timerange.start 和 hbase.mapreduce.scan.timerange.end:分别定义时间戳范围的开始和结束,以限制扫描的时间范围。
-
hbase.mapreduce.scan.maxversions:定义扫描结果中每列的最大版本数。
-
hbase.mapreduce.scan.cacheblocks:如果设置为true,则在扫描过程中缓存数据块,以提高读取速度。
-
hbase.mapreduce.scan.cachedrows:定义每次读取的最多行数,用于优化读取性能。
-
hbase.mapreduce.scan.batchsize:定义每次读取的最多值的数量,这会影响内存使用和处理速度。
结合map函数的说明
在mapreduce作业中,map()函数是处理tableinputformat输出的核心部分。函数签名如下:
java
public void map(immutablebyteswritable row, result value, context context)
在这个函数中:
-
row:类型为immutablebyteswritable,代表当前处理行的rowkey。
-
value:类型为result,包含了当前行的所有列族、列和版本的数据。
-
context:类型为context,用于向reducer发送键值对或写入日志等。
在map()函数内部,你可以根据value中的数据进行各种处理,如过滤、聚合等,然后通过context.write()将处理后的结果发送给reducer。
3.tableoutputformat
tableoutputformat是apache hbase提供的一个用于将mapreduce作业的输出直接写入hbase表的类。与tableinputformat相对应,tableoutputformat使得mapreduce作业能够将处理后的结果直接存储回hbase,而无需先写入hdfs再导入hbase,从而简化了数据流并提高了效率。
tableoutputformat的作用
tableoutputformat的主要功能是在mapreduce作业完成时,将mapreduce作业的输出数据写回到hbase表中。它接收的输出数据类型是<key, value>对,其中value必须是put或delete对象。put对象用于插入或更新hbase表中的行,而delete对象用于删除hbase表中的行。
tableoutputformat的配置参数
tableoutputformat需要通过job的configuration对象进行配置,主要的配置参数及其用途包括:
-
hbase.mapred.outputtable:指定写入数据的目的hbase表的名称。
-
hbase.mapred.output.quorum:指定目标hbase表所在的hbase集群的zookeeper配置信息,格式为:“zookeeper所在机器名(多个实例以逗号分隔):端口号:hbase根节点名”。例如:“zookeeper1.example.com,zookeeper2.example.com:2181:/hbase”。
-
hbase.mapred.output.quorum.port:zookeeper服务器的端口号,虽然可以通过hbase.mapred.output.quorum中的格式指定,但有时也可以单独配置这个参数。
-
hbase.mapred.output.rs.class 和 hbase.mapred.output.rs.impl:这两个参数用于指定regionserver的实现类和服务实现,但在实际应用中很少被直接配置,因为默认的实现通常足够满足需求。
使用场景
tableoutputformat适用于以下场景:
-
数据更新:当mapreduce作业的结果是对现有hbase表的更新时,可以直接使用tableoutputformat将更新写回表中。
-
数据加载:当需要将大量数据从hdfs或其他数据源批量导入hbase时,可以使用tableoutputformat将数据直接写入hbase,避免了先导入临时表再进行数据迁移的复杂过程。
-
数据分析:在进行数据分析或数据清洗后,可以直接将处理后的结果写回hbase表,以供后续分析或应用使用。
总之,tableoutputformat提供了将mapreduce作业的输出直接写入hbase的能力,极大地简化了数据处理流程,并提高了数据处理的效率和灵活性。
3.案例
tableinputformat和tableoutputformat的案例
import java.io.ioexception;
import java.util.list;
import org.apache.hadoop.conf.configuration;
import org.apache.hadoop.hbase.cell;
import org.apache.hadoop.hbase.client.put;
import org.apache.hadoop.hbase.client.result;
import org.apache.hadoop.hbase.client.scan;
import org.apache.hadoop.hbase.io.immutablebyteswritable;
import org.apache.hadoop.hbase.mapreduce.tableinputformat;
import org.apache.hadoop.hbase.mapreduce.tablemapreduceutil;
import org.apache.hadoop.hbase.mapreduce.tableoutputformat;
import org.apache.hadoop.hbase.util.bytes;
import org.apache.hadoop.mapreduce.job;
import org.apache.hadoop.mapreduce.mapper;
import org.apache.hadoop.mapreduce.reducer;
public class hbasetableioexample {
public static class testtablemapper extends mapper<immutablebyteswritable, result, immutablebyteswritable, immutablebyteswritable> {
@override
protected void map(immutablebyteswritable key, result value, context context) throws ioexception, interruptedexception {
list<cell> cells = value.listcells();
for (cell cell : cells) {
// assuming the family is 'cf' and qualifier is 'qual'
if (bytes.equals(cell.getfamilyarray(), cell.getfamilyoffset(), cell.getfamilylength(), "cf".getbytes())) {
if (bytes.equals(cell.getqualifierarray(), cell.getqualifieroffset(), cell.getqualifierlength(), "qual".getbytes())) {
context.write(key, new immutablebyteswritable(cell.getvaluearray(), cell.getvalueoffset(), cell.getvaluelength()));
}
}
}
}
}
public static class testtablereducer extends reducer<immutablebyteswritable, immutablebyteswritable, immutablebyteswritable, nullwritable> {
@override
protected void reduce(immutablebyteswritable key, iterable<immutablebyteswritable> values, context context) throws ioexception, interruptedexception {
byte[] rowkey = key.get();
put put = new put(rowkey);
for (immutablebyteswritable value : values) {
// assuming we want to write back to column 'cf:qual'
put.addcolumn(bytes.tobytes("cf"), bytes.tobytes("qual"), value.get());
}
context.write(new immutablebyteswritable(rowkey), nullwritable.get());
context.getcounter("custom", "processedrows").increment(1);
}
}
public static void main(string[] args) throws exception {
configuration conf = hbaseconfiguration.create();
job job = job.getinstance(conf, "hbasetableioexample");
job.setjarbyclass(hbasetableioexample.class);
// set up input format
job.setinputformatclass(tableinputformat.class);
scan scan = new scan();
scan.addcolumn(bytes.tobytes("cf"), bytes.tobytes("qual"));
tablemapreduceutil.inittablemapperjob("test_input", scan, testtablemapper.class, immutablebyteswritable.class, immutablebyteswritable.class, job);
// set up output format
job.setoutputformatclass(tableoutputformat.class);
tablemapreduceutil.inittablereducerjob("test_output", testtablereducer.class, job);
job.setmapperclass(testtablemapper.class);
job.setreducerclass(testtablereducer.class);
boolean success = job.waitforcompletion(true);
system.exit(success ? 0 : 1);
}
}
重要点:
-
mapper: testtablemapper从输入表test_input读取数据,提取特定的列族和列的值,并将其传递给reducer。
-
reducer: testtablereducer接收来自mapper的输出,处理数据(本例中为简单传递),并将其写入输出表test_output。
-
job setup: 主函数main设置了mapreduce作业的输入和输出格式,以及mapper和reducer类,并使用tableinputformat和tableoutputformat初始化作业。
确保在运行此示例之前,你已经在hbase中创建了test_input和test_output表,并且test_input表中包含了适当的数据。此外,你可能需要根据你的环境调整hbase和hadoop的配置。
发表评论