一. 存储格式
下图是向victoriametrics写入prometheus协议数据的示例:
vm在收到写入请求时,会对请求中包含的时序数据做转换处理:
- 首先,根据metrics+labels组成的metricname,生成一个唯一标识tsid;
然后:
- metric(指标名称__name__) + labels + tsid作为索引index;
- tsid + timestamp + value作为数据data;
- 最后,索引index和数据data分别进行存储和检索;
因此,vm的数据整体上分为索引和数据2个部分:
- 索引部分,用以支持按照label或tag进行多维检索,得到tsid;
- 数据部分,用以支持按照tsid得到tv数据;
二. 整体流程
victoriametrics在写入原始的rows数据时,写入过程分为两个部分:
- 写index;
- 写tv;
写入流程:
- 对于原始的rows数据,根据其metricsname从cache和内存索引中,查找其对应的tsid;
- 若tsid找到,则写入tv数据,返回client;
否则:
写index:
- 构造tsid,构造新的index items,然后将其写入内存shard;
- 内存shard被异步的goroutine压缩并保存到磁盘;
- 写tv数据;
- 返回client;
三. 写入代码
1.入口代码
vmstorage监听tcp端口,收到vminsert的插入请求后,进行处理:
// app/vmstorage/servers/vminsert.go func (s *vminsertserver) run() { ... for { c, err := s.ln.accept() ... go func() { bc, err := handshake.vminsertserver(c, compressionlevel) ... err = clusternative.parsestream(bc, func(rows []storage.metricrow) error { vminsertmetricsread.add(len(rows)) return s.storage.addrows(rows, uint8(*precisionbits)) // 入口代码 }, s.storage.isreadonly) ... }() } }
写入时,1次最多写8k个rows:
func (s *storage) addrows(mrs []metricrow, precisionbits uint8) error { .... maxblocklen := len(ic.rrs) for len(mrs) > 0 { mrsblock := mrs // 一次最多写8k,maxblocklen=8000 if len(mrs) > maxblocklen { mrsblock = mrs[:maxblocklen] mrs = mrs[maxblocklen:] } else { mrs = nil } // 写入8k rows的数据 if err := s.add(ic.rrs, ic.tmpmrs, mrsblock, precisionbits); err != nil { if firsterr == nil { firsterr = err } continue } atomic.adduint64(&rowsaddedtotal, uint64(len(mrsblock))) } .... }
2.写入流程的代码
写入过程主要分2步:
首先,为row查找或构建tsid;
- 若该row的metricnameraw与prevmetricnameraw,则使用prevtsid;
- 若cache中有缓存的metricnameraw,则使用缓存的metricnameraw对应的tsid;
若上述都不满足,则去内存索引中查找,或者创建一个新的tsid;
- 这一步是最耗时的;
- 然后,构建tsid完毕后,插入tv数据;
// lib/storage/storage.go func (s *storage) add(rows []rawrow, dstmrs []*metricrow, mrs []metricrow, precisionbits uint8) error { ... // 1.构造r.tsid // 若跟prevmetricnameraw相同,则使用pervtsid; // 若cache中有metricnameraw,则使用cache.tsid; for i := range mrs { mr := &mrs[i] ... dstmrs[j] = mr r := &rows[j] j++ r.timestamp = mr.timestamp r.value = mr.value r.precisionbits = precisionbits if string(mr.metricnameraw) == string(prevmetricnameraw) { // 使用prevtsid // fast path - the current mr contains the same metric name as the previous mr, so it contains the same tsid. // this path should trigger on bulk imports when many rows contain the same metricnameraw. r.tsid = prevtsid continue } if s.gettsidfromcache(&gentsid, mr.metricnameraw) { // 使用缓存的tsid ... r.tsid = gentsid.tsid prevtsid = r.tsid prevmetricnameraw = mr.metricnameraw ... continue } ... } if pmrs != nil { // sort pendingmetricrows by canonical metric name in order to speed up search via `is` in the loop below. pendingmetricrows := pmrs.pmrs sort.slice(pendingmetricrows, func(i, j int) bool { return string(pendingmetricrows[i].metricname) < string(pendingmetricrows[j].metricname) }) prevmetricnameraw = nil var slowinsertscount uint64 for i := range pendingmetricrows { ... r := &rows[j] j++ r.timestamp = mr.timestamp r.value = mr.value r.precisionbits = precisionbits // 尝试去index找查找,或者创建 if err := is.getorcreatetsidbyname(&r.tsid, pmr.metricname, mr.metricnameraw, date); err != nil { ... continue } gentsid.generation = idb.generation gentsid.tsid = r.tsid // 放回cache s.puttsidtocache(&gentsid, mr.metricnameraw) prevtsid = r.tsid prevmetricnameraw = mr.metricnameraw } } ... dstmrs = dstmrs[:j] rows = rows[:j] err := s.updateperdatedata(rows, dstmrs) if err != nil { err = fmt.errorf("cannot update per-date data: %w", err) } else { // tsid构造完毕,开始插入数据 err = s.tb.addrows(rows) ... } ... return nil }
3.写index
写index是slow path,重点看一下:
- 首先,去内存索引中找tsid,若找到,则返回;
- 否则,创建一个新的tsid;
// lib/storage/index_db.go func (is *indexsearch) getorcreatetsidbyname(dst *tsid, metricname, metricnameraw []byte, date uint64) error { // 1.首先尝试在index中查找 if is.tsidbynamemisses < 100 { err := is.gettsidbymetricname(dst, metricname) // 在index中找到了 if err == nil { // fast path - the tsid for the given metricname has been found in the index. is.tsidbynamemisses = 0 if err = is.db.s.registerseriescardinality(dst.metricid, metricnameraw); err != nil { return err } return nil } is.tsidbynamemisses++ } else { is.tsidbynameskips++ if is.tsidbynameskips > 10000 { is.tsidbynameskips = 0 is.tsidbynamemisses = 0 } } // 2.没有找到,那么创建一个 if err := is.createtsidbyname(dst, metricname, metricnameraw, date); err != nil { userreadablemetricname := getuserreadablemetricname(metricnameraw) return fmt.errorf("cannot create tsid by metricname %s: %w", userreadablemetricname, err) } return nil }
4. 生成tsid
具体生成tsid的逻辑:
- metricgroupid: 由metricgroup hash而来;
- jobid:由tags[0].value hash而来;
- instanceid:由tags[1].value hash而来;
// lib/storage/index_db.go func generatetsid(dst *tsid, mn *metricname) { dst.accountid = mn.accountid dst.projectid = mn.projectid dst.metricgroupid = xxhash.sum64(mn.metricgroup) if len(mn.tags) > 0 { dst.jobid = uint32(xxhash.sum64(mn.tags[0].value)) } if len(mn.tags) > 1 { dst.instanceid = uint32(xxhash.sum64(mn.tags[1].value)) } dst.metricid = generateuniquemetricid() }
而tsid中的metricid是由启动时的时间戳+1产生:
// returns local unique metricid. func generateuniquemetricid() uint64 { return atomic.adduint64(&nextuniquemetricid, 1) } var nextuniquemetricid = uint64(time.now().unixnano())
5. 创建index items
- 创建 metricname -> tsid index;
- 创建 metricid -> metricname index;
- 创建 metricid -> tsid index;
- 创建 tag -> metricid 和 metricgroup+tag -> metricid index;
- 最后,将index items存入内存shards;
// lib/storage/index_db.go func (is *indexsearch) createglobalindexes(tsid *tsid, mn *metricname) { // the order of index items is important. // it guarantees index consistency. ii := getindexitems() defer putindexitems(ii) // create metricname -> tsid index. ii.b = append(ii.b, nsprefixmetricnametotsid) ii.b = mn.marshal(ii.b) ii.b = append(ii.b, kvseparatorchar) ii.b = tsid.marshal(ii.b) ii.next() // create metricid -> metricname index. ii.b = marshalcommonprefix(ii.b, nsprefixmetricidtometricname, mn.accountid, mn.projectid) ii.b = encoding.marshaluint64(ii.b, tsid.metricid) ii.b = mn.marshal(ii.b) ii.next() // create metricid -> tsid index. ii.b = marshalcommonprefix(ii.b, nsprefixmetricidtotsid, mn.accountid, mn.projectid) ii.b = encoding.marshaluint64(ii.b, tsid.metricid) ii.b = tsid.marshal(ii.b) ii.next() prefix := kbpool.get() prefix.b = marshalcommonprefix(prefix.b[:0], nsprefixtagtometricids, mn.accountid, mn.projectid) ii.registertagindexes(prefix.b, mn, tsid.metricid) kbpool.put(prefix) is.db.tb.additems(ii.items) // 将items存入内存shards }
6. index items存入内存shards
index items构造完成后,被写入内存的shards,会有异步的goroutine将其压缩写入disk。
写内存shards的方法: roundrobin
- 内存中有若干个index shards;
- 写入时,轮转写入:idx++ % shards
// lib/mergeset/table.go func (riss *rawitemsshards) additems(tb *table, items [][]byte) { shards := riss.shards shardslen := uint32(len(shards)) for len(items) > 0 { n := atomic.adduint32(&riss.shardidx, 1) idx := n % shardslen items = shards[idx].additems(tb, items) } }
内存中shards总数,跟cpu核数有关系:
- shards总数 = (cpu*cpu + 1) / 2
- 对于4c的机器,有8个shards;
// lib/mergeset/table.go / the number of shards for rawitems per table. // // higher number of shards reduces cpu contention and increases the max bandwidth on multi-core systems. var rawitemsshardspertable = func() int { cpus := cgroup.availablecpus() multiplier := cpus if multiplier > 16 { multiplier = 16 } return (cpus*multiplier + 1) / 2 }()
以上就是时序数据库victoriametrics源码解析之写入与索引的详细内容,更多关于victoriametrics写入索引的资料请关注代码网其它相关文章!
发表评论