# MemoryPool
## Rustdoc
- 作用:跟踪和限制执行期间的内存使用
- 查询是流式执行的,大多数操作只需要固定数据的内存(取决于 schema 和 target batch size)。但某些操作如 sorting,grouping、joining,需要**缓存中间结果**,所需的内存会跟输入的行数成正比。
- **只跟踪**大的内存占用,小内存的如 streams 之间传输的不考虑。因此 MemoryPool 的大小需要为小的内存分配预留(比如 10%)。
- Datafusion 的执行计划在使用大内存之前需要先向 MemoryPool 进行申请,通过 `MemoryReservation` 和 `MemoryConsumer`.
- 申请失败后的处理:
- 先释放内存(比如置换到磁盘)然后再重试申请
- 或者报错
- 多个并发执行的计划可以共享同一个 MemoryPool,适用于多租户系统下限制内存使用。
- 三种 MemoryPool 实现
- **UnboundedMemoryPool**:没有内存限制(**默认**)
- **GreedyMemoryPool**:使用“先到先服务”策略将内存使用限制为固定大小
- **FairSpillPool**:将内存使用限制为固定大小,公平地分配内存给所有 spilling operators
## Trait
- **register** / **unregister**:注册 / 取消注册 consumer
- **grow** / **try_grow** / shrink**:**向 pool 申请和释放内存
---------
# MemoryConsumer
- 所有的分配都要绑定到一个命名的 consumer 中。
- 字段:
- **name**:`String`
- **can_spill**: `bool`
- register 方法:
- 将自己注册到 MemoryPool 中,返回一个 `MemoryReservation`
## SharedRegistration
- 辅助类,代表 consumer 在 MemoryPool 中的注册。
- 字段:
- **pool**: `Arc<dyn MemoryPool>`
- **consumer**: `MemoryConsumer`
- **Drop** 时会向 MemoryPool 进行 unregister。
---------
# MemoryReservation
- 操作类,直接负责内存的预分配/预留和释放
- 字段:
- **registration**: `Arc<SharedRegistration>`
- **size**: usize
- 一个 MemoryConsumer 可以对应多个 MemoryReservation,可以代表不同的用途,它们之间共享一个 SharedRegistration,每个 MemoryReservation 有**独立**的 size 计数,表示用了多少。
- 核心方法:
- **grow / try_grow**:预申请
- **shrink**:归还
- **new_empty**: 创建一个空的 MemoryReservation,共享同一个 SharedRegistration
- **Drop** 时会向 MemoryPool 进行 shrink 操作,归还通过它申请的内存。
- 几个类的关系图:
```mermaid
flowchart BT
SharedRegistration --> |注册 MemoryConsumer| MemoryPool
MemoryReservation1[MemoryReservation] --> SharedRegistration
MemoryReservation2[MemoryReservation] --> SharedRegistration
MemoryReservation3[MemoryReservation] --> SharedRegistration
Grow["grow()"] --> MemoryReservation1
TryGrow["try_grow()"] --> MemoryReservation2
Shrink["shrink()"] --> MemoryReservation3
```
---
# Proxy
- **VecAllocExt**:包装 std::vec::Vec,每次 push 时更新 vec 的 capacity 变化(当扩容发生时)
- **RawTableAllocExt**:同上,跟踪 hashbrown 的 RawTable 空间占用。
----------
# MemoryPool Impls
## UnboundedMemoryPool
- 字段
- **used**: `AtomicUsize`
- 只计数,grow 时增加 used,shrink 时减少,try_grow 始终成功
## GreedyMemoryPool
- 字段
- **pool_size**: usize
- **used**:`AtomicUsize`
- 相比 UnboundedMemoryPool,当 used 超出 pool_size 时, try_grow 会失败报错。
## FairSpillPool
```
┌───────────────────────z──────────────────────z───────────────┐
│ z z │
│ z z │
│ Spillable z Unspillable z Free │
│ Memory z Memory z Memory │
│ z z │
│ z z │
└───────────────────────z──────────────────────z───────────────┘
```
- 避免 spillable 的分配太多
- 每个 spillable consumer 的总配额均分 Unspillable Memory 之外的部分,即所谓的 Fair
- unspilable 的 consumer 申请不受上述限制,只要整个 pool 有空闲空间就可以。
- 使用该 Pool 可能导致即使有 Pool 内空闲内存也 spill(这些空闲内存预留给了其他 operators)。
- **FaireSpillPoolState**
- **num_spill**: usize // 支持 spill 的 consumer 个数
- **spillable**: usize // 可以 spill 的 consumer 申请的内存总量
- **unspilable**: usize // 不可以 spill 的 consumer 申请的内存总量
- **FairSpillPool**
- **pool_size**: usize
- **state**: `Mutex<FaireSpillPoolState>`
- `grow()` 方法:
- 始终成功
- 对于 spillable 的 consumer,增加 `FaireSpillPoolState.spillable`计数
- 对于 unspilable 的 consumer,增加 `FaireSpillPoolState.unspilable`计数
- `shrink()` 方法:类似 `grow` ,减少计数。
- `try_grow()`方法
- spillable 的 consumer
- 一个 consumer 可用的总空间为 `available = (pool_size - state.unspillable) / num_spill`
- 检查该 consumer 已分配的 加上 本次申请的 是否超过 available,不超出可分配成功。
- unspilable 的 consumer
- 检查 pool 的 free memory 是否足够,是的话返回成功,增加 `state.unspillable` 计数
- free memory:`pool_size - state.unspillable - state.spillable`
> [!NOTE] Note
> 每个 spillable consumer 的份额是动态变化的,每次申请时重新计算 `(pool_size - state.unspillable) / num_spill`。而 unspillable 和 num_spill 都是随时变化的, 因此 spillable consumer 之间并非绝对平均。