influxdb 源码解析-tsdb

2017/06/12 database influxdb

tsdbinfluxdb的存储引擎,主要用于持久化时序数据。在分析tsdb之前,我们先要了解influxdb在使用上关于 存储的一些概念。

concept

对于influxdb中涉及到的各种概念,官方已经提供了一个词汇表, 以供用户查阅。

influxdb将数据按不同的databasemeasurements来存储。database可以类比为mysql中的database概念。 measurements可以类比为mysqltable的概念。写入一条数据时,需要制定其databasemeasurementsinfluxdb 提供一套influxql的类似sql的语法来使用,具体的操作可以参考官方文档

然后influxdb存储的数据是schemaless的,可以包含任意维度(列),每个维度为一个K-V结构,其中维度划分为tagfield2种。

tag

tag是可以建立索引的,在查询时有更好的性能,是可选的字段。通常用来存储一些meta字段。field不能建立索引,在查 询某个field的值时,需要遍历该时间区间内的全部数据。tagfield的区别主要在是否建立索引上,因此写入数据时, 选取tag还是field主要依据查询语句而定,官方有一个指导文档 可以进行参考。

shard

shard是一个存储单元,用于存储一段时间范围内的数据,同时shardretention policy直接相关,不同的过期策略 下不同的shard,每个shard底层对应一个TSM存储引擎。当过期策略检查到某一个shard过期后,会释放其对应的资源。

point

每一次写入的数据称之为point,其包含timestampmeasurementsretention policytagfield等信息,其中pointkeymeasurementtaghash后构成。 然后根据point.HashID的值和retention policy选取合适的shard,将该point写入该shard

tsdb对外提供的2个最重要的接口是:

  • CreateShard(database, retentionPolicy string, shardID uint64, enabled bool) error 创建database对应的的shard
  • WriteToShard(shardID uint64, points []models.Point) error 向指定shard写入数据

series

series 相当于是InfluxDB中一些数据的集合,在同一个database中,retention policy、measurement、tag sets 完全 相同的数据同属于一个series,同一个series的数据在物理上会按照时间顺序排列存储在一起。

series的key为measurement加所有tags的序列化字符串,这个key在之后会经常用到。

store

storeinfluxdb的存储模块,全局只有一个该实例。主要负责将数据按一定格式写入磁盘,并且维护influxdb相关的 存储概念。例如:创建/删除Shard、创建/删除retention policy、调度shard的compaction、以及最重要的WriteToShard 等等。在store内部又包含indexengine2个抽象概念,index是对应shard的索引,engine是对应shard的存储实现, 不同的engine采用不同的存储格式和策略。后面要讲的tsdb其实就是一个engine的实现。在influxdb启动时,会创建 一个store实例,然后Open它,初始化时,它会加载已经存在的Shard ,并启动一个Shard监控任务, 监控任务负责调度每个ShardCompaction和对使用inmem索引的Shard计算每种Tag拥有的数值的基数(与配置中max-values-per-tag有关)。

tsdb

我们可以先阅读以下对于tsdb官方文档。 其采用的存储模型是LSM-Tree模型,对其进行了一定的改造。将其称之为Time-Structured Merge Tree (TSM)

当一个point写入时,influxdb根据其所属的databasemeasurementstimestamp选取一个对应的shard。每个 shard对应一个TSM存储引擎。每个shard对应一段时间范围的存储。

一个TSM存储引擎包含:

  • In-Memory Indexshard之间共享,提供measurementstags,和series的索引
  • WAL 同其他database的binlog一样,当WAL的大小达到一定大小后,会重启开启一个WAL文件。
  • Cache 内存中缓存的WAL,加速查找
  • TSM Files 压缩后的series数据
  • FileStore TSM Files的封装
  • Compactor 存储数据的比较器
  • Compaction Planner 用来确定哪些TSM文件需要compaction,同时避免并发compaction之间的相互干扰
  • Compression 用于压缩持久化文件
  • Writers/Readers 用于访问文件

shard通过CreateShard 来创建。可以看出其依次创建了所需的文件目录,然后创建IndexShard 数据结构。

