# CrossJoinExec - left 是 build side,right 是 probe side - 由优化器来保证 left 数据量更小 - 基本流程: 1. 将 left input ==所有数据==加载到内存 - 合并成一个大的 RecordBatch,存放到 JoinLeftData 结构中 - 要求 left input 必须是单分区 2. right input 多分区并行跟 left input 逐行拼接两侧所有列 - 所有分区共享同一份 left input data,只加载到内存中==一次== - 基于 [Shared Future](https://docs.rs/futures/latest/futures/future/trait.FutureExt.html#method.shared)实现 - 先从 right input fetch 一个 batch(N rows),跟 left 进行 join - 每次只处理 left input 的一行,重复 N 份,跟当前 right batch 拼接 - 等 left 所有 rows 都 join 完,再 fetch right 的下一个 batch - 分区数量即 output_partitioning 沿用 right input 的。 -------- # NestedLoopJoinExec - 基本流程: 1. 类似 CrossJoinExec 将 left 所有数据收集到内存 JoinLeftData 结构中 - JoinLeftData 多了一个 bitmap,用于记录 left rows 有没有被匹配过,实现 Left/Full joins。 2. 按照 right input 的分区数量,并行进行 probe。 - 所有分区共享同一个 JoinLeftData ```mermaid --- title: NestedLoopJoinStream 状态转换 --- stateDiagram-v2 [*] --> BufferingLeft state "BufferingLeft: 将 left input 全部读取加载到内存(所有分区共享)" as BufferingLeft state "FetchingRight:从 right input 读取一个 batch" as FetchingRight BufferingLeft --> FetchingRight state IfExhausted <<choice>> FetchingRight --> IfExhausted IfExhausted --> ProbeRight: 读到数据 IfExhausted --> EmitLeftUnmatched: EOF state "ProbeRight: 每次处理一个 left row,缓存结果,达到 batch size 后输出,循环进行直至到 left 结尾" as ProbeRight state "EmitLeftUnmatched:可选(left outer join等),处理 left inputs 中未被匹配过的数据" as EmitLeftUnmatched state "EmitRightUnmatched:可选地(right outer join等),处理当前 batch 中未匹配到的 right rows" as EmitRightUnmatched ProbeRight --> EmitRightUnmatched EmitRightUnmatched --> FetchingRight: 读取 right 的下一个 batch EmitLeftUnmatched --> [*] ``` - ProbeRight: - 优化:如果当前 right batch 很小(小于 10 rows),一次 join 多个 left rows 而非一个,使其能批量进行 join 后的 filter evaluation - PR: [Rewrite Nested Loop Join executor for 5× speed and 1% memory usage](https://github.com/apache/datafusion/pull/16996) - 递增式:每个 right batch 每次只跟 left data 中的==一行==进行 join,放入缓存区,攒够 batch_size 大小就输出 ----