# FlushScheduler - 队列,存储 memtable 满了需要 flush 的 column families。 - 由 DBImpl 持有 - **`ScheduleWork()`**:将 cfd 加入队列 - **`TakeNextColumnFamuly()`**:出队,返回 cfd ------------- # FlushJob ## 关键成员 - **`cfd_`**:`ColumnFamilyData*`,目标 cf - **`versions_`**:`VersionSet*`,来自 DBImpl - **`existing_snapshots_`**:`Vector<SequenceNumber>`,当前所有快照的序列号(去重,增序) - **`earliest_snapshot_`**:`SequenceNumber`,最早的快照序列号,没有设为`kMaxSequenceNumber` <br> - **`sync_output_directory_`**: `bool`,是否需要 fsync 输出目录。 - 通常需要。如果 flush job 是 atomic flush 的一部分,则不需要 - **`write_manifest_`**:`bool`,flush 成功后是否需要写入 MANIFEST - 通常需要。如果 flush job 是 atomic flush 的一部分,则不需要 ### 由 PickMemtable() 设置的 - **`meta_`**: `FileMetaData` - **`mems_`**: `vector<Memtable*>` - **`edit_`**: `VersionEdit*` - **`base_`**:`Version*` ## PickMemTable() - 从 cf 中挑选需要 flush 的 memtables, 调用时需持有 DB mutex。 1. 初始化 `mems_` - 调用 `cfd_->imm()->PickMemtablesToFlush()` 挑选需要 flush 的 memtables - [[memory#^b51817|PickMemtablesToFlush()]] 2. 初始化 `edit_` - SetLogNumber 为 `max_next_log_number` (选中的 memtables 中最大的),表示 recover 时不会再读取小于它 的 logs。 3. 初始化 `meta_` - 分配 L0 的文件标号和 epoch number 4. 初始化 `base_` - 设置为 `cfd_->current()` ## WriteLevel0Table() ^c27f6d 1. 在选中的 memtables 上创建 InternalIterator 和 FragmentedRangeTombstoneIterator,使用 MergingIterator 将所有 InternalIterator 合并到一起 2. 调用 BuildTable,遍历数据生成 SST 3. 将 SST 添加到 `edit_` ## Run() - 调用 `WriteLevel0Table()` 生成 L0 文件 - 如果失败,则调用 `MemtableList::RollbackMemtableFlush()` 回滚 flush - 如果需要写 manifest,调用`MemtaleList::TryInstallMemtableFlushResults()` - [[memory#^d8e92e|TryInstallMemtableFlushResults]] --------- # DBImpl ## FlushRequest - struct 结构, 包含成员: - **`flush_reason`**: `FlushReason` - **`cfd_to_max_mem_id_to_persist`**:`map<ColumnFamilyData*, uint64_t>` - value 是 memtable id,小于等于它的都是需要 flush 的,全部 flush 完成才能认定该 cf flush 完成了 - 所有的 cf 都 flush 完成了,才能认定该请求完成了。 - 只有启用了 `atomic_flush` 后,map 才可能有多个元素,包含多个 cf - DBImpl flush 队列 - **`flush_queue_`**:`deque<FlushRequest>` - 满足 `MemtableList::IsFlushPending()` 的 cf 会放入队列中 - 队列的消费者是后台 flush 线程,生产者是 WriteThread/Leader - 队列长度计数: `DBImpl::unscheduled_flushes_` ## SchedulePendingFlush() - 入参:`const FlushRequest& flush_req` - 将 `flush_req` 放入 `flush_queue_`,增加 `unscheduled_flushes_` - 设置 cfd 的 `queued_for_flush_` 标志 - 不入队列的情况: - cfd 已经在队列里(通过 `queued_for_flush_` 标记) - `MemtableList::IsFlushPending()` 为 false - `min_write_buffer_number_to_merge_` 选项开始起作用 ## SwitchMemtable() - 切换某个 cf 的 memtable。 - 执行流程: 1. 调用`CreateWAL()` 创建/切换新的 wal 文件 2. `ConstructNewMemtable()` 创建新的 memtable 3. 将之前的 memtable 放入 immutable list (`MemtableList`)中 4. 设置新创建的 memtable 为 cf 的当前 memtable 5. 调用 `InstallSuperVersionAndScheduleWork()` 安装新的 sv ,调度 flush 和 compaction ## ScheduleFlushes() - 持有 DB 锁,且在 WriteThread 内。在 `PreprocessWrite()` 中调用。 - 执行流程: 1. 遍历 `flush_scheduler_`,取出所有需要 flush 的 cfd 2. 对各个 cfd 进行 `SwitchMemtable()` 操作 3. 为每个 cfd 生成 `FlushRequest`(`flush_reason`为 `kWriteBufferFull`),调用 `SchedulePendingFlush()`,放入 `flush_queue_` 4. 调用 `MaybeScheduleFlushOrCompaction()` ,往后台线程调度 flush ## MaybeScheduleFlushOrCompaction() - 调度后台线程去执行 `DBImpl::BGWorkFlush()` - `flush_queue_`中每个元素调度一次。 - 最多同时允许 `bg_job_limits.max_flushes` 个 flush jobs - 默认为 `max_background_jobs / 4` - 如果 flush thread pool 为空(`Priority::HIGH`),则使用 `Priority::LOW` thread pool ## BGWorkFlush() - 调用 `DBImpl::BackgroundCallFlush()` ## BackgroundCallFlush() - 调用 `DBImpl::BackgroundFlush()` 执行 flush - 任务完成后 - 清理工作: - flush 失败,通过 `FindObsoleteFiles`扫描临时文件 - PurgeObsoleteFiles:后台去清理失效的文件 - 被删除的 SST、log、manfiest等 - 调用 `MaybeScheduleFlushOrCompaction()` 尝试调度新的 job ## BackgroundFlush() - 取出 `flush_queue_` 中队首的 cfd,调用 `FlushMemTablesToOutputFiles` 生成和执行 flush 任务。 --- # Flush流程图示 ```mermaid --- title: Flush 流程 --- stateDiagram-v2 [*] --> Write Write: Writer 写入 batch Write --> UpdateFlushState UpdateFlushState: MemTableInserter 每次 Put/Del 到 Memtable 时,调用 UpdateFlushState,检查是否需要 flush(concurrent write 情况下在 batch 插入完后检查) UpdateFlushState --> CheckMemtableFull CheckMemtableFull: MemTableInserter 每次 Put/Del 到 Memtable 时,检测当前 cfd 需要flush,将其放入 flush_scheduler_(CheckMemtableFull函数) CheckMemtableFull --> ScheduleFlush ScheduleFlush: Write Leader 在 PreprocessWrite 时读取 flush_scheduler_,切换 memtable,将 cfd 入队 flush_queue_, 调度后台线程(此时是新的一轮 write group 了) ScheduleFlush --> RunJob RunJob: 后台线程读取 flush_queue_ 生成 FlushJob,执行后更新 superversion RunJob --> [*] ``` - [ ] CheckMemtableFull 为什么每次 Put/Del 都调用,可以不可以 batch 写完只调用一次 --- # TODO - [ ] atomic flush - [ ] flush 限速 - [x] merge 如何实现 - 只是控制个数小于 `min_write_buffer_number_to_merge` 不 flush - 在 `MemTableList::IsFlushPending()` 里中判断 - [x] 并行 flush 如何触发 - 上一个 flush job 可能还没有完成,就有新的 memtable 满了进入 flush - [ ] Flush触发条件(FlushReason) - [ ] `allow_write_stall` - [ ] [Prevent a case of WriteBufferManager flush thrashing #6364](https://github.com/facebook/rocksdb/pull/6364) --- # 实验 - [x] flush 后的 L0 文件大小似乎无限制,验证 `min_write_buffer_number_to_merge` 的影响 - 无限制,一个 flush job 的所有 mems 都写入一个 L0 文件。 - 但 `min_write_buffer_number_to_merge` 不能超过 `max_write_buffer_number`,所以总体不会无限大 - 见 SanitizeOptions 函数 - [x] 验证:一个 huge batch,是否可能导致 memtable 超过 `write_buffer_size` 的限制 - 可能 - [x] 验证 Flush 写 L0 时 会不会写入无快照引用的 delete/覆盖数据 - 不会,无引用的不写入 - 有引用的 deletions / range tombstones 会写入