文件结构

influxdb指定的存储目录下的文件结构为:

.
├── data  # 配置中[data]下dir配置的路径
│   ├── _internal
│   │   └── monitor
│   │       ├── 73
│   │       │   └── 000000003-000000002.tsm
│   │       ├── 74
│   │       │   └── 000000003-000000002.tsm
│   │       └── 75
│   │           └── 000000001-000000001.tsm
│   └── testing
│       └── autogen
│           └── 2
│               └── 000000002-000000002.tsm
├── meta
│   └── meta.db
└── wal	# 配置文件[data]下wal-dir配置
    ├── _internal
    │   └── monitor
    │       ├── 73
    │       │   └── _00012.wal
    │       ├── 74
    │       │   └── _00012.wal
    │       └── 75
    │           ├── _00005.wal
    │           ├── _00006.wal
    │           └── _00007.wal
    └── testing
        └── autogen
            └── 2
                └── _00003.wal

每一个shard为目录,其目录的命名格式为:

  • $(ROOT)/data/$(Database)/$(RetentionPolicy)/$(ShardID)
  • $(ROOT)/wal/$(Database)/$(RetentionPolicy)/$(ShardID)

注:当相关shard不使用In-Memory Index(inmem)时,会使用文件型index,默认类型为tsi1,会在创建 $(ROOT)/data/$(Database)/$(RetentionPolicy)/$(ShardID)/index文件。

存储结构

每个shard由一个Index和一个Engine组成,Index负责进行反向索引数据,Engine为具体的存储模型实现 ,即上面的TSM结构。

Engine

type Engine interface {
    Open() error
    Close() error
    SetEnabled(enabled bool)
    SetCompactionsEnabled(enabled bool)

    WithLogger(zap.Logger)

    LoadMetadataIndex(shardID uint64, index Index) error

    CreateSnapshot() (string, error)
    Backup(w io.Writer, basePath string, since time.Time) error
    Restore(r io.Reader, basePath string) error
    Import(r io.Reader, basePath string) error

    CreateIterator(measurement string, opt query.IteratorOptions) (query.Iterator, error)
    IteratorCost(measurement string, opt query.IteratorOptions) (query.IteratorCost, error)
    WritePoints(points []models.Point) error

    CreateSeriesIfNotExists(key, name []byte, tags models.Tags) error
    CreateSeriesListIfNotExists(keys, names [][]byte, tags []models.Tags) error
    DeleteSeriesRange(keys [][]byte, min, max int64) error

    SeriesSketches() (estimator.Sketch, estimator.Sketch, error)
    MeasurementsSketches() (estimator.Sketch, estimator.Sketch, error)
    SeriesN() int64

    MeasurementExists(name []byte) (bool, error)
    MeasurementNamesByExpr(expr influxql.Expr) ([][]byte, error)
    MeasurementNamesByRegex(re *regexp.Regexp) ([][]byte, error)
    MeasurementFields(measurement []byte) *MeasurementFields
    ForEachMeasurementName(fn func(name []byte) error) error
    DeleteMeasurement(name []byte) error

    // TagKeys(name []byte) ([][]byte, error)
    HasTagKey(name, key []byte) (bool, error)
    MeasurementTagKeysByExpr(name []byte, expr influxql.Expr) (map[string]struct{}, error)
    MeasurementTagKeyValuesByExpr(auth query.Authorizer, name []byte, key []string, expr influxql.Expr, keysSorted bool) ([][]string, error)
    ForEachMeasurementTagKey(name []byte, fn func(key []byte) error) error
    TagKeyCardinality(name, key []byte) int

    // InfluxQL iterators
    MeasurementSeriesKeysByExpr(name []byte, condition influxql.Expr) ([][]byte, error)
    SeriesPointIterator(opt query.IteratorOptions) (query.Iterator, error)

    // Statistics will return statistics relevant to this engine.
    Statistics(tags map[string]string) []models.Statistic
    LastModified() time.Time
    DiskSize() int64
    IsIdle() bool
    Free() error

    io.WriterTo
}

