#database #datafusion
- [Aggregating Millions of Groups Fast in Apache Arrow DataFusion 28.0.0](https://arrow.apache.org/blog/2023/08/05/datafusion_fast_grouping/)
# Baisc approach
```python
# read file
hits = pd.read_parquet('hits.parquet', engine='pyarrow')
# build groups
counts = defaultdict(int)
for index, row in hits.iterrows():
group = (row['UserID'], row['SearchPhrase']);
# update the dict entry for the corresponding key
counts[group] += 1
```
- 优化方向:
- 所有的 CPU 核心都参与聚合计算(并行计算)
- 更快地更新 aggregate values,使用 vectorizable loop(方便 compilers 转换为 SIMD)
-----------
## Two phase parallel partioned grouping
- 两阶段 hash partitioned grouping,类似 [DuckDB’s Parallel Grouped Aggregates](https://duckdb.org/2022/03/07/aggregate-hashtable.html).
- **Step 1-2:** 每个 core 将数据读取到一个 core-specific hash table,计算中间聚合结果,不需要跨核心协调
- **Step 3-4:** 按照 group value 拆分数据(repartition)为不重叠的子集,每个子集发往一个特定的核心,完成最终的聚合计算。
```
▲ ▲
│ │
│ │
│ │
┌───────────────────────┐ ┌───────────────────┐
│ GroupBy │ │ GroupBy │ Step 4
│ (Final) │ │ (Final) │
└───────────────────────┘ └───────────────────┘
▲ ▲
│ │
└────────────┬───────────┘
│
│
┌─────────────────────────┐
│ Repartition │ Step 3
│ HASH(x) │
└─────────────────────────┘
▲
│
┌────────────┴──────────┐
│ │
│ │
┌────────────────────┐ ┌─────────────────────┐
│ GroupyBy │ │ GroupBy │ Step 2
│ (Partial) │ │ (Partial) │
└────────────────────┘ └─────────────────────┘
▲ ▲
┌──┘ └─┐
│ │
.─────────. .─────────.
,─' '─. ,─' '─.
; Input : ; Input : Step 1
: Stream 1 ; : Stream 2 ;
╲ ╱ ╲ ╱
'─. ,─' '─. ,─'
`───────' `───────'
```
- 两个阶段都使用了 hash table 的方式
- 一些未提及的细节:
- 何时从第一阶段的 hash table 中 emit data
- 处理每个聚合特定的 filter
- 中间值的数据类型(例如 AVG,最终输出的类型可能会不同)
- 内存超出预算后的处理
-----------
# Hash grouping
- 每个聚合函数的状态,称为一个 `accumulator` , 使用了 hash table 来跟踪。
- DataFusion 使用了 [HashBrown RawTale API](https://docs.rs/hashbrown/latest/hashbrown/raw/struct.RawTable.html), 存储 index 标识特定的 group value。
## Hashing grouping in 27.0.0
- 在一个 GroupState 结构中跟踪每个 group 的 state。
- 每个 group 的 state 包括
1. group columns 的值(Arrow Row 格式)
2. 该 group 正在进行的 accumulations(例如 count 聚合当前的 counts),采取两种格式之一:`Accumulator` 或者 `RowAccumulator`
3. Scratch space for tracking which rows match each aggregate in each batch.
```
┌──────────────────────────────────────┐
│ │
│ ... │
│ ┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┓ │
│ ┃ ┃ │
┌─────────┐ │ ┃ ┌──────────────────────────────┐ ┃ │
│ │ │ ┃ │group values: OwnedRow │ ┃ │
│ ┌─────┐ │ │ ┃ └──────────────────────────────┘ ┃ │
│ │ 5 │ │ │ ┃ ┌──────────────────────────────┐ ┃ │
│ ├─────┤ │ │ ┃ │Row accumulator: │ ┃ │
│ │ 9 │─┼────┐ │ ┃ │Vec<u8> │ ┃ │
│ ├─────┤ │ │ │ ┃ └──────────────────────────────┘ ┃ │
│ │ ... │ │ │ │ ┃ ┌──────────────────────┐ ┃ │
│ ├─────┤ │ │ │ ┃ │┌──────────────┐ │ ┃ │
│ │ 1 │ │ │ │ ┃ ││Accumulator 1 │ │ ┃ │
│ ├─────┤ │ │ │ ┃ │└──────────────┘ │ ┃ │
│ │ ... │ │ │ │ ┃ │┌──────────────┐ │ ┃ │
│ └─────┘ │ │ │ ┃ ││Accumulator 2 │ │ ┃ │
│ │ │ │ ┃ │└──────────────┘ │ ┃ │
└─────────┘ │ │ ┃ │ Box<dyn Accumulator> │ ┃ │
Hash Table │ │ ┃ └──────────────────────┘ ┃ │
│ │ ┃ ┌─────────────────────────┐ ┃ │
│ │ ┃ │scratch indices: Vec<u32>│ ┃ │
│ │ ┃ └─────────────────────────┘ ┃ │
│ │ ┃ GroupState ┃ │
└─────▶ │ ┗━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┛ │
│ │
Hash table tracks an │ ... │
index into group_states │ │
└──────────────────────────────────────┘
group_states: Vec<GroupState>
There is one GroupState PER GROUP
```
- 为了计算聚合,DataFusion 针对每个 input batch 执行如下步骤:
1. 向量化计算 hash 值(为每个数据类型定制实现)
2. 使用 hash table 为每个 input row 决定 group indexes
3. [Update Accumulators for each group that had input rows,](https://github.com/apache/arrow-datafusion/blob/4ab8be57dee3bfa72dd105fbd7b8901b873a4878/datafusion/core/src/physical_plan/aggregates/row_hash.rs#L562-L602) assembling the rows into a contiguous range for vectorized accumulator if there are a sufficient number of them.
- DataFusion 在 table 中也存储了 hash values,避免 rehash 时重新计算 hash。
- 这种模式在低基数 groups 时工作地很好,all accumulators are efficiently updated with large contiguous batches of rows.
- 高基数下不理想:
- **Multiple allocations per group** for the group value row format, as well as for the `RowAccumulator`s and each `Accumulator`. The `Accumulator` may have additional allocations within it as well.
- **Non-vectorized updates**:每个 input batch 更新的 groups 很大,每 group 的 values 很小。
## Hashing grouping in 28.0.0
- 优化原理:
- fewer allocations
- type specialization
- aggressive vectorization
- 仍然使用相同的 RawTable,存储 group indexes。
- 主要的区别在于:
1. Group values 的存储有以下两种
1. 内联在 RawTable 中(single columns of primitive types),此时转换成 Row format cost 会更高
2. 存储在一个单独的 [Rows](https://docs.rs/arrow-row/latest/arrow_row/struct.Row.html) 结构中,一次为所有的 groups values 分配单个连续的内存,而非每 group 一次分配。Accumulators 内部管理所有 groups 的 state,更新 intermediate values 时是一个 tight type specialized loop。
```
┌───────────────────────────────────┐ ┌───────────────────────┐
│ ┌ ─ ─ ─ ─ ─ ┐ ┌─────────────────┐│ │ ┏━━━━━━━━━━━━━━━━━━━┓ │
│ │ ││ │ ┃ ┌──────────────┐ ┃ │
│ │ │ │ ┌ ─ ─ ┐┌─────┐ ││ │ ┃ │┌───────────┐ │ ┃ │
│ │ X │ 5 │ ││ │ ┃ ││ value1 │ │ ┃ │
│ │ │ │ ├ ─ ─ ┤├─────┤ ││ │ ┃ │└───────────┘ │ ┃ │
│ │ Q │ 9 │──┼┼──┐ │ ┃ │ ... │ ┃ │
│ │ │ │ ├ ─ ─ ┤├─────┤ ││ └──┼─╋─▶│ │ ┃ │
│ │ ... │ ... │ ││ │ ┃ │┌───────────┐ │ ┃ │
│ │ │ │ ├ ─ ─ ┤├─────┤ ││ │ ┃ ││ valueN │ │ ┃ │
│ │ H │ 1 │ ││ │ ┃ │└───────────┘ │ ┃ │
│ │ │ │ ├ ─ ─ ┤├─────┤ ││ │ ┃ │values: Vec<T>│ ┃ │
│ Rows │ ... │ ... │ ││ │ ┃ └──────────────┘ ┃ │
│ │ │ │ └ ─ ─ ┘└─────┘ ││ │ ┃ ┃ │
│ ─ ─ ─ ─ ─ ─ │ ││ │ ┃ GroupsAccumulator ┃ │
│ └─────────────────┘│ │ ┗━━━━━━━━━━━━━━━━━━━┛ │
│ Hash Table │ │ │
│ │ │ ... │
└───────────────────────────────────┘ └───────────────────────┘
GroupState Accumulators
Hash table value stores group_indexes One GroupsAccumulator
and group values. per aggregate. Each
stores the state for
Group values are stored either inline *ALL* groups, typically
in the hash table or in a single using a native Vec<T>
allocation using the arrow Row format
```
- A single `GroupsAccumulator` stores the per-aggregate state for _all_ groups.
--------