# 辅助结构 ## FileIndexer ^20bb12 - 用于加速定位 SST 文件。 - 根据 key 范围建立上下层之间文件的关系 - 利用上层的二分搜索结果,缩小下层的搜索范围。 - 具体算法:[Fractional cascading](https://en.wikipedia.org/wiki/Fractional_cascading) - [Indexing SST Files for Better Lookup Performance](https://github.com/facebook/rocksdb/wiki/Indexing-SST-Files-for-Better-Lookup-Performance) ## FileMetada - 对应一个 SST 文件的元数据。 ### 基础元数据 - **`fd`**: `FileDescriptor` - **`packed_number_and_path_id`**:包含文件的序列和 path id - **`file_size**`: `uint64_t` - **`smallest_seqno`**: `SequenceNumber` - **`largest_seqno`**:`SequenceNumber` - **`smallest`**: `InternalKey`,文件中的最小 internal key - **`largest`**: `InternalKey`,文件中的最大 internal key - **`refs`**: `int`, 引用计数,小于等于0时可以删除 - **`epoch_number`**: `uint64_t`,用于 CF 内排序 L0 文件,越大的越新,由 `ColumnFamilyData::next_epoch_number_` 递增器生成 , [#10922](https://github.com/facebook/rocksdb/pull/10922) - **`tail_size`**: `uint64_t`,SST 文件 data blocks 之后的数据大小 - **`table_reader_handler`**:`Cache::Handle *`, - 用于将打开后的 table reader pin 到 TableCache 中 ### compaction 相关 - **`being_compacted`**: `bool`,是否正在被 compaction - **`marked_for_compaction`**: `bool` - 用户通过`SuggestCompactRange()` 建议 compact 该文件 - ingestion 过程也会产生 `db/external_sst_file_ingestion_job.cc` - **`compensated_file_size`**: `uint64_t` - 文件内 Delete 条目过多时,会给文件大小增加一个补偿值,使其能够优先被 compaction。 - `Version::ComputeCompensatedSizes()` 中计算 - **`compensated_range_deletion_size`**: `uint64_t`, - 因 range tombstones 产生的文件大小补偿值,`BuildTable()` 时计算 - 计算方式: next level 被该文件内的 range tomstones 所覆盖的 KV 大小 - 会被加入到 compensated_file_size - **`oldest_ancester_time`**:`uint64_t` ,用于支持 periodic compaction [#6090](https://github.com/facebook/rocksdb/pull/6090) - 如果 SST 是 flush 生成的,该值是文件的 oldest key time - 如果 SST 是 compaction 生成的,该值是所有 input files 的 oldest_ancester_time 中的 oldest - 为 0 表示不可用,会使用 file_creation_time,见 `TryGetOldestAncesterTime()` - **总结**:这意味着它是文件内数据的时间,而非是文件生成时间。 - **`file_creation_time`**: `uint64_t`,Unix time ### 统计类 - **`num_entries`**:KV 条目的个数,包含了 deletions 和 range deletions - **`num_deletions`**:包含了 range deletions - **`raw_key_size`**:未压缩的 keys 大小总和 - **`raw_value_size`**:未压缩的 values 大小总和 - **`num_range_deletions`**:range deletions 的个数 ------- # VersionStorageInfo - 文件:db/version_set.h - 与每个 Version 关联的 storage 信息,LSM 磁盘组件的核心元数据: - LSM 有多少 level - 每个 level 的 文件信息 - 标记为 compaction 的 files - blob files 等 ## 类数据成员 - LSM 相关成员: - **`num_levels_`**: `int`,LSM 最大有多少层,固定值(默认 7),取决于 Option - **`num_non_empty_levels_`**: `int`,大于它的 level 都是空的 - **`level_files_brief_`**: `vector<LevelFilesBrief>` - 每层文件精简版的元数据 - 包括每个文件的 fd 和 key 范围, - 同时也包含了一个指向完整元数据的指针 - **`files_`**:`vector<FileMetaData*>`,每层的文件列表,同层内按 key ==升序==排列 - L0 层按 `epoch_number` 排序,新的(`epoch_number` 大的)在前面 - **`level0_non_overlapping_`**:`bool`,L0 之间是否有重叠 - **`level_multiplier_`**:`double`,层之间数据最大大小的倍数 - **`file_indexer_`**: `FileIndexer`,按 key 快速搜索目标 SST 文件 - **`file_locations_`**: `Map<uint64_t, FileLocation>` - 根据文件序号查找文件的位置(level + 该 level 的第几个文件) - 在 `GenerateFileLocationIndex()` 中根据 files_ 生成 - compaction 相关成员 - **`compaction_style_`**: `CompactionStyle`, 通常是 Level-based - **`base_level_`**: `int`,L0 data compact 到哪一层 - **`level_max_bytes_`**: `vector<uint64_t>` - 每层允许的最大数据大小,用于计算 compaction score - **`compaction_score_`**: `vector<double`>, 每层的 score, 按照分数大小排序,最高的在前面 - **`compaction_level_`**: `vector<int>`,按 score 排序后的 levels,最高的在前面 - **`files_by_compaction_pri_`**: `vector<vector<int>>` - 每层按 compaction 优先级排序,最高的在前面,vector 存储 **files_** 的 index - `UpdateFilesByCompactionPri()` 中计算,根据 compaction_pri 有不同的排序指标 - `files_[level][files_by_compaction_pri_[level][idx]]`:直接获取到 idx 对应的 `FileMedataData` - **`next_file_to_compact_by_size_`**: `vector<int>` - 指向 `files_by_compaction_pri_`,表示处理到 files_by_compaction_pri_ 中的每层的哪个文件了 - **`files_marked_for_compaction_`**: `vector< pair<int, FileMetaData*> >` - 标记为 compaction 且正在被 compact 的文件列表,在 `ComputeCompactionScore()`中计算 - `pair.first` 为 level - **`expired_ttl_files_`**: `vector< pair<int, FileMetaData*> >` - 可能包含了 TTL 过期 key 的文件列表, `ComputeExpiredTtlFiles()` 中计算 - 过期基于 oldest_ancester_time 判断 - **`files_marked_for_periodic_compaction_`**:`vector< pair<int, FileMetaData*> >` - 标记需要 periodic_compaction 的文件列表,`ComputeFilesMarkedForPeriodicCompaction()` 中计算 - **`bottommost_files_`**:`vector< pair<int, FileMetaData*> >` - bottomost标准:文件的 key 范围与底层(大于当前的所有 levels 或更早的 L0)文件不存在重叠,即它是这个 key 范围的最底层 [#3009](https://github.com/facebook/rocksdb/pull/3009) - **`bottommost_files_marked_for_compaction_`**: `vector<pair<int, FileMetaData*>>` - bottommost_files_ 中 标记为需要 compaction 的 - 挑选标准见 `ComputeBottommostFilesMarkedForCompaction()` - **`oldest_snapshot_seqnum_`**:`SequenceNumber` - 每次快照释放时更新,不断变大,用于挑选需要 compaction 的 bottommost_files - **`bottommost_file_compaction_delay_`**: `uint32_t` - 来自 bottommost_file_compaction_delay 选项 [#11701](https://github.com/facebook/rocksdb/pull/11701) - **`lowest_unnecessary_level_`**:`int` - level compaction + `level_compaction_dynamic_level_bytes=true` 时有效 - 通过后台 compaction 删除不必要的 levels, 帮助用户迁移到 `level_compaction_dynamic_level_bytes=true` [#11340](https://github.com/facebook/rocksdb/pull/11340) ## ComputeCompactionScore() - **L0 计算**: - 考虑**文件数量**:L0 文件读取时需要 merge,因此控制文件数量很关键。另外对于大 writer buffer 更友好(compaction不会变多) - 还需要考虑**文件总大小**:因为有 L0 -> L0 的 compaction,即使文件数量不多,也可能导致 L0 过大, 产生巨大的 L0 -> Lbase compaction - 大小统计跳过正在被 compact 的文件 - L0 的 score 不能太大,否则 L0-> Lbase compaction 总是优先于 Lbase -> Lbase+1 执行,导致 Lbase 积压;也不能太小,否则 L0 积压,数据无法快速推到 Lbase,因此 score 计算会同时考虑 L0 的大小 和 Lbase 的大小。 - **file_num_score** = num_files(uncompacted_L0) / level0_file_num_compaction_trigger - 未启用 dynamic level: - score = max(file_num_score, L0_total_size / max_bytes_for_level_base) - 启用 dynamic level: - score 初始为 `file_num_score` - 如果 L0_total_size 大于 `max_bytes_for_level_base`,会确保 score 不小于 1.01 (加大 score,确保能被 scaled 乘以 10,1.01 也不会过大) - 如果 `L0_total_size` 大于 `level_max_bytes_[base_level_]`,确保 score 不小于 `L0_total_size / max(base_level_size, level_max_bytes_[base_level_])` - 这样如果 L0 size 大于 L1 size,score 就会大于 1,能被 scaled,提升优先级 - 如果 score > 1.0,进行 scale 乘以 10 - 总结:如果 L0 的文件大小过大,确保 score 大于 1,可以被 scale,提升优先级 - **其他 Levels 计算(L6以外)**: - `level_bytes_no_compacting`: 当前 level 没有被 compaction 的文件总大小 - `level_total_bytes`: 当前 level 文件总大小 - `level_max_bytes` : `level_max_bytes_[level]` - `kScoreScale`: 10.0 - `total_downcompact_bytes`:从上面所有 levels 将要 compaction 到当前 level 的 - level 小于 `lowest_unnecessary_level_` 也是要 compact 的,累计整个 level 大小 - 超过某个 level `level_max_bytes` 的,累计超过部分 - 如果未启用 dynamic level: - `score = level_bytes_no_compacting / level_max_bytes` - 数据量是否超出本层限制( level_bytes_no_compacting >= level_max_bytes) - 未超出:`score = level_bytes_no_compacting / level_max_bytes` - 超出了: `score = level_bytes_no_compacting / (level_max_bytes + total_downcompact_bytes) * kScoreScale - 加了 total_downcompact_bytes,相当于 score 变小,如果上层将有大量数据将要 compact 下来,调低本层优先级,优先上层 compact - 处理 unnecessary levels( level <= `lowest_unnecessary_level_` 并且 level 数据非空) - 手动设置 unnecessary levels 的 score,使其能被 compact - `score = max(score, kScoreScale * (1.001 + 0.001 * (lowest_unnecessary_level_ - level)))` - level 越小,优先级越高 - 加权值 `1.001 + 0.001 * l`小于 L0 的 1.01,使其优先级不要高于 L0 - **总结**:除了基本的 `level_max_bytes` 因素,还有考虑 `total_downcompact_bytes` 和 unnecessary levels - 计算完 score 后会排序 `compaction_score_` 和 `compaction_level_` 以及其他 compaction 相关的计算 - ComputeFilesMarkedForCompaction - ComputeBottommostFilesMarkedForCompaction - ComputeExpiredTtlFiles - ComputeFilesMarkedForPeriodicCompaction - EstimateCompactionBytesNeeded - PR: [Change The Way Level Target And Compaction Score Are Calculated](https://github.com/facebook/rocksdb/pull/10057) > the score is calculated by actual **level size / (target size + estimated upper bytes coming down)**. The reasoning is that if we have a large amount of pending L0/L1 bytes coming down, compacting L2->L3 might be more expensive, as when the L0 bytes are compacted down to L2, the actual L2->L3 fanout would change dramatically. On the other hand, when the amount of bytes coming down to L5, the impacts to L5->L6 fanout are much less. So when calculating target score, we can adjust it by adding estimated downward bytes to the target level size. - PR: [Fix regression issue of too large score](https://github.com/facebook/rocksdb/pull/10518) - 修复 L0 score 可能过高的问题 - 之前的算法:We fix calculating a score of L0 by size(L0)/size(L1) in the case where L0 is large - Issue: [There are too many write stalls because the write slowdown stage is frequently skipped](https://github.com/facebook/rocksdb/issues/9423) ## CalculateBaseBytes() - 主要是计算 `level_max_bytes_`(各层允许的最大大小) - 先计算出 base level size,再根据 level_multiplier 计算 - 不开启 `level_compaction_dynamic_level_bytes` 时: - base level 为 L1,L1 的最大大小取决于参数 `max_bytes_for_level_base` - 其余各层 $L_n = L_{n-1} \times level\_multiplier \times level\_multiplier\_additional[n]$ - 开启 `level_compaction_dynamic_level_bytes` 时: - **max_level_size**:除 L0 外,统计每层的数据量,取最大的那个 - 考虑 compaction,最大的那层不一定是 last level - 如果 `max_level_size` 为 0,表示 L0 以上没有数据,base_level_ 取 L6,即 L0 直接compact 到 max level - **first_non_empty_level**: 除 L0 外,第一个非空层(从上往下判断) - **base_bytes_max**: `max_bytes_for_level_base` - base_level_size 的上界 - **base_bytes_min**: `base_bytes_max / options.max_bytes_for_level_multiplier` - base_level_size 的下界。 - 同时小于此值的 level 也会被认定为 unnecessary - **cur_level_size**: 以 max_level_size 作为基准,从下往上根据 level_multiplier 计算得出其他各层应该最大是多大 - 例如:L5 为 max_level_size / 10,L4 为 max_level_size / 100,直到 first_non_empty_level - **`lowest_unnecessary_level_`** 为第一个 cur_level_size 小于 base_bytes_min 的层次 - 即 level 过小,没有存在的必要,可以进行 compaction - 计算 **base_level_size**: (**base_level** 初始化为 `first_non_empty_level`) - 如果: `base_level` 的 `cur_level_size` 小于等于 `base_bytes_min`, 取值为 `base_bytes_min` + 1 (加 1 是防止 `base_bytes_min` 为 0),这种情况也必然存在 `lowest_unnecessary_level_` - 否则:base_level 逐渐==减小==,`cur_level_size` 随之逐渐除以`level_multiplier`,直到不大于 `base_bytes_max` - 这种情况会增加当前的层次,制造空层 - 总结:确保 `base_level_size` 取值范围在 `base_bytes_min` 和 `base_bytes_max` 之间 - 设定 **`level_max_bytes_`** - 从 `base_level_` 开始,以 `base_level_size` 为初始值,从上往下依次乘以 `level_multiplier` > [!summary] level_compaction_dynamic_level_bytes = true > - base level size 计算: > - 以 `max_level_size` 作为 last level 的大小,不断除以 `level_multiplier` 直到位于`max_bytes_for_level_base`的范围内 > - `level_max_bytes_`计算 > - 根据 base level size 不断乘以 ``level_multiplier`` ## EstimateCompactionBytesNeeded() - 各层累加计算 - L0: ``` If num_files(L0) > compaction_trigger or size(L0) > size(L1_target): increment by size(L0) + size(L1) ``` - `L1_target`: max_bytes_for_level_base - Ln,n > 0 时: ``` If size(Ln) > target_size(Ln): increment by [ size(Ln) - target_size(Ln) ] * [ (size(Ln+1) / size(Ln)) + 1 ] ``` - `target_size(Ln)`: `level_max_bytes_[n]` - 本层超出大小 加上 compaction可能涉及的下层大小(本层超出大小乘以一个比例) ## ComputeFilesMarkedForCompaction() - 在 `ComputeCompactionScore()` 中调用 - 计算 `files_marked_for_compaction_` - 跳过最后一层非空的 level [#11489](https://github.com/facebook/rocksdb/pull/11489) - 当 `allow_ingest_behind = true` 最后一层为 ingestion 预留,不 compaction - 检查其他各层文件是否有 `marked_for_compaction` 标记,且没有在被 compact,添加到 `files_marked_for_compaction_` - 计算 `standalone_range_tombstone_files_mark_threshold_` - 满足 `file->FileIsStandAloneRangeTombstone()`: - 文件只包含一个 range deletion 的 entrie - 通常由 ingestion 产生,用于删除老数据 [#13078](https://github.com/facebook/rocksdb/pull/1307) - `standalone_range_tombstone_files_mark_threshold_` 取这些文件的`smallest_seqno`中最小的 ## ComputeBottommostFilesMarkedForCompaction() - 计算 `bottommost_files_marked_for_compaction_`,在 `ComputeCompactionScore()` 和 `UpdateOldestSnapshot()` 中调用。 - 如果 `allow_ingest_behind = true`,直接返回跳过计算。 <br> - 按照多个条件挑选 `bottommost_files_` 放入 `bottommost_files_marked_for_compaction_` - 没有正在被 compact - SST 的 `largest_seqno` 不为 0 - 配合 **`CompactionIterator::PrepareOutput()`** 使用,里面会将 bottommost 中小于最小 snapshot seq 的 seq 设为 0 - SST 的 `largest_seqno` 小于 `oldest_snapshot_seqnum_` - 意味着 STT 内的所有数据都没有被快照引用了 - `create_time` 距今已经超过 `bottommost_file_compaction_delay_` 选项 - 避免 compact 刚生成的,需要清理的数据可能很少 [#11701](https://github.com/facebook/rocksdb/pull/11701) ## ComputeExpiredTtlFiles() - 计算 `ttl_expired_files_`,在 `ComputeCompactionScore()` 中调用。 - 仅在 配置了 `AdvancedColumnFamilyOptions::ttl` 时有效。 - 根据 `oldest_ancester_time` 选择所有文件中生命超过 TTL 的。 - ~~不选择 L6,因为已经有 bottommost compaction?~~ - 不选择 L6。设计上对于 leveled compaction style,只 compact 过期的 Non-bottom-level files - PR: [Level Compaction with TTL #3591](https://github.com/facebook/rocksdb/pull/3591 > [!NOTE] 与 DBWithTTL 的区别 > - DBWithTTL 是 key 粒度的,利用 compaction filter 删除 TTL 过期数据; > - 此处的 `ttl_expired_files_` 是文件粒度的,文件超期则触发 compaction,目的不是为了清理过期数据,而是为了防止文件长时间不 compaction。 ## ComputeFilesMarkedForPeriodicCompaction() - 计算 `files_marked_for_periodic_compaction_`,在 `ComputeCompactionScore()` 中调用。 - 仅在 `periodic_compaction_seconds` 选项不为 0 时计算 - 该选项默认 30 天 - 挑选文件的创建/修改时间 距今已经超过 `periodic_compaction_seconds` 的文件 - 与 TTL compaction 不同,当不启用`allow_ingest_behind` 时,它会处理最底层 L6 的文件 - 会根据 `daily_offpeak_time_utc` 选项,在低峰时提前过期 - [PR: Periodic Compactions](https://github.com/facebook/rocksdb/pull/5166) - [PR: Mark more files for periodic compaction during offpeak](https://github.com/facebook/rocksdb/pull/12031) ## GenerateBottommostFiles() - 计算各层 `bottommost_files_`,在 `PrepareForVersionAppend()`中调用 - 针对每层每个 SST,提取 `smallest_user_key` 和 `largest_user_key`,调用 [[version#^cdf817|RangeMightExistAfterSortedRun()]], 如果返回 false, 放入 `bottommost_files_` - 范围没有出现在更早的 L0 文件或者更大的 level 中 ## ComputeCompensatedSizes() - 计算每个 SST FileMetaData 的 `compensated_file_size` 值,在 `PrepareForVersionAppend()` 中调用。 - 如果某个 SST 已经计算过,则跳过。 - `compensated_file_size` 由三部分组成: - `file_size`:原始文件大小 - `compensated_deletion_size`:由 deletion enties 产生(不包含 range deletions) - 当 deletions 超过总 entries 的一半时触发计算 - 计算方式:`(deletions * 2 - entries) * avg_value_size * 2` - `compensated_range_deletion_size`:range deletions产生的 - 预估每个range deletion 的范围跟下层重叠的数据大小 - db/builder.cc `BuildTable()` 中计算 ## UpdateFilesByCompactionPri() - 计算各层 `files_by_compaction_pri_`, 在 `PrepareForVersionAppend()` 中调用。 - 不统计 L6 的,因为它不参与一般的 compaction。 - 根据配置的 `CompactionPri` 选项,对每层的 SST 进行排序。 - kMinOverlappingRatio 的 排序 key: `overlapping_bytes / compensated_file_size / ttl_boost_score` - `overlapping_bytes`: 跟自身范围重叠的下层文件们的大小总和 - `ttl_boost_score`:提升 TTL 快到期的文件的优先级。[#8749](https://github.com/facebook/rocksdb/pull/8749) - key 越小的排序越往前,compaction 优先级越高 ## PrepareForVersionAppend() - 将 Version 追加到 VersionSet 之前调用,计算相关成员数据 ## OverlapInLevel() - 输入参数: - **`level`**: `int` - **`smallest_user_key`**: `Slice *` - **`largest_user_key`**: `Slice *` - 检查某层中是否有文件跟 `[smallest_user_key, largest_user_key]` 范围重叠 - user_key 如果为空表示 负/正 ∞ - 如果 level 大于 `num_non_empty_levels_` 表示该层为空,返回 false。 ## GetOverlappingInputs() - 查找 `level` 层跟 `[begin, end]` 重叠的文件。 - 如果 `level` 大于 0,使用二分搜索 `begin` 和 `end`的位置。 - 如果 `level` 为 0,遍历查询 - `expand_range`:发现重叠文件后,根据它的范围尝试扩大 `[begin, end]`返回。 ## RangeMightExistAfterSortedRun() ^cdf817 - 判断 key 范围是否出现在更早的 L0 或者更大的 level 中 --------- # Version - db/version_set.h - 某个 cf 在某个时间点 的 table 和 blob files。 - 属于某一个 cf。 - 主要是其包含的 VersionStorageInfo,外加了一些辅助/管理数据。 - 核心成员: - **`cfd_`**: `ColumnFamilyData*`,所属的 cf - **`vset_`**:`VersionSet*`,所属的 `VesionSet` - **`next_`**, **`prev_`**:`Version *`,Version 之间构成双链表 - **`storage_info_`**:`VersionStorageInfo`,LSM-tree 磁盘结构、compaction 信息 - **`refs_`**:`int`,引用计数,为 0 时会 delete 该 version - **`max_file_size_for_l0_meta_pin_`**:`size_t`, - cf `write_buffer_size` 的 1.5 倍,超过此限制的 L0 不会 pin 它的 index/filter blocks - **`version_number_`**:`uint64_t`,唯一 ID,仅用作调试和打印日志 ## 析构 1. 从 version 链表中移除 2. 对 Version 中引用的每个 SST 文件进行 UnRef() - 如果文件引用计数变为 0, 则放入 `VersionSet::obsolete_files_` 中 ## Unref() - 减小引用计数,如果变为 0 则进行 delete。 ----------- # VersionEdit - db/version_edit.h - 代表对 Version 一次更改操作; - cf 级别成员: - **`column_family_`**: - 目标 cf,一个 VersionEdit 仅包含对==一个 cf== 的修改,即从属于某个 cf - **`deleted_files_`**: `DeletedFiles` - 删除了哪些文件,set 结构, 元素为 `(level, fileno)` - **`new_files_`**: `NewFiles ` - 添加了新文件,vector 结构,元素为 `(level, FileMetaData)` - **`compact_cursors_`**:`CompactCursors` - 存储 compaction 的位置,仅当 `CompactionPri` 为 `kRoundRobin` 时有效 - **`log_number_`**:`uint64_t` - WAL recover 的起始位置,来自 [[memory#^ce1bad|mem_next_logfile_number_]] - flush job 会进行设置,已实现 flush 后不用回放老的 wals,避免数据重复 - **`wal_deletion_`**:`WalDeletion` - 删除某个编号之前的 WAL - **`is_in_atomic_group_`**: `bool` - 表示 edit 是 atomic flush group 的一个 - **`remaining_entries_`**:`uint32_t` - edit 是 atomic group 中的倒数第几个 - *`prev_log_number_`*:不再使用 - DB 级别成员: - **`min_log_number_to_keep_`**:`uint64_t` - 含有 unflushed data 的最早 WAL,即大于等于它的 WAL 含有 unflushed 数据,需要保留 - **`next_file_number_`**:`uint64_t` - 分配文件编号 - **`last_sequence_`**:`uint64_t` - SST 中的最大 seq,更新`new_files_`时会同时读取 SST 的 `largest_seqno` 进行更新 - 用于 recover 时恢复 VersionSet 最后发布的序列号 - [x] 区分 cf 级别和 DB 级别的元数据 --- # VersionBuilder - 基于一个 base VersionStorageInfo,累积应用 VersionEdit(通过Apply方法),最终生成新的 VersionStorageInfo(通过 SaveTo 方法保存到新 Version 的 VersionStorageInfo 中) - 主要涉及文件增删和 compact cursor。 ## LoadTableHandlers() ^e0a39b - 提前打开本次变更所新增的 SSTs。 - 即创建 TableReader。 - 调用时机: - DB::Open 时预加载 SSTs(此时新增 SSTs 为 cf 的全量 SSTs) - 每次有新 Version 产生时(VersionSet::ProcessManifestWrites 中) - 参数: - **`max_threads`**:`int`,多线程并行打开 - DB::Open时,使用参数 `DBOptions::max_file_opening_threads`,默认 16 - 追加新Version 时,使用 1 - **`is_initial_load`**:`bool`,true 表示来自 DB::Open() - **`prefetch_index_and_filter_in_cache`**:`bool` - DB::Open() 为 false,ProcessManifestWrites() 时为 true - PR:[Non-initial file preloading should always prefetch index and filter #4852](https://github.com/facebook/rocksdb/pull/4852) - 打开的文件数量限制: - 如果 `max_open_files == 1`,则所有的新增 SSTs 都打开 - 否则,同时打开的总数量 `load_limit` 限制为 `max_open_files/4` - 如果是 `is_initial_load`,则`load_limit`进一步限制为 16 个 - PR:[Preload some files even if options.max_open_files #3340](https://github.com/facebook/rocksdb/pull/3340) - 本次可打开的数量为 `load_limit - table_cache_usage` - 如果当前已打开的数量即 `table_cache_usage` 大于 `load_limit`,则直接返回 - 执行流程: 1. 搜索所有 levels 的 **`added_files`**,查找还没有打开的 SSTs,直到达到数量限制 - 没打开过:`file_meta->table_reader_handle` 为 nullptr 的 2. 通过 `TableCache->FindTable()` 并行打开 SSTs,赋值给 `FileMetaData::table_reader_handle`,pin 住 cache handle > [!NOTE] Preload > - 当 `max_open_files == -1` 时,预加载所有新增 SSTs; > - 当 `max_open_files != -1` 时,预加载 SSTs 的前提是 table cache is not 1/4 full; - [ ] `prefetch_index_and_filter_in_cache`对打开 TableReader 的影响 ------------ # VersionEditHandler ## VersionEditHandlerBase - 遍历 MANIFEST 中的 VersionEdit,并调用子类提供的 ApplyVersionEdit() 处理 VersionEdit - Iterate() 流程 1. 调用 Initialize(),子类提供 2. 扫描 MANIFEST,解码 VersionEdit,调用 ApplyVersionEdit() 3. 调用 CheckIterationResult(),子类提供 - atomic group 处理: - 同一个 atomic group 内的 VersionEdits 先缓存到 `read_buffer_`中,等 group 读完整后应用: 1. 调用 OnAtomicGroupReplayBegin() 2. 对 group 内的每个 VersionEdit 调用 ApplyVersionEdit() 3. 调用 OnAtomicGroupReplayEnd() ## VersionEditHandler ^b3d06f - 继承了 VersionEditHandlerBase - 扫描 MANIFEST 文件,应用 VersionEdit 到 VersionSet 中。Recover 使用。 - 关键成员: - **`column_families_`**:`vector<ColumnFamilyDescriptor>`, - 来自DB::Open,需要打开的 cf 列表 - **`version_set_`**:`VersionSet *`,读取到的 VersionEdit 会更新到这里 - **`builders_`**:`map<uint32_t, VersionBuilderUPtr>`,各个 cf 的 VersionBuilder - 初始化时添加 default cf 的 - 后续应用 VersionEdit 时在 OnColumnFamilyAdd 中添加其他 cf 的 - **`name_to_options_`**:`map<std::string, ColumFamilyOptions>` - cf name 到 option 的映射,初始化时根据 `column_families_` 进行赋值 - **`column_families_not_found_`**:`map<uint32_t, string>` - 存在于 MANIFEST 中(有 cf add 记录),但没出现在用户指定的 `column_families_`中 - **`version_edit_params_`**:`VersionEdit`, 缓存最新的 DB 级别的属性,到最后再更新到`version_set_`中,如 - `next_file_number_` - `has_last_sequence_` - `min_log_number_to_keep_` - `max_column_family_`:最大的 cf id - `log_number_`,维持各个 cf 最大的 `log_number_` - 应用于 `VersionSet::MarkFileNumberUsed`,更新 `next_file_number_` ### ApplyVersionEdit() 1. 根据 VersionEdit 的类型,进行不同的处理 - IsColumnFamilyAdd -> OnColumnFamilyAdd() - IsColumnFamilyDrop -> OnColumnFamilyDrop() - 其他 -> OnNonCfOperation() 2. 调用 ExtractInfoFromVersionEdit,VersionEdit 解析其他 cf 级别 和 DB 级别的信息 ### ExtractInfoFromVersionEdit() - 解析 edit 的 `log_number_`,更新到对应 cfd - 解析 DB 级别的信息,更新到 `version_edit_params_` ### OnColumnFamilyAdd() - 主要逻辑:调用 `version_set_->CreateColumnFamily()` 创建和初始化 cf - cf memtable 的 `earliest_seqno_` 为 0 - 其他: - 检测有同一cf 重复的 add 记录并报错 - `persistent_stats` cf的 初始化(如果有启用) - 尝试更新 `column_families_not_found_` ### OnColumnFamilyDrop() - 主要逻辑: - 从 `version_set_`获取 cf 并设置为 Dropped - 再调用 `ColumnFamilyData::UnrefAndTryDelete()` 释放 cf ### OnNonCfOperation() - 从 `builder_` 中找出 cf 对应的 VersionBuilder - 调用 `VersionBuilder::Apply(edit)`应用 VerionEdit ### CheckIterationResult() - 遍历 MANIFEST 结束后调用: 1. 从 `version_edit_params_` 解析 DB 级别的信息,更新到 `verson_set_` 2. LoadTables() 加载 SSTs 到 TableCache - 调用 [[version#^e0a39b|LoadTableHandler()]] - `prefetch_index_and_filter_in_cache` 为 false - `is_initial_load` 为 true - 每个 cf 都加载 3. 完成各个 cf 的 VersionBuilder,生成 cf 的 Version > [!summary] VersionEditHandler > Recover 时 回放 MANIFEST 到 VersionSet,并 preload SSTs。 ---- # VersionSet - DB 所有 cf 的 versions 集合,一个 DB 只有一个 VersionSet。 ## 核心成员 - **`column_family_set_`**:`unique_ptr<ColumnFamilySet>`,[[column family#^09f276|ColumnFamilySet]] - 保管所有的 cf - **`wals_`**:`WalSet`,WAL文件集合, - **`next_file_number_`**:`atomic<uint64_t>`,分配文件序号 - **`last_sequence_`**:`atomic<uint64_t>`,读可见的最新 seq(通常来自 memtable) - **`descriptor_last_sequence_`**: `SequenceNumber`,提交到 Manifest 的最新 seq - 用于确保提交给 manifest 的`VersionEdit::last_sequence_`是递增的 - **`last_allocated_sequence_`**:`atomic<uint64_t>`,用于分配 seq - **`obsolete_files_`**:无效的文件,以及在 DB 关闭时没有被内存中 LSM 引用的文件 - **`descriptor_log_`**:`unique_ptr<log::Writer>`,MANIFEST writer - **`manifest_writers_`**:`deque<ManifestWriter*>`,ManifestWriter 的写入请求队列 - 一个 ManifestWriter 对应==一个 cf== 的写入,主要包含 - **`done`**:`bool`,写入完成标志 - **`cv`**:`CondVar`,写入同步,等待 group commit 时被别人完成写入,或者,自己成为队首进入写入流程 - **`cfd`**:`ColumnFamilyData *`,关联的 cf - **`mutable_cf_options`**:`const MutableCFOptions`,option 修改 - **`edit_list`**:`const autovector<VersionEdit*>&`,修改列表 - **`pending_manifest_file_number_`**:`uint64_t` - 当前正在写入中的 manifest 文件,用于阻止删除 切换 CURRENT 时生成的临时文件 - 该临时文件名字中包含 `pending_manifest_file_number_` - 关联逻辑:DBImpl::PurgeObsoleteFiles() - 每次写完 ProcessManifestWrites 完会改回 0 ## AppendVersion() - 为某个 cf 按照新的 Version - 关键操作: - ComputeCompactionScore - 当新 version 设置为 cf 的 current - 追加到 cf 的 versions 链表中 ## SetCurrentFile() - 更新 CURRENT 文件 - 参数: - **`db_name`**:DB 路径 - **`descriptor_number`**:新 manifest 文件的编号 - 流程: 1. 创建临时 CURRENT 文件,名为 `{descriptor_number}.dbtmp` 2. 将新 manifest 文件名写入临时文件,即 `MANIFEST-{descriptor_number}` 3. 将临时文件重命名为 CURRENT(原子操作) 4. sync 目录 ## ProcessManifestWrites() - 批量写入 Manifest,并尝试组提交队列中 pending 的写入。 - 一次调用写入多个 cf 上的修改 - 参数: - **`writers`**:`deque<ManifestWriter>`,当前的写入请求,可能内含多个 cf - **`new_descriptor_log`**:`bool`,强制创建新 manifest,仅测试使用 - 进入该函数的==前提==:`writers.front()`是 `manifest_writers_`的头部 - 即当前 writer 是队首,由 LogAndApply 调用时保证 - group commit 逻辑: - **`batch_edits`**:`vector<VersionEdit*>`,组提交的结果,最后一次性写入 - 从 `manifest_writers_` 队首开始不断挑选 writer,将 writer 的 VersionEdits 放入 `batch_edits` 中 - 停止条件: - 遇到 `IsColumnFamilyManipulation() == true` 的 VersionEdit,即 add/drop cf - 一个 writer 从属于 一个 cf,add/drop cf 对应的 writer 内只有一个 VersionEdit - 目的:add/drop cf 不参与 group commit,单独写入 - 如果 writer 所属的 cf 已经 dropped,则忽略,不写入 - 如果 writer 在一个 atomic group 内,需要改写 group 成员的 `remaining_entries_`,维护 group 关系 - PR:[Support group commits of version edits #3944](https://github.com/facebook/rocksdb/pull/3944) - 创建新 manifest : - 条件(两者任一满足): - `descriptor_log_`为空,即每次 Open 初始化时 - `manifest_file_size_ > max_manifest_file_size`,即当前文件过大(默认 1G) - 创建流程: 1. 新建文件,通过 fallocate 预分配 `manifest_preallocation_size`(默认 4MB) - 使用 FALLOC_FL_KEEP_SIZE 2. WriteCurrentStateToManifest:写入全量元数据(DB当前状态),包括 - 每个 cf 创建信息(add cf) - 每个 cf 每层的 SST 文件,blob 文件,compaction 位置,`log_number` - 整个DB 的 `min_log_number_to_keep_`(写到默认 cf)等 - 新 manifest 会消费 `next_file_number_`,需要体现到后续的 VersionEdit 中 - 错误处理: - 如果写 manifest 成功,但更新 CURRENT 失败,则==不删除==新创建的 manifest 文件 - 远程文件系统可能更新 CURRENT 成功了,但给客户端返回失败,此时删除则导致 CURRENT 指向了不存在的 MANIFEST - PR: [Handle rename() failure in non-local FS #8192](https://github.com/facebook/rocksdb/pull/8192) - 整体执行流程: 1. group commit:批量挑选 VersionEdits 到 `batch_edits` 2. 为每个 cf 生成新 Version(存储到 `versions`中) - 基于 cf 当前 version 创建新的 version,并应用针对该 cf 的 VersionEdits - 调用 [[version#^e0a39b|LoadTableHandler()]] ==预加载==新增的 SSTs 3. 按需创建新 manifest 4. 调用 `Writer::AddRecord()` 依次写入 `batch_edits`中的每个 VersionEdit - 一个 VersionEdit 一个 record 5. SyncManifest:flush + sync 6. 如果新建了 manifest,调用 SetCurrentFile() 更新 CURRENT 文件 7. 调用 `VersionSet::AppendVersion()`安装每个 cf 的新 Version - 如果是 add/drop cf 则特殊处理,调用 CreateColumnFamily 或者 SetDropped 8. 写入成功后,将已写入的从 `manifest_writers_`出队,并唤醒组提交中非本次请求的 writers 9. 唤醒新的队首,通知下轮写入 > [!summary] ProcessManifestWrites 要点 > - group commit > - 每个 VersionEdit 写入一个 record,只是整体只有一次 flush + sync > - 预先打开 new SSTs > - 按需切换 manifest 文件 > - 写完后将新 Version 更新到 cfd 中 ## LogAndApply() - 记录 VersionEdits 到 Manifest 、并生成和应用新 Versions。 - 参数: - **`column_family_datas`**:`vector<ColumnFamilyData*>` - 变更涉及到的 cf 列表 - **`mutable_cf_options_list`**:`vector<const MutableCFOptions *>` - 各个 cf 最新的 mutable options,最终会应用到 cf 的新 Version 中 - 数量跟 `column_family_datas`一样 - **`edit_lists`**:`vector<vector<VersionEdit*>>` - 两层 vector - 外层每个元素对应一个 cf,数量跟 `column_family_datas`一样 - 内层表示某一个 cf 的 修改列表(VersionEdits) - **`new_cf_options`**: `ColumnFamilyOptions *` - 创建新 cf 时需要,此时 `edit_lists` 只包含一个 cf,且该 cf 只有一个 VersionEdit - 流程: 1. 构建 `ManifestWriter`s,入队 `manifest_writers_` 2. 等待他人通过 group commit 代自己写入,或者,自己成为队首 3. 如果成为队首,调用 ProcessManifestWrites() 写入 manifest 并安装新 Versions - 如果 `column_family_datas` 所有的 cf 都被 drop 了即 `cfd->IsDropped() == true`, 则不写入,返回 `Status::ColumnFamilyDropped()` ## Recover ^6ca17c - 回放 MANIFEST 到 VerionSet 中 1. 读取 CURRENT -> MANIFEST 文件 2. 利用 [[version#^b3d06f|VersionEditHandler]] 遍历 MANIFEST,回放 VersionEdits 到 VersionSet - 各个 cf,cf 的 version(SSTs 文件等) - `next_file_number`等 DB 级别元数据 - 预打开每个 cf 的 SST 文件( VersionEditHandler::CheckIterationResult 中) - 如果`max_open_files != -1`,所有 cf 累积打开不超过 16 个 -------- # 总结 - 几个 Version 类的关系 - ![](https://img.jonahgao.com/oss/note/2025p2/rocksdb_version_builder.svg) --- # 实验 - [ ] 一个VersionEdit 编码后的 manifest record 极限下能有多大 - 比如 FileMetaData 的 `smallest` 或 `largest` 非常大 - [ ] `max_manifest_file_size`过小