删除线格式
 hbase2.0加持mob技术,支持小对象实时存取,具有读写强一致,低延迟,高并发等特点,并兼容所有企业级特性如snapshot,replication。
4.3.2mob compaction 压缩
 与hdfs块相比,mob文件可能相对较小,如果我们写入只有少数条目符合mob条件的行,也可能存在被删除的单元格。我们需要删除那些被删除的单元格,并将小文件合并为更大的文件,以提高hdfs的利用率。mob压缩只对小文件进行压缩,而对大文件不进行严格的压缩,避免了对大文件的无限压缩,使写放大更具可预测性。我们需要知道在mob压缩中删除了哪些单元格。因此,在每次hbase主压缩中,删除标记都会在删除之前写入del文件。
 压缩mob文件的步骤如下所示。
 
原文链接:https://blog.csdn.net/weixin_43785656/article/details/107063547
 1 减少del文件的数量,将del文件合并成更大的文件。
 2 找到所有的小的mob文件并按名称前缀将它们分组到分区中(md5(startkey)和日期的编码名称)。
 3 对于每个分区,打开扫描器到小的mob文件和所有del文件,将有效单元格写入新的更大的mob文件,同时将新的引用单元格写入新的存储文件。
 4 提交新的mob文件,将新的存储文件批量加载到hbase,并删除旧的mob文件。
 5 如果mob压缩是一个主要的压缩,那么在所有分区中完成压缩之后删除所有del文件。
原文链接:https://blog.csdn.net/weixin_43785656/article/details/107063547
hbase 2.x mob compaction流程代码详细解析
 created by 伍增田 wutommy
重大改进如下:
 移动到regionserver上通过线程池执行,提高并发性能
 hbase shell获取表的mob compaction的状态
执行hbase shell命令
 hbase(main):007:0> major_compact ‘static_file’, nil, ‘mob’
 masterrpcservices.java:
 private compactregionresponse compactmob(final compactregionrequest request,
 tablename tablename)
 1、收集哪些columnfamily设置了mob
 2、hmaster.requestmobcompaction(tablename, compactedcolumns, allfiles);
 mastermobcompactionthread.java
 mastermobcompactionthread .requestmobcompaction(conf, fs, tablename, columns, allfiles);
 mastermobcompactionthread类有2个线程池:
 private final executorservice mobcompactorpool; 负责对partition的具体的compact的执行操作
 private final executorservice mastermobpool; 收集partition,选择文件,compact “del文件”,major后删除掉del文件。
compactionrunner.run ,调用 mobutils.domobcompaction(conf, fs, tablename, hcd, pool, allfiles, lock, master);
 主要工作是这2个class: partitionedmobcompactor.java和partitionedmobcompactionrequest.java
partitionedmobcompactionrequest包含的信息有:
 protected list delpartitions;
 protected collection compactionpartitions;
 selectiontime
 protected compactiontype type = compactiontype.part_files;
 partitionedmobcompactor.java
 获取当前表的当前列簇下的所有mob文件
 compact(arrays.aslist(fs.liststatus(mobfamilydir)), allfiles);
// find the files to compact.
 partitionedmobcompactionrequest request = select(files, allfiles);
 // compact the files.
 return performcompaction(request);
 protected list 
 
 throws ioexception 
  
 // merge the del files, it is per del partition 
  
 list 
  
 // compact the mob files by partitions. 
   
 paths = compactmobfiles(request); 
   
 // archive the del files if all the mob files are selected. compactiontype.all_files 
   
 mobutils.removemobfiles(conf, fs, tablename, mobtabledir, column.getname(), 
   
 delpartition.getstorefiles()); 
  
protected list 
 
 // compact the mob files by partitions in parallel. 线程池pool中执行 
 
