当前位置: 代码网 > it编程>数据库>大数据 > 时序数据库VictoriaMetrics源码解析之写入与索引

时序数据库VictoriaMetrics源码解析之写入与索引

2024年05月18日 大数据 我要评论
一. 存储格式下图是向victoriametrics写入prometheus协议数据的示例:vm在收到写入请求时,会对请求中包含的时序数据做转换处理:首先,根据metrics+labels组成的met

一. 存储格式

下图是向victoriametrics写入prometheus协议数据的示例:

vm在收到写入请求时,会对请求中包含的时序数据做转换处理:

  • 首先,根据metrics+labels组成的metricname,生成一个唯一标识tsid;
  • 然后:

    • metric(指标名称__name__) + labels + tsid作为索引index;
    • tsid + timestamp + value作为数据data;
  • 最后,索引index和数据data分别进行存储和检索;

因此,vm的数据整体上分为索引和数据2个部分:

  • 索引部分,用以支持按照label或tag进行多维检索,得到tsid;
  • 数据部分,用以支持按照tsid得到tv数据;

二. 整体流程

victoriametrics在写入原始的rows数据时,写入过程分为两个部分:

  • 写index;
  • 写tv;

写入流程:

  • 对于原始的rows数据,根据其metricsname从cache和内存索引中,查找其对应的tsid;
  • 若tsid找到,则写入tv数据,返回client;
  • 否则:

    • 写index:

      • 构造tsid,构造新的index items,然后将其写入内存shard;
      • 内存shard被异步的goroutine压缩并保存到磁盘;
    • 写tv数据;
    • 返回client;

三. 写入代码

1.入口代码

vmstorage监听tcp端口,收到vminsert的插入请求后,进行处理:

// app/vmstorage/servers/vminsert.go
func (s *vminsertserver) run() {
    ...
    for {
        c, err := s.ln.accept()
        ...
        go func() {
            bc, err := handshake.vminsertserver(c, compressionlevel)
            ...
            err = clusternative.parsestream(bc, func(rows []storage.metricrow) error {
                vminsertmetricsread.add(len(rows))
                return s.storage.addrows(rows, uint8(*precisionbits))    // 入口代码
            }, s.storage.isreadonly)
            ...
        }()
    }
}

写入时,1次最多写8k个rows:

func (s *storage) addrows(mrs []metricrow, precisionbits uint8) error {
    ....
    maxblocklen := len(ic.rrs)
    for len(mrs) > 0 {
        mrsblock := mrs
        // 一次最多写8k,maxblocklen=8000
        if len(mrs) > maxblocklen {
            mrsblock = mrs[:maxblocklen]
            mrs = mrs[maxblocklen:]
        } else {
            mrs = nil
        }
        // 写入8k rows的数据
        if err := s.add(ic.rrs, ic.tmpmrs, mrsblock, precisionbits); err != nil {
            if firsterr == nil {
                firsterr = err
            }
            continue
        }
        atomic.adduint64(&rowsaddedtotal, uint64(len(mrsblock)))
    }
    ....
}

2.写入流程的代码

写入过程主要分2步:

  • 首先,为row查找或构建tsid;

    • 若该row的metricnameraw与prevmetricnameraw,则使用prevtsid;
    • 若cache中有缓存的metricnameraw,则使用缓存的metricnameraw对应的tsid;
    • 若上述都不满足,则去内存索引中查找,或者创建一个新的tsid;

      • 这一步是最耗时的;
  • 然后,构建tsid完毕后,插入tv数据;
