# MemoryPool ## Rustdoc - 作用:跟踪和限制执行期间的内存使用 - 查询是流式执行的,大多数操作只需要固定数据的内存(取决于 schema 和 target batch size)。但某些操作如 sorting,grouping、joining,需要**缓存中间结果**,所需的内存会跟输入的行数成正比。 - **只跟踪**大的内存占用,小内存的如 streams 之间传输的不考虑。因此 MemoryPool 的大小需要为小的内存分配预留(比如 10%)。 - Datafusion 的执行计划在使用大内存之前需要先向 MemoryPool 进行申请,通过 `MemoryReservation` 和 `MemoryConsumer`. - 申请失败后的处理: - 先释放内存(比如置换到磁盘)然后再重试申请 - 或者报错 - 多个并发执行的计划可以共享同一个 MemoryPool,适用于多租户系统下限制内存使用。 - 三种 MemoryPool 实现 - **UnboundedMemoryPool**:没有内存限制(**默认**) - **GreedyMemoryPool**:使用“先到先服务”策略将内存使用限制为固定大小 - **FairSpillPool**:将内存使用限制为固定大小,公平地分配内存给所有 spilling operators ## Trait - **register** / **unregister**:注册 / 取消注册 consumer - **grow** / **try_grow** / shrink**:**向 pool 申请和释放内存 --------- # MemoryConsumer - 所有的分配都要绑定到一个命名的 consumer 中。 - 字段: - **name**:`String` - **can_spill**: `bool` - register 方法: - 将自己注册到 MemoryPool 中,返回一个 `MemoryReservation` ## SharedRegistration - 辅助类,代表 consumer 在 MemoryPool 中的注册。 - 字段: - **pool**: `Arc<dyn MemoryPool>` - **consumer**: `MemoryConsumer` - **Drop** 时会向 MemoryPool 进行 unregister。 --------- # MemoryReservation - 操作类,直接负责内存的预分配/预留和释放 - 字段: - **registration**: `Arc<SharedRegistration>` - **size**: usize - 一个 MemoryConsumer 可以对应多个 MemoryReservation,可以代表不同的用途,它们之间共享一个 SharedRegistration,每个 MemoryReservation 有**独立**的 size 计数,表示用了多少。 - 核心方法: - **grow / try_grow**:预申请 - **shrink**:归还 - **new_empty**: 创建一个空的 MemoryReservation,共享同一个 SharedRegistration - **Drop** 时会向 MemoryPool 进行 shrink 操作,归还通过它申请的内存。 - 几个类的关系图: ```mermaid flowchart BT SharedRegistration --> |注册 MemoryConsumer| MemoryPool MemoryReservation1[MemoryReservation] --> SharedRegistration MemoryReservation2[MemoryReservation] --> SharedRegistration MemoryReservation3[MemoryReservation] --> SharedRegistration Grow["grow()"] --> MemoryReservation1 TryGrow["try_grow()"] --> MemoryReservation2 Shrink["shrink()"] --> MemoryReservation3 ``` --- # Proxy - **VecAllocExt**:包装 std::vec::Vec,每次 push 时更新 vec 的 capacity 变化(当扩容发生时) - **RawTableAllocExt**:同上,跟踪 hashbrown 的 RawTable 空间占用。 ---------- # MemoryPool Impls ## UnboundedMemoryPool - 字段 - **used**: `AtomicUsize` - 只计数,grow 时增加 used,shrink 时减少,try_grow 始终成功 ## GreedyMemoryPool - 字段 - **pool_size**: usize - **used**:`AtomicUsize` - 相比 UnboundedMemoryPool,当 used 超出 pool_size 时, try_grow 会失败报错。 ## FairSpillPool ``` ┌───────────────────────z──────────────────────z───────────────┐ │ z z │ │ z z │ │ Spillable z Unspillable z Free │ │ Memory z Memory z Memory │ │ z z │ │ z z │ └───────────────────────z──────────────────────z───────────────┘ ``` - 避免 spillable 的分配太多 - 每个 spillable consumer 的总配额均分 Unspillable Memory 之外的部分,即所谓的 Fair - unspilable 的 consumer 申请不受上述限制,只要整个 pool 有空闲空间就可以。 - 使用该 Pool 可能导致即使有 Pool 内空闲内存也 spill(这些空闲内存预留给了其他 operators)。 - **FaireSpillPoolState** - **num_spill**: usize // 支持 spill 的 consumer 个数 - **spillable**: usize // 可以 spill 的 consumer 申请的内存总量 - **unspilable**: usize // 不可以 spill 的 consumer 申请的内存总量 - **FairSpillPool** - **pool_size**: usize - **state**: `Mutex<FaireSpillPoolState>` - `grow()` 方法: - 始终成功 - 对于 spillable 的 consumer,增加 `FaireSpillPoolState.spillable`计数 - 对于 unspilable 的 consumer,增加 `FaireSpillPoolState.unspilable`计数 - `shrink()` 方法:类似 `grow` ,减少计数。 - `try_grow()`方法 - spillable 的 consumer - 一个 consumer 可用的总空间为 `available = (pool_size - state.unspillable) / num_spill` - 检查该 consumer 已分配的 加上 本次申请的 是否超过 available,不超出可分配成功。 - unspilable 的 consumer - 检查 pool 的 free memory 是否足够,是的话返回成功,增加 `state.unspillable` 计数 - free memory:`pool_size - state.unspillable - state.spillable` > [!NOTE] Note > 每个 spillable consumer 的份额是动态变化的,每次申请时重新计算 `(pool_size - state.unspillable) / num_spill`。而 unspillable 和 num_spill 都是随时变化的, 因此 spillable consumer 之间并非绝对平均。