# 辅助结构
## FileIndexer
^20bb12
- 用于加速定位 SST 文件。
- 根据 key 范围建立上下层之间文件的关系
- 利用上层的二分搜索结果,缩小下层的搜索范围。
- 具体算法:[Fractional cascading](https://en.wikipedia.org/wiki/Fractional_cascading)
- [Indexing SST Files for Better Lookup Performance](https://github.com/facebook/rocksdb/wiki/Indexing-SST-Files-for-Better-Lookup-Performance)
## FileMetada
- 对应一个 SST 文件的元数据。
### 基础元数据
- **`fd`**: `FileDescriptor`
- **`packed_number_and_path_id`**:包含文件的序列和 path id
- **`file_size**`: `uint64_t`
- **`smallest_seqno`**: `SequenceNumber`
- **`largest_seqno`**:`SequenceNumber`
- **`smallest`**: `InternalKey`,文件中的最小 internal key
- **`largest`**: `InternalKey`,文件中的最大 internal key
- **`refs`**: `int`, 引用计数,小于等于0时可以删除
- **`epoch_number`**: `uint64_t`,用于 CF 内排序 L0 文件,越大的越新,由 `ColumnFamilyData::next_epoch_number_` 递增器生成 , [#10922](https://github.com/facebook/rocksdb/pull/10922)
- **`tail_size`**: `uint64_t`,SST 文件 data blocks 之后的数据大小
- **`table_reader_handler`**:`Cache::Handle *`,
- 用于将打开后的 table reader pin 到 TableCache 中
### compaction 相关
- **`being_compacted`**: `bool`,是否正在被 compaction
- **`marked_for_compaction`**: `bool`
- 用户通过`SuggestCompactRange()` 建议 compact 该文件
- ingestion 过程也会产生 `db/external_sst_file_ingestion_job.cc`
- **`compensated_file_size`**: `uint64_t`
- 文件内 Delete 条目过多时,会给文件大小增加一个补偿值,使其能够优先被 compaction。
- `Version::ComputeCompensatedSizes()` 中计算
- **`compensated_range_deletion_size`**: `uint64_t`,
- 因 range tombstones 产生的文件大小补偿值,`BuildTable()` 时计算
- 计算方式: next level 被该文件内的 range tomstones 所覆盖的 KV 大小
- 会被加入到 compensated_file_size
- **`oldest_ancester_time`**:`uint64_t` ,用于支持 periodic compaction [#6090](https://github.com/facebook/rocksdb/pull/6090)
- 如果 SST 是 flush 生成的,该值是文件的 oldest key time
- 如果 SST 是 compaction 生成的,该值是所有 input files 的 oldest_ancester_time 中的 oldest
- 为 0 表示不可用,会使用 file_creation_time,见 `TryGetOldestAncesterTime()`
- **总结**:这意味着它是文件内数据的时间,而非是文件生成时间。
- **`file_creation_time`**: `uint64_t`,Unix time
### 统计类
- **`num_entries`**:KV 条目的个数,包含了 deletions 和 range deletions
- **`num_deletions`**:包含了 range deletions
- **`raw_key_size`**:未压缩的 keys 大小总和
- **`raw_value_size`**:未压缩的 values 大小总和
- **`num_range_deletions`**:range deletions 的个数
-------
# VersionStorageInfo
- 文件:db/version_set.h
- 与每个 Version 关联的 storage 信息,LSM 磁盘组件的核心元数据:
- LSM 有多少 level
- 每个 level 的 文件信息
- 标记为 compaction 的 files
- blob files 等
## 类数据成员
- LSM 相关成员:
- **`num_levels_`**: `int`,LSM 最大有多少层,固定值(默认 7),取决于 Option
- **`num_non_empty_levels_`**: `int`,大于它的 level 都是空的
- **`level_files_brief_`**: `vector<LevelFilesBrief>`
- 每层文件精简版的元数据
- 包括每个文件的 fd 和 key 范围,
- 同时也包含了一个指向完整元数据的指针
- **`files_`**:`vector<FileMetaData*>`,每层的文件列表,同层内按 key ==升序==排列
- L0 层按 `epoch_number` 排序,新的(`epoch_number` 大的)在前面
- **`level0_non_overlapping_`**:`bool`,L0 之间是否有重叠
- **`level_multiplier_`**:`double`,层之间数据最大大小的倍数
- **`file_indexer_`**: `FileIndexer`,按 key 快速搜索目标 SST 文件
- **`file_locations_`**: `Map<uint64_t, FileLocation>`
- 根据文件序号查找文件的位置(level + 该 level 的第几个文件)
- 在 `GenerateFileLocationIndex()` 中根据 files_ 生成
- compaction 相关成员
- **`compaction_style_`**: `CompactionStyle`, 通常是 Level-based
- **`base_level_`**: `int`,L0 data compact 到哪一层
- **`level_max_bytes_`**: `vector<uint64_t>`
- 每层允许的最大数据大小,用于计算 compaction score
- **`compaction_score_`**: `vector<double`>, 每层的 score, 按照分数大小排序,最高的在前面
- **`compaction_level_`**: `vector<int>`,按 score 排序后的 levels,最高的在前面
- **`files_by_compaction_pri_`**: `vector<vector<int>>`
- 每层按 compaction 优先级排序,最高的在前面,vector 存储 **files_** 的 index
- `UpdateFilesByCompactionPri()` 中计算,根据 compaction_pri 有不同的排序指标
- `files_[level][files_by_compaction_pri_[level][idx]]`:直接获取到 idx 对应的 `FileMedataData`
- **`next_file_to_compact_by_size_`**: `vector<int>`
- 指向 `files_by_compaction_pri_`,表示处理到 files_by_compaction_pri_ 中的每层的哪个文件了
- **`files_marked_for_compaction_`**: `vector< pair<int, FileMetaData*> >`
- 标记为 compaction 且正在被 compact 的文件列表,在 `ComputeCompactionScore()`中计算
- `pair.first` 为 level
- **`expired_ttl_files_`**: `vector< pair<int, FileMetaData*> >`
- 可能包含了 TTL 过期 key 的文件列表, `ComputeExpiredTtlFiles()` 中计算
- 过期基于 oldest_ancester_time 判断
- **`files_marked_for_periodic_compaction_`**:`vector< pair<int, FileMetaData*> >`
- 标记需要 periodic_compaction 的文件列表,`ComputeFilesMarkedForPeriodicCompaction()` 中计算
- **`bottommost_files_`**:`vector< pair<int, FileMetaData*> >`
- bottomost标准:文件的 key 范围与底层(大于当前的所有 levels 或更早的 L0)文件不存在重叠,即它是这个 key 范围的最底层 [#3009](https://github.com/facebook/rocksdb/pull/3009)
- **`bottommost_files_marked_for_compaction_`**: `vector<pair<int, FileMetaData*>>`
- bottommost_files_ 中 标记为需要 compaction 的
- 挑选标准见 `ComputeBottommostFilesMarkedForCompaction()`
- **`oldest_snapshot_seqnum_`**:`SequenceNumber`
- 每次快照释放时更新,不断变大,用于挑选需要 compaction 的 bottommost_files
- **`bottommost_file_compaction_delay_`**: `uint32_t`
- 来自 bottommost_file_compaction_delay 选项 [#11701](https://github.com/facebook/rocksdb/pull/11701)
- **`lowest_unnecessary_level_`**:`int`
- level compaction + `level_compaction_dynamic_level_bytes=true` 时有效
- 通过后台 compaction 删除不必要的 levels, 帮助用户迁移到 `level_compaction_dynamic_level_bytes=true` [#11340](https://github.com/facebook/rocksdb/pull/11340)
## ComputeCompactionScore()
- **L0 计算**:
- 考虑**文件数量**:L0 文件读取时需要 merge,因此控制文件数量很关键。另外对于大 writer buffer 更友好(compaction不会变多)
- 还需要考虑**文件总大小**:因为有 L0 -> L0 的 compaction,即使文件数量不多,也可能导致 L0 过大, 产生巨大的 L0 -> Lbase compaction
- 大小统计跳过正在被 compact 的文件
- L0 的 score 不能太大,否则 L0-> Lbase compaction 总是优先于 Lbase -> Lbase+1 执行,导致 Lbase 积压;也不能太小,否则 L0 积压,数据无法快速推到 Lbase,因此 score 计算会同时考虑 L0 的大小 和 Lbase 的大小。
- **file_num_score** = num_files(uncompacted_L0) / level0_file_num_compaction_trigger
- 未启用 dynamic level:
- score = max(file_num_score, L0_total_size / max_bytes_for_level_base)
- 启用 dynamic level:
- score 初始为 `file_num_score`
- 如果 L0_total_size 大于 `max_bytes_for_level_base`,会确保 score 不小于 1.01 (加大 score,确保能被 scaled 乘以 10,1.01 也不会过大)
- 如果 `L0_total_size` 大于 `level_max_bytes_[base_level_]`,确保 score 不小于 `L0_total_size / max(base_level_size, level_max_bytes_[base_level_])`
- 这样如果 L0 size 大于 L1 size,score 就会大于 1,能被 scaled,提升优先级
- 如果 score > 1.0,进行 scale 乘以 10
- 总结:如果 L0 的文件大小过大,确保 score 大于 1,可以被 scale,提升优先级
- **其他 Levels 计算(L6以外)**:
- `level_bytes_no_compacting`: 当前 level 没有被 compaction 的文件总大小
- `level_total_bytes`: 当前 level 文件总大小
- `level_max_bytes` : `level_max_bytes_[level]`
- `kScoreScale`: 10.0
- `total_downcompact_bytes`:从上面所有 levels 将要 compaction 到当前 level 的
- level 小于 `lowest_unnecessary_level_` 也是要 compact 的,累计整个 level 大小
- 超过某个 level `level_max_bytes` 的,累计超过部分
- 如果未启用 dynamic level:
- `score = level_bytes_no_compacting / level_max_bytes`
- 数据量是否超出本层限制( level_bytes_no_compacting >= level_max_bytes)
- 未超出:`score = level_bytes_no_compacting / level_max_bytes`
- 超出了: `score = level_bytes_no_compacting / (level_max_bytes + total_downcompact_bytes) * kScoreScale
- 加了 total_downcompact_bytes,相当于 score 变小,如果上层将有大量数据将要 compact 下来,调低本层优先级,优先上层 compact
- 处理 unnecessary levels( level <= `lowest_unnecessary_level_` 并且 level 数据非空)
- 手动设置 unnecessary levels 的 score,使其能被 compact
- `score = max(score, kScoreScale * (1.001 + 0.001 * (lowest_unnecessary_level_ - level)))`
- level 越小,优先级越高
- 加权值 `1.001 + 0.001 * l`小于 L0 的 1.01,使其优先级不要高于 L0
- **总结**:除了基本的 `level_max_bytes` 因素,还有考虑 `total_downcompact_bytes` 和 unnecessary levels
- 计算完 score 后会排序 `compaction_score_` 和 `compaction_level_` 以及其他 compaction 相关的计算
- ComputeFilesMarkedForCompaction
- ComputeBottommostFilesMarkedForCompaction
- ComputeExpiredTtlFiles
- ComputeFilesMarkedForPeriodicCompaction
- EstimateCompactionBytesNeeded
- PR: [Change The Way Level Target And Compaction Score Are Calculated](https://github.com/facebook/rocksdb/pull/10057)
> the score is calculated by actual **level size / (target size + estimated upper bytes coming down)**. The reasoning is that if we have a large amount of pending L0/L1 bytes coming down, compacting L2->L3 might be more expensive, as when the L0 bytes are compacted down to L2, the actual L2->L3 fanout would change dramatically. On the other hand, when the amount of bytes coming down to L5, the impacts to L5->L6 fanout are much less. So when calculating target score, we can adjust it by adding estimated downward bytes to the target level size.
- PR: [Fix regression issue of too large score](https://github.com/facebook/rocksdb/pull/10518)
- 修复 L0 score 可能过高的问题
- 之前的算法:We fix calculating a score of L0 by size(L0)/size(L1) in the case where L0 is large
- Issue: [There are too many write stalls because the write slowdown stage is frequently skipped](https://github.com/facebook/rocksdb/issues/9423)
## CalculateBaseBytes()
- 主要是计算 `level_max_bytes_`(各层允许的最大大小)
- 先计算出 base level size,再根据 level_multiplier 计算
- 不开启 `level_compaction_dynamic_level_bytes` 时:
- base level 为 L1,L1 的最大大小取决于参数 `max_bytes_for_level_base`
- 其余各层 $L_n = L_{n-1} \times level\_multiplier \times level\_multiplier\_additional[n]$
- 开启 `level_compaction_dynamic_level_bytes` 时:
- **max_level_size**:除 L0 外,统计每层的数据量,取最大的那个
- 考虑 compaction,最大的那层不一定是 last level
- 如果 `max_level_size` 为 0,表示 L0 以上没有数据,base_level_ 取 L6,即 L0 直接compact 到 max level
- **first_non_empty_level**: 除 L0 外,第一个非空层(从上往下判断)
- **base_bytes_max**: `max_bytes_for_level_base`
- base_level_size 的上界
- **base_bytes_min**: `base_bytes_max / options.max_bytes_for_level_multiplier`
- base_level_size 的下界。
- 同时小于此值的 level 也会被认定为 unnecessary
- **cur_level_size**: 以 max_level_size 作为基准,从下往上根据 level_multiplier 计算得出其他各层应该最大是多大
- 例如:L5 为 max_level_size / 10,L4 为 max_level_size / 100,直到 first_non_empty_level
- **`lowest_unnecessary_level_`** 为第一个 cur_level_size 小于 base_bytes_min 的层次
- 即 level 过小,没有存在的必要,可以进行 compaction
- 计算 **base_level_size**: (**base_level** 初始化为 `first_non_empty_level`)
- 如果: `base_level` 的 `cur_level_size` 小于等于 `base_bytes_min`, 取值为 `base_bytes_min` + 1 (加 1 是防止 `base_bytes_min` 为 0),这种情况也必然存在 `lowest_unnecessary_level_`
- 否则:base_level 逐渐==减小==,`cur_level_size` 随之逐渐除以`level_multiplier`,直到不大于 `base_bytes_max`
- 这种情况会增加当前的层次,制造空层
- 总结:确保 `base_level_size` 取值范围在 `base_bytes_min` 和 `base_bytes_max` 之间
- 设定 **`level_max_bytes_`**
- 从 `base_level_` 开始,以 `base_level_size` 为初始值,从上往下依次乘以 `level_multiplier`
> [!summary] level_compaction_dynamic_level_bytes = true
> - base level size 计算:
> - 以 `max_level_size` 作为 last level 的大小,不断除以 `level_multiplier` 直到位于`max_bytes_for_level_base`的范围内
> - `level_max_bytes_`计算
> - 根据 base level size 不断乘以 ``level_multiplier``
## EstimateCompactionBytesNeeded()
- 各层累加计算
- L0:
```
If num_files(L0) > compaction_trigger or size(L0) > size(L1_target):
increment by size(L0) + size(L1)
```
- `L1_target`: max_bytes_for_level_base
- Ln,n > 0 时:
```
If size(Ln) > target_size(Ln):
increment by [ size(Ln) - target_size(Ln) ] *
[ (size(Ln+1) / size(Ln)) + 1 ]
```
- `target_size(Ln)`: `level_max_bytes_[n]`
- 本层超出大小 加上 compaction可能涉及的下层大小(本层超出大小乘以一个比例)
## ComputeFilesMarkedForCompaction()
- 在 `ComputeCompactionScore()` 中调用
- 计算 `files_marked_for_compaction_`
- 跳过最后一层非空的 level [#11489](https://github.com/facebook/rocksdb/pull/11489)
- 当 `allow_ingest_behind = true` 最后一层为 ingestion 预留,不 compaction
- 检查其他各层文件是否有 `marked_for_compaction` 标记,且没有在被 compact,添加到 `files_marked_for_compaction_`
- 计算 `standalone_range_tombstone_files_mark_threshold_`
- 满足 `file->FileIsStandAloneRangeTombstone()`:
- 文件只包含一个 range deletion 的 entrie
- 通常由 ingestion 产生,用于删除老数据 [#13078](https://github.com/facebook/rocksdb/pull/1307)
- `standalone_range_tombstone_files_mark_threshold_` 取这些文件的`smallest_seqno`中最小的
## ComputeBottommostFilesMarkedForCompaction()
- 计算 `bottommost_files_marked_for_compaction_`,在 `ComputeCompactionScore()` 和 `UpdateOldestSnapshot()` 中调用。
- 如果 `allow_ingest_behind = true`,直接返回跳过计算。
<br>
- 按照多个条件挑选 `bottommost_files_` 放入 `bottommost_files_marked_for_compaction_`
- 没有正在被 compact
- SST 的 `largest_seqno` 不为 0
- 配合 **`CompactionIterator::PrepareOutput()`** 使用,里面会将 bottommost 中小于最小 snapshot seq 的 seq 设为 0
- SST 的 `largest_seqno` 小于 `oldest_snapshot_seqnum_`
- 意味着 STT 内的所有数据都没有被快照引用了
- `create_time` 距今已经超过 `bottommost_file_compaction_delay_` 选项
- 避免 compact 刚生成的,需要清理的数据可能很少 [#11701](https://github.com/facebook/rocksdb/pull/11701)
## ComputeExpiredTtlFiles()
- 计算 `ttl_expired_files_`,在 `ComputeCompactionScore()` 中调用。
- 仅在 配置了 `AdvancedColumnFamilyOptions::ttl` 时有效。
- 根据 `oldest_ancester_time` 选择所有文件中生命超过 TTL 的。
- ~~不选择 L6,因为已经有 bottommost compaction?~~
- 不选择 L6。设计上对于 leveled compaction style,只 compact 过期的 Non-bottom-level files
- PR: [Level Compaction with TTL #3591](https://github.com/facebook/rocksdb/pull/3591
> [!NOTE] 与 DBWithTTL 的区别
> - DBWithTTL 是 key 粒度的,利用 compaction filter 删除 TTL 过期数据;
> - 此处的 `ttl_expired_files_` 是文件粒度的,文件超期则触发 compaction,目的不是为了清理过期数据,而是为了防止文件长时间不 compaction。
## ComputeFilesMarkedForPeriodicCompaction()
- 计算 `files_marked_for_periodic_compaction_`,在 `ComputeCompactionScore()` 中调用。
- 仅在 `periodic_compaction_seconds` 选项不为 0 时计算
- 该选项默认 30 天
- 挑选文件的创建/修改时间 距今已经超过 `periodic_compaction_seconds` 的文件
- 与 TTL compaction 不同,当不启用`allow_ingest_behind` 时,它会处理最底层 L6 的文件
- 会根据 `daily_offpeak_time_utc` 选项,在低峰时提前过期
- [PR: Periodic Compactions](https://github.com/facebook/rocksdb/pull/5166)
- [PR: Mark more files for periodic compaction during offpeak](https://github.com/facebook/rocksdb/pull/12031)
## GenerateBottommostFiles()
- 计算各层 `bottommost_files_`,在 `PrepareForVersionAppend()`中调用
- 针对每层每个 SST,提取 `smallest_user_key` 和 `largest_user_key`,调用 [[version#^cdf817|RangeMightExistAfterSortedRun()]], 如果返回 false, 放入 `bottommost_files_`
- 范围没有出现在更早的 L0 文件或者更大的 level 中
## ComputeCompensatedSizes()
- 计算每个 SST FileMetaData 的 `compensated_file_size` 值,在 `PrepareForVersionAppend()` 中调用。
- 如果某个 SST 已经计算过,则跳过。
- `compensated_file_size` 由三部分组成:
- `file_size`:原始文件大小
- `compensated_deletion_size`:由 deletion enties 产生(不包含 range deletions)
- 当 deletions 超过总 entries 的一半时触发计算
- 计算方式:`(deletions * 2 - entries) * avg_value_size * 2`
- `compensated_range_deletion_size`:range deletions产生的
- 预估每个range deletion 的范围跟下层重叠的数据大小
- db/builder.cc `BuildTable()` 中计算
## UpdateFilesByCompactionPri()
- 计算各层 `files_by_compaction_pri_`, 在 `PrepareForVersionAppend()` 中调用。
- 不统计 L6 的,因为它不参与一般的 compaction。
- 根据配置的 `CompactionPri` 选项,对每层的 SST 进行排序。
- kMinOverlappingRatio 的 排序 key: `overlapping_bytes / compensated_file_size / ttl_boost_score`
- `overlapping_bytes`: 跟自身范围重叠的下层文件们的大小总和
- `ttl_boost_score`:提升 TTL 快到期的文件的优先级。[#8749](https://github.com/facebook/rocksdb/pull/8749)
- key 越小的排序越往前,compaction 优先级越高
## PrepareForVersionAppend()
- 将 Version 追加到 VersionSet 之前调用,计算相关成员数据
## OverlapInLevel()
- 输入参数:
- **`level`**: `int`
- **`smallest_user_key`**: `Slice *`
- **`largest_user_key`**: `Slice *`
- 检查某层中是否有文件跟 `[smallest_user_key, largest_user_key]` 范围重叠
- user_key 如果为空表示 负/正 ∞
- 如果 level 大于 `num_non_empty_levels_` 表示该层为空,返回 false。
## GetOverlappingInputs()
- 查找 `level` 层跟 `[begin, end]` 重叠的文件。
- 如果 `level` 大于 0,使用二分搜索 `begin` 和 `end`的位置。
- 如果 `level` 为 0,遍历查询
- `expand_range`:发现重叠文件后,根据它的范围尝试扩大 `[begin, end]`返回。
## RangeMightExistAfterSortedRun()
^cdf817
- 判断 key 范围是否出现在更早的 L0 或者更大的 level 中
---------
# Version
- db/version_set.h
- 某个 cf 在某个时间点 的 table 和 blob files。
- 属于某一个 cf。
- 主要是其包含的 VersionStorageInfo,外加了一些辅助/管理数据。
- 核心成员:
- **`cfd_`**: `ColumnFamilyData*`,所属的 cf
- **`vset_`**:`VersionSet*`,所属的 `VesionSet`
- **`next_`**, **`prev_`**:`Version *`,Version 之间构成双链表
- **`storage_info_`**:`VersionStorageInfo`,LSM-tree 磁盘结构、compaction 信息
- **`refs_`**:`int`,引用计数,为 0 时会 delete 该 version
- **`max_file_size_for_l0_meta_pin_`**:`size_t`,
- cf `write_buffer_size` 的 1.5 倍,超过此限制的 L0 不会 pin 它的 index/filter blocks
- **`version_number_`**:`uint64_t`,唯一 ID,仅用作调试和打印日志
## 析构
1. 从 version 链表中移除
2. 对 Version 中引用的每个 SST 文件进行 UnRef()
- 如果文件引用计数变为 0, 则放入 `VersionSet::obsolete_files_` 中
## Unref()
- 减小引用计数,如果变为 0 则进行 delete。
-----------
# VersionEdit
- db/version_edit.h
- 代表对 Version 一次更改操作;
- cf 级别成员:
- **`column_family_`**:
- 目标 cf,一个 VersionEdit 仅包含对==一个 cf== 的修改,即从属于某个 cf
- **`deleted_files_`**: `DeletedFiles`
- 删除了哪些文件,set 结构, 元素为 `(level, fileno)`
- **`new_files_`**: `NewFiles `
- 添加了新文件,vector 结构,元素为 `(level, FileMetaData)`
- **`compact_cursors_`**:`CompactCursors`
- 存储 compaction 的位置,仅当 `CompactionPri` 为 `kRoundRobin` 时有效
- **`log_number_`**:`uint64_t`
- WAL recover 的起始位置,来自 [[memory#^ce1bad|mem_next_logfile_number_]]
- flush job 会进行设置,已实现 flush 后不用回放老的 wals,避免数据重复
- **`wal_deletion_`**:`WalDeletion`
- 删除某个编号之前的 WAL
- **`is_in_atomic_group_`**: `bool`
- 表示 edit 是 atomic flush group 的一个
- **`remaining_entries_`**:`uint32_t`
- edit 是 atomic group 中的倒数第几个
- *`prev_log_number_`*:不再使用
- DB 级别成员:
- **`min_log_number_to_keep_`**:`uint64_t`
- 含有 unflushed data 的最早 WAL,即大于等于它的 WAL 含有 unflushed 数据,需要保留
- **`next_file_number_`**:`uint64_t`
- 分配文件编号
- **`last_sequence_`**:`uint64_t`
- SST 中的最大 seq,更新`new_files_`时会同时读取 SST 的 `largest_seqno` 进行更新
- 用于 recover 时恢复 VersionSet 最后发布的序列号
- [x] 区分 cf 级别和 DB 级别的元数据
---
# VersionBuilder
- 基于一个 base VersionStorageInfo,累积应用 VersionEdit(通过Apply方法),最终生成新的 VersionStorageInfo(通过 SaveTo 方法保存到新 Version 的 VersionStorageInfo 中)
- 主要涉及文件增删和 compact cursor。
## LoadTableHandlers()
^e0a39b
- 提前打开本次变更所新增的 SSTs。
- 即创建 TableReader。
- 调用时机:
- DB::Open 时预加载 SSTs(此时新增 SSTs 为 cf 的全量 SSTs)
- 每次有新 Version 产生时(VersionSet::ProcessManifestWrites 中)
- 参数:
- **`max_threads`**:`int`,多线程并行打开
- DB::Open时,使用参数 `DBOptions::max_file_opening_threads`,默认 16
- 追加新Version 时,使用 1
- **`is_initial_load`**:`bool`,true 表示来自 DB::Open()
- **`prefetch_index_and_filter_in_cache`**:`bool`
- DB::Open() 为 false,ProcessManifestWrites() 时为 true
- PR:[Non-initial file preloading should always prefetch index and filter #4852](https://github.com/facebook/rocksdb/pull/4852)
- 打开的文件数量限制:
- 如果 `max_open_files == 1`,则所有的新增 SSTs 都打开
- 否则,同时打开的总数量 `load_limit` 限制为 `max_open_files/4`
- 如果是 `is_initial_load`,则`load_limit`进一步限制为 16 个
- PR:[Preload some files even if options.max_open_files #3340](https://github.com/facebook/rocksdb/pull/3340)
- 本次可打开的数量为 `load_limit - table_cache_usage`
- 如果当前已打开的数量即 `table_cache_usage` 大于 `load_limit`,则直接返回
- 执行流程:
1. 搜索所有 levels 的 **`added_files`**,查找还没有打开的 SSTs,直到达到数量限制
- 没打开过:`file_meta->table_reader_handle` 为 nullptr 的
2. 通过 `TableCache->FindTable()` 并行打开 SSTs,赋值给 `FileMetaData::table_reader_handle`,pin 住 cache handle
> [!NOTE] Preload
> - 当 `max_open_files == -1` 时,预加载所有新增 SSTs;
> - 当 `max_open_files != -1` 时,预加载 SSTs 的前提是 table cache is not 1/4 full;
- [ ] `prefetch_index_and_filter_in_cache`对打开 TableReader 的影响
------------
# VersionEditHandler
## VersionEditHandlerBase
- 遍历 MANIFEST 中的 VersionEdit,并调用子类提供的 ApplyVersionEdit() 处理 VersionEdit
- Iterate() 流程
1. 调用 Initialize(),子类提供
2. 扫描 MANIFEST,解码 VersionEdit,调用 ApplyVersionEdit()
3. 调用 CheckIterationResult(),子类提供
- atomic group 处理:
- 同一个 atomic group 内的 VersionEdits 先缓存到 `read_buffer_`中,等 group 读完整后应用:
1. 调用 OnAtomicGroupReplayBegin()
2. 对 group 内的每个 VersionEdit 调用 ApplyVersionEdit()
3. 调用 OnAtomicGroupReplayEnd()
## VersionEditHandler
^b3d06f
- 继承了 VersionEditHandlerBase
- 扫描 MANIFEST 文件,应用 VersionEdit 到 VersionSet 中。Recover 使用。
- 关键成员:
- **`column_families_`**:`vector<ColumnFamilyDescriptor>`,
- 来自DB::Open,需要打开的 cf 列表
- **`version_set_`**:`VersionSet *`,读取到的 VersionEdit 会更新到这里
- **`builders_`**:`map<uint32_t, VersionBuilderUPtr>`,各个 cf 的 VersionBuilder
- 初始化时添加 default cf 的
- 后续应用 VersionEdit 时在 OnColumnFamilyAdd 中添加其他 cf 的
- **`name_to_options_`**:`map<std::string, ColumFamilyOptions>`
- cf name 到 option 的映射,初始化时根据 `column_families_` 进行赋值
- **`column_families_not_found_`**:`map<uint32_t, string>`
- 存在于 MANIFEST 中(有 cf add 记录),但没出现在用户指定的 `column_families_`中
- **`version_edit_params_`**:`VersionEdit`, 缓存最新的 DB 级别的属性,到最后再更新到`version_set_`中,如
- `next_file_number_`
- `has_last_sequence_`
- `min_log_number_to_keep_`
- `max_column_family_`:最大的 cf id
- `log_number_`,维持各个 cf 最大的 `log_number_`
- 应用于 `VersionSet::MarkFileNumberUsed`,更新 `next_file_number_`
### ApplyVersionEdit()
1. 根据 VersionEdit 的类型,进行不同的处理
- IsColumnFamilyAdd -> OnColumnFamilyAdd()
- IsColumnFamilyDrop -> OnColumnFamilyDrop()
- 其他 -> OnNonCfOperation()
2. 调用 ExtractInfoFromVersionEdit,VersionEdit 解析其他 cf 级别 和 DB 级别的信息
### ExtractInfoFromVersionEdit()
- 解析 edit 的 `log_number_`,更新到对应 cfd
- 解析 DB 级别的信息,更新到 `version_edit_params_`
### OnColumnFamilyAdd()
- 主要逻辑:调用 `version_set_->CreateColumnFamily()` 创建和初始化 cf
- cf memtable 的 `earliest_seqno_` 为 0
- 其他:
- 检测有同一cf 重复的 add 记录并报错
- `persistent_stats` cf的 初始化(如果有启用)
- 尝试更新 `column_families_not_found_`
### OnColumnFamilyDrop()
- 主要逻辑:
- 从 `version_set_`获取 cf 并设置为 Dropped
- 再调用 `ColumnFamilyData::UnrefAndTryDelete()` 释放 cf
### OnNonCfOperation()
- 从 `builder_` 中找出 cf 对应的 VersionBuilder
- 调用 `VersionBuilder::Apply(edit)`应用 VerionEdit
### CheckIterationResult()
- 遍历 MANIFEST 结束后调用:
1. 从 `version_edit_params_` 解析 DB 级别的信息,更新到 `verson_set_`
2. LoadTables() 加载 SSTs 到 TableCache
- 调用 [[version#^e0a39b|LoadTableHandler()]]
- `prefetch_index_and_filter_in_cache` 为 false
- `is_initial_load` 为 true
- 每个 cf 都加载
3. 完成各个 cf 的 VersionBuilder,生成 cf 的 Version
> [!summary] VersionEditHandler
> Recover 时 回放 MANIFEST 到 VersionSet,并 preload SSTs。
----
# VersionSet
- DB 所有 cf 的 versions 集合,一个 DB 只有一个 VersionSet。
## 核心成员
- **`column_family_set_`**:`unique_ptr<ColumnFamilySet>`,[[column family#^09f276|ColumnFamilySet]]
- 保管所有的 cf
- **`wals_`**:`WalSet`,WAL文件集合,
- **`next_file_number_`**:`atomic<uint64_t>`,分配文件序号
- **`last_sequence_`**:`atomic<uint64_t>`,读可见的最新 seq(通常来自 memtable)
- **`descriptor_last_sequence_`**: `SequenceNumber`,提交到 Manifest 的最新 seq
- 用于确保提交给 manifest 的`VersionEdit::last_sequence_`是递增的
- **`last_allocated_sequence_`**:`atomic<uint64_t>`,用于分配 seq
- **`obsolete_files_`**:无效的文件,以及在 DB 关闭时没有被内存中 LSM 引用的文件
- **`descriptor_log_`**:`unique_ptr<log::Writer>`,MANIFEST writer
- **`manifest_writers_`**:`deque<ManifestWriter*>`,ManifestWriter 的写入请求队列
- 一个 ManifestWriter 对应==一个 cf== 的写入,主要包含
- **`done`**:`bool`,写入完成标志
- **`cv`**:`CondVar`,写入同步,等待 group commit 时被别人完成写入,或者,自己成为队首进入写入流程
- **`cfd`**:`ColumnFamilyData *`,关联的 cf
- **`mutable_cf_options`**:`const MutableCFOptions`,option 修改
- **`edit_list`**:`const autovector<VersionEdit*>&`,修改列表
- **`pending_manifest_file_number_`**:`uint64_t`
- 当前正在写入中的 manifest 文件,用于阻止删除 切换 CURRENT 时生成的临时文件
- 该临时文件名字中包含 `pending_manifest_file_number_`
- 关联逻辑:DBImpl::PurgeObsoleteFiles()
- 每次写完 ProcessManifestWrites 完会改回 0
## AppendVersion()
- 为某个 cf 按照新的 Version
- 关键操作:
- ComputeCompactionScore
- 当新 version 设置为 cf 的 current
- 追加到 cf 的 versions 链表中
## SetCurrentFile()
- 更新 CURRENT 文件
- 参数:
- **`db_name`**:DB 路径
- **`descriptor_number`**:新 manifest 文件的编号
- 流程:
1. 创建临时 CURRENT 文件,名为 `{descriptor_number}.dbtmp`
2. 将新 manifest 文件名写入临时文件,即 `MANIFEST-{descriptor_number}`
3. 将临时文件重命名为 CURRENT(原子操作)
4. sync 目录
## ProcessManifestWrites()
- 批量写入 Manifest,并尝试组提交队列中 pending 的写入。
- 一次调用写入多个 cf 上的修改
- 参数:
- **`writers`**:`deque<ManifestWriter>`,当前的写入请求,可能内含多个 cf
- **`new_descriptor_log`**:`bool`,强制创建新 manifest,仅测试使用
- 进入该函数的==前提==:`writers.front()`是 `manifest_writers_`的头部
- 即当前 writer 是队首,由 LogAndApply 调用时保证
- group commit 逻辑:
- **`batch_edits`**:`vector<VersionEdit*>`,组提交的结果,最后一次性写入
- 从 `manifest_writers_` 队首开始不断挑选 writer,将 writer 的 VersionEdits 放入 `batch_edits` 中
- 停止条件:
- 遇到 `IsColumnFamilyManipulation() == true` 的 VersionEdit,即 add/drop cf
- 一个 writer 从属于 一个 cf,add/drop cf 对应的 writer 内只有一个 VersionEdit
- 目的:add/drop cf 不参与 group commit,单独写入
- 如果 writer 所属的 cf 已经 dropped,则忽略,不写入
- 如果 writer 在一个 atomic group 内,需要改写 group 成员的 `remaining_entries_`,维护 group 关系
- PR:[Support group commits of version edits #3944](https://github.com/facebook/rocksdb/pull/3944)
- 创建新 manifest :
- 条件(两者任一满足):
- `descriptor_log_`为空,即每次 Open 初始化时
- `manifest_file_size_ > max_manifest_file_size`,即当前文件过大(默认 1G)
- 创建流程:
1. 新建文件,通过 fallocate 预分配 `manifest_preallocation_size`(默认 4MB)
- 使用 FALLOC_FL_KEEP_SIZE
2. WriteCurrentStateToManifest:写入全量元数据(DB当前状态),包括
- 每个 cf 创建信息(add cf)
- 每个 cf 每层的 SST 文件,blob 文件,compaction 位置,`log_number`
- 整个DB 的 `min_log_number_to_keep_`(写到默认 cf)等
- 新 manifest 会消费 `next_file_number_`,需要体现到后续的 VersionEdit 中
- 错误处理:
- 如果写 manifest 成功,但更新 CURRENT 失败,则==不删除==新创建的 manifest 文件
- 远程文件系统可能更新 CURRENT 成功了,但给客户端返回失败,此时删除则导致 CURRENT 指向了不存在的 MANIFEST
- PR: [Handle rename() failure in non-local FS #8192](https://github.com/facebook/rocksdb/pull/8192)
- 整体执行流程:
1. group commit:批量挑选 VersionEdits 到 `batch_edits`
2. 为每个 cf 生成新 Version(存储到 `versions`中)
- 基于 cf 当前 version 创建新的 version,并应用针对该 cf 的 VersionEdits
- 调用 [[version#^e0a39b|LoadTableHandler()]] ==预加载==新增的 SSTs
3. 按需创建新 manifest
4. 调用 `Writer::AddRecord()` 依次写入 `batch_edits`中的每个 VersionEdit
- 一个 VersionEdit 一个 record
5. SyncManifest:flush + sync
6. 如果新建了 manifest,调用 SetCurrentFile() 更新 CURRENT 文件
7. 调用 `VersionSet::AppendVersion()`安装每个 cf 的新 Version
- 如果是 add/drop cf 则特殊处理,调用 CreateColumnFamily 或者 SetDropped
8. 写入成功后,将已写入的从 `manifest_writers_`出队,并唤醒组提交中非本次请求的 writers
9. 唤醒新的队首,通知下轮写入
> [!summary] ProcessManifestWrites 要点
> - group commit
> - 每个 VersionEdit 写入一个 record,只是整体只有一次 flush + sync
> - 预先打开 new SSTs
> - 按需切换 manifest 文件
> - 写完后将新 Version 更新到 cfd 中
## LogAndApply()
- 记录 VersionEdits 到 Manifest 、并生成和应用新 Versions。
- 参数:
- **`column_family_datas`**:`vector<ColumnFamilyData*>`
- 变更涉及到的 cf 列表
- **`mutable_cf_options_list`**:`vector<const MutableCFOptions *>`
- 各个 cf 最新的 mutable options,最终会应用到 cf 的新 Version 中
- 数量跟 `column_family_datas`一样
- **`edit_lists`**:`vector<vector<VersionEdit*>>`
- 两层 vector
- 外层每个元素对应一个 cf,数量跟 `column_family_datas`一样
- 内层表示某一个 cf 的 修改列表(VersionEdits)
- **`new_cf_options`**: `ColumnFamilyOptions *`
- 创建新 cf 时需要,此时 `edit_lists` 只包含一个 cf,且该 cf 只有一个 VersionEdit
- 流程:
1. 构建 `ManifestWriter`s,入队 `manifest_writers_`
2. 等待他人通过 group commit 代自己写入,或者,自己成为队首
3. 如果成为队首,调用 ProcessManifestWrites() 写入 manifest 并安装新 Versions
- 如果 `column_family_datas` 所有的 cf 都被 drop 了即 `cfd->IsDropped() == true`, 则不写入,返回 `Status::ColumnFamilyDropped()`
## Recover
^6ca17c
- 回放 MANIFEST 到 VerionSet 中
1. 读取 CURRENT -> MANIFEST 文件
2. 利用 [[version#^b3d06f|VersionEditHandler]] 遍历 MANIFEST,回放 VersionEdits 到 VersionSet
- 各个 cf,cf 的 version(SSTs 文件等)
- `next_file_number`等 DB 级别元数据
- 预打开每个 cf 的 SST 文件( VersionEditHandler::CheckIterationResult 中)
- 如果`max_open_files != -1`,所有 cf 累积打开不超过 16 个
--------
# 总结
- 几个 Version 类的关系
- 
---
# 实验
- [ ] 一个VersionEdit 编码后的 manifest record 极限下能有多大
- 比如 FileMetaData 的 `smallest` 或 `largest` 非常大
- [ ] `max_manifest_file_size`过小