influxd的API收到数据后,最终会根据所属的shard而找到对应的Engine,然后调用WritePoints 写入该Engine。在Engine中会将models.Point写入CacheWALCacheWAL中存有相同的数据,Cache主要用于查询时的读取操作,当用户查询时,可能会读取CacheTSM 文件中的数据,当数据有重复时,Cache中的数据优先级高。WAL中数据主要用于启动时的数据加载。

可以从WritePoints看出其将models.Point拆分为多个Value。 其拆分规则为:

当在databasett执行insert disk_free,hostname=server01 value1=1000i,value2=1001i 1435362189575692182 后,该条会被解析为一个models.Point,然后在写入Engine后,因为有2个Field,所以会被拆解为2个Value。 第一个Value的Key为disk_free#!~#hostname=server01value1IntegerValueIntegerValue中包含数据1000和 时间戳1435362189575692182,第二个Value为Key为disk_free#!~#hostname=server01value2IntegerValueIntegerValue中包含数据1001和时间戳1435362189575692182。

然后会将分解为的Value写入CacheWALCache在内存中的组织形式为一个两级的map,在这里就不细说了。

将分解成的ValueValues 的形式写入WAL0/1:

| type(0x01) 1 byte | data length 4 byte | data (snappy compressed Values encoding data) |

Values encoding的格式为:

    // ┌────────────────────────────────────────────────────────────────────┐
    // │                           WriteWALEntry                            │
    // ├──────┬─────────┬────────┬───────┬─────────┬─────────┬───┬──────┬───┤
    // │ Type │ Key Len │   Key  │ Count │  Time   │  Value  │...│ Type │...│
    // │1 byte│ 2 bytes │ N bytes│4 bytes│ 8 bytes │ N bytes │   │1 byte│   │
    // └──────┴─────────┴────────┴───────┴─────────┴─────────┴───┴──────┴───┘

在写WAL时,如果WAL文件大于10M 时,会发生滚动,生成一个新的WAL文件

Compaction

当数据不断写入,WAL文件的数量和Cache的大小会不断增长,因此需要一个Compaction来对数据进行压缩、文 件清理。在Engine被创建后,会启动3个与Compaction相关的任务compactCachecompactTSMFullcompactTSMLevel 分别对CacheTSM进行Compaction

  • compactCache 主要是将内存中的Cache的数据变为TSM文件,然后清除对应的WAL文件。
  • compactTSMFull/compactTSMLevel 主要是进行TSM文件的Compaction,

Cache Compaction

Cache使用内存的大小大于配置文件中cache-snapshot-memory-size的大小,或者空闲时间大于配置文件中 cache-snapshot-write-cold-duration的时间时,会触发Cache的compaction。其会将Cache中的Value数据 写入新的TSM文件 ,然后删除与该Cache对应的全部WAL文件。

TSM 文件

TSM文件有4块组成,分别为HeaderblocksIndexFooter

┌────────┬────────────────────────────────────┬─────────────┬──────────────┐
│ Header │               Blocks               │    Index    │    Footer    │
│5 bytes │              N bytes               │   N bytes   │   4 bytes    │
└────────┴────────────────────────────────────┴─────────────┴──────────────┘

其中Header的前4字节为Magic Number(大端的0x16D116D1),第5个字节为版本号(目前为1)。

┌───────────────────┐
│      Header       │
├─────────┬─────────┤
│  Magic  │ Version │
│ 4 bytes │ 1 byte  │
└─────────┴─────────┘

Blocks存储的是很多组CRC32和Data组成的Block,通常每个Block中有1000Value。每个Block中存储的都是相同Key的Value,该Block数据序列化后的长度、Value的类型、最大时间、最小 时间、在文件中的偏移地址等都存储在与之对应的Index块中。

┌───────────────────────────────────────────────────────────┐
│                          Blocks                           │
├───────────────────┬───────────────────┬───────────────────┤
│      Block 1      │      Block 2      │      Block N      │
├─────────┬─────────┼─────────┬─────────┼─────────┬─────────┤
│  CRC    │  Data   │  CRC    │  Data   │  CRC    │  Data   │
│ 4 bytes │ N bytes │ 4 bytes │ N bytes │ 4 bytes │ N bytes │
└─────────┴─────────┴─────────┴─────────┴─────────┴─────────┘