// lib/storage/storage.go
func (s *storage) add(rows []rawrow, dstmrs []*metricrow, mrs []metricrow, precisionbits uint8) error {
    ...
    // 1.构造r.tsid
    // 若跟prevmetricnameraw相同,则使用pervtsid;
    // 若cache中有metricnameraw,则使用cache.tsid;
    for i := range mrs {
        mr := &mrs[i]
        ...
        dstmrs[j] = mr
        r := &rows[j]
        j++
        r.timestamp = mr.timestamp
        r.value = mr.value
        r.precisionbits = precisionbits
        if string(mr.metricnameraw) == string(prevmetricnameraw) {    // 使用prevtsid
            // fast path - the current mr contains the same metric name as the previous mr, so it contains the same tsid.
            // this path should trigger on bulk imports when many rows contain the same metricnameraw.
            r.tsid = prevtsid
            continue
        }
        if s.gettsidfromcache(&gentsid, mr.metricnameraw) {        // 使用缓存的tsid
            ...
            r.tsid = gentsid.tsid
            prevtsid = r.tsid
            prevmetricnameraw = mr.metricnameraw
            ...
            continue
        }
        ...
    }
    if pmrs != nil {
        // sort pendingmetricrows by canonical metric name in order to speed up search via `is` in the loop below.
        pendingmetricrows := pmrs.pmrs
        sort.slice(pendingmetricrows, func(i, j int) bool {
            return string(pendingmetricrows[i].metricname) < string(pendingmetricrows[j].metricname)
        })
        prevmetricnameraw = nil
        var slowinsertscount uint64
        for i := range pendingmetricrows {
            ...
            r := &rows[j]
            j++
            r.timestamp = mr.timestamp
            r.value = mr.value
            r.precisionbits = precisionbits
            // 尝试去index找查找,或者创建
          if err := is.getorcreatetsidbyname(&r.tsid, pmr.metricname, mr.metricnameraw, date); err != nil {
                ...
                continue
            }
            gentsid.generation = idb.generation
            gentsid.tsid = r.tsid
            // 放回cache
            s.puttsidtocache(&gentsid, mr.metricnameraw)
            prevtsid = r.tsid
            prevmetricnameraw = mr.metricnameraw
        }
    }
    ...
    dstmrs = dstmrs[:j]
    rows = rows[:j]
    err := s.updateperdatedata(rows, dstmrs)
    if err != nil {
        err = fmt.errorf("cannot update per-date data: %w", err)
    } else {
        // tsid构造完毕,开始插入数据
        err = s.tb.addrows(rows)
        ...
    }
    ...
    return nil
}

3.写index

写index是slow path,重点看一下:

  • 首先,去内存索引中找tsid,若找到,则返回;
  • 否则,创建一个新的tsid;
// lib/storage/index_db.go
func (is *indexsearch) getorcreatetsidbyname(dst *tsid, metricname, metricnameraw []byte, date uint64) error {
    // 1.首先尝试在index中查找
    if is.tsidbynamemisses < 100 {
        err := is.gettsidbymetricname(dst, metricname)
        // 在index中找到了
        if err == nil {
            // fast path - the tsid for the given metricname has been found in the index.
            is.tsidbynamemisses = 0
            if err = is.db.s.registerseriescardinality(dst.metricid, metricnameraw); err != nil {
                return err
            }
            return nil
        }
        is.tsidbynamemisses++
    } else {
        is.tsidbynameskips++
        if is.tsidbynameskips > 10000 {
            is.tsidbynameskips = 0
            is.tsidbynamemisses = 0
        }
    }
    // 2.没有找到,那么创建一个
    if err := is.createtsidbyname(dst, metricname, metricnameraw, date); err != nil {
        userreadablemetricname := getuserreadablemetricname(metricnameraw)
        return fmt.errorf("cannot create tsid by metricname %s: %w", userreadablemetricname, err)
    }
    return nil
}

4. 生成tsid

具体生成tsid的逻辑:

  • metricgroupid: 由metricgroup hash而来;
  • jobid:由tags[0].value hash而来;
  • instanceid:由tags[1].value hash而来;
// lib/storage/index_db.go
func generatetsid(dst *tsid, mn *metricname) {
    dst.accountid = mn.accountid
    dst.projectid = mn.projectid
    dst.metricgroupid = xxhash.sum64(mn.metricgroup)
    if len(mn.tags) > 0 {
        dst.jobid = uint32(xxhash.sum64(mn.tags[0].value))
    }
    if len(mn.tags) > 1 {
        dst.instanceid = uint32(xxhash.sum64(mn.tags[1].value))
    }
    dst.metricid = generateuniquemetricid()
}

而tsid中的metricid是由启动时的时间戳+1产生:

// returns local unique metricid.
func generateuniquemetricid() uint64 {
    return atomic.adduint64(&amp;nextuniquemetricid, 1)
}
var nextuniquemetricid = uint64(time.now().unixnano())

5. 创建index items

  • 创建 metricname -> tsid index;
  • 创建 metricid -> metricname index;
  • 创建 metricid -> tsid index;
  • 创建 tag -> metricid 和 metricgroup+tag -> metricid index;
  • 最后,将index items存入内存shards;
