当前位置: 代码网 > it编程>软件设计>算法 > h2database源码解析-将B+树刷新到磁盘

h2database源码解析-将B+树刷新到磁盘

2024年08月06日 算法 我要评论
h2的表、索引、undo log都是使用B+树存储的,这些数据存储到B+树上之后,需要将树从内存再刷新到磁盘上。因为树刷新到磁盘,涉及到磁盘的随机访问,所以一般的数据库实现上,都是有一个后台线程以一定的频率将脏页面刷新到磁盘上。h2也是这么实现的,它是借助于BackgroundWriterThread后台线程完成脏页面的刷新。

概述

h2的表、索引、undo log都是使用b+树存储的,这些数据存储到b+树上之后,需要将树从内存再刷新到磁盘上。
因为树刷新到磁盘,涉及到磁盘的随机访问,所以一般的数据库实现上,都是有一个后台线程以一定的频率将脏页面刷新到磁盘上。h2也是这么实现的,它是借助于backgroundwriterthread后台线程完成脏页面的刷新。
这里需要注意一点,h2与mysql等数据库不同,mysql数据库当事务提交的时候会将redo log写入磁盘,这样可以保证数据库崩溃的时候借助redo log恢复,h2没有这样的机制,如果b+树没有刷新到磁盘,而此时数据库崩溃了,那么数据就会丢失,h2刷新磁盘的频率默认是default_write_delay常量指定的,可以通过参数“write_delay”修改执行频率:

    /**
     * the default delay in milliseconds before the transaction log is written.
     */
    public static final int default_write_delay = 500;

这一点在h2的官方文档里面也得到了印证:
请添加图片描述

上面的文档中提到了两个命令:

启动线程

该线程是在数据库启动的时候创建的,调用下面的方法启动线程:

    public final void setautocommitdelay(int millis) {
        if (autocommitdelay != millis) {
            autocommitdelay = millis;
            //判断数据库是否只读
            if (!isreadonly()) {
            	//停止当前正在运行的后台线程,并且关闭两个线程池serializationexecutor和buffersaveexecutor
                stopbackgroundthread(millis >= 0);
                // start the background thread if needed
                if (millis > 0 && mvstore.isopen()) {
                    int sleep = math.max(1, millis / 10);
                    //创建后台线程
                    backgroundwriterthread t = new backgroundwriterthread(this, sleep, tostring());
                    if (backgroundwriterthread.compareandset(null, t)) {
                        t.start();//启动线程
                        //下面是创建两个线程池,其作用后文介绍
                        serializationexecutor = utils.createsinglethreadexecutor("h2-serialization");
                        buffersaveexecutor = utils.createsinglethreadexecutor("h2-save");
                    }
                }
            }
        }
    }

其实set write_delay命令也是执行上面的方法。

运行线程

下面代码是线程的run()方法,该方法不断的在后台执行,主要是调用filestore.writeinbackground()完成刷新的:

        public void run() {
            while (store.isbackgroundthread()) {
                synchronized (sync) {
                    try {
                        sync.wait(sleep);
                    } catch (interruptedexception ignore) {/**/}
                }
                if (!store.isbackgroundthread()) {
                    break;
                }
                store.writeinbackground();
            }
        }

下面代码是filestore.writeinbackground():

    void writeinbackground() {
        try {
            if (mvstore.isopen() && !isreadonly()) {
                
                long time = gettimesincecreation();
                //如果最后一次磁盘刷新到现在为止超过了autocommitdelay,那么执行一次trycommit()
                //autocommitdelay就是后台线程的执行频率,默认为500ms
                if (time > lastcommittime + autocommitdelay) {
                    mvstore.trycommit();
                }
                dohousekeeping(mvstore);
                autocompactlastfileopcount = getwritecount() + getreadcount();
            }
        } catch (interruptedexception ignore) {
        } catch (throwable e) {
            if (!mvstore.handleexception(e)) {
                throw e;
            }
        }
    }

所有的刷新逻辑都在trycommit()方法中,而在trycommit()中,又会调用mvstore.store()方法,下面看下store()方法都做了什么。

store()

该方法首先检查当前是否有页面发生了变化,h2检测页面变化是通过版本号实现的,在rootreference对象里面有一个version属性,如果发生了变化,便会修改version值,,每修改一次增加1。store()方法会保存最后一次刷新磁盘时的version值,比较两个version值是否一致便可以判断出页面是否发生了变化。因此通过比较version的不同,便可以统计出所有当前打开的表、索引是否发生了变化。之后遍历这些发生变化的表或者索引即可。
在遍历开始前,会用属性lastcommittime记录当前系统时间,lastcommittime表示最后一次刷新磁盘的时间,这个属性在filestore.writeinbackground()方法里面使用到。
在上文的setautocommitdelay()方法里面启动了两个线程池,store()方法接下来便是提交一个任务到serializationexecutor线程池里面,代码如下:

		//lastcommittime:记录当前系统时间
		lastcommittime = gettimesincecreation();
		//serializationexecutor:表示线程池
		//changed:是一个arraylist<page<?,?>>对象,里面记录了所有发生变化的b+树根页面,也就是记录了发生变化的表或者索引
		//version:最新的版本号
        serializationexecutorhwm = submitorrun(serializationexecutor,
                () -> serializeandstore(syncwrite, changed, lastcommittime, version),
                syncwrite, pipe_length, serializationexecutorhwm);

