tsdb是influxdb的存储引擎,主要用于持久化时序数据。在分析tsdb之前,我们先要了解influxdb在使用上关于 存储的一些概念。
concept
对于influxdb中涉及到的各种概念,官方已经提供了一个词汇表, 以供用户查阅。
influxdb将数据按不同的database和measurements来存储。database可以类比为mysql中的database概念。 measurements可以类比为mysql中table的概念。写入一条数据时,需要制定其database和measurements。influxdb 提供一套influxql的类似sql的语法来使用,具体的操作可以参考官方文档
然后influxdb存储的数据是schemaless的,可以包含任意维度(列),每个维度为一个K-V结构,其中维度划分为tag和 field2种。
tag
tag是可以建立索引的,在查询时有更好的性能,是可选的字段。通常用来存储一些meta字段。field不能建立索引,在查 询某个field的值时,需要遍历该时间区间内的全部数据。tag跟field的区别主要在是否建立索引上,因此写入数据时, 选取tag还是field主要依据查询语句而定,官方有一个指导文档 可以进行参考。
shard
shard是一个存储单元,用于存储一段时间范围内的数据,同时shard与retention policy直接相关,不同的过期策略 下不同的shard,每个shard底层对应一个TSM存储引擎。当过期策略检查到某一个shard过期后,会释放其对应的资源。
point
每一次写入的数据称之为point,其包含timestamp、measurements、retention policy、tag和field等信息,其中point的key由 measurement和taghash后构成。 然后根据point.HashID的值和retention policy选取合适的shard,将该point写入该shard。
tsdb对外提供的2个最重要的接口是:
CreateShard(database, retentionPolicy string, shardID uint64, enabled bool) error创建database对应的的shardWriteToShard(shardID uint64, points []models.Point) error向指定shard写入数据
series
series 相当于是InfluxDB中一些数据的集合,在同一个database中,retention policy、measurement、tag sets 完全 相同的数据同属于一个series,同一个series的数据在物理上会按照时间顺序排列存储在一起。
series的key为measurement加所有tags的序列化字符串,这个key在之后会经常用到。
store
store是influxdb的存储模块,全局只有一个该实例。主要负责将数据按一定格式写入磁盘,并且维护influxdb相关的 存储概念。例如:创建/删除Shard、创建/删除retention policy、调度shard的compaction、以及最重要的WriteToShard 等等。在store内部又包含index和engine2个抽象概念,index是对应shard的索引,engine是对应shard的存储实现, 不同的engine采用不同的存储格式和策略。后面要讲的tsdb其实就是一个engine的实现。在influxdb启动时,会创建 一个store实例,然后Open它,初始化时,它会加载已经存在的Shard ,并启动一个Shard监控任务, 监控任务负责调度每个Shard的Compaction和对使用inmem索引的Shard计算每种Tag拥有的数值的基数(与配置中max-values-per-tag有关)。
tsdb
我们可以先阅读以下对于tsdb的官方文档。 其采用的存储模型是LSM-Tree模型,对其进行了一定的改造。将其称之为Time-Structured Merge Tree (TSM)
当一个point写入时,influxdb根据其所属的database、measurements和timestamp选取一个对应的shard。每个 shard对应一个TSM存储引擎。每个shard对应一段时间范围的存储。
一个TSM存储引擎包含:
In-Memory Index在shard之间共享,提供measurements,tags,和series的索引WAL同其他database的binlog一样,当WAL的大小达到一定大小后,会重启开启一个WAL文件。Cache内存中缓存的WAL,加速查找TSM Files压缩后的series数据FileStoreTSM Files的封装Compactor存储数据的比较器Compaction Planner用来确定哪些TSM文件需要compaction,同时避免并发compaction之间的相互干扰Compression用于压缩持久化文件Writers/Readers用于访问文件
shard通过CreateShard 来创建。可以看出其依次创建了所需的文件目录,然后创建Index 和Shard 数据结构。
文件结构
在influxdb指定的存储目录下的文件结构为:
.
├── data # 配置中[data]下dir配置的路径
│ ├── _internal
│ │ └── monitor
│ │ ├── 73
│ │ │ └── 000000003-000000002.tsm
│ │ ├── 74
│ │ │ └── 000000003-000000002.tsm
│ │ └── 75
│ │ └── 000000001-000000001.tsm
│ └── testing
│ └── autogen
│ └── 2
│ └── 000000002-000000002.tsm
├── meta
│ └── meta.db
└── wal # 配置文件[data]下wal-dir配置
├── _internal
│ └── monitor
│ ├── 73
│ │ └── _00012.wal
│ ├── 74
│ │ └── _00012.wal
│ └── 75
│ ├── _00005.wal
│ ├── _00006.wal
│ └── _00007.wal
└── testing
└── autogen
└── 2
└── _00003.wal
每一个shard为目录,其目录的命名格式为:
$(ROOT)/data/$(Database)/$(RetentionPolicy)/$(ShardID)$(ROOT)/wal/$(Database)/$(RetentionPolicy)/$(ShardID)
注:当相关shard不使用In-Memory Index(inmem)时,会使用文件型index,默认类型为tsi1,会在创建 $(ROOT)/data/$(Database)/$(RetentionPolicy)/$(ShardID)/index文件。
存储结构
每个shard由一个Index和一个Engine组成,Index负责进行反向索引数据,Engine为具体的存储模型实现 ,即上面的TSM结构。
Engine
type Engine interface {
Open() error
Close() error
SetEnabled(enabled bool)
SetCompactionsEnabled(enabled bool)
WithLogger(zap.Logger)
LoadMetadataIndex(shardID uint64, index Index) error
CreateSnapshot() (string, error)
Backup(w io.Writer, basePath string, since time.Time) error
Restore(r io.Reader, basePath string) error
Import(r io.Reader, basePath string) error
CreateIterator(measurement string, opt query.IteratorOptions) (query.Iterator, error)
IteratorCost(measurement string, opt query.IteratorOptions) (query.IteratorCost, error)
WritePoints(points []models.Point) error
CreateSeriesIfNotExists(key, name []byte, tags models.Tags) error
CreateSeriesListIfNotExists(keys, names [][]byte, tags []models.Tags) error
DeleteSeriesRange(keys [][]byte, min, max int64) error
SeriesSketches() (estimator.Sketch, estimator.Sketch, error)
MeasurementsSketches() (estimator.Sketch, estimator.Sketch, error)
SeriesN() int64
MeasurementExists(name []byte) (bool, error)
MeasurementNamesByExpr(expr influxql.Expr) ([][]byte, error)
MeasurementNamesByRegex(re *regexp.Regexp) ([][]byte, error)
MeasurementFields(measurement []byte) *MeasurementFields
ForEachMeasurementName(fn func(name []byte) error) error
DeleteMeasurement(name []byte) error
// TagKeys(name []byte) ([][]byte, error)
HasTagKey(name, key []byte) (bool, error)
MeasurementTagKeysByExpr(name []byte, expr influxql.Expr) (map[string]struct{}, error)
MeasurementTagKeyValuesByExpr(auth query.Authorizer, name []byte, key []string, expr influxql.Expr, keysSorted bool) ([][]string, error)
ForEachMeasurementTagKey(name []byte, fn func(key []byte) error) error
TagKeyCardinality(name, key []byte) int
// InfluxQL iterators
MeasurementSeriesKeysByExpr(name []byte, condition influxql.Expr) ([][]byte, error)
SeriesPointIterator(opt query.IteratorOptions) (query.Iterator, error)
// Statistics will return statistics relevant to this engine.
Statistics(tags map[string]string) []models.Statistic
LastModified() time.Time
DiskSize() int64
IsIdle() bool
Free() error
io.WriterTo
}
当influxd的API收到数据后,最终会根据所属的shard而找到对应的Engine,然后调用WritePoints 写入该Engine。在Engine中会将models.Point写入Cache 和WAL。 Cache和WAL中存有相同的数据,Cache主要用于查询时的读取操作,当用户查询时,可能会读取Cache和TSM 文件中的数据,当数据有重复时,Cache中的数据优先级高。WAL中数据主要用于启动时的数据加载。
可以从WritePoints看出其将models.Point拆分为多个Value。 其拆分规则为:
当在databasett执行insert disk_free,hostname=server01 value1=1000i,value2=1001i 1435362189575692182 后,该条会被解析为一个models.Point,然后在写入Engine后,因为有2个Field,所以会被拆解为2个Value。 第一个Value的Key为disk_free#!~#hostname=server01value1的IntegerValue,IntegerValue中包含数据1000和 时间戳1435362189575692182,第二个Value为Key为disk_free#!~#hostname=server01value2的IntegerValue, IntegerValue中包含数据1001和时间戳1435362189575692182。
然后会将分解为的Value写入Cache和WAL。Cache在内存中的组织形式为一个两级的map,在这里就不细说了。
将分解成的Value以Values 的形式写入WAL0/1:
| type(0x01) 1 byte | data length 4 byte | data (snappy compressed Values encoding data) |
// ┌────────────────────────────────────────────────────────────────────┐
// │ WriteWALEntry │
// ├──────┬─────────┬────────┬───────┬─────────┬─────────┬───┬──────┬───┤
// │ Type │ Key Len │ Key │ Count │ Time │ Value │...│ Type │...│
// │1 byte│ 2 bytes │ N bytes│4 bytes│ 8 bytes │ N bytes │ │1 byte│ │
// └──────┴─────────┴────────┴───────┴─────────┴─────────┴───┴──────┴───┘
在写WAL时,如果WAL文件大于10M 时,会发生滚动,生成一个新的WAL文件。
Compaction
当数据不断写入,WAL文件的数量和Cache的大小会不断增长,因此需要一个Compaction来对数据进行压缩、文 件清理。在Engine被创建后,会启动3个与Compaction相关的任务compactCache 、compactTSMFull 和compactTSMLevel 分别对Cache和TSM进行Compaction。
compactCache主要是将内存中的Cache的数据变为TSM文件,然后清除对应的WAL文件。compactTSMFull/compactTSMLevel主要是进行TSM文件的Compaction,
Cache Compaction
当Cache使用内存的大小大于配置文件中cache-snapshot-memory-size的大小,或者空闲时间大于配置文件中 cache-snapshot-write-cold-duration的时间时,会触发Cache的compaction。其会将Cache中的Value数据 写入新的TSM文件 ,然后删除与该Cache对应的全部WAL文件。
TSM 文件
TSM文件有4块组成,分别为Header、blocks、Index和Footer。
┌────────┬────────────────────────────────────┬─────────────┬──────────────┐
│ Header │ Blocks │ Index │ Footer │
│5 bytes │ N bytes │ N bytes │ 4 bytes │
└────────┴────────────────────────────────────┴─────────────┴──────────────┘
其中Header的前4字节为Magic Number(大端的0x16D116D1),第5个字节为版本号(目前为1)。
┌───────────────────┐
│ Header │
├─────────┬─────────┤
│ Magic │ Version │
│ 4 bytes │ 1 byte │
└─────────┴─────────┘
Blocks存储的是很多组CRC32和Data组成的Block,通常每个Block中有1000 个Value。每个Block中存储的都是相同Key的Value,该Block数据序列化后的长度、Value的类型、最大时间、最小 时间、在文件中的偏移地址等都存储在与之对应的Index块中。
┌───────────────────────────────────────────────────────────┐
│ Blocks │
├───────────────────┬───────────────────┬───────────────────┤
│ Block 1 │ Block 2 │ Block N │
├─────────┬─────────┼─────────┬─────────┼─────────┬─────────┤
│ CRC │ Data │ CRC │ Data │ CRC │ Data │
│ 4 bytes │ N bytes │ 4 bytes │ N bytes │ 4 bytes │ N bytes │
└─────────┴─────────┴─────────┴─────────┴─────────┴─────────┘
Blocks后面是Index块,Index中存储的是按Value的Key的字典序和时间戳排列的Block的元数据。 每个Block的元数据称为一个IndexEntry 。同一个Key可能拥有多个Block的数据,因为每个Block最多1000个Value,因此其IndexEntry 为一个数组。
┌────────────────────────────────────────────────────────────────────────────┐
│ Index │
├─────────┬─────────┬──────┬───────┬─────────┬─────────┬────────┬────────┬───┤
│ Key Len │ Key │ Type │ Count │Min Time │Max Time │ Offset │ Size │...│
│ 2 bytes │ N bytes │1 byte│2 bytes│ 8 bytes │ 8 bytes │8 bytes │4 bytes │ │
├─────────┼─────────┼──────┼───────┼─────────┼─────────┼────────┼────────┼───┤
│ Key Len │ Key │ Type │ Count │Min Time │Max Time │ Offset │ Size │...│
│ 2 bytes │ N bytes │1 byte│2 bytes│ 8 bytes │ 8 bytes │8 bytes │4 bytes │ │
└─────────┴─────────┴──────┴───────┴─────────┴─────────┴────────┴────────┴───┘
根据IndexEntry中记录的时间戳范围和文件偏移地址,我们能很高效的找到我们所需数据在TSM文件中 位置。
The last section is the footer that stores the offset of the start of the index.
最后一个是Footer块,占用8个字节,存储Index块的偏移位置,Footer相对于文件末尾的位置是固定的, 读取TSM文件时,可以从文件末尾先速度出Index块的偏移量,再加载读取Index内容。
┌─────────┐
│ Footer │
├─────────┤
│Index Ofs│
│ 8 bytes │
└─────────┘
Level Compaction
在Cache Compaction的同时,还会进行Level Compaction。
Index
type Index interface {
Open() error
Close() error
WithLogger(zap.Logger)
MeasurementExists(name []byte) (bool, error)
MeasurementNamesByExpr(expr influxql.Expr) ([][]byte, error)
MeasurementNamesByRegex(re *regexp.Regexp) ([][]byte, error)
DropMeasurement(name []byte) error
ForEachMeasurementName(fn func(name []byte) error) error
InitializeSeries(key, name []byte, tags models.Tags) error
CreateSeriesIfNotExists(key, name []byte, tags models.Tags) error
CreateSeriesListIfNotExists(keys, names [][]byte, tags []models.Tags) error
DropSeries(key []byte) error
SeriesSketches() (estimator.Sketch, estimator.Sketch, error)
MeasurementsSketches() (estimator.Sketch, estimator.Sketch, error)
SeriesN() int64
HasTagKey(name, key []byte) (bool, error)
TagSets(name []byte, options query.IteratorOptions) ([]*query.TagSet, error)
MeasurementTagKeysByExpr(name []byte, expr influxql.Expr) (map[string]struct{}, error)
MeasurementTagKeyValuesByExpr(auth query.Authorizer, name []byte, keys []string, expr influxql.Expr, keysSorted bool) ([][]string, error)
ForEachMeasurementTagKey(name []byte, fn func(key []byte) error) error
TagKeyCardinality(name, key []byte) int
// InfluxQL system iterators
MeasurementSeriesKeysByExpr(name []byte, condition influxql.Expr) ([][]byte, error)
SeriesPointIterator(opt query.IteratorOptions) (query.Iterator, error)
// Sets a shared fieldset from the engine.
SetFieldSet(fs *MeasurementFieldSet)
// Creates hard links inside path for snapshotting.
SnapshotTo(path string) error
// To be removed w/ tsi1.
SetFieldName(measurement []byte, name string)
AssignShard(k string, shardID uint64)
UnassignShard(k string, shardID uint64) error
RemoveShard(shardID uint64)
Type() string
Rebuild()
}
文档信息
- 本文作者:Neal Hu
- 本文链接:https://lrita.github.io/2017/06/12/influxdb-tsdb/
- 版权声明:自由转载-非商用-非衍生-保持署名(创意共享3.0许可证)