# Format - **Block format** - 一个文件由多个 block 所组成 ``` * +-----+-------------+--+----+----------+------+-- ... ----+ * | r0 | r1 |P | r2 | r3 | r4 | | * +-----+-------------+--+----+----------+------+-- ... ----+ * <--- kBlockSize ------>|<-- kBlockSize ------>| ``` - 每个 block 内有多个 records - block 末尾不够写入一个 record,则写入 `\0` paddings - block_size: 32 K - **Record format** ``` * +---------+-----------+-----------+--- ... ---+ * |CRC (4B) | Size (2B) | Type (1B) | Payload | * +---------+-----------+-----------+--- ... ---+ ``` - **CRC**: 包含 Type 和 Payload,type 的 crc 是预计算好的(`type_crc_`) - **Size**: payload 长度 n - `Size[0] = n & 0xff` // 低字节 - `Size[1] = n >> 8` // 高字节 - **Type**:RecordType - **RecordType** - kFullType:一个 record 对应一个完整的 entry - 用于 fragment 的:一个 entry 拆分为多个 record - kFirstType,kMiddleType, kLastType - kSetCompressionType: - 需要配置了压缩,则在**文件首**写入该记录`CompressionTypeRecord` - 注意包含了文件的压缩方法 - `recycle_log_files` - DB 选项配置 `recycle_log_file_num` 大于 0 时才启用 - 相关PR:[#746](https://github.com/facebook/rocksdb/pull/746) - 大意是 fallocate 并不是物理分配,实际写入时仍然需要更新 inode,不能有效节省 IO。 ------- # Reader - 读取 wal 里的 records,大致流程: 1. 先读取 block,固定大小 kBlockSize 2. 从 block buffer 中读取 physical record(fragment) 3. 重复读取一个或多个 physical record,组装成一个 logical record - Reader 成员变量: - **`buffer_`**: `Slice` - block buffer,读取时先将 block 读取到这个 buffer 内,再从它中解析 record - 解析完 record 后,通过 `remove_prefix()` 更新 buffer 内的数据 - **`end_of_buffer_offset_`**:下一次读 block 的文件偏移,也就是 buffer_ 末尾的下一个位置 - **`last_record_offset_`**:上一次读到的 record 的文件偏移 - **`eof_offset_`**:遇到 eof 时,读到的 block 大小(该 block 不完整,即小于 block_size) - UnmarkEOF 时会使用 - **`checksum_`**:读取时是否检验 record 的 CRC - **`uncompress_`**: - 流式解压器,当读取到文件首的 compression record 后进行初始化 - InitCompression 函数 - 没读到就是 nullptr 表示没启用压缩 - **ReadRecord()** 函数(virtual) - 读取下一条 user/logical record,会合并一个 user record 的多个 framgent - 参数: - **`Slice *record`**: - 将下一条 record 读入到该参数 ,指向 scratch 或者 内部的 block buffer。 - 再次调用 ReadRecord 或者修改 scatch 都有可能导致它==失效== - **`string *scratch`**: - 外部提供的 record 临时存储(缓存一条 record 的多个 fragments) - **`uint64_t* record_checksum`**: - 如果非空,则计算 record 的 checksum - 使用 xxhash,只计算 payload(与 header 中的 crc 不同) - 返回值:读到文件末尾返回 false,否则返回 true - **ReadPhysicalRecord()** - 读取下一条 physical record(logical record的一个 fragment ) - 读完会更新 buffer_ 的偏移(remove_prefix) - 如果有配置压缩,流式解压时如果会流式累计计算 payload 的 checksum,解压完后再整体计算一遍,对比两者是否相等,以防止拷贝过程中的内存损坏。 - [#10339](https://github.com/facebook/rocksdb/pull/10339) - **ReadMore():** 读一个 block 到 buffer_ - **UnmarkEOF()** - 已经知道了有新的数据写入了文件,取消 eof 标记允许继续读 - 根据当前 `buffer_` 的大小、`eof_offset_`,从文件末读取数据补齐最后一个不完整的 block ------- # FragmentBufferReader - 继承了 Reader - 添加 PR: [#4899](https://github.com/facebook/rocksdb/pull/4899) - Add `FragmentBufferedReader` to handle partially-read, trailing record at the end of a log from where future read can continue. - 读取时如果 `eof_` 为 true(之前读时 eof 了)会清理 `eof_` 标记并自动重试 - 类似 tail 命令?可以持续读 - **TryReadMore()** - 如果 `read_error_` 为 false,`eof_` 为 true,会调用 `UnmarkEOF` ---- # DBImpl - WAL 创建的条件: - DB open 时 - 每次有 cf flush memtable 时都会新建 WAL(在 SwitchMemtable 中),后续所有的 cf 都写入到新 WAL 中 - WAL 可删除的条件: - 核心:不包含任何 cf 的 unflushed data - `min_log_number_to_keep_`:所有 cf 的 [[column family#^21d38a|log_file_number]] 中最小的,此 WAL 中可能有着未 flush 的数据 - VersionSet 中维护,flush memtable 后更新(TryInstallMemtableFlushResults 中) - 只有早于 `min_log_number_to_keep_` 的才能被删除,保证里面的数据都已经 flush 到 SSTs 中 - **`DBOptions::max_total_wal_size`** - 达到限制会强制 flush(memtable) 引用最老 WAL 的 cf,从而可以删除掉最老的 WAL - **struct LogWriterNumber** - **`number`**:`uint64_t`,WAL 文件编号 - **`writer`**:`log::Writer *`,log writer - **`getting_synced`**:`bool`,标记正在 sync 中,sync 之前标记,避免重复 sync - PrepareForSync() 设置,FinishSync() 取消 - **`pre_sync_size`**:`uint64_t`,当前已 flush 的文件大小,也是 sync 的目标位置,用于 manifest 中跟踪 WAL sync 进度 - PrepareForSync() 时更新 - DBImpl 成员 - **`logs_`**:`deque<LogWriterNumber>`,用于 WAL 的 sync 管理 - 包括需要 sync 的 WAL: - 没有 fully synced 的旧 WAL - 当前正在写入的 WAL 文件 即 `logs_.back()` - 每次新 WAL 创建后追加进去,sync 完或者FindObsoleteFiles 时从中进行移除,挪到 `logs_to_free_` - **`alive_log_files_`**:`deque<LogFileNumberSize>` ,存活的 WALs - 包括: - 还未被识别为 Obsolete (WAL 内有某个 cf 还没有 flush 的数据) - 当前可写入的 WAL(最后一个) - 每次新 WAL 创建后追加进去,FindObsoleteFiles 从前面消费,触发删除文件或者归档 - **`logs_to_free_`**:`vector<log::Writer *>`,可回收,等待后台 delete 的 WAL 文件 - 只是调用 Writer 的析构,==不删除==文件 - 在 FindObsoleteFiles 中触发调度到后台执行 - **`logfile_number_`**:`uint64_t`,当前正在写入的 WAL 文件编号 - **`log_empty_`**:`bool`,当前 WAL 是否写入过 record - **`log_dir_synced_`**:`bool`,WAL 的目录是否需要 sync,每次新建 WAL 后置为 false ## SwitchMemtable() - 每次有 cf 切换 memtable 时会创建新的 WAL。 - 分支:如果 `log_empty_` 为 true 即当前 WAL 为空,则不创建新的。 - 创建 WAL 流程: 1. 调用 CreateWAL 创建新的 WAL 2. flush 老的 WAL 3. 将新 WAL 追加到 `logs_` 和 `alive_log_files_` 中 - cf 切换的新 memtable 会绑定新的 WAL,表示该 memtable 对应的数据只会出现在新 WAL 或之后的 WAL 中。(Memtable::SetNextLogNumber) - PR: [Track WAL obsoletion when updating empty CF's log number #7781](https://github.com/facebook/rocksdb/pull/7781) - 针对 empty cf 的优化,直接 advance cf 的 `log_number` ## SwitchWAL() - 让存活的 WAL 中最早的有资格成为 obsoleted,后续可以删除 - 触发 flush 所有引用 `alive_log_files_.front()` 的 cf,保证该 WAL 不再包含未 flush 的数据 - ==只是触发 flush 某些 cf==,等后台 memtable flush 完成后通过更新 `min_log_number_to_keep_`,最早的 WAL 就可以识别为可以删除了(FindObsoleteFiles 中) - 调用时间: - 存活的 WAL 总大小超过限制即 `max_total_wal_size` - 由 group leader 在 PreprocessWrite() 中调用 ## SyncWalImpl() - 对 `logs_` 即需要 sync 的 WALs 进行 sync 操作。 - `include_current_wal` 表示是否 sync 当前最新可写入的 WAL - 执行流程: 1. 对需要 sync 的 WALs 依次调用 Sync 或 SyncWithoutFlush - 操作前对目标文件设置 syncing 标志 2. sync 完 Close 老的 WAL 3. 如果 `log_dir_synced_` 为 false,则对 WAL 目录进行 sync 4. 调用 MarkLogsSynced 去除 syncing 标志,已经 close 的 WAL 从 `logs_` 中删除并加入 `logs_to_free_` 延迟释放 - 调用时机: - 用户手动调用 SyncWAL() 或 FlushWAL(sync=true) - Write 时指定 `WriteOption.sync` 为 true - SyncClosedWals() - PR: [Fix a bug in FlushJob picking more memtables beyond synced WALs #9142](https://github.com/facebook/rocksdb/pull/9142/) - Flush Memtable 时 cf 数量大于 1 个,sync 已经关闭写入的 WAL ## FindObsoleteFiles() - 从 `alive_log_files_` 中查找小于 `min_log_number_to_keep_` 的 WAL,移入 `JobContext->log_delete_files`,后续通过 PurgeObsoleteFiles 进行删除 - 从 `logs_` 中查找小于 `min_log_number_to_keep_` 的 WAL,关闭并移入 `logs_to_free_` 析构队列,已经不需要保留因此也不没必要 sync 了 - full scan 也会将所有 WAL 加入扫描目标,在 PurgeObsoleteFiles 进行扫描检查 ## 总结 > [!summary] WAL 操作总结 > - **创建:** > - DB Open 时 > - SwitchMemtable() > > - **Flush:** > - 默认每次 group leader 写完 WAL 就会 flush > - 如果 `manual_wal_flush` 如果为 true(默认情况),则需要手动调用 FlushWAL() 接口 > - 新建 WAL 时会 flush 之前的 WAL > - Sync() 操作隐含 flush 动作 > - **Sync:** > - `wal_bytes_per_sync`:`WritableFileWriter::Flush()` 时达到阈值时 sync > - SyncWalImpl(),在 log Writer 外部调用 > - 用户手动 > - Write 指定 WriteOption.sync > - Flush memtable 时 cf > 1触发 SyncClosedWals() > - **文件删除**: > - FindObsoleteFiles() 找出大于 MinWalNumberToKeep 的 WAL,PurgeObsoleteFiles() 在前台或调度到后台删除 > - `max_total_wal_size`阈值会通过 flush memtable 触发删除 > - 如果配置了 `WAL_ttl_seconds` 和 `WAL_size_limit_MB` 则对在阈值范围内的 WAL 进行归档而不删除 > - WalManager::ArchiveWALFile() 中移动到 archive 文件夹中 --- # TODO - [ ] wal_compresssion - [Add option for WAL compression algorithm #9432](https://github.com/facebook/rocksdb/pull/9432) - [Streaming Compression API for WAL compression. #9619](https://github.com/facebook/rocksdb/pull/9619) - [Integrate WAL compression into log reader/writer. #9642](https://github.com/facebook/rocksdb/pull/9642) - ❓ 流式压缩如果数据有损坏,是否就无法恢复 - [ ] `track_and_verify_wals_in_manifest` - [ ] [Track WAL in MANIFEST: add method to check WAL consistency #7236](https://github.com/facebook/rocksdb/pull/7236) - [ ] [Track WAL in MANIFEST: persist WALs to and recover WALs from MANIFEST #7256](https://github.com/facebook/rocksdb/pull/7256/) - [ ] [Track WAL in MANIFEST: update WalMetadata for WAL syncing #7414](https://github.com/facebook/rocksdb/pull/7414) - [ ] [Track WAL in MANIFEST: Track deleted WALs in MANIFEST after recovering from the WALs #7649](https://github.com/facebook/rocksdb/pull/7649) - [ ] [Stop tracking syncing live WAL for performance #10330](https://github.com/facebook/rocksdb/pull/10330/) - 不跟踪最新的 wal 即`logs_.back()`,相等于一整个 wal 才跟踪一次写一次 MANIFEST,不影响性能 - [ ] 单 cf 不主动 SyncWAL,MANIFEST 中没有WalAddition,只有 WalDeletion,是否有必要 - [ ] WalManager - [x] PrecomputeMinLogNumberToKeepNon2PC - [ ] `wal_filter`