# CrossJoinExec
- left 是 build side,right 是 probe side
- 由优化器来保证 left 数据量更小
- 基本流程:
1. 将 left input ==所有数据==加载到内存
- 合并成一个大的 RecordBatch,存放到 JoinLeftData 结构中
- 要求 left input 必须是单分区
2. right input 多分区并行跟 left input 逐行拼接两侧所有列
- 所有分区共享同一份 left input data,只加载到内存中==一次==
- 基于 [Shared Future](https://docs.rs/futures/latest/futures/future/trait.FutureExt.html#method.shared)实现
- 先从 right input fetch 一个 batch(N rows),跟 left 进行 join
- 每次只处理 left input 的一行,重复 N 份,跟当前 right batch 拼接
- 等 left 所有 rows 都 join 完,再 fetch right 的下一个 batch
- 分区数量即 output_partitioning 沿用 right input 的。
--------
# NestedLoopJoinExec
- 基本流程:
1. 类似 CrossJoinExec 将 left 所有数据收集到内存 JoinLeftData 结构中
- JoinLeftData 多了一个 bitmap,用于记录 left rows 有没有被匹配过,实现 Left/Full joins。
2. 按照 right input 的分区数量,并行进行 probe。
- 所有分区共享同一个 JoinLeftData
```mermaid
---
title: NestedLoopJoinStream 状态转换
---
stateDiagram-v2
[*] --> BufferingLeft
state "BufferingLeft: 将 left input 全部读取加载到内存(所有分区共享)" as BufferingLeft
state "FetchingRight:从 right input 读取一个 batch" as FetchingRight
BufferingLeft --> FetchingRight
state IfExhausted <<choice>>
FetchingRight --> IfExhausted
IfExhausted --> ProbeRight: 读到数据
IfExhausted --> EmitLeftUnmatched: EOF
state "ProbeRight: 每次处理一个 left row,缓存结果,达到 batch size 后输出,循环进行直至到 left 结尾" as ProbeRight
state "EmitLeftUnmatched:可选(left outer join等),处理 left inputs 中未被匹配过的数据" as EmitLeftUnmatched
state "EmitRightUnmatched:可选地(right outer join等),处理当前 batch 中未匹配到的 right rows" as EmitRightUnmatched
ProbeRight --> EmitRightUnmatched
EmitRightUnmatched --> FetchingRight: 读取 right 的下一个 batch
EmitLeftUnmatched --> [*]
```
- ProbeRight:
- 优化:如果当前 right batch 很小(小于 10 rows),一次 join 多个 left rows 而非一个,使其能批量进行 join 后的 filter evaluation
- PR: [Rewrite Nested Loop Join executor for 5× speed and 1% memory usage](https://github.com/apache/datafusion/pull/16996)
- 递增式:每个 right batch 每次只跟 left data 中的==一行==进行 join,放入缓存区,攒够 batch_size 大小就输出
----