tsdb
是influxdb
的存储引擎,主要用于持久化时序数据。在分析tsdb
之前,我们先要了解influxdb
在使用上关于
存储的一些概念。
concept
对于influxdb
中涉及到的各种概念,官方已经提供了一个词汇表,
以供用户查阅。
influxdb
将数据按不同的database
和measurements
来存储。database
可以类比为mysql
中的database
概念。
measurements
可以类比为mysql
中table
的概念。写入一条数据时,需要制定其database
和measurements
。influxdb
提供一套influxql
的类似sql
的语法来使用,具体的操作可以参考官方文档
然后influxdb
存储的数据是schemaless的,可以包含任意维度(列),每个维度为一个K-V结构,其中维度划分为tag
和
field
2种。
tag
tag
是可以建立索引的,在查询时有更好的性能,是可选的字段。通常用来存储一些meta字段。field
不能建立索引,在查
询某个field
的值时,需要遍历该时间区间内的全部数据。tag
跟field
的区别主要在是否建立索引上,因此写入数据时,
选取tag
还是field
主要依据查询语句而定,官方有一个指导文档
可以进行参考。
shard
shard
是一个存储单元,用于存储一段时间范围内的数据,同时shard
与retention policy
直接相关,不同的过期策略
下不同的shard
,每个shard
底层对应一个TSM
存储引擎。当过期策略检查到某一个shard
过期后,会释放其对应的资源。
point
每一次写入的数据称之为point
,其包含timestamp
、measurements
、retention policy
、tag
和field
等信息,其中point
的key
由
measurement
和tag
hash后构成。
然后根据point.HashID
的值和retention policy
选取合适的shard
,将该point
写入该shard
。
tsdb
对外提供的2个最重要的接口是:
CreateShard(database, retentionPolicy string, shardID uint64, enabled bool) error
创建database
对应的的shard
WriteToShard(shardID uint64, points []models.Point) error
向指定shard
写入数据
series
series 相当于是InfluxDB中一些数据的集合,在同一个database中,retention policy、measurement、tag sets 完全 相同的数据同属于一个series,同一个series的数据在物理上会按照时间顺序排列存储在一起。
series的key为measurement加所有tags的序列化字符串,这个key在之后会经常用到。
store
store
是influxdb
的存储模块,全局只有一个该实例。主要负责将数据按一定格式写入磁盘,并且维护influxdb
相关的
存储概念。例如:创建/删除Shard
、创建/删除retention policy
、调度shard
的compaction、以及最重要的WriteToShard
等等。在store
内部又包含index
和engine
2个抽象概念,index
是对应shard
的索引,engine
是对应shard
的存储实现,
不同的engine
采用不同的存储格式和策略。后面要讲的tsdb
其实就是一个engine
的实现。在influxdb
启动时,会创建
一个store
实例,然后Open
它,初始化时,它会加载已经存在的Shard
,并启动一个Shard
监控任务,
监控任务负责调度每个Shard
的Compaction
和对使用inmem
索引的Shard
计算每种Tag
拥有的数值的基数(与配置中max-values-per-tag
有关)。
tsdb
我们可以先阅读以下对于tsdb
的官方文档。
其采用的存储模型是LSM-Tree
模型,对其进行了一定的改造。将其称之为Time-Structured Merge Tree (TSM)
当一个point
写入时,influxdb
根据其所属的database
、measurements
和timestamp
选取一个对应的shard
。每个
shard
对应一个TSM
存储引擎。每个shard
对应一段时间范围的存储。
一个TSM
存储引擎包含:
In-Memory Index
在shard
之间共享,提供measurements
,tags
,和series
的索引WAL
同其他database的binlog一样,当WAL
的大小达到一定大小后,会重启开启一个WAL
文件。Cache
内存中缓存的WAL
,加速查找TSM Files
压缩后的series
数据FileStore
TSM Files
的封装Compactor
存储数据的比较器Compaction Planner
用来确定哪些TSM
文件需要compaction
,同时避免并发compaction
之间的相互干扰Compression
用于压缩持久化文件Writers/Readers
用于访问文件
shard
通过CreateShard
来创建。可以看出其依次创建了所需的文件目录,然后创建Index
和Shard
数据结构。
文件结构
在influxdb
指定的存储目录下的文件结构为:
.
├── data # 配置中[data]下dir配置的路径
│ ├── _internal
│ │ └── monitor
│ │ ├── 73
│ │ │ └── 000000003-000000002.tsm
│ │ ├── 74
│ │ │ └── 000000003-000000002.tsm
│ │ └── 75
│ │ └── 000000001-000000001.tsm
│ └── testing
│ └── autogen
│ └── 2
│ └── 000000002-000000002.tsm
├── meta
│ └── meta.db
└── wal # 配置文件[data]下wal-dir配置
├── _internal
│ └── monitor
│ ├── 73
│ │ └── _00012.wal
│ ├── 74
│ │ └── _00012.wal
│ └── 75
│ ├── _00005.wal
│ ├── _00006.wal
│ └── _00007.wal
└── testing
└── autogen
└── 2
└── _00003.wal
每一个shard
为目录,其目录的命名格式为:
$(ROOT)/data/$(Database)/$(RetentionPolicy)/$(ShardID)
$(ROOT)/wal/$(Database)/$(RetentionPolicy)/$(ShardID)
注:当相关shard
不使用In-Memory Index(inmem)
时,会使用文件型index
,默认类型为tsi1
,会在创建
$(ROOT)/data/$(Database)/$(RetentionPolicy)/$(ShardID)/index
文件。
存储结构
每个shard
由一个Index
和一个Engine
组成,Index
负责进行反向索引数据,Engine
为具体的存储模型实现
,即上面的TSM
结构。
Engine
type Engine interface {
Open() error
Close() error
SetEnabled(enabled bool)
SetCompactionsEnabled(enabled bool)
WithLogger(zap.Logger)
LoadMetadataIndex(shardID uint64, index Index) error
CreateSnapshot() (string, error)
Backup(w io.Writer, basePath string, since time.Time) error
Restore(r io.Reader, basePath string) error
Import(r io.Reader, basePath string) error
CreateIterator(measurement string, opt query.IteratorOptions) (query.Iterator, error)
IteratorCost(measurement string, opt query.IteratorOptions) (query.IteratorCost, error)
WritePoints(points []models.Point) error
CreateSeriesIfNotExists(key, name []byte, tags models.Tags) error
CreateSeriesListIfNotExists(keys, names [][]byte, tags []models.Tags) error
DeleteSeriesRange(keys [][]byte, min, max int64) error
SeriesSketches() (estimator.Sketch, estimator.Sketch, error)
MeasurementsSketches() (estimator.Sketch, estimator.Sketch, error)
SeriesN() int64
MeasurementExists(name []byte) (bool, error)
MeasurementNamesByExpr(expr influxql.Expr) ([][]byte, error)
MeasurementNamesByRegex(re *regexp.Regexp) ([][]byte, error)
MeasurementFields(measurement []byte) *MeasurementFields
ForEachMeasurementName(fn func(name []byte) error) error
DeleteMeasurement(name []byte) error
// TagKeys(name []byte) ([][]byte, error)
HasTagKey(name, key []byte) (bool, error)
MeasurementTagKeysByExpr(name []byte, expr influxql.Expr) (map[string]struct{}, error)
MeasurementTagKeyValuesByExpr(auth query.Authorizer, name []byte, key []string, expr influxql.Expr, keysSorted bool) ([][]string, error)
ForEachMeasurementTagKey(name []byte, fn func(key []byte) error) error
TagKeyCardinality(name, key []byte) int
// InfluxQL iterators
MeasurementSeriesKeysByExpr(name []byte, condition influxql.Expr) ([][]byte, error)
SeriesPointIterator(opt query.IteratorOptions) (query.Iterator, error)
// Statistics will return statistics relevant to this engine.
Statistics(tags map[string]string) []models.Statistic
LastModified() time.Time
DiskSize() int64
IsIdle() bool
Free() error
io.WriterTo
}
当influxd
的API收到数据后,最终会根据所属的shard
而找到对应的Engine
,然后调用WritePoints
写入该Engine
。在Engine
中会将models.Point
写入Cache
和WAL
。
Cache
和WAL
中存有相同的数据,Cache
主要用于查询时的读取操作,当用户查询时,可能会读取Cache
和TSM
文件中的数据,当数据有重复时,Cache
中的数据优先级高。WAL
中数据主要用于启动时的数据加载。
可以从WritePoints
看出其将models.Point
拆分为多个Value
。
其拆分规则为:
当在databasett
执行insert disk_free,hostname=server01 value1=1000i,value2=1001i 1435362189575692182
后,该条会被解析为一个models.Point
,然后在写入Engine
后,因为有2个Field
,所以会被拆解为2个Value
。
第一个Value
的Key为disk_free#!~#hostname=server01value1
的IntegerValue
,IntegerValue
中包含数据1000和
时间戳1435362189575692182,第二个Value
为Key为disk_free#!~#hostname=server01value2
的IntegerValue
,
IntegerValue
中包含数据1001和时间戳1435362189575692182。
然后会将分解为的Value
写入Cache
和WAL
。Cache
在内存中的组织形式为一个两级的map,在这里就不细说了。
将分解成的Value
以Values
的形式写入WAL
0/1:
| type(0x01) 1 byte | data length 4 byte | data (snappy compressed Values encoding data) |
// ┌────────────────────────────────────────────────────────────────────┐
// │ WriteWALEntry │
// ├──────┬─────────┬────────┬───────┬─────────┬─────────┬───┬──────┬───┤
// │ Type │ Key Len │ Key │ Count │ Time │ Value │...│ Type │...│
// │1 byte│ 2 bytes │ N bytes│4 bytes│ 8 bytes │ N bytes │ │1 byte│ │
// └──────┴─────────┴────────┴───────┴─────────┴─────────┴───┴──────┴───┘
在写WAL
时,如果WAL
文件大于10M
时,会发生滚动,生成一个新的WAL
文件。
Compaction
当数据不断写入,WAL
文件的数量和Cache
的大小会不断增长,因此需要一个Compaction
来对数据进行压缩、文
件清理。在Engine
被创建后,会启动3个与Compaction
相关的任务compactCache
、compactTSMFull
和compactTSMLevel
分别对Cache
和TSM
进行Compaction
。
compactCache
主要是将内存中的Cache
的数据变为TSM文件,然后清除对应的WAL
文件。compactTSMFull
/compactTSMLevel
主要是进行TSM
文件的Compaction,
Cache Compaction
当Cache
使用内存的大小大于配置文件中cache-snapshot-memory-size
的大小,或者空闲时间大于配置文件中
cache-snapshot-write-cold-duration
的时间时,会触发Cache
的compaction。其会将Cache
中的Value
数据
写入新的TSM
文件
,然后删除与该Cache
对应的全部WAL
文件。
TSM 文件
TSM
文件有4块组成,分别为Header
、blocks
、Index
和Footer
。
┌────────┬────────────────────────────────────┬─────────────┬──────────────┐
│ Header │ Blocks │ Index │ Footer │
│5 bytes │ N bytes │ N bytes │ 4 bytes │
└────────┴────────────────────────────────────┴─────────────┴──────────────┘
其中Header
的前4字节为Magic Number(大端的0x16D116D1),第5个字节为版本号(目前为1)。
┌───────────────────┐
│ Header │
├─────────┬─────────┤
│ Magic │ Version │
│ 4 bytes │ 1 byte │
└─────────┴─────────┘
Blocks
存储的是很多组CRC32和Data组成的Block,通常每个Block中有1000
个Value
。每个Block中存储的都是相同Key的Value
,该Block数据序列化后的长度、Value
的类型、最大时间、最小
时间、在文件中的偏移地址等都存储在与之对应的Index
块中。
┌───────────────────────────────────────────────────────────┐
│ Blocks │
├───────────────────┬───────────────────┬───────────────────┤
│ Block 1 │ Block 2 │ Block N │
├─────────┬─────────┼─────────┬─────────┼─────────┬─────────┤
│ CRC │ Data │ CRC │ Data │ CRC │ Data │
│ 4 bytes │ N bytes │ 4 bytes │ N bytes │ 4 bytes │ N bytes │
└─────────┴─────────┴─────────┴─────────┴─────────┴─────────┘
Blocks后面是Index块,Index中存储的是按Value
的Key的字典序和时间戳排列的Block的元数据。
每个Block的元数据称为一个IndexEntry
。同一个Key可能拥有多个Block的数据,因为每个Block最多1000个Value
,因此其IndexEntry
为一个数组。
┌────────────────────────────────────────────────────────────────────────────┐
│ Index │
├─────────┬─────────┬──────┬───────┬─────────┬─────────┬────────┬────────┬───┤
│ Key Len │ Key │ Type │ Count │Min Time │Max Time │ Offset │ Size │...│
│ 2 bytes │ N bytes │1 byte│2 bytes│ 8 bytes │ 8 bytes │8 bytes │4 bytes │ │
├─────────┼─────────┼──────┼───────┼─────────┼─────────┼────────┼────────┼───┤
│ Key Len │ Key │ Type │ Count │Min Time │Max Time │ Offset │ Size │...│
│ 2 bytes │ N bytes │1 byte│2 bytes│ 8 bytes │ 8 bytes │8 bytes │4 bytes │ │
└─────────┴─────────┴──────┴───────┴─────────┴─────────┴────────┴────────┴───┘
根据IndexEntry
中记录的时间戳范围和文件偏移地址,我们能很高效的找到我们所需数据在TSM文件中
位置。
The last section is the footer that stores the offset of the start of the index.
最后一个是Footer块,占用8个字节,存储Index块的偏移位置,Footer相对于文件末尾的位置是固定的, 读取TSM文件时,可以从文件末尾先速度出Index块的偏移量,再加载读取Index内容。
┌─────────┐
│ Footer │
├─────────┤
│Index Ofs│
│ 8 bytes │
└─────────┘
Level Compaction
在Cache Compaction的同时,还会进行Level Compaction。
Index
type Index interface {
Open() error
Close() error
WithLogger(zap.Logger)
MeasurementExists(name []byte) (bool, error)
MeasurementNamesByExpr(expr influxql.Expr) ([][]byte, error)
MeasurementNamesByRegex(re *regexp.Regexp) ([][]byte, error)
DropMeasurement(name []byte) error
ForEachMeasurementName(fn func(name []byte) error) error
InitializeSeries(key, name []byte, tags models.Tags) error
CreateSeriesIfNotExists(key, name []byte, tags models.Tags) error
CreateSeriesListIfNotExists(keys, names [][]byte, tags []models.Tags) error
DropSeries(key []byte) error
SeriesSketches() (estimator.Sketch, estimator.Sketch, error)
MeasurementsSketches() (estimator.Sketch, estimator.Sketch, error)
SeriesN() int64
HasTagKey(name, key []byte) (bool, error)
TagSets(name []byte, options query.IteratorOptions) ([]*query.TagSet, error)
MeasurementTagKeysByExpr(name []byte, expr influxql.Expr) (map[string]struct{}, error)
MeasurementTagKeyValuesByExpr(auth query.Authorizer, name []byte, keys []string, expr influxql.Expr, keysSorted bool) ([][]string, error)
ForEachMeasurementTagKey(name []byte, fn func(key []byte) error) error
TagKeyCardinality(name, key []byte) int
// InfluxQL system iterators
MeasurementSeriesKeysByExpr(name []byte, condition influxql.Expr) ([][]byte, error)
SeriesPointIterator(opt query.IteratorOptions) (query.Iterator, error)
// Sets a shared fieldset from the engine.
SetFieldSet(fs *MeasurementFieldSet)
// Creates hard links inside path for snapshotting.
SnapshotTo(path string) error
// To be removed w/ tsi1.
SetFieldName(measurement []byte, name string)
AssignShard(k string, shardID uint64)
UnassignShard(k string, shardID uint64) error
RemoveShard(shardID uint64)
Type() string
Rebuild()
}