Blocks后面是Index块,Index中存储的是按Value的Key的字典序和时间戳排列的Block的元数据。 每个Block的元数据称为一个IndexEntry 。同一个Key可能拥有多个Block的数据,因为每个Block最多1000个Value,因此其IndexEntry 为一个数组。

┌────────────────────────────────────────────────────────────────────────────┐
│                                   Index                                    │
├─────────┬─────────┬──────┬───────┬─────────┬─────────┬────────┬────────┬───┤
│ Key Len │   Key   │ Type │ Count │Min Time │Max Time │ Offset │  Size  │...│
│ 2 bytes │ N bytes │1 byte│2 bytes│ 8 bytes │ 8 bytes │8 bytes │4 bytes │   │
├─────────┼─────────┼──────┼───────┼─────────┼─────────┼────────┼────────┼───┤
│ Key Len │   Key   │ Type │ Count │Min Time │Max Time │ Offset │  Size  │...│
│ 2 bytes │ N bytes │1 byte│2 bytes│ 8 bytes │ 8 bytes │8 bytes │4 bytes │   │
└─────────┴─────────┴──────┴───────┴─────────┴─────────┴────────┴────────┴───┘

根据IndexEntry中记录的时间戳范围和文件偏移地址,我们能很高效的找到我们所需数据在TSM文件中 位置。

The last section is the footer that stores the offset of the start of the index.

最后一个是Footer块,占用8个字节,存储Index块的偏移位置,Footer相对于文件末尾的位置是固定的, 读取TSM文件时,可以从文件末尾先速度出Index块的偏移量,再加载读取Index内容。

┌─────────┐
│ Footer  │
├─────────┤
│Index Ofs│
│ 8 bytes │
└─────────┘

Level Compaction

在Cache Compaction的同时,还会进行Level Compaction

Index

type Index interface {
    Open() error
    Close() error
    WithLogger(zap.Logger)

    MeasurementExists(name []byte) (bool, error)
    MeasurementNamesByExpr(expr influxql.Expr) ([][]byte, error)
    MeasurementNamesByRegex(re *regexp.Regexp) ([][]byte, error)
    DropMeasurement(name []byte) error
    ForEachMeasurementName(fn func(name []byte) error) error

    InitializeSeries(key, name []byte, tags models.Tags) error
    CreateSeriesIfNotExists(key, name []byte, tags models.Tags) error
    CreateSeriesListIfNotExists(keys, names [][]byte, tags []models.Tags) error
    DropSeries(key []byte) error

    SeriesSketches() (estimator.Sketch, estimator.Sketch, error)
    MeasurementsSketches() (estimator.Sketch, estimator.Sketch, error)
    SeriesN() int64

    HasTagKey(name, key []byte) (bool, error)
    TagSets(name []byte, options query.IteratorOptions) ([]*query.TagSet, error)
    MeasurementTagKeysByExpr(name []byte, expr influxql.Expr) (map[string]struct{}, error)
    MeasurementTagKeyValuesByExpr(auth query.Authorizer, name []byte, keys []string, expr influxql.Expr, keysSorted bool) ([][]string, error)

    ForEachMeasurementTagKey(name []byte, fn func(key []byte) error) error
    TagKeyCardinality(name, key []byte) int

    // InfluxQL system iterators
    MeasurementSeriesKeysByExpr(name []byte, condition influxql.Expr) ([][]byte, error)
    SeriesPointIterator(opt query.IteratorOptions) (query.Iterator, error)

    // Sets a shared fieldset from the engine.
    SetFieldSet(fs *MeasurementFieldSet)

    // Creates hard links inside path for snapshotting.
    SnapshotTo(path string) error

    // To be removed w/ tsi1.
    SetFieldName(measurement []byte, name string)
    AssignShard(k string, shardID uint64)
    UnassignShard(k string, shardID uint64) error
    RemoveShard(shardID uint64)

    Type() string

    Rebuild()
}

Search

    Table of Contents