results.put(partition.getpartitionid(), pool.submit(new callable<list 
 
 @override 
  
 public list 
  
 log.info("compacting mob files for partition " + partition.getpartitionid()); 
   
 // new add 
   
 if(conf.getboolean(“hbase.mob.compaction.onregionserver”, false) 
   
 /* && conf.getboolean(hconstants.cluster_distributed, false) */){ 
   
 return callcompactmobfilepartitionrs(request, partition, delfiles, c, table); 
   
 } 
   
 else { 
   
 return 
   compactmobfilepartition(request, partition, delfiles, c, table); 
   
 } 
   
 } 
   
 })) 
  
private list 
 
 compactionpartition partition, 
  
 list delfiles, 
  
 connection connection, 
  
 table table) 
  
 // compact the mob files in a batch. 
  
 compactmobfilesinbatch(request, partition, connection, table, filestocompact, batch, 
  
 
  bulkloadpathofpartition, bulkloadcolumnpath, newfiles); 
 
private void compactmobfilesinbatch(partitionedmobcompactionrequest request,
 compactionpartition partition,
 connection connection, table table,
 list filestocompact, int batch,
 path bulkloadpathofpartition, path bulkloadcolumnpath,
 list 
 
 // create a temp file and open a writer for it in the bulkloadpath 
  
 reffilewriter = mobutils.createreffilewriter(conf, fs, column, bulkloadcolumnpath, 
  
 fileinfo.getsecond().longvalue(), compactioncacheconfig, cryptocontext, true); 
 
// write the mob cell to the mob file.
 writer.append(cell);
 // write the new reference cell to the store file.
 cell reference = mobutils.createmobrefcell(cell, filename, this.refcelltags);
 reffilewriter.append(reference);
 // commit mob file
 mobutils.commitfile(conf, fs, filepath, mobfamilydir, compactioncacheconfig);
 // bulkload the ref file
 bulkloadreffile(connection, table, bulkloadpathofpartition, filepath.getname());
 // archive the old mob files, do not archive the del files.
 mobutils.removemobfiles(conf, fs, tablename, mobtabledir, column.getname(), mobfilestocompact);
 查询mob compaction状态:
 hbase(main):001:0> compaction_state ‘static_file’, ‘mob’
 took 0.4384 seconds
 => “none”
 hbase-2.0.1\hbase-shell\src\main\ruby\hbase\admin.rb :
 def major_compact(table_or_region_name, family = nil, type = 'normal')
  family_bytes = nil
  family_bytes = family.to_java_bytes unless family.nil?
  compact_type = nil
  if type == 'normal'
    compact_type = org.apache.hadoop.hbase.client.compacttype::normal
  ***elsif type == 'mob'
    compact_type = org.apache.hadoop.hbase.client.compacttype::mob***
  else
    raise argumenterror, 'only normal or mob accepted for type!'
  end
  begin
    @admin.majorcompactregion(table_or_region_name.to_java_bytes, family_bytes)
  rescue java.lang.illegalargumentexception => e
    @admin.majorcompact(tablename.valueof(table_or_region_name), family_bytes, compact_type)
  end
end
 
hbase-2.0.1\hbase-shell\src\main\ruby\shell\commands\compaction_state.rb:
 def command(table_name, type = ‘normal’)
 admin.getcompactionstate(table_name, type)
 end
masterrpcservices.java:
 public getregioninforesponse getregioninfo(final rpccontroller controller,
 final getregioninforequest request)
 master.getmobcompactionstate(tablename) /mobcompactionstates.get(tablename);
 public void reportmobcompactionstart(tablename tablename)
相关的配置项:
 public static final string mob_cleaner_period = “hbase.master.mob.ttl.cleaner.period”;
 public static final string mob_compaction_mergeable_threshold =
 “hbase.mob.compaction.mergeable.threshold”;
 public static final string mob_delfile_max_count = “hbase.mob.delfile.max.count”;
 public static final string mob_compaction_batch_size =
 “hbase.mob.compaction.batch.size”;
 public static final string mob_compaction_chore_period =
 “hbase.mob.compaction.chore.period”;
 public static final string mob_compaction_threads_max =
 “hbase.mob.compaction.threads.max”;
            
                                            
                                            
                                            
发表评论