# FlushScheduler
- 队列,存储 memtable 满了需要 flush 的 column families。
- 由 DBImpl 持有
- **`ScheduleWork()`**:将 cfd 加入队列
- **`TakeNextColumnFamuly()`**:出队,返回 cfd
-------------
# FlushJob
## 关键成员
- **`cfd_`**:`ColumnFamilyData*`,目标 cf
- **`versions_`**:`VersionSet*`,来自 DBImpl
- **`existing_snapshots_`**:`Vector<SequenceNumber>`,当前所有快照的序列号(去重,增序)
- **`earliest_snapshot_`**:`SequenceNumber`,最早的快照序列号,没有设为`kMaxSequenceNumber`
<br>
- **`sync_output_directory_`**: `bool`,是否需要 fsync 输出目录。
- 通常需要。如果 flush job 是 atomic flush 的一部分,则不需要
- **`write_manifest_`**:`bool`,flush 成功后是否需要写入 MANIFEST
- 通常需要。如果 flush job 是 atomic flush 的一部分,则不需要
### 由 PickMemtable() 设置的
- **`meta_`**: `FileMetaData`
- **`mems_`**: `vector<Memtable*>`
- **`edit_`**: `VersionEdit*`
- **`base_`**:`Version*`
## PickMemTable()
- 从 cf 中挑选需要 flush 的 memtables, 调用时需持有 DB mutex。
1. 初始化 `mems_`
- 调用 `cfd_->imm()->PickMemtablesToFlush()` 挑选需要 flush 的 memtables
- [[memory#^b51817|PickMemtablesToFlush()]]
2. 初始化 `edit_`
- SetLogNumber 为 `max_next_log_number` (选中的 memtables 中最大的),表示 recover 时不会再读取小于它 的 logs。
3. 初始化 `meta_`
- 分配 L0 的文件标号和 epoch number
4. 初始化 `base_`
- 设置为 `cfd_->current()`
## WriteLevel0Table()
^c27f6d
1. 在选中的 memtables 上创建 InternalIterator 和 FragmentedRangeTombstoneIterator,使用 MergingIterator 将所有 InternalIterator 合并到一起
2. 调用 BuildTable,遍历数据生成 SST
3. 将 SST 添加到 `edit_`
## Run()
- 调用 `WriteLevel0Table()` 生成 L0 文件
- 如果失败,则调用 `MemtableList::RollbackMemtableFlush()` 回滚 flush
- 如果需要写 manifest,调用`MemtaleList::TryInstallMemtableFlushResults()`
- [[memory#^d8e92e|TryInstallMemtableFlushResults]]
---------
# DBImpl
## FlushRequest
- struct 结构, 包含成员:
- **`flush_reason`**: `FlushReason`
- **`cfd_to_max_mem_id_to_persist`**:`map<ColumnFamilyData*, uint64_t>`
- value 是 memtable id,小于等于它的都是需要 flush 的,全部 flush 完成才能认定该 cf flush 完成了
- 所有的 cf 都 flush 完成了,才能认定该请求完成了。
- 只有启用了 `atomic_flush` 后,map 才可能有多个元素,包含多个 cf
- DBImpl flush 队列
- **`flush_queue_`**:`deque<FlushRequest>`
- 满足 `MemtableList::IsFlushPending()` 的 cf 会放入队列中
- 队列的消费者是后台 flush 线程,生产者是 WriteThread/Leader
- 队列长度计数: `DBImpl::unscheduled_flushes_`
## SchedulePendingFlush()
- 入参:`const FlushRequest& flush_req`
- 将 `flush_req` 放入 `flush_queue_`,增加 `unscheduled_flushes_`
- 设置 cfd 的 `queued_for_flush_` 标志
- 不入队列的情况:
- cfd 已经在队列里(通过 `queued_for_flush_` 标记)
- `MemtableList::IsFlushPending()` 为 false
- `min_write_buffer_number_to_merge_` 选项开始起作用
## SwitchMemtable()
- 切换某个 cf 的 memtable。
- 执行流程:
1. 调用`CreateWAL()` 创建/切换新的 wal 文件
2. `ConstructNewMemtable()` 创建新的 memtable
3. 将之前的 memtable 放入 immutable list (`MemtableList`)中
4. 设置新创建的 memtable 为 cf 的当前 memtable
5. 调用 `InstallSuperVersionAndScheduleWork()` 安装新的 sv ,调度 flush 和 compaction
## ScheduleFlushes()
- 持有 DB 锁,且在 WriteThread 内。在 `PreprocessWrite()` 中调用。
- 执行流程:
1. 遍历 `flush_scheduler_`,取出所有需要 flush 的 cfd
2. 对各个 cfd 进行 `SwitchMemtable()` 操作
3. 为每个 cfd 生成 `FlushRequest`(`flush_reason`为 `kWriteBufferFull`),调用 `SchedulePendingFlush()`,放入 `flush_queue_`
4. 调用 `MaybeScheduleFlushOrCompaction()` ,往后台线程调度 flush
## MaybeScheduleFlushOrCompaction()
- 调度后台线程去执行 `DBImpl::BGWorkFlush()`
- `flush_queue_`中每个元素调度一次。
- 最多同时允许 `bg_job_limits.max_flushes` 个 flush jobs
- 默认为 `max_background_jobs / 4`
- 如果 flush thread pool 为空(`Priority::HIGH`),则使用 `Priority::LOW` thread pool
## BGWorkFlush()
- 调用 `DBImpl::BackgroundCallFlush()`
## BackgroundCallFlush()
- 调用 `DBImpl::BackgroundFlush()` 执行 flush
- 任务完成后
- 清理工作:
- flush 失败,通过 `FindObsoleteFiles`扫描临时文件
- PurgeObsoleteFiles:后台去清理失效的文件
- 被删除的 SST、log、manfiest等
- 调用 `MaybeScheduleFlushOrCompaction()` 尝试调度新的 job
## BackgroundFlush()
- 取出 `flush_queue_` 中队首的 cfd,调用 `FlushMemTablesToOutputFiles` 生成和执行 flush 任务。
---
# Flush流程图示
```mermaid
---
title: Flush 流程
---
stateDiagram-v2
[*] --> Write
Write: Writer 写入 batch
Write --> UpdateFlushState
UpdateFlushState: MemTableInserter 每次 Put/Del 到 Memtable 时,调用 UpdateFlushState,检查是否需要 flush(concurrent write 情况下在 batch 插入完后检查)
UpdateFlushState --> CheckMemtableFull
CheckMemtableFull: MemTableInserter 每次 Put/Del 到 Memtable 时,检测当前 cfd 需要flush,将其放入 flush_scheduler_(CheckMemtableFull函数)
CheckMemtableFull --> ScheduleFlush
ScheduleFlush: Write Leader 在 PreprocessWrite 时读取 flush_scheduler_,切换 memtable,将 cfd 入队 flush_queue_, 调度后台线程(此时是新的一轮 write group 了)
ScheduleFlush --> RunJob
RunJob: 后台线程读取 flush_queue_ 生成 FlushJob,执行后更新 superversion
RunJob --> [*]
```
- [ ] CheckMemtableFull 为什么每次 Put/Del 都调用,可以不可以 batch 写完只调用一次
---
# TODO
- [ ] atomic flush
- [ ] flush 限速
- [x] merge 如何实现
- 只是控制个数小于 `min_write_buffer_number_to_merge` 不 flush
- 在 `MemTableList::IsFlushPending()` 里中判断
- [x] 并行 flush 如何触发
- 上一个 flush job 可能还没有完成,就有新的 memtable 满了进入 flush
- [ ] Flush触发条件(FlushReason)
- [ ] `allow_write_stall`
- [ ] [Prevent a case of WriteBufferManager flush thrashing #6364](https://github.com/facebook/rocksdb/pull/6364)
---
# 实验
- [x] flush 后的 L0 文件大小似乎无限制,验证 `min_write_buffer_number_to_merge` 的影响
- 无限制,一个 flush job 的所有 mems 都写入一个 L0 文件。
- 但 `min_write_buffer_number_to_merge` 不能超过 `max_write_buffer_number`,所以总体不会无限大
- 见 SanitizeOptions 函数
- [x] 验证:一个 huge batch,是否可能导致 memtable 超过 `write_buffer_size` 的限制
- 可能
- [x] 验证 Flush 写 L0 时 会不会写入无快照引用的 delete/覆盖数据
- 不会,无引用的不写入
- 有引用的 deletions / range tombstones 会写入