# Compaction 类 - 封装了一次 compaction 的元数据。 - db/compaction/compaction.h ## 核心成员 - **`smallest_user_key_`**:`Slice` - **`largest_user_key_`**:`Slice`,所有 inputs 的整体范围 - **`input_vstorage_`**:`VersionStorageInfo *`,compaction 任务生成时所基于的 version - **`inputs_`**:`vector<CompactionInputFiles>`,compaction 输入 - **`input_levels_`**:`autovector<LevelFiesBrief, 2>`,输入的上下两层 - **`grandparents`**:`vector<FileMetaData *>`,`output_level + 1` 层跟 inputs 范围有重叠的文件 - **`bottommost_level_`**:当前 compaction 是否会在 bottommost level 创建文件 - 检查它的范围不会出现在更大的 level 或更旧的 L0 中 - 值取自于 `IsBottommostLevel()` - **`target_output_file_size_`**: 输出的 SST 文件的目标大小 - 取决于 `target_file_size_base` 和 `target_file_size_multiplier` 参数 - 计算逻辑: MaxFileSizeForLevel() - **`max_output_file_size_`**:输出的 SST 的文件最大限制 ^b6ff7b - 取决于 `target_output_file_size_` 的值,最大为 ==2 倍== - PR:[Align compaction output file boundaries to the next level ones #10655](https://github.com/facebook/rocksdb/pull/10655) ## IsBottommostLevel() - 检查在比`output_level` 更大的层次中,没有跟 `inputs_`的范围重叠的。 - 调用 [[version#^cdf817|RangeMightExistAfterSortedRun]] - 如果 `output_level` 是 L0,则也会检查更早的 L0 文件。 - 也就是说 compaction 输出的 key 范围不会跟更大 level 更旧的数据有重叠。 - 这个信息可以用于实现一些优化,例如 Bottommost compaction 输出中 的 delete marker 不需要保留。 ## PopulateWithAtomicBoundaries() - PR: [Truncate range tombstones by leveraging InternalKeys #4432](https://github.com/facebook/rocksdb/pull/4432) ---- # CompactionPicker - Picker 基类,也实现了一些公用方法。 - 一个 CF 持有一个 picker。 - 纯虚函数(需要不同的 CompactionStyle 来实现): - NeedsCompaction - PickCompaction ## 核心成员 - **`compactions_in_progress_`**:`set<Compaction *>` - 跟踪所有正在运行的 compactions,避免有冲突的 compaction 并行运行 - **`level0_compactions_in_progress_`**:`set<Compaction *>` - 跟踪所有正在 L0 层上运行的 compactions,保证同时只有一个 L0-> Lbase ## AreFilesInCompaction() - 入参:`const vector<FileMetaData*>& files` - 如果 files 中任一文件为 `being_compacted`,返回 true。 ## ExpandInputsToCleanCut() - 添加更多 `start_level` 的 inputs 文件(L 层的),确保当 L 层某个 key 被 compact 到 L + 1 层后,L 层不会保留该 user key 的更早版本 - 否则会导致 Get 操作得到过时的数据 - *例如 seq=100 的 comacted 到 L+1 了,seq=10 的还在 L 层* - 如果无法执行,则返回 false。 - 入参: - **`vstorage`** : `VersionStorageInfo*` - **`inputs`**: `CompactionInputFiles` - 执行流程: - 对于 L0,不需要处理,`GetOverlappingInputs()`已经能保证没有问题。 - 循环进行如下操作,直接 inputs 不再增长: 1. 调用 `GetRange()` 获取当前 inputs 的整体范围 2. 调用 `vstorage->GetOverlappingInputs()`,获取跟上述范围的 **user key** 有重叠的所有文件作为新的 inputs - 检查所有 inputs 的 `being_compacted` 标记,如果有为 true 则表示该文件已经有其他 compaction 任务在执行,因此应该取消本次 compaction。 - [ ] bottommost compaction 是否有必要调用它 ## RangeOverlapWithCompaction() - 检查 key 范围是否跟当前正在运行的某个 compaction 重叠。 - 输入参数: - **`smallest_user_key`**:`const Slice&` - **`largest_user_key`**:`const Slice&` - **`level`**:`int`,output level,只检查输出到该 level 的 compaction - 执行流程: - 检查 `compactions_in_progress_` 中每个 compaction - 重叠条件:compact 到同一个层(`output_level`)且 key 范围有重叠 ## FilesRangeOverlapWithCompaction() - 检查输入文件的 key 范围是否跟当前正在运行的 compactions 重叠。 - 输入参数: - **`inputs`**:`vector<CompactionInputFiles>` - **`level`**:`int`,output level - **`penultimate_level`**:`int`, - 默认为 kInvalid(不启用 `preclude_last_level_data_seconds` 特性) - 执行流程 1. 调用 `GetRange()` 获取 inputs 的最大和最小 key(一个整体的范围,涵盖所有 inputs) 2. 调用 `RangeOverlapWithCompaction()` ## SetupOtherInputs() ^a5d371 - 填充与 `start_level` 重叠的其他 level 的 inputs。 - 也会根据填充的 `ouput_level` 文件,尝试扩容 `start_level` 的 inputs - 输入参数: - **`inputs`**: `CompactionInputFiles*`,`start_level` 的输入 - **`output_level_inputs`**: `CompactionInputFiles`, `output_level`的输入 - **`parent_index`**:`int *` - **`base_level`**:`int` - 前置条件: - `input_level` 和 `output_level` 不一样,否则不操作返回 true。 - `input_level` 和 `output_level` 之间的 levels 都是空的(无SST 文件)。 - 执行流程: 1. 获取 inputs 的整体范围( smallest 和 largest) 2. 计算 `ouput_level` 中跟上述范围重叠的文件,输出到 `output_level_inputs` 中 3. 检查 `output_level_inputs` 中是否有文件已经在被 compaction,是的话,返回 false。 4. 如果 `output_level_inputs` 不为空,则对其调用 `ExpandInputsToCleanCut()` 5. 根据 `output_level_inputs` 尝试扩充 `start_level` inputs(不增加 `output_level_inputs` 的前提下) - 扩充 `start_level` inputs - 避免写放大,防止需要后面再生成一个任务来 compact `ouput_level` 对应的同一个范围,代价是当前的 compaction 会变大 - PR:[Ignore max_compaction_bytes for compaction input that are within output key-range](https://github.com/facebook/rocksdb/pull/10835) - PR: [Limit compaction input files expansion](https://github.com/facebook/rocksdb/pull/12484) - 执行流程: 1. 获取两个 level 的整体范围,再获取 `start_level` 中跟此范围重叠的文件到 `expaned_inputs` 2. 可以扩充 inputs 的条件: - 对 `expaned_inputs` 调用 `ExpandInputsToCleanCut()` 并成功 - `expaned_inputs` 不包含正在被 compaction 的文件 - 两层文件大小加起来不超过两倍的 `max_compaction_bytes`,避免扩充带来大 compaction - `expaned_inputs` 不会引发 output level inputs 的增长(通过新的范围检查) 3. 如果不能扩充,通过 `GetCleanInputsWithinInterval` 再次尝试 ## GetOverlappingL0Files() - 入参: - **`start_level_inputs`**:`CompactionInputFiles *` - **`output_level`**: `int` - 实现: - 查找跟 `start_level_inputs` 重叠的 L0 文件,重新填充进 `start_level_inputs` - 获取新的 `start_level_inputs` 的范围,通过 `IsRangeInCompaction()` 检测是否与已有 compaction 重叠 --- # LevelCompactionBuilder ## 核心成员 - **`base_index_`**:`int`,initial file 在 `start_level_`层的索引 - **`parent_index_`**:`int` - **`start_level_`** : `int`,哪一层需要 compact(输入),例如 L 层 - **`start_level_inputs_`**: `CompactionInputFiles`, L 层输入文件 - 包含了哪个 level 的哪些文件 - **`output_level_`**:`int`,compact 到哪一层(输出),例如 L + 1 层 - 输出可能也是 L 层(例如 intra L0 和 bottommost compaction) - **`output_level_inputs`**: `CompactionInputFiles`, L + 1 层输入文件 - **`grandparents_`**:`vector<FileMetaData *>` - 跟本次 compaction 所有输入有重叠的 grandparent 文件列表 > grandparent == `output_level + 1` or the first level after that has overlapping files - **`compaction_inputs_`**: `vector<CompactionInputFiles>`,compaction 所有的输入, - **`is_l0_trivial_move_`**:`int`,特殊的 compaction,直接将 L0 移动到 LBase ## PickSizeBasedIntraL0Compaction() - 当 L0 层的总大小相对 Lbase很小时,尝试从最新的 L0 文件开始挑选 intra-L0 compaction,目的是减少 L0 -> Lbase compaction 的写放大。 - Issue: [compact L0 file to L0 when L0 is too small #10706](https://github.com/facebook/rocksdb/issues/10706) - PR:[Allow more intra-L0 compaction when L0 is small #12214](ttps://github.com/facebook/rocksdb/pull/12214/) - 背景:L0文件每个都很小,数量很快超过 `level0_file_num_compaction_trigger`,触发 compaction,导致写放大很大。 - 返回 true 如果挑选出了 intra L0 compaction,同时会更新 `start_level_inputs_` 和 `output_level_` - 前置条件:`start_level_ == 0` 即 compaction 目标是 L0 <br> - 执行流程: - **`min_num_file`**:`max(2, level0_file_num_compaction_trigger)` - 如果 L0 文件数量小于 `min_num_file` 则不挑选,返回 false - 数量少就不会触发 compaction 即 issue #10706 的问题,不会带来额外写放大 - ==目标条件==:`L0_total_size` 小于 `Lbase_total_size / level_multiplier` - 满足就说明整体比较小,不满足就不挑选,退出返回 false - 从新到旧将 L0 的文件放入 `start_level_inputs_`,如果遇到 `being_compacted` 则中断 - 放入前会先清空 `start_level_inputs_` - 被选中的文件数量需要大于等于 `min_num_file`,否则返回 false - intra L0 的输出 level 即 `output_level_` 为 0,返回 true,挑选成功。 ## TryPickL0TrivialMove() - 尝试将 L0 文件直接 move 到 LBase - 适用于 L0 文件跟 LBase 之间文件范围不重叠的情况 - 典型场景:顺序写入。 - PR:[Multi-File Trivial Move in L0->L1 #10188](https://github.com/facebook/rocksdb/pull/10188) - 前置条件: - `base_level` 不为空 - `start_level_` 为 L0,即 L0 需要 compaction - 没有配置 `compression_per_level`,避免移动时两个 level 的 compression 算法不匹配 - `output_level_` 不为空(move 的收益不大,简化实现) - 没有配置多 DB Path - 执行流程: - 从旧到新搜索 L0 文件 - 因为需要维持 L1 的文件始终旧于 L0(移动完后 L0 剩余的文件可能会跟 L1 重叠) - 挑选符合条件的 L0 文件, 需要满足 - 跟已被选中的 L0 文件无重叠 - 跟 L1 所有文件无重叠 - 将被选中的文件按 key 排序,放入 `start_level_inputs_` ,同时设置 `is_l0_trivial_move_` 为 true - 如果有能直接 move 的,函数返回 true。 ## TryExtendNonL0TrivialMove() - PR: [Try to pick more files in TryExtendNonL0TrivialMove() #11347](https://github.com/facebook/rocksdb/pull/11347) - 前置条件: - `start_level_inputs_` 只有一个文件,该文件可以直接 trivial move 到下一层 - 没有配置多个 `db_paths` - 没有设置 `compression_per_level` - 参数: - **`start_index`**: `int`,被选中的 initial file (`start_level_inputs_[0]`)在 level 中 的 index - 执行流程: - 从 `start_index` 开始,先从右再从左扩展能 trivial move(跟下层没有重叠)的文件 - 限制: - 总个数不超过 `kMaxMultiTrivialMove`,默认值为 4 - 总大小不超过 `max_compaction_bytes` ## PickFileToCompact() / 无参数 - 根据 `start_level_`,挑选文件 compact 到 `output_level_`。 - L0 compacton(`start_level_` 为 0)的特殊规则: 1. 如果有其他 L0 的 compaction,则只能尝试 `PickSizeBasedIntraL0Compaction()` - 原因:L0 文件互相有重叠,不允许 L0 并行 compaction。(优化根据 key 范围判断) 2. 尝试 `TryPickL0TrivialMove()`,成功直接返回 3. 再次尝试 `PickSizeBasedIntraL0Compaction()`,成功直接返回 - 各层通用挑选规则: 1. 按照 compaction 优先级顺序,从 `start_level_` 的 `NextCompactionIndex()`开始,跳过 `being_compacted` 的文件,选中第一个满足条件的文件放入 `start_level_inputs` 2. 调用 `ExpandInputsToCleanCut()` 和 `FilesRangeOverlapWithCompaction()` ,如果跟其他 compaction 重叠冲突,清空,继续挑选 3. 填充 `output_level_inputs`(根据选中的源 level 的文件范围) - 如果为空,则表示可以 trivial move,调用 `TryExtendNonL0TrivialMove()` 尝试扩展 - 否则,对 `output_level_inputs` 调用 `ExpandInputsToCleanCut()`后检测冲突,如果冲突则重新挑选下一个 4. 更新 `vstorage_` 的 compaction index ## FindIntraL0Compaction() - file num based Intra-L0 - 入参: - **`level_files`**:`Vector<FileMetaData>`, L0 层所有文件 - **`min_files_to_compact`**:`size_t`,默认为 `kMinFilesForIntraL0Compaction` 即 4 - **`max_compaction_bytes`**:`size_t`,来自 DBOptions, - **`comp_inputs`**:`CompactionInputFiles*`,被选中的 L0 文件放到这里面 - 执行流程: - 从新到旧挑选 L0 文件,中止条件(任一满足) - 遇到 `being_compacted` 的 - 被选中的 L0 文件累积大小超过 `max_compaction_bytes` 限制 - 检查被选中的文件个数,超过 `min_files_to_compact` 才算成功,否则返回 false - 被选中的文件放到 `comp_inputs` 中 ## PickIntraL0Compaction() - 当 L0 文件累积太多时,挑选 L0 -> L0 compaction。 - 目的:避免读放大 - 从最新的 L0 文件开始,挑选未在被 compact 的最长连续区间 - 前置条件: - L0 文件的数量有挤压:大于等于 `level0_file_num_compaction_trigger + 2` - 最新的 L0 文件没有正在被 compact - 执行流程: - 先清空 `start_level_inputs_` - 检查前置条件后,满足调用 `FindIntraL0Compaction()` 挑选 L0 文件,被选中的放入 `start_level_inputs_` 中 ## SetupInitialFiles() - 挑选 compact 的初始输入文件,也包含了 Intra-L0 compaction。 - 结果存储到 `start_level_inputs_`中 - 按以下规则依次挑选文件(优先级由高到低),规则能命中挑选出文件就直接返回 - compaction score - 不包含 Lmax - 按 score 从高到低挑选大于 1 的,如果选中的跟运行中的冲突,则跳过检查下一个 - 如果跳过的是 L0->Lbase(同时只能有一个在运行),说明 L0 压力比较大 1. 避免选择 Lbase 的 compaction,否则可能 starve L0->Lbase 2. 尝试 IntraL0 compaction(PickIntraL0Compaction,filenum-based) - marked for compaction - 通过`SuggestCompactRange()`标记的 - bottommost compaction - TTL compaction - periodic compaction - force blob gc ## SetupOtherL0FilesIfNeeded() - L0 到 Lbase compaction 时(且非 trivial move),挑选其他 L0 文件。 - 扩充 `start_level_inputs_` - 实现: - 调用 `CompactionPicker::GetOverlappingL0Files()` - 查找跟当前 `start_level_inputs_` 范围内重叠的 L0 文件,重新填充。 ## SetupOtherInputsIfNeeded() - 基于 initial files(通过 `SetupInitialFiles()` 获取的),设置其他需要被 compact 的文件。 - 如果 `output_level_` 是 L0,只将 `start_level_inputs_` 放入 `compaction_inputs_` 后返回。 - 非 L0 -> L0 的处理: 1. 如果非 `is_l0_trivial_move_`,调用 [[compaction#^a5d371|SetupOtherInputs]] 扩充 inputs 2. 将 `start_level_inputs_` 和 `output_level_inputs_` 放入 `compaction_inputs_` 3. 检测是否跟其他正在运行的 compaction 范围重叠(它们的输出 level 也一样) - PR: [Add missing range conflict check](https://github.com/facebook/rocksdb/pull/10988) 4. 如果非 `is_l0_trivial_move_`,调用 `GetGrandparents()` 初始化 `grandparents_` 文件 ## GetCompaction() - 创建 Compaction 对象,注册到 picker - `vstorage_`需要重新 `ComputeCompactionScore()` - 因为计算分数会考量 `being_compacted` 状态,该状态发生变化了 ## PickCompaction - 生成/返回一个 Compaction 对象。 - 执行流程: 1. 调用 `SetupInitialFiles()`,挑选需要 compact 的第一个文件,也可能通过 clean cut 扩展为更多文件 2. 调用 `SetupOtherL0FilesIfNeeded()`,适用于 L0 -> Lbase 时,挑选其他 L0 文件 3. 调用 `SetupOtherInputsIfNeeded()`,挑选 output level 的文件,并扩充更多 start level 文件(key 范围可能变大) 4. 调用 `GetCompaction()` 生成 Compaction 对象并返回。 ## 总结 > [!summary] Picker 基本流程 >1. 按照几种类型的优先级,先选择一个 input 文件; >2. 由这个文件作为起始,按范围覆盖扩展 input level 的更多文件; >3. 按范围覆盖挑选和扩展 output level 文件,并回头尝试扩展 input level > [!summary] Compaction 并行 > - L0-> Lbase 同时只能由一个 > - 其他情况可以并行,但需要保证相同 level 时 compaction 的 key 范围不重叠 > [!summary] Compaction output level > - IntraL0 和 Bottomest 的输出跟 input_level 相同 > - L0 compaction 输出到 Lbase > - 其他的输出到 input_level + 1 > [!summary] Intra L0 compaction > 两种形式的: > - **filenum-based**:消减L0数量,优化**读放大** > - L0 score 比较高,且已存在运行中的 L0->Lbase compaction ,说明 L0 压力比较大 > - 此时检查 L0 文件数量是否积累过多,进行 intra-L0 compaction 消减数量,减少读放大 > - **size-based**:合并小文件,优化**写放大** > - 存在较多 filesize 较小的 L0 文件,如果按照 `level0_file_num_compaction_trigger` 数量跟下层 compaction,则写放大较大; > - 利用 intra-L0 compaction 合并小文件 ---- # LevelCompactionPicker ## NeedsCompaction() - 入参:`const VersionStorageInfo* vstorage` - 依次检测如下条件,任一满足直接返回 true 1. `vstorage->ExpiredTtlFiles()` 非空 2. `vstorage->FilesMarkedForPeriodicCompaction()` 非空 3. `vstorage->BottommostFilesMarkedForCompaction()` 非空 4. `vstorage->FilesMarkedForCompaction()`非空 5. `vstorage->FilesMarkedForForcedBlobGC()` 非空 6. 任一 level 的 `vstorage->CompactionScore` 大于 1 ## PickCompaction() - 关键参数: - **`vstorage`**: `VersionStorageInfo *` - 调用 `LevelCompactionBuilder::PickCompaction()` ------- # CompactionIterator - 迭代 compaction input,丢弃无用数据,只返回需要保留的版本。 - 基本流程: 1. NextFromInput() 2. PrepareOutput() ## 核心成员 - **`input_`**:`SequenceIterWrapper`,compaction input - **`range_del_agg_`**:`CompactionRangeDelAggregator*` - 聚合了所有 input 文件的 range tombstones - **`snapshots_`**:`vector<SequenceNumber>`,快照列表 - **`earliest_snapshot_`**:`SequenceNumber`,最早的快照 - `snapshots_`的首个元素,`snapshots_`为空时则为 `kMaxSequenceNumber` - **`bottommost_level_`**:`bool`,compaction 是否输出到 bottommost level - `compaction_->bottommost_level() && !compaction_->allow_ingest_behind()` ## findEarliestVisibleSnapshot() - 入参: - **`in`**:`SequenceNumber`,input key 的 seq no - 返回第一个 大于等于 `in` 的快照号,即最早/最小可见的快照 - 并将`prev_snapshot`赋值为结果的前一个快照(如果没有则返回 0 ) ## NextFromInput() - 丢弃`input_` 无用的数据,返回下一个需要保留的版本。 - 可以丢弃的几种情况: 1. 无快照引用 - ![](https://img.jonahgao.com/oss/note/2025p2/rocksdb_compact_snapshot.svg) - 相同 user key, seq 80 和 100 的 EarliestVisibleSnapshot 相同,则 80 版本对任何快照都不可见,可以跳过丢弃 - 对应 `num_record_drop_hidden` 2. 类型为 `kTypeDeletion` 且 `seq <= earliest_snapshot_` 且 KeyNotExistsBeyondOutputLevel - KeyNotExistsBeyondOutputLevel 表示更大 levels 中没有该 key,因此不需要保留该 delete marker 来删除更旧的数据 - `input_` 中相同 user key 的后续 seq 较小数据会命中 **case 1**, 也能正常被丢失 - 对应统计: - `num_record_drop_obsolete` - `num_optimized_del_drop_obsolete`(非 `bottommost_level_` 情况下) 3. 类型为 `kTypeDeletion` 且 `bottommost_level_` 为 true,则跳过相同 snapshot stripe 中的后续其他版本 - 如果最后 input 中还存在相同 user key,则需要保留该 delete marker - ![](https://img.jonahgao.com/oss/note/2025p2/rocksdb_compaction_delete.svg) 4. 被 range tombstone 所屏蔽,即 `range_del_agg_->ShouldDelete()` 返回 true - 对应 `num_record_drop_hidden` 和 `num_record_drop_range_del` - [ ] 是否考虑了该 key 仍然对某个快照可见 5. CompactionFilter 返回需要跳过的 ## PrepareOutput() - 主要逻辑: - 将 bottommost level 符合条件的 key seq 置为 0,实现更好的压缩率。 - 关联逻辑:bottommost compaction - 当所有 key 都置 0 后,bottommost sst 将处于稳态,不再触发 bottommost compaction - 置 0 条件: - key 小于等于 `earliest_snapshot_` 且 - 类型不是 kTypeMerge 且 - [ ] 非 delete range start key [#1113](https://github.com/facebook/rocksdb/pull/11113) - 其他逻辑:将大 value 分离到 blob --- # CompactonOutputs - 管理一个 subcompaction 的输出。 - 主要逻辑: - 将 CompactionIterator 的输出写入到 SST - 入口:AddToOutput() - 文件管理:新建、关闭和 cut SST ## 核心成员 - **`builder_`**:`unique_ptr<TableBuilder>`,当前写入中的 sst builder - **`outputs_`**: `vector<Output>`,当前输出的所有 SSTs - Output 结构对应一个 SST,主要包含了 FileMetaData 和 TableProperties ## ShouldStopBefore() - cut 逻辑,是否需要切换新 SST。 - cut 的策略(优先级从高到低): 1. cut for TTL - [Consider TTL compaction file cutting earlier to prevent small output file #11075](https://github.com/facebook/rocksdb/pull/11075) 2. 用户自定义的 SstPartitioner 3. 达到 `Compaction::max_output_file_size_` 限制 - [[compaction#^b6ff7b|max_output_file_size_]] 4. 与 grandparent 重叠数据太大 5. 尽量对齐 grandparent 的文件边界,避免跨越,导致下次 compaction 太大 - [Align compaction output file boundaries to the next level ones #10655](https://github.com/facebook/rocksdb/pull/10655) - [ ] [Compaction to cut output file if it can avoid to overlap a grandparent file #1963](https://github.com/facebook/rocksdb/pull/1963) > [!NOTE] cut 策略 > - 输出到 L0 时,只使用策略 1 和 2。 > - L0 需要避免文件过多,来减少读放大,因此 cut 时不积极。 ## AddToOutput() ^9fd48b - 将 CompactionIterator 当前的 entry 写入到 SST 1. 写入前,先调用 ShouldStopBefore(), 检查是否需要 file cut 2. 调用 `builder_->Add()` 写入 entry 3. 更新 file meta 的 boundary keys - 一个 SST 的生命周期: 1. 创建:调用 CompactionJob::OpenCompactionOutputFile() 2. 写入:`builder_->Add()` 3. 关闭:调用 CompactionJob::FinishCompactionOutputFile() ## AddRangeDels() - 文件 Finish 时写入 delete range tombstones. - `builder_` 内根据 key类型最终会写入到 meta block。 ## CloseOutput() - 任务结束时关闭最后一个 SST。 - 特殊逻辑:整个 subcompaction 没有输出 entries,但存在 range deletions,则需要创建一个 SST,先写入再关闭。 ------- # SubcompactionState ## 关键成员 - **`start`**,**`end`**:`optional<Slice>`,subcompaction 负责的范围,左闭右开,nullptr 意味着 unbounded - **`compaction_outputs_`**:`CompactionOutputs`,compaction 的输出 ## CloseCompactionFiles() - 任务结束时,关闭最后一个正在写入的 SST。 --- # CompactionJob - 职责: - 负责执行一次 Compaction,通常的执行流程: Prepare() -> Run() -> Install() - CompactionJob 会将 compaction 切分为多个可以并行执行的 subcompactions。 - 有两个主要的 stats: - **CompactionJobStats**:public 结构,用户可访问 - **CompactionStatsFull**:内部的 stats,最终发往 `ColumnFamilyData::internal_stats_`,用于 logging 和 public metrics ## 关键成员 - **`job_id_`**:`uint32_t`,任务唯一ID,由 DBImpl 分配。 - **`existing_snapshots_`**:`vector<SequenceNumber>`,当前所有快照的序列号 - **`earliest_snapshot_`**:`SequenceNumber`,最老的快照序列号 - 如果 `existing_snapshots_` 为空,设置为 `kMaxSequenceNumber` - **`boundaries_`**:`vector<string>`,每个 subcompaction 的边界 - **`compact_`**:`CompactionState`, 包含 `sub_compact_states` 和 原始的 Compaction 对象。 ## GetRateLimiterPriority() - 如果发生了 write delay 或 write stop,返回 `Env::IO_USER`(高优先级); - 否则返回 `Env::IO_LOW`。 ## GenSubcompactionBoundaries() ^1c1c5a - 找到一个边界 keys,可以均匀地将 input data 分割为 `max_subcompactions` 份。 - PR: [Improve SubCompaction Partitioning](https://github.com/facebook/rocksdb/pull/10393) - 针对每个 input file,通过扫描 index blocks,将其拆分为 128 份(anchor points)。 - 每个 anchor point 包含一个边界 key 和 size - 将所有文件的 anchor points 按 key 排序,再按总 size 进行均分。 - `total_size`:所有 anchor 的 size 总和 - `level_file_size`:`output_level`单个文件的大小限制 - `target_range_size`:每份的大小,等于 `max(total_size / N, level_file_size)` - 均分:连续取 N 个 `target_range_size` ## Prepare() - 准备工作: - 为每个 subcompaction 设置边界, 生成多个 SubcompactionState 子任务,保存到 `compact_` 中 ## ProcessKeyValueCompaction() - 处理 subcompaction 的输入、利用 CompactionIterator 丢弃数据并写入到新文件中。 - 入参: - **`sub_compact`**: `SubcompactionState` - 流程 1. 创建 compaction filter 2. 创建 input InternalIterator,范围对齐 sub_comact 3. 基于 input iterator 和 compaction filter 创建 CompactionIterator 4. 迭代 CompactionIterator 写入到 CompactionOutputs 中 - [[compaction#^9fd48b|AddToOutput]] ## Run() - 流程: 1. 第一个 subcompaction 在当前线程运行,其他 N - 1 个**每个单独开线程**运行,每个子任务都执行 ProcessKeyValueCompaction 函数 2. 等待所有子任务执行完成,检查它们的 status 3. 如果开启了 `paranoid_file_checks` 选型,则会读取校验每个生成的 SST ## InstallCompactionResults() - 生成 VersionEdit,并调用 LogAndApply 应用。 - 流程: 1. 将整个 compaction 的所有 inputs 加入 VersionEdit,类型为删除 2. 将所有 subcompaction 的 ouputs 加入 VersionEdit 3. 调用 `versions_->LogAndApply()` 应用 Edit ## Install() - 主要是调用 InstallCompactionResults 和 输出一些统计信息。 ## OpenCompactionOutputFile() - 新建一个 SST 文件,用于 sub compaction 的输出。 - 主要是创建新的 FSWritableFile 和 TableBuilder - 并更新到 subcompaction 的 CompactionOutputs 中 - 设置到 `builder_`, `file_writer_` 中 - 追击到 `outputs_` 中 - `epoch_number` 取 compaction 所有 inputs 中最小的 - `oldest_ancester_time` 取跟 subcompaction 范围重叠的 inputs 中最小的 ## FinishCompactionOutputFile() - 完成一个 SST 文件的构建 1. 添加 range tombstones - 调用 CompactionOutputs::AddRangeDels() 写入到 range del meta block 2. builder->Finish() - 写入 filter、index 等其他 meta blocks 和 footer 3. 根据文件的精准范围,调整 `oldest_ancester_time` 4. Sync + Close 5. 如果 SST 为空,则删除(不包含任何 entries 和 range deletions) 6. 上报给 SstFileManager ---- # DBImpl - Compaction 入口: `InstallSuperVersionAndScheduleWork()` -> `MaybeScheduleFlushOrCompaction()` -> ## 关键成员 - **`compaction_queue_`**:`deque<ColumnFamilyData *>`, - 当 cf 满足 `cfd->NeedsCompaction()` 时,会放入该队列 - 放入路径:`InstallSuperVersionAndScheduleWork()` ->`SchedulePendingCompaction()` -> `AddToCompactionQueue()` - **`unscheduled_compactions_`**:`int`,在队列中还未调度的 compactions 技术 - 每次 `SchedulePendingCompaction()` 入队列时加 1 - **`bg_bottom_compaction_scheduled_`**: `int`,调度到 BOTTOM pool 中的 compaction - ManualCompaction 时才触发 - **`bg_compaction_scheduled_`**:`int`,调度到 LOW pool 中的 compaction。 - 正常都调度到 LOW pool - **`manual_compaction_dequeue_`**: `deque<ManualCompactionState*>` ,手动 compaction 的队列 ## SchedulePendingCompaction() - 入参:cfd - 检查 cfd 是否需要 compaction 且该 cfd 未在 compaction 队列中,放入入队 - 在 `InstallSuperVersionAndScheduleWork()` 调用 ## MaybeScheduleFlushOrCompaction() - 条件: - 已经 scheduled 的没超过 `max_compactions`,且 - `unscheduled_compactions_ > 0` 即队列里有 pending 的 - 动作:调用 `env_->Schedule()`,去执行 `BGWorkCompaction`,目标是 LOW pool 中。 ## BackgroundCallCompaction() - 主要是调用 BackgroundCompaction ## PickCompactionFromQueue() - 从 `compaction_queue_` 取出队首的 cfd,并且取消设置该 cfd 的 `queued_for_compaction_` - [ ] [Concurrent task limiter for compaction thread control](https://github.com/facebook/rocksdb/pull/4332) ## BackgroundCompaction() - 流程: 1. 调用 `PickCompactionFromQueue()` 从队列挑选 compaction 2. 调用 `picker->PickCompaction()` 生成 Compaction 对象 3. 检测当前 cfd 是否还需要 compaction,需要的话加队列,并触发调度(调用。MaybeScheduleFlushOrCompaction) - 把当前 compaction 涉及的文件屏蔽后,可能还需要 compaction - PickCompaction 内部会重新 score 4. 执行不同类型的 compaction - deletion compaction - trivial move:在 VersionEdit 中直接移动文件,再执行 LogAndApply - bottom compaction: - compact 到最后一层且存在 BOTTOM thread pool - 使用 BOTTOM pool 运行 compaction - 一般 compaction - 生成 CompactionJob,Prepare->Run->Install - 执行完也会调用 InstallSuperVersionAndScheduleWork ---- # 总结 - 三种可能改变 compaction score 的方式: - flush 或 cmpaction 完成,由 InstallSuperVersionAndScheduleWork 触发 - MutableCFOptions 改变,也由 InstallSuperVersionAndScheduleWork 触发 - pick compaction 时,随着部分文件被标记为 `being_compacted`被屏蔽,score 也要重算 - Compaction 的数据流向 ```mermaid flowchart LR CompactionMergingIterator@{ shape: lean-r, label: "CompactionMergingIterator" } CompactionIterator[CompactionIterator] CompactionOutputs[(CompactionOutputs)] CompactionMergingIterator --> CompactionIterator CompactionIterator --> CompactionOutputs ``` - CompactionMergingIterator:迭代输入 - CompactionIterator:drop 数据 - CompactionOutputs:写入结果 ------- # Manual Compaction ## FindMinimumEmptyLevelFitting() - 参数: - **`level`**: `int` - 将整个 level 层往更小的层次移动,寻找最小能容纳 level 层的空层次 - 搜索 $i \in [level-1, 0)$,停止条件: - i 层非空 - 或者 i 层 `max_bytes` 太小无法放入 level 层现有数据 - `MaxBytesForLevel(i) < NumLevelBytes(level)` - 无法移动时,返回 `level`原始值 ## ReFitLevel() - 将 level 层整体移动到另外一层。 - 目标层可以更大也可以更小。 - 参数: - **`level`**:`int`,源 level - **`target_level`**:`int`,目标 level,小于 0 则使用 `FindMinimumEmptyLevelFitting()` - 执行流程: 1. 检查是否能移动,需要满足: - 两个层次中间的所有层次都是空的(包含`target_level`) - 涉及的数据范围与其他 compaction 任务不冲突 - 这些 compactions 的 `ouput_level` 位于这两个层次之间 - L0 层的特殊限制: - 不允许将 L0 层移动到其他层次 - 从其他层移动到 L0 时,不能超过 1 个文件 - [Disallow refitting more than 1 file from non-L0 to L0 #12481](https://github.com/facebook/rocksdb/pull/12481) - 避免 L0 文件过大,影响读性能 2. RegisterCompaction:生成 compaction job 并注册,reason 为 `kRefitLevel` - 避免后续的 compactions 跟它冲突 - [ ] 整个过程都持有 db 锁,是否有必要注册 - [Add missing range conflict check between file ingestion and RefitLevel() #10988](https://github.com/facebook/rocksdb/pull/10988) 3. LogAndApply:应用文件移动对应的 VersionEdit 4. UnregisterCompaction 5. InstallSuperVersionAndScheduleWork ## RunManualCompaction() - 手动 compaction 某一层。 - 主要流程: 1. 调用 `ColumnFamilyData::CompactRange(begin, end)` 生成 compaction 任务 - 除了 L0 之外其他层有 `max_compaction_bytes` 限制,因此可能一次任务只 compact 部分范围,这时会通过 `compaction_end`返回实际选中的范围 2. 调用 `env_->Schedule()` 调度到线程池后台执行 3. 等待 compaction 任务完成 4. 如果`compaction_end`不为 nullptr,表示没 compact 完全部范围, 更新 begin 重试步骤 1 - 其他逻辑: - 因 compaction 冲突时无法执行时,通过 `bg_cv_`等待直到不冲突; - 如果 `manual_compaction_paused_ > 0` 即禁止了手动 compaction ,则停止,并 unschedule 所有 manual compactions ## CompactRangeInternal() - CompactRange() 接口的核心逻辑。 - compact 某个范围 `[begin, end]`内的所有数据。 - `begin` 为 nullptr 表示 `-∞` - `end`为 nullptr 表示 `+∞` - 主要流程: 1. Flush memtable,如果 memtables 跟目标范围重叠 2. 确定起始 level:从小到大,查找第一个跟目标范围重叠的 level,记为 `first_overlapped_level` 3. 从 `first_overlapped_level` 到 `bottommost_level`(不包含),依次调用 RunManualCompaction() 进行 compaction 4. 对 bottommost level 执行 RunManualCompaction - intra-level compaction,结果写入到相同 level - 执行条件(满足任一): - 设置了 `kIfHaveCompactionFilter` 且存在 compaction filter - 设置了 `BottommostLevelCompaction::kForceOptimized` 或 `kForce` - `kForceOptimized`: - 通过 `next_file_number` ,跳过本次手动 compaction 开始之后产生的新文件 - [Avoid double-compacting data in bottom level in manual compactions #5138](https://github.com/facebook/rocksdb/pull/5138/) 5. **ReFitLevel**,如果设置了 `change_level` - 使用场景:手动 compaction 后,数据都到了最后一层,可能数据量变小,适合放在更小的层次 - 开启了`level_compaction_dynamic_level_bytes`的话,似乎意义不大❓ ---- # TODO - [x] L0 intra compaction 条件 - [x] 哪些条件会触发 compaction - [x] 优先级 - [x] 并行 compaction - [x] subcompaction L0-intra - [x] https://smalldatum.blogspot.com/2022/01/rocksdb-internals-intra-l0-compaction.html - [x] max_compaction_bytes - [x] DBImpl::compaction_queue_ - [x] 不同层之间的 compaction 能否同时进行 - 例如 L0-> L1, L1 -> L2 同时运行(可以,只要范围不重叠) - [x] L0 compaction 会选中所有 L0 层文件吗 - [x] 选中一个L0文件后,按范围重叠扩展时可能会 - [x] MaybeScheduleFlushOrCompaction - [x] ReFitLevel - [x] https://github.com/facebook/rocksdb/pull/10988 - [x] manual_compaction - [ ] CompactionFiles - [ ] compaction 如何保留 range tombstones 到 output - [ ] periodic_compaction - https://github.com/facebook/rocksdb/pull/5166 - [ ] bottommost_files_marked_for_compaction_ - https://github.com/facebook/rocksdb/pull/3009 - https://github.com/facebook/rocksdb/pull/11701 - https://github.com/facebook/rocksdb/pull/5090 - [ ] levels 何时增加或减少 - [ ] Universal Compaction - [ ] max_background 如何限制 - [ ] 如何限速 - [ ] PopulateWithAtomicBoundaries - [ ] deletion_compaction - [ ] remote compaction - CompactionService - https://github.com/facebook/rocksdb/wiki/Choose-Level-Compaction-Files - [ ] [Refactor AddRangeDels() + consider range tombstone during compaction file cutting #11113](https://github.com/facebook/rocksdb/pull/11113) - [ ] GetCompactionPressureToken, NeedSpeedupCompaction - [ ] [Speedup based on pending compaction bytes relative to data size #12130](https://github.com/facebook/rocksdb/pull/12130) - [Concurrent task limiter for compaction thread control #4332](https://github.com/facebook/rocksdb/pull/4332) - per-cf 的 compaction 并发限制 --- ## 实验 - [ ] 顺序写入,测试 trival move - [x] 如果 L0 跟 Lbase 压缩算法不同,是否还能 move - 即使配置 L0 跟 L1 压缩算法是一样的。 - Lbase 使用的是 `compression_per_level[1]` (GetCompressionType) - [ ] 模拟 Intra L0 compaction - [ ] 验证 max_compaction_bytes 的影响 - [ ] 测试 `ExpandInputsToCleanCut()` - [ ] 场景:每个相邻文件之间都有 user key 重叠,最坏情况下是否可能选取该 level 的所有文件 - [ ] 测试触发 bottommost files compaction - [x] 哪些被算作 `bottommost_files_` - [ ] 验证 subcompaction 会临时开启新线程 - [x] 如何触发 bottomest compaction