# Compaction 类
- 封装了一次 compaction 的元数据。
- db/compaction/compaction.h
## 核心成员
- **`smallest_user_key_`**:`Slice`
- **`largest_user_key_`**:`Slice`,所有 inputs 的整体范围
- **`input_vstorage_`**:`VersionStorageInfo *`,compaction 任务生成时所基于的 version
- **`inputs_`**:`vector<CompactionInputFiles>`,compaction 输入
- **`input_levels_`**:`autovector<LevelFiesBrief, 2>`,输入的上下两层
- **`grandparents`**:`vector<FileMetaData *>`,`output_level + 1` 层跟 inputs 范围有重叠的文件
- **`bottommost_level_`**:当前 compaction 是否会在 bottommost level 创建文件
- 检查它的范围不会出现在更大的 level 或更旧的 L0 中
- 值取自于 `IsBottommostLevel()`
- **`target_output_file_size_`**: 输出的 SST 文件的目标大小
- 取决于 `target_file_size_base` 和 `target_file_size_multiplier` 参数
- 计算逻辑: MaxFileSizeForLevel()
- **`max_output_file_size_`**:输出的 SST 的文件最大限制 ^b6ff7b
- 取决于 `target_output_file_size_` 的值,最大为 ==2 倍==
- PR:[Align compaction output file boundaries to the next level ones #10655](https://github.com/facebook/rocksdb/pull/10655)
## IsBottommostLevel()
- 检查在比`output_level` 更大的层次中,没有跟 `inputs_`的范围重叠的。
- 调用 [[version#^cdf817|RangeMightExistAfterSortedRun]]
- 如果 `output_level` 是 L0,则也会检查更早的 L0 文件。
- 也就是说 compaction 输出的 key 范围不会跟更大 level 更旧的数据有重叠。
- 这个信息可以用于实现一些优化,例如 Bottommost compaction 输出中 的 delete marker 不需要保留。
## PopulateWithAtomicBoundaries()
- PR: [Truncate range tombstones by leveraging InternalKeys #4432](https://github.com/facebook/rocksdb/pull/4432)
----
# CompactionPicker
- Picker 基类,也实现了一些公用方法。
- 一个 CF 持有一个 picker。
- 纯虚函数(需要不同的 CompactionStyle 来实现):
- NeedsCompaction
- PickCompaction
## 核心成员
- **`compactions_in_progress_`**:`set<Compaction *>`
- 跟踪所有正在运行的 compactions,避免有冲突的 compaction 并行运行
- **`level0_compactions_in_progress_`**:`set<Compaction *>`
- 跟踪所有正在 L0 层上运行的 compactions,保证同时只有一个 L0-> Lbase
## AreFilesInCompaction()
- 入参:`const vector<FileMetaData*>& files`
- 如果 files 中任一文件为 `being_compacted`,返回 true。
## ExpandInputsToCleanCut()
- 添加更多 `start_level` 的 inputs 文件(L 层的),确保当 L 层某个 key 被 compact 到 L + 1 层后,L 层不会保留该 user key 的更早版本
- 否则会导致 Get 操作得到过时的数据
- *例如 seq=100 的 comacted 到 L+1 了,seq=10 的还在 L 层*
- 如果无法执行,则返回 false。
- 入参:
- **`vstorage`** : `VersionStorageInfo*`
- **`inputs`**: `CompactionInputFiles`
- 执行流程:
- 对于 L0,不需要处理,`GetOverlappingInputs()`已经能保证没有问题。
- 循环进行如下操作,直接 inputs 不再增长:
1. 调用 `GetRange()` 获取当前 inputs 的整体范围
2. 调用 `vstorage->GetOverlappingInputs()`,获取跟上述范围的 **user key** 有重叠的所有文件作为新的 inputs
- 检查所有 inputs 的 `being_compacted` 标记,如果有为 true 则表示该文件已经有其他 compaction 任务在执行,因此应该取消本次 compaction。
- [ ] bottommost compaction 是否有必要调用它
## RangeOverlapWithCompaction()
- 检查 key 范围是否跟当前正在运行的某个 compaction 重叠。
- 输入参数:
- **`smallest_user_key`**:`const Slice&`
- **`largest_user_key`**:`const Slice&`
- **`level`**:`int`,output level,只检查输出到该 level 的 compaction
- 执行流程:
- 检查 `compactions_in_progress_` 中每个 compaction
- 重叠条件:compact 到同一个层(`output_level`)且 key 范围有重叠
## FilesRangeOverlapWithCompaction()
- 检查输入文件的 key 范围是否跟当前正在运行的 compactions 重叠。
- 输入参数:
- **`inputs`**:`vector<CompactionInputFiles>`
- **`level`**:`int`,output level
- **`penultimate_level`**:`int`,
- 默认为 kInvalid(不启用 `preclude_last_level_data_seconds` 特性)
- 执行流程
1. 调用 `GetRange()` 获取 inputs 的最大和最小 key(一个整体的范围,涵盖所有 inputs)
2. 调用 `RangeOverlapWithCompaction()`
## SetupOtherInputs()
^a5d371
- 填充与 `start_level` 重叠的其他 level 的 inputs。
- 也会根据填充的 `ouput_level` 文件,尝试扩容 `start_level` 的 inputs
- 输入参数:
- **`inputs`**: `CompactionInputFiles*`,`start_level` 的输入
- **`output_level_inputs`**: `CompactionInputFiles`, `output_level`的输入
- **`parent_index`**:`int *`
- **`base_level`**:`int`
- 前置条件:
- `input_level` 和 `output_level` 不一样,否则不操作返回 true。
- `input_level` 和 `output_level` 之间的 levels 都是空的(无SST 文件)。
- 执行流程:
1. 获取 inputs 的整体范围( smallest 和 largest)
2. 计算 `ouput_level` 中跟上述范围重叠的文件,输出到 `output_level_inputs` 中
3. 检查 `output_level_inputs` 中是否有文件已经在被 compaction,是的话,返回 false。
4. 如果 `output_level_inputs` 不为空,则对其调用 `ExpandInputsToCleanCut()`
5. 根据 `output_level_inputs` 尝试扩充 `start_level` inputs(不增加 `output_level_inputs` 的前提下)
- 扩充 `start_level` inputs
- 避免写放大,防止需要后面再生成一个任务来 compact `ouput_level` 对应的同一个范围,代价是当前的 compaction 会变大
- PR:[Ignore max_compaction_bytes for compaction input that are within output key-range](https://github.com/facebook/rocksdb/pull/10835)
- PR: [Limit compaction input files expansion](https://github.com/facebook/rocksdb/pull/12484)
- 执行流程:
1. 获取两个 level 的整体范围,再获取 `start_level` 中跟此范围重叠的文件到 `expaned_inputs`
2. 可以扩充 inputs 的条件:
- 对 `expaned_inputs` 调用 `ExpandInputsToCleanCut()` 并成功
- `expaned_inputs` 不包含正在被 compaction 的文件
- 两层文件大小加起来不超过两倍的 `max_compaction_bytes`,避免扩充带来大 compaction
- `expaned_inputs` 不会引发 output level inputs 的增长(通过新的范围检查)
3. 如果不能扩充,通过 `GetCleanInputsWithinInterval` 再次尝试
## GetOverlappingL0Files()
- 入参:
- **`start_level_inputs`**:`CompactionInputFiles *`
- **`output_level`**: `int`
- 实现:
- 查找跟 `start_level_inputs` 重叠的 L0 文件,重新填充进 `start_level_inputs`
- 获取新的 `start_level_inputs` 的范围,通过 `IsRangeInCompaction()` 检测是否与已有 compaction 重叠
---
# LevelCompactionBuilder
## 核心成员
- **`base_index_`**:`int`,initial file 在 `start_level_`层的索引
- **`parent_index_`**:`int`
- **`start_level_`** : `int`,哪一层需要 compact(输入),例如 L 层
- **`start_level_inputs_`**: `CompactionInputFiles`, L 层输入文件
- 包含了哪个 level 的哪些文件
- **`output_level_`**:`int`,compact 到哪一层(输出),例如 L + 1 层
- 输出可能也是 L 层(例如 intra L0 和 bottommost compaction)
- **`output_level_inputs`**: `CompactionInputFiles`, L + 1 层输入文件
- **`grandparents_`**:`vector<FileMetaData *>`
- 跟本次 compaction 所有输入有重叠的 grandparent 文件列表
> grandparent == `output_level + 1` or the first level after that has overlapping files
- **`compaction_inputs_`**: `vector<CompactionInputFiles>`,compaction 所有的输入,
- **`is_l0_trivial_move_`**:`int`,特殊的 compaction,直接将 L0 移动到 LBase
## PickSizeBasedIntraL0Compaction()
- 当 L0 层的总大小相对 Lbase很小时,尝试从最新的 L0 文件开始挑选 intra-L0 compaction,目的是减少 L0 -> Lbase compaction 的写放大。
- Issue: [compact L0 file to L0 when L0 is too small #10706](https://github.com/facebook/rocksdb/issues/10706)
- PR:[Allow more intra-L0 compaction when L0 is small #12214](ttps://github.com/facebook/rocksdb/pull/12214/)
- 背景:L0文件每个都很小,数量很快超过 `level0_file_num_compaction_trigger`,触发 compaction,导致写放大很大。
- 返回 true 如果挑选出了 intra L0 compaction,同时会更新 `start_level_inputs_` 和 `output_level_`
- 前置条件:`start_level_ == 0` 即 compaction 目标是 L0
<br>
- 执行流程:
- **`min_num_file`**:`max(2, level0_file_num_compaction_trigger)`
- 如果 L0 文件数量小于 `min_num_file` 则不挑选,返回 false
- 数量少就不会触发 compaction 即 issue #10706 的问题,不会带来额外写放大
- ==目标条件==:`L0_total_size` 小于 `Lbase_total_size / level_multiplier`
- 满足就说明整体比较小,不满足就不挑选,退出返回 false
- 从新到旧将 L0 的文件放入 `start_level_inputs_`,如果遇到 `being_compacted` 则中断
- 放入前会先清空 `start_level_inputs_`
- 被选中的文件数量需要大于等于 `min_num_file`,否则返回 false
- intra L0 的输出 level 即 `output_level_` 为 0,返回 true,挑选成功。
## TryPickL0TrivialMove()
- 尝试将 L0 文件直接 move 到 LBase
- 适用于 L0 文件跟 LBase 之间文件范围不重叠的情况
- 典型场景:顺序写入。
- PR:[Multi-File Trivial Move in L0->L1 #10188](https://github.com/facebook/rocksdb/pull/10188)
- 前置条件:
- `base_level` 不为空
- `start_level_` 为 L0,即 L0 需要 compaction
- 没有配置 `compression_per_level`,避免移动时两个 level 的 compression 算法不匹配
- `output_level_` 不为空(move 的收益不大,简化实现)
- 没有配置多 DB Path
- 执行流程:
- 从旧到新搜索 L0 文件
- 因为需要维持 L1 的文件始终旧于 L0(移动完后 L0 剩余的文件可能会跟 L1 重叠)
- 挑选符合条件的 L0 文件, 需要满足
- 跟已被选中的 L0 文件无重叠
- 跟 L1 所有文件无重叠
- 将被选中的文件按 key 排序,放入 `start_level_inputs_` ,同时设置 `is_l0_trivial_move_` 为 true
- 如果有能直接 move 的,函数返回 true。
## TryExtendNonL0TrivialMove()
- PR: [Try to pick more files in TryExtendNonL0TrivialMove() #11347](https://github.com/facebook/rocksdb/pull/11347)
- 前置条件:
- `start_level_inputs_` 只有一个文件,该文件可以直接 trivial move 到下一层
- 没有配置多个 `db_paths`
- 没有设置 `compression_per_level`
- 参数:
- **`start_index`**: `int`,被选中的 initial file (`start_level_inputs_[0]`)在 level 中 的 index
- 执行流程:
- 从 `start_index` 开始,先从右再从左扩展能 trivial move(跟下层没有重叠)的文件
- 限制:
- 总个数不超过 `kMaxMultiTrivialMove`,默认值为 4
- 总大小不超过 `max_compaction_bytes`
## PickFileToCompact() / 无参数
- 根据 `start_level_`,挑选文件 compact 到 `output_level_`。
- L0 compacton(`start_level_` 为 0)的特殊规则:
1. 如果有其他 L0 的 compaction,则只能尝试 `PickSizeBasedIntraL0Compaction()`
- 原因:L0 文件互相有重叠,不允许 L0 并行 compaction。(优化根据 key 范围判断)
2. 尝试 `TryPickL0TrivialMove()`,成功直接返回
3. 再次尝试 `PickSizeBasedIntraL0Compaction()`,成功直接返回
- 各层通用挑选规则:
1. 按照 compaction 优先级顺序,从 `start_level_` 的 `NextCompactionIndex()`开始,跳过 `being_compacted` 的文件,选中第一个满足条件的文件放入 `start_level_inputs`
2. 调用 `ExpandInputsToCleanCut()` 和 `FilesRangeOverlapWithCompaction()` ,如果跟其他 compaction 重叠冲突,清空,继续挑选
3. 填充 `output_level_inputs`(根据选中的源 level 的文件范围)
- 如果为空,则表示可以 trivial move,调用 `TryExtendNonL0TrivialMove()` 尝试扩展
- 否则,对 `output_level_inputs` 调用 `ExpandInputsToCleanCut()`后检测冲突,如果冲突则重新挑选下一个
4. 更新 `vstorage_` 的 compaction index
## FindIntraL0Compaction()
- file num based Intra-L0
- 入参:
- **`level_files`**:`Vector<FileMetaData>`, L0 层所有文件
- **`min_files_to_compact`**:`size_t`,默认为 `kMinFilesForIntraL0Compaction` 即 4
- **`max_compaction_bytes`**:`size_t`,来自 DBOptions,
- **`comp_inputs`**:`CompactionInputFiles*`,被选中的 L0 文件放到这里面
- 执行流程:
- 从新到旧挑选 L0 文件,中止条件(任一满足)
- 遇到 `being_compacted` 的
- 被选中的 L0 文件累积大小超过 `max_compaction_bytes` 限制
- 检查被选中的文件个数,超过 `min_files_to_compact` 才算成功,否则返回 false
- 被选中的文件放到 `comp_inputs` 中
## PickIntraL0Compaction()
- 当 L0 文件累积太多时,挑选 L0 -> L0 compaction。
- 目的:避免读放大
- 从最新的 L0 文件开始,挑选未在被 compact 的最长连续区间
- 前置条件:
- L0 文件的数量有挤压:大于等于 `level0_file_num_compaction_trigger + 2`
- 最新的 L0 文件没有正在被 compact
- 执行流程:
- 先清空 `start_level_inputs_`
- 检查前置条件后,满足调用 `FindIntraL0Compaction()` 挑选 L0 文件,被选中的放入 `start_level_inputs_` 中
## SetupInitialFiles()
- 挑选 compact 的初始输入文件,也包含了 Intra-L0 compaction。
- 结果存储到 `start_level_inputs_`中
- 按以下规则依次挑选文件(优先级由高到低),规则能命中挑选出文件就直接返回
- compaction score
- 不包含 Lmax
- 按 score 从高到低挑选大于 1 的,如果选中的跟运行中的冲突,则跳过检查下一个
- 如果跳过的是 L0->Lbase(同时只能有一个在运行),说明 L0 压力比较大
1. 避免选择 Lbase 的 compaction,否则可能 starve L0->Lbase
2. 尝试 IntraL0 compaction(PickIntraL0Compaction,filenum-based)
- marked for compaction
- 通过`SuggestCompactRange()`标记的
- bottommost compaction
- TTL compaction
- periodic compaction
- force blob gc
## SetupOtherL0FilesIfNeeded()
- L0 到 Lbase compaction 时(且非 trivial move),挑选其他 L0 文件。
- 扩充 `start_level_inputs_`
- 实现:
- 调用 `CompactionPicker::GetOverlappingL0Files()`
- 查找跟当前 `start_level_inputs_` 范围内重叠的 L0 文件,重新填充。
## SetupOtherInputsIfNeeded()
- 基于 initial files(通过 `SetupInitialFiles()` 获取的),设置其他需要被 compact 的文件。
- 如果 `output_level_` 是 L0,只将 `start_level_inputs_` 放入 `compaction_inputs_` 后返回。
- 非 L0 -> L0 的处理:
1. 如果非 `is_l0_trivial_move_`,调用 [[compaction#^a5d371|SetupOtherInputs]] 扩充 inputs
2. 将 `start_level_inputs_` 和 `output_level_inputs_` 放入 `compaction_inputs_`
3. 检测是否跟其他正在运行的 compaction 范围重叠(它们的输出 level 也一样)
- PR: [Add missing range conflict check](https://github.com/facebook/rocksdb/pull/10988)
4. 如果非 `is_l0_trivial_move_`,调用 `GetGrandparents()` 初始化 `grandparents_` 文件
## GetCompaction()
- 创建 Compaction 对象,注册到 picker
- `vstorage_`需要重新 `ComputeCompactionScore()`
- 因为计算分数会考量 `being_compacted` 状态,该状态发生变化了
## PickCompaction
- 生成/返回一个 Compaction 对象。
- 执行流程:
1. 调用 `SetupInitialFiles()`,挑选需要 compact 的第一个文件,也可能通过 clean cut 扩展为更多文件
2. 调用 `SetupOtherL0FilesIfNeeded()`,适用于 L0 -> Lbase 时,挑选其他 L0 文件
3. 调用 `SetupOtherInputsIfNeeded()`,挑选 output level 的文件,并扩充更多 start level 文件(key 范围可能变大)
4. 调用 `GetCompaction()` 生成 Compaction 对象并返回。
## 总结
> [!summary] Picker 基本流程
>1. 按照几种类型的优先级,先选择一个 input 文件;
>2. 由这个文件作为起始,按范围覆盖扩展 input level 的更多文件;
>3. 按范围覆盖挑选和扩展 output level 文件,并回头尝试扩展 input level
> [!summary] Compaction 并行
> - L0-> Lbase 同时只能由一个
> - 其他情况可以并行,但需要保证相同 level 时 compaction 的 key 范围不重叠
> [!summary] Compaction output level
> - IntraL0 和 Bottomest 的输出跟 input_level 相同
> - L0 compaction 输出到 Lbase
> - 其他的输出到 input_level + 1
> [!summary] Intra L0 compaction
> 两种形式的:
> - **filenum-based**:消减L0数量,优化**读放大**
> - L0 score 比较高,且已存在运行中的 L0->Lbase compaction ,说明 L0 压力比较大
> - 此时检查 L0 文件数量是否积累过多,进行 intra-L0 compaction 消减数量,减少读放大
> - **size-based**:合并小文件,优化**写放大**
> - 存在较多 filesize 较小的 L0 文件,如果按照 `level0_file_num_compaction_trigger` 数量跟下层 compaction,则写放大较大;
> - 利用 intra-L0 compaction 合并小文件
----
# LevelCompactionPicker
## NeedsCompaction()
- 入参:`const VersionStorageInfo* vstorage`
- 依次检测如下条件,任一满足直接返回 true
1. `vstorage->ExpiredTtlFiles()` 非空
2. `vstorage->FilesMarkedForPeriodicCompaction()` 非空
3. `vstorage->BottommostFilesMarkedForCompaction()` 非空
4. `vstorage->FilesMarkedForCompaction()`非空
5. `vstorage->FilesMarkedForForcedBlobGC()` 非空
6. 任一 level 的 `vstorage->CompactionScore` 大于 1
## PickCompaction()
- 关键参数:
- **`vstorage`**: `VersionStorageInfo *`
- 调用 `LevelCompactionBuilder::PickCompaction()`
-------
# CompactionIterator
- 迭代 compaction input,丢弃无用数据,只返回需要保留的版本。
- 基本流程:
1. NextFromInput()
2. PrepareOutput()
## 核心成员
- **`input_`**:`SequenceIterWrapper`,compaction input
- **`range_del_agg_`**:`CompactionRangeDelAggregator*`
- 聚合了所有 input 文件的 range tombstones
- **`snapshots_`**:`vector<SequenceNumber>`,快照列表
- **`earliest_snapshot_`**:`SequenceNumber`,最早的快照
- `snapshots_`的首个元素,`snapshots_`为空时则为 `kMaxSequenceNumber`
- **`bottommost_level_`**:`bool`,compaction 是否输出到 bottommost level
- `compaction_->bottommost_level() && !compaction_->allow_ingest_behind()`
## findEarliestVisibleSnapshot()
- 入参:
- **`in`**:`SequenceNumber`,input key 的 seq no
- 返回第一个 大于等于 `in` 的快照号,即最早/最小可见的快照
- 并将`prev_snapshot`赋值为结果的前一个快照(如果没有则返回 0 )
## NextFromInput()
- 丢弃`input_` 无用的数据,返回下一个需要保留的版本。
- 可以丢弃的几种情况:
1. 无快照引用
- 
- 相同 user key, seq 80 和 100 的 EarliestVisibleSnapshot 相同,则 80 版本对任何快照都不可见,可以跳过丢弃
- 对应 `num_record_drop_hidden`
2. 类型为 `kTypeDeletion` 且 `seq <= earliest_snapshot_` 且 KeyNotExistsBeyondOutputLevel
- KeyNotExistsBeyondOutputLevel 表示更大 levels 中没有该 key,因此不需要保留该 delete marker 来删除更旧的数据
- `input_` 中相同 user key 的后续 seq 较小数据会命中 **case 1**, 也能正常被丢失
- 对应统计:
- `num_record_drop_obsolete`
- `num_optimized_del_drop_obsolete`(非 `bottommost_level_` 情况下)
3. 类型为 `kTypeDeletion` 且 `bottommost_level_` 为 true,则跳过相同 snapshot stripe 中的后续其他版本
- 如果最后 input 中还存在相同 user key,则需要保留该 delete marker
- 
4. 被 range tombstone 所屏蔽,即 `range_del_agg_->ShouldDelete()` 返回 true
- 对应 `num_record_drop_hidden` 和 `num_record_drop_range_del`
- [ ] 是否考虑了该 key 仍然对某个快照可见
5. CompactionFilter 返回需要跳过的
## PrepareOutput()
- 主要逻辑:
- 将 bottommost level 符合条件的 key seq 置为 0,实现更好的压缩率。
- 关联逻辑:bottommost compaction
- 当所有 key 都置 0 后,bottommost sst 将处于稳态,不再触发 bottommost compaction
- 置 0 条件:
- key 小于等于 `earliest_snapshot_` 且
- 类型不是 kTypeMerge 且
- [ ] 非 delete range start key [#1113](https://github.com/facebook/rocksdb/pull/11113)
- 其他逻辑:将大 value 分离到 blob
---
# CompactonOutputs
- 管理一个 subcompaction 的输出。
- 主要逻辑:
- 将 CompactionIterator 的输出写入到 SST
- 入口:AddToOutput()
- 文件管理:新建、关闭和 cut SST
## 核心成员
- **`builder_`**:`unique_ptr<TableBuilder>`,当前写入中的 sst builder
- **`outputs_`**: `vector<Output>`,当前输出的所有 SSTs
- Output 结构对应一个 SST,主要包含了 FileMetaData 和 TableProperties
## ShouldStopBefore()
- cut 逻辑,是否需要切换新 SST。
- cut 的策略(优先级从高到低):
1. cut for TTL
- [Consider TTL compaction file cutting earlier to prevent small output file #11075](https://github.com/facebook/rocksdb/pull/11075)
2. 用户自定义的 SstPartitioner
3. 达到 `Compaction::max_output_file_size_` 限制
- [[compaction#^b6ff7b|max_output_file_size_]]
4. 与 grandparent 重叠数据太大
5. 尽量对齐 grandparent 的文件边界,避免跨越,导致下次 compaction 太大
- [Align compaction output file boundaries to the next level ones #10655](https://github.com/facebook/rocksdb/pull/10655)
- [ ] [Compaction to cut output file if it can avoid to overlap a grandparent file #1963](https://github.com/facebook/rocksdb/pull/1963)
> [!NOTE] cut 策略
> - 输出到 L0 时,只使用策略 1 和 2。
> - L0 需要避免文件过多,来减少读放大,因此 cut 时不积极。
## AddToOutput()
^9fd48b
- 将 CompactionIterator 当前的 entry 写入到 SST
1. 写入前,先调用 ShouldStopBefore(), 检查是否需要 file cut
2. 调用 `builder_->Add()` 写入 entry
3. 更新 file meta 的 boundary keys
- 一个 SST 的生命周期:
1. 创建:调用 CompactionJob::OpenCompactionOutputFile()
2. 写入:`builder_->Add()`
3. 关闭:调用 CompactionJob::FinishCompactionOutputFile()
## AddRangeDels()
- 文件 Finish 时写入 delete range tombstones.
- `builder_` 内根据 key类型最终会写入到 meta block。
## CloseOutput()
- 任务结束时关闭最后一个 SST。
- 特殊逻辑:整个 subcompaction 没有输出 entries,但存在 range deletions,则需要创建一个 SST,先写入再关闭。
-------
# SubcompactionState
## 关键成员
- **`start`**,**`end`**:`optional<Slice>`,subcompaction 负责的范围,左闭右开,nullptr 意味着 unbounded
- **`compaction_outputs_`**:`CompactionOutputs`,compaction 的输出
## CloseCompactionFiles()
- 任务结束时,关闭最后一个正在写入的 SST。
---
# CompactionJob
- 职责:
- 负责执行一次 Compaction,通常的执行流程: Prepare() -> Run() -> Install()
- CompactionJob 会将 compaction 切分为多个可以并行执行的 subcompactions。
- 有两个主要的 stats:
- **CompactionJobStats**:public 结构,用户可访问
- **CompactionStatsFull**:内部的 stats,最终发往 `ColumnFamilyData::internal_stats_`,用于 logging 和 public metrics
## 关键成员
- **`job_id_`**:`uint32_t`,任务唯一ID,由 DBImpl 分配。
- **`existing_snapshots_`**:`vector<SequenceNumber>`,当前所有快照的序列号
- **`earliest_snapshot_`**:`SequenceNumber`,最老的快照序列号
- 如果 `existing_snapshots_` 为空,设置为 `kMaxSequenceNumber`
- **`boundaries_`**:`vector<string>`,每个 subcompaction 的边界
- **`compact_`**:`CompactionState`, 包含 `sub_compact_states` 和 原始的 Compaction 对象。
## GetRateLimiterPriority()
- 如果发生了 write delay 或 write stop,返回 `Env::IO_USER`(高优先级);
- 否则返回 `Env::IO_LOW`。
## GenSubcompactionBoundaries()
^1c1c5a
- 找到一个边界 keys,可以均匀地将 input data 分割为 `max_subcompactions` 份。
- PR: [Improve SubCompaction Partitioning](https://github.com/facebook/rocksdb/pull/10393)
- 针对每个 input file,通过扫描 index blocks,将其拆分为 128 份(anchor points)。
- 每个 anchor point 包含一个边界 key 和 size
- 将所有文件的 anchor points 按 key 排序,再按总 size 进行均分。
- `total_size`:所有 anchor 的 size 总和
- `level_file_size`:`output_level`单个文件的大小限制
- `target_range_size`:每份的大小,等于 `max(total_size / N, level_file_size)`
- 均分:连续取 N 个 `target_range_size`
## Prepare()
- 准备工作:
- 为每个 subcompaction 设置边界, 生成多个 SubcompactionState 子任务,保存到 `compact_` 中
## ProcessKeyValueCompaction()
- 处理 subcompaction 的输入、利用 CompactionIterator 丢弃数据并写入到新文件中。
- 入参:
- **`sub_compact`**: `SubcompactionState`
- 流程
1. 创建 compaction filter
2. 创建 input InternalIterator,范围对齐 sub_comact
3. 基于 input iterator 和 compaction filter 创建 CompactionIterator
4. 迭代 CompactionIterator 写入到 CompactionOutputs 中
- [[compaction#^9fd48b|AddToOutput]]
## Run()
- 流程:
1. 第一个 subcompaction 在当前线程运行,其他 N - 1 个**每个单独开线程**运行,每个子任务都执行 ProcessKeyValueCompaction 函数
2. 等待所有子任务执行完成,检查它们的 status
3. 如果开启了 `paranoid_file_checks` 选型,则会读取校验每个生成的 SST
## InstallCompactionResults()
- 生成 VersionEdit,并调用 LogAndApply 应用。
- 流程:
1. 将整个 compaction 的所有 inputs 加入 VersionEdit,类型为删除
2. 将所有 subcompaction 的 ouputs 加入 VersionEdit
3. 调用 `versions_->LogAndApply()` 应用 Edit
## Install()
- 主要是调用 InstallCompactionResults 和 输出一些统计信息。
## OpenCompactionOutputFile()
- 新建一个 SST 文件,用于 sub compaction 的输出。
- 主要是创建新的 FSWritableFile 和 TableBuilder
- 并更新到 subcompaction 的 CompactionOutputs 中
- 设置到 `builder_`, `file_writer_` 中
- 追击到 `outputs_` 中
- `epoch_number` 取 compaction 所有 inputs 中最小的
- `oldest_ancester_time` 取跟 subcompaction 范围重叠的 inputs 中最小的
## FinishCompactionOutputFile()
- 完成一个 SST 文件的构建
1. 添加 range tombstones
- 调用 CompactionOutputs::AddRangeDels() 写入到 range del meta block
2. builder->Finish()
- 写入 filter、index 等其他 meta blocks 和 footer
3. 根据文件的精准范围,调整 `oldest_ancester_time`
4. Sync + Close
5. 如果 SST 为空,则删除(不包含任何 entries 和 range deletions)
6. 上报给 SstFileManager
----
# DBImpl
- Compaction 入口: `InstallSuperVersionAndScheduleWork()` -> `MaybeScheduleFlushOrCompaction()` ->
## 关键成员
- **`compaction_queue_`**:`deque<ColumnFamilyData *>`,
- 当 cf 满足 `cfd->NeedsCompaction()` 时,会放入该队列
- 放入路径:`InstallSuperVersionAndScheduleWork()` ->`SchedulePendingCompaction()` -> `AddToCompactionQueue()`
- **`unscheduled_compactions_`**:`int`,在队列中还未调度的 compactions 技术
- 每次 `SchedulePendingCompaction()` 入队列时加 1
- **`bg_bottom_compaction_scheduled_`**: `int`,调度到 BOTTOM pool 中的 compaction
- ManualCompaction 时才触发
- **`bg_compaction_scheduled_`**:`int`,调度到 LOW pool 中的 compaction。
- 正常都调度到 LOW pool
- **`manual_compaction_dequeue_`**: `deque<ManualCompactionState*>` ,手动 compaction 的队列
## SchedulePendingCompaction()
- 入参:cfd
- 检查 cfd 是否需要 compaction 且该 cfd 未在 compaction 队列中,放入入队
- 在 `InstallSuperVersionAndScheduleWork()` 调用
## MaybeScheduleFlushOrCompaction()
- 条件:
- 已经 scheduled 的没超过 `max_compactions`,且
- `unscheduled_compactions_ > 0` 即队列里有 pending 的
- 动作:调用 `env_->Schedule()`,去执行 `BGWorkCompaction`,目标是 LOW pool 中。
## BackgroundCallCompaction()
- 主要是调用 BackgroundCompaction
## PickCompactionFromQueue()
- 从 `compaction_queue_` 取出队首的 cfd,并且取消设置该 cfd 的 `queued_for_compaction_`
- [ ] [Concurrent task limiter for compaction thread control](https://github.com/facebook/rocksdb/pull/4332)
## BackgroundCompaction()
- 流程:
1. 调用 `PickCompactionFromQueue()` 从队列挑选 compaction
2. 调用 `picker->PickCompaction()` 生成 Compaction 对象
3. 检测当前 cfd 是否还需要 compaction,需要的话加队列,并触发调度(调用。MaybeScheduleFlushOrCompaction)
- 把当前 compaction 涉及的文件屏蔽后,可能还需要 compaction
- PickCompaction 内部会重新 score
4. 执行不同类型的 compaction
- deletion compaction
- trivial move:在 VersionEdit 中直接移动文件,再执行 LogAndApply
- bottom compaction:
- compact 到最后一层且存在 BOTTOM thread pool
- 使用 BOTTOM pool 运行 compaction
- 一般 compaction
- 生成 CompactionJob,Prepare->Run->Install
- 执行完也会调用 InstallSuperVersionAndScheduleWork
----
# 总结
- 三种可能改变 compaction score 的方式:
- flush 或 cmpaction 完成,由 InstallSuperVersionAndScheduleWork 触发
- MutableCFOptions 改变,也由 InstallSuperVersionAndScheduleWork 触发
- pick compaction 时,随着部分文件被标记为 `being_compacted`被屏蔽,score 也要重算
- Compaction 的数据流向
```mermaid
flowchart LR
CompactionMergingIterator@{ shape: lean-r, label: "CompactionMergingIterator" }
CompactionIterator[CompactionIterator]
CompactionOutputs[(CompactionOutputs)]
CompactionMergingIterator --> CompactionIterator
CompactionIterator --> CompactionOutputs
```
- CompactionMergingIterator:迭代输入
- CompactionIterator:drop 数据
- CompactionOutputs:写入结果
-------
# Manual Compaction
## FindMinimumEmptyLevelFitting()
- 参数:
- **`level`**: `int`
- 将整个 level 层往更小的层次移动,寻找最小能容纳 level 层的空层次
- 搜索 $i \in [level-1, 0)$,停止条件:
- i 层非空
- 或者 i 层 `max_bytes` 太小无法放入 level 层现有数据
- `MaxBytesForLevel(i) < NumLevelBytes(level)`
- 无法移动时,返回 `level`原始值
## ReFitLevel()
- 将 level 层整体移动到另外一层。
- 目标层可以更大也可以更小。
- 参数:
- **`level`**:`int`,源 level
- **`target_level`**:`int`,目标 level,小于 0 则使用 `FindMinimumEmptyLevelFitting()`
- 执行流程:
1. 检查是否能移动,需要满足:
- 两个层次中间的所有层次都是空的(包含`target_level`)
- 涉及的数据范围与其他 compaction 任务不冲突
- 这些 compactions 的 `ouput_level` 位于这两个层次之间
- L0 层的特殊限制:
- 不允许将 L0 层移动到其他层次
- 从其他层移动到 L0 时,不能超过 1 个文件
- [Disallow refitting more than 1 file from non-L0 to L0 #12481](https://github.com/facebook/rocksdb/pull/12481)
- 避免 L0 文件过大,影响读性能
2. RegisterCompaction:生成 compaction job 并注册,reason 为 `kRefitLevel`
- 避免后续的 compactions 跟它冲突
- [ ] 整个过程都持有 db 锁,是否有必要注册
- [Add missing range conflict check between file ingestion and RefitLevel() #10988](https://github.com/facebook/rocksdb/pull/10988)
3. LogAndApply:应用文件移动对应的 VersionEdit
4. UnregisterCompaction
5. InstallSuperVersionAndScheduleWork
## RunManualCompaction()
- 手动 compaction 某一层。
- 主要流程:
1. 调用 `ColumnFamilyData::CompactRange(begin, end)` 生成 compaction 任务
- 除了 L0 之外其他层有 `max_compaction_bytes` 限制,因此可能一次任务只 compact 部分范围,这时会通过 `compaction_end`返回实际选中的范围
2. 调用 `env_->Schedule()` 调度到线程池后台执行
3. 等待 compaction 任务完成
4. 如果`compaction_end`不为 nullptr,表示没 compact 完全部范围, 更新 begin 重试步骤 1
- 其他逻辑:
- 因 compaction 冲突时无法执行时,通过 `bg_cv_`等待直到不冲突;
- 如果 `manual_compaction_paused_ > 0` 即禁止了手动 compaction ,则停止,并 unschedule 所有 manual compactions
## CompactRangeInternal()
- CompactRange() 接口的核心逻辑。
- compact 某个范围 `[begin, end]`内的所有数据。
- `begin` 为 nullptr 表示 `-∞`
- `end`为 nullptr 表示 `+∞`
- 主要流程:
1. Flush memtable,如果 memtables 跟目标范围重叠
2. 确定起始 level:从小到大,查找第一个跟目标范围重叠的 level,记为 `first_overlapped_level`
3. 从 `first_overlapped_level` 到 `bottommost_level`(不包含),依次调用 RunManualCompaction() 进行 compaction
4. 对 bottommost level 执行 RunManualCompaction
- intra-level compaction,结果写入到相同 level
- 执行条件(满足任一):
- 设置了 `kIfHaveCompactionFilter` 且存在 compaction filter
- 设置了 `BottommostLevelCompaction::kForceOptimized` 或 `kForce`
- `kForceOptimized`:
- 通过 `next_file_number` ,跳过本次手动 compaction 开始之后产生的新文件
- [Avoid double-compacting data in bottom level in manual compactions #5138](https://github.com/facebook/rocksdb/pull/5138/)
5. **ReFitLevel**,如果设置了 `change_level`
- 使用场景:手动 compaction 后,数据都到了最后一层,可能数据量变小,适合放在更小的层次
- 开启了`level_compaction_dynamic_level_bytes`的话,似乎意义不大❓
----
# TODO
- [x] L0 intra compaction 条件
- [x] 哪些条件会触发 compaction
- [x] 优先级
- [x] 并行 compaction
- [x] subcompaction L0-intra
- [x] https://smalldatum.blogspot.com/2022/01/rocksdb-internals-intra-l0-compaction.html
- [x] max_compaction_bytes
- [x] DBImpl::compaction_queue_
- [x] 不同层之间的 compaction 能否同时进行
- 例如 L0-> L1, L1 -> L2 同时运行(可以,只要范围不重叠)
- [x] L0 compaction 会选中所有 L0 层文件吗
- [x] 选中一个L0文件后,按范围重叠扩展时可能会
- [x] MaybeScheduleFlushOrCompaction
- [x] ReFitLevel
- [x] https://github.com/facebook/rocksdb/pull/10988
- [x] manual_compaction
- [ ] CompactionFiles
- [ ] compaction 如何保留 range tombstones 到 output
- [ ] periodic_compaction
- https://github.com/facebook/rocksdb/pull/5166
- [ ] bottommost_files_marked_for_compaction_
- https://github.com/facebook/rocksdb/pull/3009
- https://github.com/facebook/rocksdb/pull/11701
- https://github.com/facebook/rocksdb/pull/5090
- [ ] levels 何时增加或减少
- [ ] Universal Compaction
- [ ] max_background 如何限制
- [ ] 如何限速
- [ ] PopulateWithAtomicBoundaries
- [ ] deletion_compaction
- [ ] remote compaction
- CompactionService
- https://github.com/facebook/rocksdb/wiki/Choose-Level-Compaction-Files
- [ ] [Refactor AddRangeDels() + consider range tombstone during compaction file cutting #11113](https://github.com/facebook/rocksdb/pull/11113)
- [ ] GetCompactionPressureToken, NeedSpeedupCompaction
- [ ] [Speedup based on pending compaction bytes relative to data size #12130](https://github.com/facebook/rocksdb/pull/12130)
- [Concurrent task limiter for compaction thread control #4332](https://github.com/facebook/rocksdb/pull/4332)
- per-cf 的 compaction 并发限制
---
## 实验
- [ ] 顺序写入,测试 trival move
- [x] 如果 L0 跟 Lbase 压缩算法不同,是否还能 move
- 即使配置 L0 跟 L1 压缩算法是一样的。
- Lbase 使用的是 `compression_per_level[1]` (GetCompressionType)
- [ ] 模拟 Intra L0 compaction
- [ ] 验证 max_compaction_bytes 的影响
- [ ] 测试 `ExpandInputsToCleanCut()`
- [ ] 场景:每个相邻文件之间都有 user key 重叠,最坏情况下是否可能选取该 level 的所有文件
- [ ] 测试触发 bottommost files compaction
- [x] 哪些被算作 `bottommost_files_`
- [ ] 验证 subcompaction 会临时开启新线程
- [x] 如何触发 bottomest compaction