删除线格式
hbase2.0加持mob技术,支持小对象实时存取,具有读写强一致,低延迟,高并发等特点,并兼容所有企业级特性如snapshot,replication。
4.3.2mob compaction 压缩
与hdfs块相比,mob文件可能相对较小,如果我们写入只有少数条目符合mob条件的行,也可能存在被删除的单元格。我们需要删除那些被删除的单元格,并将小文件合并为更大的文件,以提高hdfs的利用率。mob压缩只对小文件进行压缩,而对大文件不进行严格的压缩,避免了对大文件的无限压缩,使写放大更具可预测性。我们需要知道在mob压缩中删除了哪些单元格。因此,在每次hbase主压缩中,删除标记都会在删除之前写入del文件。
压缩mob文件的步骤如下所示。
原文链接:https://blog.csdn.net/weixin_43785656/article/details/107063547
1 减少del文件的数量,将del文件合并成更大的文件。
2 找到所有的小的mob文件并按名称前缀将它们分组到分区中(md5(startkey)和日期的编码名称)。
3 对于每个分区,打开扫描器到小的mob文件和所有del文件,将有效单元格写入新的更大的mob文件,同时将新的引用单元格写入新的存储文件。
4 提交新的mob文件,将新的存储文件批量加载到hbase,并删除旧的mob文件。
5 如果mob压缩是一个主要的压缩,那么在所有分区中完成压缩之后删除所有del文件。
原文链接:https://blog.csdn.net/weixin_43785656/article/details/107063547
hbase 2.x mob compaction流程代码详细解析
created by 伍增田 wutommy
重大改进如下:
移动到regionserver上通过线程池执行,提高并发性能
hbase shell获取表的mob compaction的状态
执行hbase shell命令
hbase(main):007:0> major_compact ‘static_file’, nil, ‘mob’
masterrpcservices.java:
private compactregionresponse compactmob(final compactregionrequest request,
tablename tablename)
1、收集哪些columnfamily设置了mob
2、hmaster.requestmobcompaction(tablename, compactedcolumns, allfiles);
mastermobcompactionthread.java
mastermobcompactionthread .requestmobcompaction(conf, fs, tablename, columns, allfiles);
mastermobcompactionthread类有2个线程池:
private final executorservice mobcompactorpool; 负责对partition的具体的compact的执行操作
private final executorservice mastermobpool; 收集partition,选择文件,compact “del文件”,major后删除掉del文件。
compactionrunner.run ,调用 mobutils.domobcompaction(conf, fs, tablename, hcd, pool, allfiles, lock, master);
主要工作是这2个class: partitionedmobcompactor.java和partitionedmobcompactionrequest.java
partitionedmobcompactionrequest包含的信息有:
protected list delpartitions;
protected collection compactionpartitions;
selectiontime
protected compactiontype type = compactiontype.part_files;
partitionedmobcompactor.java
获取当前表的当前列簇下的所有mob文件
compact(arrays.aslist(fs.liststatus(mobfamilydir)), allfiles);
// find the files to compact.
partitionedmobcompactionrequest request = select(files, allfiles);
// compact the files.
return performcompaction(request);
protected list
throws ioexception
// merge the del files, it is per del partition
list
// compact the mob files by partitions.
paths = compactmobfiles(request);
// archive the del files if all the mob files are selected. compactiontype.all_files
mobutils.removemobfiles(conf, fs, tablename, mobtabledir, column.getname(),
delpartition.getstorefiles());
protected list
// compact the mob files by partitions in parallel. 线程池pool中执行
results.put(partition.getpartitionid(), pool.submit(new callable<list
@override
public list
log.info("compacting mob files for partition " + partition.getpartitionid());
// new add
if(conf.getboolean(“hbase.mob.compaction.onregionserver”, false)
/* && conf.getboolean(hconstants.cluster_distributed, false) */){
return callcompactmobfilepartitionrs(request, partition, delfiles, c, table);
}
else {
return
compactmobfilepartition(request, partition, delfiles, c, table);
}
}
}))
private list
compactionpartition partition,
list delfiles,
connection connection,
table table)
// compact the mob files in a batch.
compactmobfilesinbatch(request, partition, connection, table, filestocompact, batch,
bulkloadpathofpartition, bulkloadcolumnpath, newfiles);
private void compactmobfilesinbatch(partitionedmobcompactionrequest request,
compactionpartition partition,
connection connection, table table,
list filestocompact, int batch,
path bulkloadpathofpartition, path bulkloadcolumnpath,
list
// create a temp file and open a writer for it in the bulkloadpath
reffilewriter = mobutils.createreffilewriter(conf, fs, column, bulkloadcolumnpath,
fileinfo.getsecond().longvalue(), compactioncacheconfig, cryptocontext, true);
// write the mob cell to the mob file.
writer.append(cell);
// write the new reference cell to the store file.
cell reference = mobutils.createmobrefcell(cell, filename, this.refcelltags);
reffilewriter.append(reference);
// commit mob file
mobutils.commitfile(conf, fs, filepath, mobfamilydir, compactioncacheconfig);
// bulkload the ref file
bulkloadreffile(connection, table, bulkloadpathofpartition, filepath.getname());
// archive the old mob files, do not archive the del files.
mobutils.removemobfiles(conf, fs, tablename, mobtabledir, column.getname(), mobfilestocompact);
查询mob compaction状态:
hbase(main):001:0> compaction_state ‘static_file’, ‘mob’
took 0.4384 seconds
=> “none”
hbase-2.0.1\hbase-shell\src\main\ruby\hbase\admin.rb :
def major_compact(table_or_region_name, family = nil, type = 'normal')
family_bytes = nil
family_bytes = family.to_java_bytes unless family.nil?
compact_type = nil
if type == 'normal'
compact_type = org.apache.hadoop.hbase.client.compacttype::normal
***elsif type == 'mob'
compact_type = org.apache.hadoop.hbase.client.compacttype::mob***
else
raise argumenterror, 'only normal or mob accepted for type!'
end
begin
@admin.majorcompactregion(table_or_region_name.to_java_bytes, family_bytes)
rescue java.lang.illegalargumentexception => e
@admin.majorcompact(tablename.valueof(table_or_region_name), family_bytes, compact_type)
end
end
hbase-2.0.1\hbase-shell\src\main\ruby\shell\commands\compaction_state.rb:
def command(table_name, type = ‘normal’)
admin.getcompactionstate(table_name, type)
end
masterrpcservices.java:
public getregioninforesponse getregioninfo(final rpccontroller controller,
final getregioninforequest request)
master.getmobcompactionstate(tablename) /mobcompactionstates.get(tablename);
public void reportmobcompactionstart(tablename tablename)
相关的配置项:
public static final string mob_cleaner_period = “hbase.master.mob.ttl.cleaner.period”;
public static final string mob_compaction_mergeable_threshold =
“hbase.mob.compaction.mergeable.threshold”;
public static final string mob_delfile_max_count = “hbase.mob.delfile.max.count”;
public static final string mob_compaction_batch_size =
“hbase.mob.compaction.batch.size”;
public static final string mob_compaction_chore_period =
“hbase.mob.compaction.chore.period”;
public static final string mob_compaction_threads_max =
“hbase.mob.compaction.threads.max”;
发表评论