概述
h2的表、索引、undo log都是使用b+树存储的,这些数据存储到b+树上之后,需要将树从内存再刷新到磁盘上。
因为树刷新到磁盘,涉及到磁盘的随机访问,所以一般的数据库实现上,都是有一个后台线程以一定的频率将脏页面刷新到磁盘上。h2也是这么实现的,它是借助于backgroundwriterthread后台线程完成脏页面的刷新。
这里需要注意一点,h2与mysql等数据库不同,mysql数据库当事务提交的时候会将redo log写入磁盘,这样可以保证数据库崩溃的时候借助redo log恢复,h2没有这样的机制,如果b+树没有刷新到磁盘,而此时数据库崩溃了,那么数据就会丢失,h2刷新磁盘的频率默认是default_write_delay
常量指定的,可以通过参数“write_delay
”修改执行频率:
/**
* the default delay in milliseconds before the transaction log is written.
*/
public static final int default_write_delay = 500;
这一点在h2的官方文档里面也得到了印证:
上面的文档中提到了两个命令:
启动线程
该线程是在数据库启动的时候创建的,调用下面的方法启动线程:
public final void setautocommitdelay(int millis) {
if (autocommitdelay != millis) {
autocommitdelay = millis;
//判断数据库是否只读
if (!isreadonly()) {
//停止当前正在运行的后台线程,并且关闭两个线程池serializationexecutor和buffersaveexecutor
stopbackgroundthread(millis >= 0);
// start the background thread if needed
if (millis > 0 && mvstore.isopen()) {
int sleep = math.max(1, millis / 10);
//创建后台线程
backgroundwriterthread t = new backgroundwriterthread(this, sleep, tostring());
if (backgroundwriterthread.compareandset(null, t)) {
t.start();//启动线程
//下面是创建两个线程池,其作用后文介绍
serializationexecutor = utils.createsinglethreadexecutor("h2-serialization");
buffersaveexecutor = utils.createsinglethreadexecutor("h2-save");
}
}
}
}
}
其实set write_delay
命令也是执行上面的方法。
运行线程
下面代码是线程的run()方法,该方法不断的在后台执行,主要是调用filestore.writeinbackground()完成刷新的:
public void run() {
while (store.isbackgroundthread()) {
synchronized (sync) {
try {
sync.wait(sleep);
} catch (interruptedexception ignore) {/**/}
}
if (!store.isbackgroundthread()) {
break;
}
store.writeinbackground();
}
}
下面代码是filestore.writeinbackground():
void writeinbackground() {
try {
if (mvstore.isopen() && !isreadonly()) {
long time = gettimesincecreation();
//如果最后一次磁盘刷新到现在为止超过了autocommitdelay,那么执行一次trycommit()
//autocommitdelay就是后台线程的执行频率,默认为500ms
if (time > lastcommittime + autocommitdelay) {
mvstore.trycommit();
}
dohousekeeping(mvstore);
autocompactlastfileopcount = getwritecount() + getreadcount();
}
} catch (interruptedexception ignore) {
} catch (throwable e) {
if (!mvstore.handleexception(e)) {
throw e;
}
}
}
所有的刷新逻辑都在trycommit()方法中,而在trycommit()中,又会调用mvstore.store()方法,下面看下store()方法都做了什么。
store()
该方法首先检查当前是否有页面发生了变化,h2检测页面变化是通过版本号实现的,在rootreference对象里面有一个version属性,如果发生了变化,便会修改version值,,每修改一次增加1。store()方法会保存最后一次刷新磁盘时的version值,比较两个version值是否一致便可以判断出页面是否发生了变化。因此通过比较version的不同,便可以统计出所有当前打开的表、索引是否发生了变化。之后遍历这些发生变化的表或者索引即可。
在遍历开始前,会用属性lastcommittime
记录当前系统时间,lastcommittime
表示最后一次刷新磁盘的时间,这个属性在filestore.writeinbackground()方法里面使用到。
在上文的setautocommitdelay()方法里面启动了两个线程池,store()方法接下来便是提交一个任务到serializationexecutor线程池里面,代码如下:
//lastcommittime:记录当前系统时间
lastcommittime = gettimesincecreation();
//serializationexecutor:表示线程池
//changed:是一个arraylist<page<?,?>>对象,里面记录了所有发生变化的b+树根页面,也就是记录了发生变化的表或者索引
//version:最新的版本号
serializationexecutorhwm = submitorrun(serializationexecutor,
() -> serializeandstore(syncwrite, changed, lastcommittime, version),
syncwrite, pipe_length, serializationexecutorhwm);
主要的逻辑放在了serializeandstore()方法里面,在介绍serializeandstore()方法之前,先介绍下h2里面的chunk。
chunk
chunk的结构如下图所示,一个chunk是一个写入单位,也就是说h2每次写入的时候如果有多个页面变更,那么这些页面都组织到一个chunk里面,然后一次性写入磁盘。
通过对代码分析,每个page在写入chunk的时候,不是按照每页4096 byte要求写入的,而是有多少写多少,如果每页的内容非常少,可能多个页面写到chunk后还不够4096,那么此时h2就要对chunk进行填充了,代码如下,这要求每个chunk占用空间大小必须是4096的倍数,不够的话使用空白填充。
// add the store header and round to the next block
//filestore.block_size=4096
//chunklength表示已经写入内容的大小,也就是header和page的总大小
int length = mathutils.roundupint(chunklength + chunk.footer_length, filestore.block_size);
buff.limit(length);
下面是chunk的header和footer的格式:
chunk:1,block:2,len:1,map:6,max:1c0,next:3,pages:2,root:4000004f8c,time:1fc,version:1
chunk:1,block:2,len:1,block:2,version:1,fletcher:aed9a4f6
各个字段含义如下(下面内容来自官网,因为我懒没有做翻译,具体可以参加代码chunk.getheaderbytes()和getfooterbytes()):
page
一个page表示一个b+树页面,在写入磁盘时,page的格式如下:
向磁盘写入b+树时,先写入根页面,之后是非叶子节点,最后是叶子节点。
写入数据
根据上面介绍的page和chunk,下面的代码应该会比较好理解。
serializeandstore()方法首先会获取新的chunk id,每新建一个chunk,chunk id会增加1,之后创建一个chunk对象,默认是sfchunk,然后再创建一个写缓冲对象writebuffer,该写缓冲对象底层使用bytebuffer存储数据 ,之后调用serializetobuffer()对page对象进行序列化,也就是按照chunk的格式要求将数据写入到writebuffer中,之后将writebuffer的bytebuffer赋值到chunk对象中。最后调用线程池buffersaveexecutor执行storebuffer()将writebuffer中的数据写入到磁盘上,写入的动作比较简单,因为序列化动作已经完成,那么直接将bytebuffer的数据写入文件即可。
private void serializeandstore(boolean syncrun, arraylist<page<?,?>> changed, long time, long version) {
serializationlock.lock();//加锁,防止并发
try {
c lastchunk = null;
//lastchunkid用于记录当前最新的chunk id,
//该值在数据库启动的时候会从磁盘上加载出来,以后每增加一个chunk,chunk id加1
int chunkid = lastchunkid;
if (chunkid != 0) {
//下面的代码主要是获取最大的时间,防止时间回退
chunkid &= chunk.max_id;
lastchunk = chunks.get(chunkid);
assert lastchunk != null : lastchunkid + " ("+chunkid+") " + chunks;
// never go backward in time
time = math.max(lastchunk.time, time);
}
//创建一个chunk对象,默认情况下,创建的是sfchunk对象
c c = createchunk(time, version);
//writebuffer里面使用bytebuffer存储数据
//writebuffer是一个写缓存,b+树的数据都会写到该对象,而这些数据最终使用bytebuffer保存
writebuffer buff = getwritebuffer();
//将修改的b+树页面序列化到writebuffer中
serializetobuffer(buff, changed, c, lastchunk);
//记录下该最新的chunk
chunks.put(c.id, c);
//使用线程池将writebuffer同步到磁盘上
buffersaveexecutorhwm = submitorrun(buffersaveexecutor, () -> storebuffer(c, buff),
syncrun, 5, buffersaveexecutorhwm);
for (page<?, ?> p : changed) {
//将已经保存到磁盘上的页面从内存中删除,防止占用内存过多
p.releasesavedpages();
}
} catch (mvstoreexception e) {
mvstore.panic(e);
} catch (throwable e) {
mvstore.panic(datautils.newmvstoreexception(datautils.error_internal, "{0}", e.tostring(), e));
} finally {
serializationlock.unlock();
}
}
在文章的开始提到了两个线程池,从上面的代码可以看出,这两个线程池一个负责将数据序列化到bytebuffer,另一个负责将bytebuffer的数据刷新到磁盘上。
发表评论