当前位置: 代码网 > it编程>编程语言>Java > hbase 2.x MOB compaction流程代码详细解析

hbase 2.x MOB compaction流程代码详细解析

2024年08月06日 Java 我要评论
hbase 2.x MOB compaction流程代码详细解析

删除线格式
hbase2.0加持mob技术,支持小对象实时存取,具有读写强一致,低延迟,高并发等特点,并兼容所有企业级特性如snapshot,replication。

4.3.2mob compaction 压缩
与hdfs块相比,mob文件可能相对较小,如果我们写入只有少数条目符合mob条件的行,也可能存在被删除的单元格。我们需要删除那些被删除的单元格,并将小文件合并为更大的文件,以提高hdfs的利用率。mob压缩只对小文件进行压缩,而对大文件不进行严格的压缩,避免了对大文件的无限压缩,使写放大更具可预测性。我们需要知道在mob压缩中删除了哪些单元格。因此,在每次hbase主压缩中,删除标记都会在删除之前写入del文件。
压缩mob文件的步骤如下所示。
在这里插入图片描述

原文链接:https://blog.csdn.net/weixin_43785656/article/details/107063547
1 减少del文件的数量,将del文件合并成更大的文件。
2 找到所有的小的mob文件并按名称前缀将它们分组到分区中(md5(startkey)和日期的编码名称)。
3 对于每个分区,打开扫描器到小的mob文件和所有del文件,将有效单元格写入新的更大的mob文件,同时将新的引用单元格写入新的存储文件。
4 提交新的mob文件,将新的存储文件批量加载到hbase,并删除旧的mob文件。
5 如果mob压缩是一个主要的压缩,那么在所有分区中完成压缩之后删除所有del文件。

原文链接:https://blog.csdn.net/weixin_43785656/article/details/107063547

hbase 2.x mob compaction流程代码详细解析
created by 伍增田 wutommy

重大改进如下:
移动到regionserver上通过线程池执行,提高并发性能
hbase shell获取表的mob compaction的状态

执行hbase shell命令
hbase(main):007:0> major_compact ‘static_file’, nil, ‘mob’
masterrpcservices.java:
private compactregionresponse compactmob(final compactregionrequest request,
tablename tablename)
1、收集哪些columnfamily设置了mob
2、hmaster.requestmobcompaction(tablename, compactedcolumns, allfiles);
mastermobcompactionthread.java
mastermobcompactionthread .requestmobcompaction(conf, fs, tablename, columns, allfiles);
mastermobcompactionthread类有2个线程池:
private final executorservice mobcompactorpool; 负责对partition的具体的compact的执行操作
private final executorservice mastermobpool; 收集partition,选择文件,compact “del文件”,major后删除掉del文件。

compactionrunner.run ,调用 mobutils.domobcompaction(conf, fs, tablename, hcd, pool, allfiles, lock, master);
主要工作是这2个class: partitionedmobcompactor.java和partitionedmobcompactionrequest.java

partitionedmobcompactionrequest包含的信息有:
protected list delpartitions;
protected collection compactionpartitions;
selectiontime
protected compactiontype type = compactiontype.part_files;
partitionedmobcompactor.java
获取当前表的当前列簇下的所有mob文件
compact(arrays.aslist(fs.liststatus(mobfamilydir)), allfiles);

// find the files to compact.
partitionedmobcompactionrequest request = select(files, allfiles);
// compact the files.
return performcompaction(request);
protected list performcompaction(partitionedmobcompactionrequest request)
throws ioexception
// merge the del files, it is per del partition
list newdelpaths = compactdelfiles(request, delpartition.listdelfiles());
// compact the mob files by partitions.
paths = compactmobfiles(request);
// archive the del files if all the mob files are selected. compactiontype.all_files
mobutils.removemobfiles(conf, fs, tablename, mobtabledir, column.getname(),
delpartition.getstorefiles());

protected list compactmobfiles(final partitionedmobcompactionrequest request)
// compact the mob files by partitions in parallel. 线程池pool中执行

