- https://www.influxdata.com/blog/querying-parquet-millisecond-latency/ # Optimizing queries - 优化查询性能的通用技术(也适用于 Parquet): 1. reduce IO:减少从存储传输的数据 2. reduce CPU:减少 decode data 的计算开销 3. improve parallelism:interleave/pipeline reading & decoding ---- # Decode optimization - decoding:CPU-bound,主导了查询延迟。 ## Vectorized decode - vectorized/colunmnar processing - 一次将==多个 values== 解码成 columnar memory format(如 Arrow),而非逐行处理: - 好处: - 均摊 switch on types of column 的开销(类型分支) - cache locality - SIMD - 避免 small heap allocation - 尤其是变长类型如 strings 和 byte arrays ## Streaming decode - 背景知识: - 同一个 row,在不同的 ColumnChunks 中可能位于==不同的 pages==。 - 例如:第 10000 行位于 column A 的第一个 page,在 Column B 中位于第三个 page - 最简单的 decode 方式:一次 decode 整个 RowGroup 或者 ColumnChunk - 问题: - 内存占用大:需要大量 internediate RAM - 增加查询延迟:后续的处理(如 filter / aggregation)只有当整个 RowGroup - streaming decode: - 每次按需输出 batches of rows - batch size 可配置 - 必须足够大来均摊 decode 的开销 - 也要足够小来减少内存占用和 pipeline 延迟 - ![500](https://images.ctfassets.net/o7xu9whrs0u9/2qCu93BPhnyTqeNGzVq1Q5/878e133452698c4341817fc581a37c96/Parquet_File_Streaming_Decode_Diagram_12.05.2022v1.png) - 实现复杂性: - across multiple columns and [arbitrarily nested data](https://arrow.apache.org/blog/2022/10/05/arrow-parquet-encoding-part-1/) - rows 和 values 的关系不固定 ## Dictionary preservation - Dictionary Encoding - ColumnChunk 的 first page 是一个 dictionary page - 包含了 column values 列表 - 后续的 pages 编码了到 dictionary 的 indices,而非 column values - dictionary preservation: - decode 时不还原 column values,否则会低效地重复相同的 values - 使用 Arrow  [DictionaryArray](https://docs.rs/arrow/27.0.0/arrow/array/struct.DictionaryArray.html),解码后的内存格式仍然使用 dictionary encoding - 可以极大提升性能(一些 cases 60x) - 复杂性: - 处理 dictionary 可能跨多个 RowGroups 的情况(dictionary 是 per-rowgroup 独立的) - 并且要针对 RowGroup 是 batch size 整数倍的情况,进行特例优化 - [partly dictionary encoded](https://github.com/apache/parquet-format/blob/111dbdcf8eff2e9f8e0d4e958cecbc7e00028aca/README.md?plain=1#L194-L199) - 某个 column 可能部分使用 dictionary encoding,部分使用 raw values - ![](https://arrow.apache.org/img/20190903-parquet-dictionary-column-chunk.png) --- # Projection pushdown - Parquet reader 只读取 query 需要的 columns。 ---- # Predicate pushdown - 利用 filter expressions 来 skip data,需要和查询引擎结合(scan 时对 predicates 进行求值) - 使用 [RowSelection](https://docs.rs/parquet/27.0.0/parquet/arrow/arrow_reader/struct.RowSelector.html) API 解耦:上层引擎来指定 读取/skip 的范围 ## RowGroup pruning - 使用 footer 中存储的 statistics 来 skip ==整个 RowGroup==s。 - 例如:查询条件是 A > 35 - ![500](https://images.ctfassets.net/o7xu9whrs0u9/5W1ZN8oCkbZAMcqRAwrLJz/1d19e94d3ad8a60a08fa848eea2b4fcb/Parquet_File_RowGroup_Pruning_Diagram_12.05.2022v1.png) - RowGroup 1 的 列 A 最大值为 15,因此可以整个跳过。 - 支持 per ColumnChunk [Bloom Filters](https://github.com/apache/parquet-format/blob/master/BloomFilter.md) ## Page pruning - 利用 footer metadata 中的 page index 来跳过某些 data pages。 - 实现的复杂性: - 不同 ColumnChunks 的 page 经常包含不同数量的 rows - 导致: pruning 某个 column 的一个 page 后并不会马上排除其他 column 的 page。 - page pruning 的流程: 1. 使用 predicates 结合 page index 来辨别哪些 page 需要 skip 2. 根据 offset index 来计算出需要读取的 row ranges(未被 skipped 的) 3. 计算 non-skipped pages 对应 rows ranges 的交集(多个predicate计算出来的),只解码这些 rows - 不易实现,特别是对于 nested lists(一行对应多个 values) ## Late materialization - Only fields that are referenced by predicates and rows that are not filtered out by predicates are fully materialized. - 示例 predicate:`A > 35 AND B = "F"` 1. 使用列 A 的 page index 确定 RowSelection 为 `[100-244]`(50 rows) 2. 解码 A 列的这 50 个 values 3. 只有 5 rows 匹配,生成新的 RowSelection - `[205-206]` - `[238-240]` 4. 只解码 B 列的 5 rows > [!summary] > - Projection pushdown 是 skip columns; > - Predicate pushdown 是 skip rows。 --- # I/O pushdown - Parquet 为 HDFS 而设计,但它也能很好地适配其他 blob storages 如 S3,因为它们拥有相同的特性: - slow random read - **首字节延迟**很大 - 每请求的成本高 - 通常按请求计费 - 为了从此类系统中实现最优的方式读取,Parquet reader 必须: - 最小化 IO 请求数量,同时也要利用 pushdown 技术来避免读取==无关==数据 - 合理的 task scheduling 机制:可以穿插执行 IO 和 计算(数据处理)。 - 读取==整个==文件然后再处理它们并不理想: - 高延迟 - 在整个文件被读取完之前,无法解码(metadata 位于 footer) - 无效工作 - 读取了很多不必要的数据 - 浪费本地磁盘/内存 - 许多云环境对于计算实例不提供 locally attached storage,需要依赖 network block storage( 如 AWS EBS) 或者将 local storage 限制在某些类别的 VM 中。 - 避免 buffer 整个文件,就需要一个复杂的 Parquet decoder: - fetch & decode metadata、ragned fetch 相关 data blocks、穿插数据解码 - ![](https://images.ctfassets.net/o7xu9whrs0u9/6uRMH3yBp7B9xJ6R3BWtGC/3971a116b1ce47186705c0aab5e199b4/Parquet_File_IO_Pushdown_Diagram_12.05.2022v1.png) - 需要仔细设计,从对象存储中获取足够大的 data bloock,使每次请求的开销不会超过减少传输字节所带来的收益。 - [Spark: Decouple CPU with IO work in vectorized Parquet reader](https://issues.apache.org/jira/browse/SPARK-36529) - Rust Parquet [AsyncFileReader](https://docs.rs/parquet/latest/parquet/arrow/async_reader/trait.AsyncFileReader.html): - 高效地读取任何支持 range requests 的存储介质 - 使用 [Async Tokio](https://www.influxdata.com/blog/using-rustlangs-async-tokio-runtime-for-cpu-bound-tasks/) 避免阻塞等网络 IO,可以 interleave CPU 和 network。 - 并行请求多个 ranges,合并相邻 ranges - 使用 pushdown 尽可能消除无关数据 - 方便与 Arrow object_store create 集成 - timeline: - ![](https://images.ctfassets.net/o7xu9whrs0u9/23iUnmSLDBw5yyM291OhRm/d912f29c1d0dbba75d1f79bfd22e1b80/Parquet_File_IO_Pushdown_Diagram_2_12.05.2022v1.png)