一. 存储格式
下图是向victoriametrics写入prometheus协议数据的示例:

vm在收到写入请求时,会对请求中包含的时序数据做转换处理:
- 首先,根据metrics+labels组成的metricname,生成一个唯一标识tsid;
然后:
- metric(指标名称__name__) + labels + tsid作为索引index;
- tsid + timestamp + value作为数据data;
- 最后,索引index和数据data分别进行存储和检索;

因此,vm的数据整体上分为索引和数据2个部分:
- 索引部分,用以支持按照label或tag进行多维检索,得到tsid;
- 数据部分,用以支持按照tsid得到tv数据;
二. 整体流程
victoriametrics在写入原始的rows数据时,写入过程分为两个部分:
- 写index;
- 写tv;
写入流程:
- 对于原始的rows数据,根据其metricsname从cache和内存索引中,查找其对应的tsid;
- 若tsid找到,则写入tv数据,返回client;
否则:
写index:
- 构造tsid,构造新的index items,然后将其写入内存shard;
- 内存shard被异步的goroutine压缩并保存到磁盘;
- 写tv数据;
- 返回client;

三. 写入代码
1.入口代码
vmstorage监听tcp端口,收到vminsert的插入请求后,进行处理:
// app/vmstorage/servers/vminsert.go
func (s *vminsertserver) run() {
...
for {
c, err := s.ln.accept()
...
go func() {
bc, err := handshake.vminsertserver(c, compressionlevel)
...
err = clusternative.parsestream(bc, func(rows []storage.metricrow) error {
vminsertmetricsread.add(len(rows))
return s.storage.addrows(rows, uint8(*precisionbits)) // 入口代码
}, s.storage.isreadonly)
...
}()
}
}写入时,1次最多写8k个rows:
func (s *storage) addrows(mrs []metricrow, precisionbits uint8) error {
....
maxblocklen := len(ic.rrs)
for len(mrs) > 0 {
mrsblock := mrs
// 一次最多写8k,maxblocklen=8000
if len(mrs) > maxblocklen {
mrsblock = mrs[:maxblocklen]
mrs = mrs[maxblocklen:]
} else {
mrs = nil
}
// 写入8k rows的数据
if err := s.add(ic.rrs, ic.tmpmrs, mrsblock, precisionbits); err != nil {
if firsterr == nil {
firsterr = err
}
continue
}
atomic.adduint64(&rowsaddedtotal, uint64(len(mrsblock)))
}
....
}2.写入流程的代码
写入过程主要分2步:
首先,为row查找或构建tsid;
- 若该row的metricnameraw与prevmetricnameraw,则使用prevtsid;
- 若cache中有缓存的metricnameraw,则使用缓存的metricnameraw对应的tsid;
若上述都不满足,则去内存索引中查找,或者创建一个新的tsid;
- 这一步是最耗时的;
- 然后,构建tsid完毕后,插入tv数据;
// lib/storage/storage.go
func (s *storage) add(rows []rawrow, dstmrs []*metricrow, mrs []metricrow, precisionbits uint8) error {
...
// 1.构造r.tsid
// 若跟prevmetricnameraw相同,则使用pervtsid;
// 若cache中有metricnameraw,则使用cache.tsid;
for i := range mrs {
mr := &mrs[i]
...
dstmrs[j] = mr
r := &rows[j]
j++
r.timestamp = mr.timestamp
r.value = mr.value
r.precisionbits = precisionbits
if string(mr.metricnameraw) == string(prevmetricnameraw) { // 使用prevtsid
// fast path - the current mr contains the same metric name as the previous mr, so it contains the same tsid.
// this path should trigger on bulk imports when many rows contain the same metricnameraw.
r.tsid = prevtsid
continue
}
if s.gettsidfromcache(&gentsid, mr.metricnameraw) { // 使用缓存的tsid
...
r.tsid = gentsid.tsid
prevtsid = r.tsid
prevmetricnameraw = mr.metricnameraw
...
continue
}
...
}
if pmrs != nil {
// sort pendingmetricrows by canonical metric name in order to speed up search via `is` in the loop below.
pendingmetricrows := pmrs.pmrs
sort.slice(pendingmetricrows, func(i, j int) bool {
return string(pendingmetricrows[i].metricname) < string(pendingmetricrows[j].metricname)
})
prevmetricnameraw = nil
var slowinsertscount uint64
for i := range pendingmetricrows {
...
r := &rows[j]
j++
r.timestamp = mr.timestamp
r.value = mr.value
r.precisionbits = precisionbits
// 尝试去index找查找,或者创建
if err := is.getorcreatetsidbyname(&r.tsid, pmr.metricname, mr.metricnameraw, date); err != nil {
...
continue
}
gentsid.generation = idb.generation
gentsid.tsid = r.tsid
// 放回cache
s.puttsidtocache(&gentsid, mr.metricnameraw)
prevtsid = r.tsid
prevmetricnameraw = mr.metricnameraw
}
}
...
dstmrs = dstmrs[:j]
rows = rows[:j]
err := s.updateperdatedata(rows, dstmrs)
if err != nil {
err = fmt.errorf("cannot update per-date data: %w", err)
} else {
// tsid构造完毕,开始插入数据
err = s.tb.addrows(rows)
...
}
...
return nil
}3.写index
写index是slow path,重点看一下:
- 首先,去内存索引中找tsid,若找到,则返回;
- 否则,创建一个新的tsid;
// lib/storage/index_db.go
func (is *indexsearch) getorcreatetsidbyname(dst *tsid, metricname, metricnameraw []byte, date uint64) error {
// 1.首先尝试在index中查找
if is.tsidbynamemisses < 100 {
err := is.gettsidbymetricname(dst, metricname)
// 在index中找到了
if err == nil {
// fast path - the tsid for the given metricname has been found in the index.
is.tsidbynamemisses = 0
if err = is.db.s.registerseriescardinality(dst.metricid, metricnameraw); err != nil {
return err
}
return nil
}
is.tsidbynamemisses++
} else {
is.tsidbynameskips++
if is.tsidbynameskips > 10000 {
is.tsidbynameskips = 0
is.tsidbynamemisses = 0
}
}
// 2.没有找到,那么创建一个
if err := is.createtsidbyname(dst, metricname, metricnameraw, date); err != nil {
userreadablemetricname := getuserreadablemetricname(metricnameraw)
return fmt.errorf("cannot create tsid by metricname %s: %w", userreadablemetricname, err)
}
return nil
}4. 生成tsid
具体生成tsid的逻辑:
- metricgroupid: 由metricgroup hash而来;
- jobid:由tags[0].value hash而来;
- instanceid:由tags[1].value hash而来;
// lib/storage/index_db.go
func generatetsid(dst *tsid, mn *metricname) {
dst.accountid = mn.accountid
dst.projectid = mn.projectid
dst.metricgroupid = xxhash.sum64(mn.metricgroup)
if len(mn.tags) > 0 {
dst.jobid = uint32(xxhash.sum64(mn.tags[0].value))
}
if len(mn.tags) > 1 {
dst.instanceid = uint32(xxhash.sum64(mn.tags[1].value))
}
dst.metricid = generateuniquemetricid()
}而tsid中的metricid是由启动时的时间戳+1产生:
// returns local unique metricid.
func generateuniquemetricid() uint64 {
return atomic.adduint64(&nextuniquemetricid, 1)
}
var nextuniquemetricid = uint64(time.now().unixnano())5. 创建index items
- 创建 metricname -> tsid index;
- 创建 metricid -> metricname index;
- 创建 metricid -> tsid index;
- 创建 tag -> metricid 和 metricgroup+tag -> metricid index;
- 最后,将index items存入内存shards;
// lib/storage/index_db.go
func (is *indexsearch) createglobalindexes(tsid *tsid, mn *metricname) {
// the order of index items is important.
// it guarantees index consistency.
ii := getindexitems()
defer putindexitems(ii)
// create metricname -> tsid index.
ii.b = append(ii.b, nsprefixmetricnametotsid)
ii.b = mn.marshal(ii.b)
ii.b = append(ii.b, kvseparatorchar)
ii.b = tsid.marshal(ii.b)
ii.next()
// create metricid -> metricname index.
ii.b = marshalcommonprefix(ii.b, nsprefixmetricidtometricname, mn.accountid, mn.projectid)
ii.b = encoding.marshaluint64(ii.b, tsid.metricid)
ii.b = mn.marshal(ii.b)
ii.next()
// create metricid -> tsid index.
ii.b = marshalcommonprefix(ii.b, nsprefixmetricidtotsid, mn.accountid, mn.projectid)
ii.b = encoding.marshaluint64(ii.b, tsid.metricid)
ii.b = tsid.marshal(ii.b)
ii.next()
prefix := kbpool.get()
prefix.b = marshalcommonprefix(prefix.b[:0], nsprefixtagtometricids, mn.accountid, mn.projectid)
ii.registertagindexes(prefix.b, mn, tsid.metricid)
kbpool.put(prefix)
is.db.tb.additems(ii.items) // 将items存入内存shards
}6. index items存入内存shards
index items构造完成后,被写入内存的shards,会有异步的goroutine将其压缩写入disk。
写内存shards的方法: roundrobin
- 内存中有若干个index shards;
- 写入时,轮转写入:idx++ % shards
// lib/mergeset/table.go
func (riss *rawitemsshards) additems(tb *table, items [][]byte) {
shards := riss.shards
shardslen := uint32(len(shards))
for len(items) > 0 {
n := atomic.adduint32(&riss.shardidx, 1)
idx := n % shardslen
items = shards[idx].additems(tb, items)
}
}内存中shards总数,跟cpu核数有关系:
- shards总数 = (cpu*cpu + 1) / 2
- 对于4c的机器,有8个shards;
// lib/mergeset/table.go
/ the number of shards for rawitems per table.
//
// higher number of shards reduces cpu contention and increases the max bandwidth on multi-core systems.
var rawitemsshardspertable = func() int {
cpus := cgroup.availablecpus()
multiplier := cpus
if multiplier > 16 {
multiplier = 16
}
return (cpus*multiplier + 1) / 2
}()以上就是时序数据库victoriametrics源码解析之写入与索引的详细内容,更多关于victoriametrics写入索引的资料请关注代码网其它相关文章!
发表评论