主要的逻辑放在了serializeandstore()方法里面,在介绍serializeandstore()方法之前,先介绍下h2里面的chunk。

chunk

chunk的结构如下图所示,一个chunk是一个写入单位,也就是说h2每次写入的时候如果有多个页面变更,那么这些页面都组织到一个chunk里面,然后一次性写入磁盘。
请添加图片描述
通过对代码分析,每个page在写入chunk的时候,不是按照每页4096 byte要求写入的,而是有多少写多少,如果每页的内容非常少,可能多个页面写到chunk后还不够4096,那么此时h2就要对chunk进行填充了,代码如下,这要求每个chunk占用空间大小必须是4096的倍数,不够的话使用空白填充。

        // add the store header and round to the next block
        //filestore.block_size=4096
        //chunklength表示已经写入内容的大小,也就是header和page的总大小
        int length = mathutils.roundupint(chunklength + chunk.footer_length, filestore.block_size);
        buff.limit(length);

下面是chunk的header和footer的格式:

chunk:1,block:2,len:1,map:6,max:1c0,next:3,pages:2,root:4000004f8c,time:1fc,version:1
chunk:1,block:2,len:1,block:2,version:1,fletcher:aed9a4f6

各个字段含义如下(下面内容来自官网,因为我懒没有做翻译,具体可以参加代码chunk.getheaderbytes()和getfooterbytes()):

page

一个page表示一个b+树页面,在写入磁盘时,page的格式如下:

向磁盘写入b+树时,先写入根页面,之后是非叶子节点,最后是叶子节点。

写入数据

根据上面介绍的page和chunk,下面的代码应该会比较好理解。
serializeandstore()方法首先会获取新的chunk id,每新建一个chunk,chunk id会增加1,之后创建一个chunk对象,默认是sfchunk,然后再创建一个写缓冲对象writebuffer,该写缓冲对象底层使用bytebuffer存储数据 ,之后调用serializetobuffer()对page对象进行序列化,也就是按照chunk的格式要求将数据写入到writebuffer中,之后将writebuffer的bytebuffer赋值到chunk对象中。最后调用线程池buffersaveexecutor执行storebuffer()将writebuffer中的数据写入到磁盘上,写入的动作比较简单,因为序列化动作已经完成,那么直接将bytebuffer的数据写入文件即可。

    private void serializeandstore(boolean syncrun, arraylist<page<?,?>> changed, long time, long version) {
        serializationlock.lock();//加锁,防止并发
        try {
            c lastchunk = null;
            //lastchunkid用于记录当前最新的chunk id,
            //该值在数据库启动的时候会从磁盘上加载出来,以后每增加一个chunk,chunk id加1
            int chunkid = lastchunkid;
            if (chunkid != 0) {
            	//下面的代码主要是获取最大的时间,防止时间回退
                chunkid &= chunk.max_id;
                lastchunk = chunks.get(chunkid);
                assert lastchunk != null : lastchunkid + " ("+chunkid+") " + chunks;
                // never go backward in time
                time = math.max(lastchunk.time, time);
            }
            //创建一个chunk对象,默认情况下,创建的是sfchunk对象
            c c = createchunk(time, version);
            //writebuffer里面使用bytebuffer存储数据
            //writebuffer是一个写缓存,b+树的数据都会写到该对象,而这些数据最终使用bytebuffer保存
            writebuffer buff = getwritebuffer();
            //将修改的b+树页面序列化到writebuffer中
            serializetobuffer(buff, changed, c, lastchunk);
            //记录下该最新的chunk
            chunks.put(c.id, c);
			//使用线程池将writebuffer同步到磁盘上
            buffersaveexecutorhwm = submitorrun(buffersaveexecutor, () -> storebuffer(c, buff),
                    syncrun, 5, buffersaveexecutorhwm);![请添加图片描述](https://img-blog.csdnimg.cn/cba29a91f89d465a8a5bb8dd509a0d41.jpeg)


            for (page<?, ?> p : changed) {
            	//将已经保存到磁盘上的页面从内存中删除,防止占用内存过多
                p.releasesavedpages();
            }
        } catch (mvstoreexception e) {
            mvstore.panic(e);
        } catch (throwable e) {
            mvstore.panic(datautils.newmvstoreexception(datautils.error_internal, "{0}", e.tostring(), e));
        } finally {
            serializationlock.unlock();
        }
    }

在文章的开始提到了两个线程池,从上面的代码可以看出,这两个线程池一个负责将数据序列化到bytebuffer,另一个负责将bytebuffer的数据刷新到磁盘上。

(0)

相关文章:

版权声明:本文内容由互联网用户贡献,该文观点仅代表作者本人。本站仅提供信息存储服务,不拥有所有权,不承担相关法律责任。 如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 2386932994@qq.com 举报,一经查实将立刻删除。

发表评论

验证码:
Copyright © 2017-2025  代码网 保留所有权利. 粤ICP备2024248653号
站长QQ:2386932994 | 联系邮箱:2386932994@qq.com