- 原文:[A Practical Dive Into Late Materialization in arrow-rs Parquet Reads](https://arrow.apache.org/blog/2025/12/11/parquet-late-materialization-deep-dive/) - 主题: arrow-rs 的 ParquetReader 如何实现延迟物化 - 需要复杂的逻辑来 evaluate predicates,实际上已经变成了一个 tiny query enigne。 ---- ## 1. Why Late Materialization - Column reads:constant battle between I/O bandwidth and CPU decode costs - skipping data 自身也会带来计算开销。 - 对于能过滤掉许多 rows 的 predicates,延迟物化能最最小化 reads 和 decode work。 - pipleline-style late materialization: 1. evalute predicate 2. access projected columns - interleaving predicates and data column access - Paper:[Materialization Strategies in a Column-Oriented DBMS](https://www.cs.umd.edu/~abadi/papers/abadiicde2007.pdf) - ![600](https://arrow.apache.org/img/late-materialization/fig1.jpg) - 示例查询:`SELECT B, C FROM table WHERE A > 10 AND B < 5` 1. 读 A 列,evaluate `A > 10`,构建一个 `RowSelection`(a sparse mask),代表初始的有效 rows 2. 使用上面的 `RowSelection`,读 B 列并 evaluate `B < 5`,然后更新 `RowSelection`,使其变得更稀疏 3. 使用更新后的 `RowSelection` 读 C 列,只 decode 最终有效的 rows(未被过滤掉的) --- ## 2. Late Materialization in the Rust Parquet Reader ### 2.1 LM-pipelined - 两种策略: 1. **pipelined** strategy ```mermaid graph LR A[Read Predicate Column] --> B[Generate Row Selection] B --> C[Read Data Column] ``` - 顺序性依次读取 predicate columns 2. **parallel** strategy:并行读取所有 predicate columns - 能最大化利用多核 CPU - 通常 ==pipelined 更优==,因此每个 filter 能大量减少后续步骤需要读取的数据。 - 代码结构中的 core roles: - **[ReadPlan](https://github.com/apache/arrow-rs/blob/bab30ae3d61509aa8c73db33010844d440226af2/parquet/src/arrow/arrow_reader/read_plan.rs#L302) / [ReadPlanBuilder](https://github.com/apache/arrow-rs/blob/bab30ae3d61509aa8c73db33010844d440226af2/parquet/src/arrow/arrow_reader/read_plan.rs#L34)** - 将读取哪些列和哪些 row 子集组织成一个 plan - **[RowSelection](https://github.com/apache/arrow-rs/blob/bab30ae3d61509aa8c73db33010844d440226af2/parquet/src/arrow/arrow_reader/selection.rs#L139)** - 两种实现: - RLE(通过 RowSelector)来 skip/select N rows - Arrow BooleanBuffer bitmask 来 filter rows - 可以动态切换。 - bitmask:适合 tiny gaps 和 high sparsity - RLE:适合 large,page-level skips - **[ArrayReader](https://github.com/apache/arrow-rs/blob/bab30ae3d61509aa8c73db33010844d440226af2/parquet/src/arrow/array_reader/mod.rs#L85)** - 负责 decoding。 - 接收 `RowSelection` ,决定读取哪些 pages 和 decode 哪些 values - 示例查询:x `SELECT B, C FROM table WHERE A > 10 AND B < 5` 1. 初始化:`selection = None`(select all) 2. Read A: 1. ArrayReader 读 A 列; 2. 利用 predicate 构建一个 boolean mask; 3. 使用 [`RowSelection::from_filters`](https://github.com/apache/arrow-rs/blob/bab30ae3d61509aa8c73db33010844d440226af2/parquet/src/arrow/arrow_reader/selection.rs#L149)转换为一个 sparse selection 3. Tighten: - 将上面的 selection 传递给 ReadPlan - [`ReadPlanBuilder::with_predicate`](https://github.com/apache/arrow-rs/blob/bab30ae3d61509aa8c73db33010844d440226af2/parquet/src/arrow/arrow_reader/read_plan.rs#L143)通过 [`RowSelection::and_then`](https://github.com/apache/arrow-rs/blob/bab30ae3d61509aa8c73db33010844d440226af2/parquet/src/arrow/arrow_reader/selection.rs#L345) chains new mask 4. Read B: - 使用当前 `selection` 构建 B 列的 reader; 5. Merge: - `selection = selection.and_then(selection_b)` ```rust // Close to the flow in read_plan.rs (simplified) let mut builder = ReadPlanBuilder::new(batch_size); // 1) Inject external pruning (e.g., Page Index): builder = builder.with_selection(page_index_selection); // 2) Append predicates serially: for predicate in predicates { builder = builder.with_predicate(predicate); // internally uses RowSelection::and_then } // 3) Build readers; all ArrayReaders share the final selection strategy let plan = builder.build(); let reader = ParquetRecordBatchReader::new(array_reader, plan); ``` - ![](https://arrow.apache.org/img/late-materialization/fig2.jpg) ### 2.2 Combining row selectors - [`RowSelection::and_then`](https://github.com/apache/arrow-rs/blob/bab30ae3d61509aa8c73db33010844d440226af2/parquet/src/arrow/arrow_reader/selection.rs#L345) - 对两个 RowSelection 做 AND 操作,用于串联两个 filter 的结果 - 示例: - Input Selection A:`[Skip 100, Select 50, Skip 50]` - 只选中 rows 100-150 - Selection B(在 A 之上继续 filter):`[Select 10, Skip 40]` - 只选择 A 中的 前 10 rows,即 100-110 - 结果:`[Skip 100, Select 10, Skip 90]` ```rust // Example: Skip 100 rows, then take the next 10 let a: RowSelection = vec![RowSelector::skip(100), RowSelector::select(50)].into(); let b: RowSelection = vec![RowSelector::select(10), RowSelector::skip(40)].into(); let result = a.and_then(&b); // Result should be: Skip 100, Select 10, Skip 40 assert_eq!( Vec::<RowSelector>::from(result), vec![RowSelector::skip(100), RowSelector::select(10), RowSelector::skip(40)] ); ``` - ![](https://arrow.apache.org/img/late-materialization/fig3.jpg) --- ## 3. Engineering Challenges ### 3.1 Adaptive RowSelection Policy (Bitmask vs. RLE) - Paper: [Filter Representation in Vectorized Query Execution](https://db.cs.cmu.edu/papers/2021/ngom-damon2021.pdf) - no 'one-size-fits-all' format for `RowSelection` - 最优的内部格式是一个==动态变化==的目标,会随着稀疏模式的不同而不断调整 - Ultra sparse: - 例如每 1w 行选择 1 行。 - 使用 bitmask 会非常浪费;适合 RLE - Sparse but tiny gaps: - 例如重复的 "read 1, skip 1" - RLE 会产生严重的碎片化,bitmask 会更高效 - **Adaptive strategy** - PR: [Adaptive Parquet Predicate Pushdown #8733](https://github.com/apache/arrow-rs/pull/8733) - 检查 selector 的 平均 run length,如果小于阈值 ==32==, 使用 bitmask;否则继续使用 RLE。 - 平均值计算:[resolve_selection_strategy](https://github.com/apache/arrow-rs/blob/2a77e106fd762692c30540a577a08a9825e22245/parquet/src/arrow/arrow_reader/read_plan.rs#L103) - `total_rows`:总共 selected 或 skipped 多少行 - `effective_count`:selectors 的数量 - avg:`total_rows / `effective_count` - 32 的测试依据: [data-driven "face-off"](https://github.com/apache/arrow-rs/pull/8733#issuecomment-3468441165) - **The Bitmask Trap: Missing Pages** - 问题:bitmask 跟 page pruning 结合时,某个完整的 page 被跳过了 - RLE: - ![500](https://arrow.apache.org/img/late-materialization/3.3.2-fig3.jpg) - bitmask: - ![](https://arrow.apache.org/img/late-materialization/3.3.2-fig2.jpg) - bitmask 会==先读取== 6 行,再应用 bitmask。 - 解决:检查到这种情况后,回滚使用 RLE。 - [override_selector_strategy_if_needed](https://github.com/apache/arrow-rs/blob/2a77e106fd762692c30540a577a08a9825e22245/parquet/src/arrow/push_decoder/reader_builder/mod.rs#L700) ### 3.2 Page Pruning > The ultimate performance win is **not doing I/O or decoding at all**. - 使用 PageIndex 来 skip pages,节省 IO 和 CPU(decompression & decoding). - ⚠️:即使 RowSelection 只 select page 中的一行,也需要解压缩整个 page 。 - 实现: - [`RowSelection::scan_ranges`](https://github.com/apache/arrow-rs/blob/ce4edd53203eb4bca96c10ebf3d2118299dad006/parquet/src/arrow/arrow_reader/selection.rs#L204) 结合每个 page 的 metadata 计算出需要读取的 pages - page metadata: - `compressed_page_size`:page 大小 - `first_row_index`:page 起始行号 - 输出:**byte ranges** 即 `(offset, length)` list - ![](https://arrow.apache.org/img/late-materialization/fig4.jpg) > [!summary] Page pruning > - `RowSelection` 包含了需要读取的 rows > - `RowSelection::scan_ranges` 根据上述信息和 page index,计算出需要读取的 pages 位置 > - 即 selected rows -> selected pages(byte ranges in file) > - logical row filtering -> physical byte fetching ### 3.3 Smart Caching - double tax - 延迟物化:为了高效的 skip data,需要先读取它(即 filter columns)。 - 问题: skip 时读取一次,output / projection 时再读取一次。 - [[parquet-efficient-filter-pushdown-blog]] - 解决:[`CachedArrayReader`](https://github.com/apache/arrow-rs/blob/ce4edd53203eb4bca96c10ebf3d2118299dad006/parquet/src/arrow/array_reader/cached_array_reader.rs#L40-L68) -  [#arrow-rs/7850](https://github.com/apache/arrow-rs/pull/7850) - 缓存第一次 decoded batch,projection 时进行重用。 - **Dual-Layer** Cache: - **The Shared Cache**: - 所有 columns 和 readers 共享的 global cache,有容量限制,==不一定命中==。 - 当解码 predicate page 后插入到 cache 中,projection 时读取。 - **The Local Cache** - column reader 私有,当前 pipeline 片段中一直持有,一定会命中。 - ==优先读取== ### 3.4 Minimizing Copies and Allocations - [`PrimitiveArrayReader`](https://github.com/apache/arrow-rs/blob/ce4edd53203eb4bca96c10ebf3d2118299dad006/parquet/src/arrow/array_reader/primitive_array.rs#L102) 实现了 zero-copy - 直接接管 decoded `Vec<T>`,而非 memcpy 到 Arrow Buffer - [PrimitiveArray: From a Vec](https://docs.rs/arrow/latest/arrow/array/struct.PrimitiveArray.html#example-from-a-vec) ### 3.5 The Alignment Gauntlet - Chained filtering 使用了 relative offset - Row 1" in filter N might actually be "Row 10,001" in the file due to prior filters. - 使用 [fuzz test](https://github.com/apache/arrow-rs/blob/ce4edd53203eb4bca96c10ebf3d2118299dad006/parquet/src/arrow/arrow_reader/selection.rs#L1309) 测试 offset 转换的正确性