#database #datafusion - [Aggregating Millions of Groups Fast in Apache Arrow DataFusion 28.0.0](https://arrow.apache.org/blog/2023/08/05/datafusion_fast_grouping/) # Baisc approach ```python # read file hits = pd.read_parquet('hits.parquet', engine='pyarrow') # build groups counts = defaultdict(int) for index, row in hits.iterrows(): group = (row['UserID'], row['SearchPhrase']); # update the dict entry for the corresponding key counts[group] += 1 ``` - 优化方向: - 所有的 CPU 核心都参与聚合计算(并行计算) - 更快地更新 aggregate values,使用 vectorizable loop(方便 compilers 转换为 SIMD) ----------- ## Two phase parallel partioned grouping - 两阶段 hash partitioned grouping,类似 [DuckDB’s Parallel Grouped Aggregates](https://duckdb.org/2022/03/07/aggregate-hashtable.html). - **Step 1-2:** 每个 core 将数据读取到一个 core-specific hash table,计算中间聚合结果,不需要跨核心协调 - **Step 3-4:** 按照 group value 拆分数据(repartition)为不重叠的子集,每个子集发往一个特定的核心,完成最终的聚合计算。 ``` ▲ ▲ │ │ │ │ │ │ ┌───────────────────────┐ ┌───────────────────┐ │ GroupBy │ │ GroupBy │ Step 4 │ (Final) │ │ (Final) │ └───────────────────────┘ └───────────────────┘ ▲ ▲ │ │ └────────────┬───────────┘ │ │ ┌─────────────────────────┐ │ Repartition │ Step 3 │ HASH(x) │ └─────────────────────────┘ ▲ │ ┌────────────┴──────────┐ │ │ │ │ ┌────────────────────┐ ┌─────────────────────┐ │ GroupyBy │ │ GroupBy │ Step 2 │ (Partial) │ │ (Partial) │ └────────────────────┘ └─────────────────────┘ ▲ ▲ ┌──┘ └─┐ │ │ .─────────. .─────────. ,─' '─. ,─' '─. ; Input : ; Input : Step 1 : Stream 1 ; : Stream 2 ; ╲ ╱ ╲ ╱ '─. ,─' '─. ,─' `───────' `───────' ``` - 两个阶段都使用了 hash table 的方式 - 一些未提及的细节: - 何时从第一阶段的 hash table 中 emit data - 处理每个聚合特定的 filter - 中间值的数据类型(例如 AVG,最终输出的类型可能会不同) - 内存超出预算后的处理 ----------- # Hash grouping - 每个聚合函数的状态,称为一个 `accumulator` , 使用了 hash table 来跟踪。 - DataFusion 使用了 [HashBrown RawTale API](https://docs.rs/hashbrown/latest/hashbrown/raw/struct.RawTable.html), 存储 index 标识特定的 group value。 ## Hashing grouping in 27.0.0 - 在一个 GroupState 结构中跟踪每个 group 的 state。 - 每个 group 的 state 包括 1. group columns 的值(Arrow Row 格式) 2. 该 group 正在进行的 accumulations(例如 count 聚合当前的 counts),采取两种格式之一:`Accumulator` 或者 `RowAccumulator` 3. Scratch space for tracking which rows match each aggregate in each batch. ``` ┌──────────────────────────────────────┐ │ │ │ ... │ │ ┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┓ │ │ ┃ ┃ │ ┌─────────┐ │ ┃ ┌──────────────────────────────┐ ┃ │ │ │ │ ┃ │group values: OwnedRow │ ┃ │ │ ┌─────┐ │ │ ┃ └──────────────────────────────┘ ┃ │ │ │ 5 │ │ │ ┃ ┌──────────────────────────────┐ ┃ │ │ ├─────┤ │ │ ┃ │Row accumulator: │ ┃ │ │ │ 9 │─┼────┐ │ ┃ │Vec<u8> │ ┃ │ │ ├─────┤ │ │ │ ┃ └──────────────────────────────┘ ┃ │ │ │ ... │ │ │ │ ┃ ┌──────────────────────┐ ┃ │ │ ├─────┤ │ │ │ ┃ │┌──────────────┐ │ ┃ │ │ │ 1 │ │ │ │ ┃ ││Accumulator 1 │ │ ┃ │ │ ├─────┤ │ │ │ ┃ │└──────────────┘ │ ┃ │ │ │ ... │ │ │ │ ┃ │┌──────────────┐ │ ┃ │ │ └─────┘ │ │ │ ┃ ││Accumulator 2 │ │ ┃ │ │ │ │ │ ┃ │└──────────────┘ │ ┃ │ └─────────┘ │ │ ┃ │ Box<dyn Accumulator> │ ┃ │ Hash Table │ │ ┃ └──────────────────────┘ ┃ │ │ │ ┃ ┌─────────────────────────┐ ┃ │ │ │ ┃ │scratch indices: Vec<u32>│ ┃ │ │ │ ┃ └─────────────────────────┘ ┃ │ │ │ ┃ GroupState ┃ │ └─────▶ │ ┗━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┛ │ │ │ Hash table tracks an │ ... │ index into group_states │ │ └──────────────────────────────────────┘ group_states: Vec<GroupState> There is one GroupState PER GROUP ``` - 为了计算聚合,DataFusion 针对每个 input batch 执行如下步骤: 1. 向量化计算 hash 值(为每个数据类型定制实现) 2. 使用 hash table 为每个 input row 决定 group indexes 3. [Update Accumulators for each group that had input rows,](https://github.com/apache/arrow-datafusion/blob/4ab8be57dee3bfa72dd105fbd7b8901b873a4878/datafusion/core/src/physical_plan/aggregates/row_hash.rs#L562-L602) assembling the rows into a contiguous range for vectorized accumulator if there are a sufficient number of them. - DataFusion 在 table 中也存储了 hash values,避免 rehash 时重新计算 hash。 - 这种模式在低基数 groups 时工作地很好,all accumulators are efficiently updated with large contiguous batches of rows. - 高基数下不理想: - **Multiple allocations per group** for the group value row format, as well as for the `RowAccumulator`s and each `Accumulator`. The `Accumulator` may have additional allocations within it as well. - **Non-vectorized updates**:每个 input batch 更新的 groups 很大,每 group 的 values 很小。 ## Hashing grouping in 28.0.0 - 优化原理: - fewer allocations - type specialization - aggressive vectorization - 仍然使用相同的 RawTable,存储 group indexes。 - 主要的区别在于: 1. Group values 的存储有以下两种 1. 内联在 RawTable 中(single columns of primitive types),此时转换成 Row format cost 会更高 2. 存储在一个单独的 [Rows](https://docs.rs/arrow-row/latest/arrow_row/struct.Row.html) 结构中,一次为所有的 groups values 分配单个连续的内存,而非每 group 一次分配。Accumulators 内部管理所有 groups 的 state,更新 intermediate values 时是一个 tight type specialized loop。 ``` ┌───────────────────────────────────┐ ┌───────────────────────┐ │ ┌ ─ ─ ─ ─ ─ ┐ ┌─────────────────┐│ │ ┏━━━━━━━━━━━━━━━━━━━┓ │ │ │ ││ │ ┃ ┌──────────────┐ ┃ │ │ │ │ │ ┌ ─ ─ ┐┌─────┐ ││ │ ┃ │┌───────────┐ │ ┃ │ │ │ X │ 5 │ ││ │ ┃ ││ value1 │ │ ┃ │ │ │ │ │ ├ ─ ─ ┤├─────┤ ││ │ ┃ │└───────────┘ │ ┃ │ │ │ Q │ 9 │──┼┼──┐ │ ┃ │ ... │ ┃ │ │ │ │ │ ├ ─ ─ ┤├─────┤ ││ └──┼─╋─▶│ │ ┃ │ │ │ ... │ ... │ ││ │ ┃ │┌───────────┐ │ ┃ │ │ │ │ │ ├ ─ ─ ┤├─────┤ ││ │ ┃ ││ valueN │ │ ┃ │ │ │ H │ 1 │ ││ │ ┃ │└───────────┘ │ ┃ │ │ │ │ │ ├ ─ ─ ┤├─────┤ ││ │ ┃ │values: Vec<T>│ ┃ │ │ Rows │ ... │ ... │ ││ │ ┃ └──────────────┘ ┃ │ │ │ │ │ └ ─ ─ ┘└─────┘ ││ │ ┃ ┃ │ │ ─ ─ ─ ─ ─ ─ │ ││ │ ┃ GroupsAccumulator ┃ │ │ └─────────────────┘│ │ ┗━━━━━━━━━━━━━━━━━━━┛ │ │ Hash Table │ │ │ │ │ │ ... │ └───────────────────────────────────┘ └───────────────────────┘ GroupState Accumulators Hash table value stores group_indexes One GroupsAccumulator and group values. per aggregate. Each stores the state for Group values are stored either inline *ALL* groups, typically in the hash table or in a single using a native Vec<T> allocation using the arrow Row format ``` - A single `GroupsAccumulator` stores the per-aggregate state for _all_ groups. --------