// lib/storage/index_db.go
func (is *indexsearch) createglobalindexes(tsid *tsid, mn *metricname) {
    // the order of index items is important.
    // it guarantees index consistency.
    ii := getindexitems()
    defer putindexitems(ii)
    // create metricname -> tsid index.
    ii.b = append(ii.b, nsprefixmetricnametotsid)
    ii.b = mn.marshal(ii.b)
    ii.b = append(ii.b, kvseparatorchar)
    ii.b = tsid.marshal(ii.b)
    ii.next()
    // create metricid -> metricname index.
    ii.b = marshalcommonprefix(ii.b, nsprefixmetricidtometricname, mn.accountid, mn.projectid)
    ii.b = encoding.marshaluint64(ii.b, tsid.metricid)
    ii.b = mn.marshal(ii.b)
    ii.next()
    // create metricid -> tsid index.
    ii.b = marshalcommonprefix(ii.b, nsprefixmetricidtotsid, mn.accountid, mn.projectid)
    ii.b = encoding.marshaluint64(ii.b, tsid.metricid)
    ii.b = tsid.marshal(ii.b)
    ii.next()
    prefix := kbpool.get()
    prefix.b = marshalcommonprefix(prefix.b[:0], nsprefixtagtometricids, mn.accountid, mn.projectid)
    ii.registertagindexes(prefix.b, mn, tsid.metricid)
    kbpool.put(prefix)
    is.db.tb.additems(ii.items)     // 将items存入内存shards
}

6. index items存入内存shards

index items构造完成后,被写入内存的shards,会有异步的goroutine将其压缩写入disk。

写内存shards的方法: roundrobin

  • 内存中有若干个index shards;
  • 写入时,轮转写入:idx++ % shards
// lib/mergeset/table.go
func (riss *rawitemsshards) additems(tb *table, items [][]byte) {
   shards := riss.shards
   shardslen := uint32(len(shards))
   for len(items) > 0 {
      n := atomic.adduint32(&riss.shardidx, 1)
      idx := n % shardslen
      items = shards[idx].additems(tb, items)
   }
}

内存中shards总数,跟cpu核数有关系:

  • shards总数 = (cpu*cpu + 1) / 2
  • 对于4c的机器,有8个shards;
// lib/mergeset/table.go
/ the number of shards for rawitems per table.
//
// higher number of shards reduces cpu contention and increases the max bandwidth on multi-core systems.
var rawitemsshardspertable = func() int {
   cpus := cgroup.availablecpus()
   multiplier := cpus
   if multiplier > 16 {
      multiplier = 16
   }
   return (cpus*multiplier + 1) / 2
}()

以上就是时序数据库victoriametrics源码解析之写入与索引的详细内容,更多关于victoriametrics写入索引的资料请关注代码网其它相关文章!

(0)

相关文章:

  • 安装navicat最新详细流程

    1.双击已下载好的navicat安装包,点击"下一步"2.点击我同意,在点击"下一步"3.设置navicat安装路劲, 至少要保证磁盘有90…

    2024年05月18日 数据库
  • 快速解决openGauss数据库pg_xlog爆满问题

    问题现象最近有一个之前搭的环境登不上了,好久没用想拿来测试的时候发现启动不了。启动时报错:[errno 28] no space left on device query也不行了,…

    2024年05月18日 数据库
  • 一文弄懂数据库设计的三范式

    写在前面很多数据库设计者,都是按照自己的性子和习惯来设计数据库数据表,其实不然。其实,数据库的设计也有要遵循的原则。范式,就是规范,就是指设计数据库需要(应该)遵循的原则。每个范式…

    2024年05月18日 数据库
  • hive内部表和外部表的区别详解

    hive内部表和外部表的区别详解

    hive内部表:默认创建的表是内部表。hive完全管理表(元数据和数据)的声明周期,类似于rdbms的表。当删除表时,他会删除源数据以及表的元数据。hive外部... [阅读全文]
  • 关于hive表的存储格式ORC格式的使用详解

    关于hive表的存储格式ORC格式的使用详解

    hive表的源文件存储格式:1、textfile默认格式,建表时不指定默认为这个格式,导入数据时会直接把数据文件拷贝到hdfs上不进行处理。源文件可以直接通过h... [阅读全文]
  • 如何查看Navicat加密的数据库密码

    查看navicat加密的数据库密码背景:本机装的mysql数据库密码忘记了,打开了navicat连接过数据库,不过密码是加密的,既然能加密那就能解密,哈哈哈哈。解密后发现密码居然是…

    2024年05月18日 数据库

版权声明:本文内容由互联网用户贡献,该文观点仅代表作者本人。本站仅提供信息存储服务,不拥有所有权,不承担相关法律责任。 如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 2386932994@qq.com 举报,一经查实将立刻删除。

发表评论

验证码:
Copyright © 2017-2025  代码网 保留所有权利. 粤ICP备2024248653号
站长QQ:2386932994 | 联系邮箱:2386932994@qq.com