results.put(partition.getpartitionid(), pool.submit(new callable<list >() {
@override
public list call() throws exception {
log.info("compacting mob files for partition " + partition.getpartitionid());
// new add
if(conf.getboolean(“hbase.mob.compaction.onregionserver”, false)
/* && conf.getboolean(hconstants.cluster_distributed, false) */){
return callcompactmobfilepartitionrs(request, partition, delfiles, c, table);
}
else {
return compactmobfilepartition(request, partition, delfiles, c, table);
}
}
}))

private list compactmobfilepartition(partitionedmobcompactionrequest request,
compactionpartition partition,
list delfiles,
connection connection,
table table)
// compact the mob files in a batch.
compactmobfilesinbatch(request, partition, connection, table, filestocompact, batch,
bulkloadpathofpartition, bulkloadcolumnpath, newfiles);

private void compactmobfilesinbatch(partitionedmobcompactionrequest request,
compactionpartition partition,
connection connection, table table,
list filestocompact, int batch,
path bulkloadpathofpartition, path bulkloadcolumnpath,
list newfiles)
// create a temp file and open a writer for it in the bulkloadpath
reffilewriter = mobutils.createreffilewriter(conf, fs, column, bulkloadcolumnpath,
fileinfo.getsecond().longvalue(), compactioncacheconfig, cryptocontext, true);

// write the mob cell to the mob file.
writer.append(cell);
// write the new reference cell to the store file.
cell reference = mobutils.createmobrefcell(cell, filename, this.refcelltags);
reffilewriter.append(reference);
// commit mob file
mobutils.commitfile(conf, fs, filepath, mobfamilydir, compactioncacheconfig);
// bulkload the ref file
bulkloadreffile(connection, table, bulkloadpathofpartition, filepath.getname());
// archive the old mob files, do not archive the del files.
mobutils.removemobfiles(conf, fs, tablename, mobtabledir, column.getname(), mobfilestocompact);
查询mob compaction状态:
hbase(main):001:0> compaction_state ‘static_file’, ‘mob’
took 0.4384 seconds
=> “none”
hbase-2.0.1\hbase-shell\src\main\ruby\hbase\admin.rb :

 def major_compact(table_or_region_name, family = nil, type = 'normal')
  family_bytes = nil
  family_bytes = family.to_java_bytes unless family.nil?
  compact_type = nil
  if type == 'normal'
    compact_type = org.apache.hadoop.hbase.client.compacttype::normal
  ***elsif type == 'mob'
    compact_type = org.apache.hadoop.hbase.client.compacttype::mob***
  else
    raise argumenterror, 'only normal or mob accepted for type!'
  end

  begin
    @admin.majorcompactregion(table_or_region_name.to_java_bytes, family_bytes)
  rescue java.lang.illegalargumentexception => e
    @admin.majorcompact(tablename.valueof(table_or_region_name), family_bytes, compact_type)
  end
end

hbase-2.0.1\hbase-shell\src\main\ruby\shell\commands\compaction_state.rb:
def command(table_name, type = ‘normal’)
admin.getcompactionstate(table_name, type)
end

masterrpcservices.java:
public getregioninforesponse getregioninfo(final rpccontroller controller,
final getregioninforequest request)
master.getmobcompactionstate(tablename) /mobcompactionstates.get(tablename);
public void reportmobcompactionstart(tablename tablename)

相关的配置项:
public static final string mob_cleaner_period = “hbase.master.mob.ttl.cleaner.period”;
public static final string mob_compaction_mergeable_threshold =
“hbase.mob.compaction.mergeable.threshold”;
public static final string mob_delfile_max_count = “hbase.mob.delfile.max.count”;
public static final string mob_compaction_batch_size =
“hbase.mob.compaction.batch.size”;
public static final string mob_compaction_chore_period =
“hbase.mob.compaction.chore.period”;
public static final string mob_compaction_threads_max =
“hbase.mob.compaction.threads.max”;

(0)

相关文章:

版权声明:本文内容由互联网用户贡献,该文观点仅代表作者本人。本站仅提供信息存储服务,不拥有所有权,不承担相关法律责任。 如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 2386932994@qq.com 举报,一经查实将立刻删除。

发表评论

验证码:
Copyright © 2017-2025  代码网 保留所有权利. 粤ICP备2024248653号
站长QQ:2386932994 | 联系邮箱:2386932994@qq.com