# ABSTRACT
- C++ database acceleration **library**
- 可复用、可扩展、高性能、SQL方言中立的数据处理组件
- 集成到多个 data systems 如 Presto、Spark、机器学习等。
-----------
# 1. INTRODUCTION
问题背景:每个场景都使用专门/特定的查询和计算引擎,引发数据生态孤岛。
不同引擎的主要区别在于SQL前端、优化器、查询任务的分发和 IO 层。
执行的核心是非常类似的,比如都具有:
- type system(表示 scalar 和 复杂 数据类型)
- 数据集内存中的形式(通常是列存)
- 表达式求值系统
- operators
- 存储和网络序列化、编码格式
- 资源管理
Velox**不提供** SQL parser、dataframe layer、global query optimizer,通常不是直接面向用户的。
Velox 的价值是三个层面的:
- **Efficiency**
- SIMD
- lazy evaluation
- adaptive predicate re-ordering and pushdown
- common subexpression eliminatation
- execution over encoded data
- data generation
- etc
- **Consistency**:数据类型、函数等方面提供一致体验
- **Engineering Efficiency**:特性和优化都有 Velox 提供,可以重用,避免重复开发
目标:让数据系统更加模块化和可互操作。
-------
# 2. LIBRARY OVERVIEW
Velox 期望的输入是一个 **fully optimized query plan**。
另外,Velox 不提供 global query optimizer,而是在执行期间利用大量自适应的技术,如 filter/conjunct reordering、dynamic filter pushdown 和 adaptive column prefetching。
Velox 提供的组件:
- **Type**:
- scalar
- complex
- nested data types(如 structs、maps、arrays、tensors 等)
- **Vector**:
- 和 arrow 兼容的列式内存布局,支持多种编码(Flat、Doct、RLE 等)
- lazy materialization
- out-of-order result buffer population
- **Expression Eval**
- 完全向量化的表达式计算引擎(基于 vector-encoded data)
- 共同表达式消除、常量折叠、==efficient null propagation==、encoding-aware evalutation、dictinary memoization 等优化
- **Functions**
- 支持自定义函数
- 为标量函数提供 row-by-row 和 vectorized(batch-by-batch)两种接口。
- **Operators**: 实现了常见的算子
- **I/O**:
- 一个通用的 connector 接口,允许可插拔的文件格式/存储
- 支持 ORC/Parquet,S3 和 HDFS
- **Serializers**:序列化接口支持网络通信,可以实现不同的 wire protocols,如 PrestoPage,Spark UnsafeRow 格式等。
- **Resource Management**:
一组原语集合来处理计算资源。
比如 memory arenas、buffer management、tasks、drivers、thread pool、spilling 和 caching 等
可以选择使用不同的组件(全部使用,或者只使用部分组件)。
Velox 也提供了扩展 API 用于定制化 Library。
**策略**:足够通用就是现在 Velox 内,否则就用扩展的形式实现。
-----
# 3. USE CASES
## 3.1 Presto
- Presto 的两层架构:
- **coordinator node**:负责接收用户请求、解析SQL、元数据解析、全局查询优化和资源管理
- **worker node**:负责实际执行具体的 query plan fragment。
两者共享相同的 java codebase,通过 HTTP REST 接口通信。
- **Prestissimo:**
使用 Velox 替换 presto 的 java workers。
实现了 HTTP REST 接口,包含 workers 之间的 exchange 协议等。
**主要步骤**:接收来自 coordinator 的 query plan fragment,翻译为 Velox 的 query plan,传递给 Velox 来执行。
## 3.2 Spark
- Spark 由一个 driver process 和 一组 executor processes 组成:
- **driver process**: 负责 task planning、scheduling、与外部资源管理器的通信
- **executor**:负责执行实际的计算、与外部存储系统通信。
- Spruce 是 Velox版 Spark 的 codename
利用了 Spark script transform 接口(允许用户在 Spark 中执行任意二进制程序),将执行 offload 到外部的 C++ 进程(Velox)。
流程:
1. Spark executor 接受到 query plan fragment,序列化后转发给外部的 C++ 进程(SparkCpp)
2. SparkCpp 反序列化,转换成一个 Velox plan,然后使用 Velox 执行。
## 3.3 Realtime Data Infrastructure
使用在 Meta 的三个不同但相关的案例:
- stream processing
- distributed messaging infrastructure
- data ingestion
### 3.3.1 Streaming Processing
尽管流处理提供的抽象是一次操作一行数据,但在实践中,批量处理可以优化 IO,并因此也可以受益于 Velox 的向量化执行模型。
> 生产环境中,数据通常每批最大到 500 KB,buffered 最多 20 秒。
大多数的 data processing operations **直接映射**到 Velox 的 operators。
聚合实现为 Velox 的一个扩展(涉及时间窗口等,非通用场景)。
### 3.3.2 Messaging Bus
**Scribe** 是一个分布式的消息系统,用来收集、聚合、分发,作为 data ingestion 的主要入口。
数据由 web 层生产,写入到 Scribe,然后由 streaming processing 应用消费 或者 分发给数仓。
Scribe 的写入和读取传统上都是 row-by-row 的形式,目前读取可以支持编码为列存。
另外 Velox 用在 Scribe 的读取服务,在消费时可以执行 **pushdown** 操作(如 projection、filters等)。
### 3.3.3 Data Ingestion
FBETL 是 Meta 的 data ingestion engine,负责两个主要的用途:
- data warehouse ingestion
- database ingestion
在 FBETL 中使用 Velox 可以**允许用户指定数据转换规则**,在 ingestion 时执行表达式、UDF、filters等。
## 3.4 Machine Learning
用于 Data Preprocessing 等。
----
# 4. DEEP DIVE
## 4.1 Type System
- 支持的数据类型
- primitive types:integer/float/strings/dates/timestamps/functions
- complex types:arrays/maps/rows/structs,可以任意嵌套
- opaque data type:可以包装任意 C++ 数据类型
- 可扩展,允许添加引擎特定的类型,而不需修改主库。
## 4.2 Vectors
- Velex Vectors 可以在内存中表示列存数据(利用各种编码格式)
- 扩展了 Arrow 格式:
- size 变量(表示 Vector 中的行数)
- data type
- 可选的 nullability bitmap(表示 null values)
- Vectors 可以表示 fixed-size/ variable-size 元素,也可以任意嵌套。利用了不同的编码格式(如 flat、dictionary,constant,sequence/RLE,bias)。
- **Vector 的存储**:
- Vector 的数据存储在 Velox buffers 中(由内存池分配出来的连续内存)。
- 支持不同的 ownership mode(owned/buffer view)
- Vectors 和 Buffers 都有引用计数,单引用计数的是可修改的,但也可以通过 COW 修改。
- **Lazy Vectors**
- 首次使用时才会被填充
- 支持在加载数据时应用一个 callback,用于计算下推(如聚合),不需要物化 intermediate vector
- **Decoded Vector 抽象**
将 encoded Vector 转换为一个 flat vector 和 全部/部分元素的索引。
对于 flat、constant、单层词典编码的输入是 zero-copy 的。
但对于多字典嵌套/RLE 需要物化一个字典索引的新数组。
### 4.2.1 Arrow Comparison
1. **Strings**
采取 StringView 的设计,string vectors 由两个 buffer 组成:
- 一个是 metadata,每元素16字节,称为 StringView
- 一个存储 string 的数据
```cpp
struct StringView {
uint32_t size_;
char prefix_[4];
union {
char inlined[8];
const char* data;
} value_;
}
```
StringView inline 了字符串的前4字节,用于加速 filter 和 orderig 操作。
trim、substr 等操作的执行是 zero-copy 的,只需要更新 metadata pointers。
2. **Out-of-order Write Support**
目的:高效支持**条件语句**的执行(如 IF 和 SWITCH 操作)
条件执行:
先根据条件生成一个 bitmask,表示每行数据执行哪个分支,然后每个分支独立地进行向量化执行,将计算完的值写入一个 output vector。
- [ ] ==为啥不分配多个 output vector再合并==
实现:
- 基础类型天然支持(每个元素大小是固定的)
- string也可以(metadata object 大小是固定的)
- 变长类型(如 arrays 和 maps):同时维护 lengths 和 offsets buffers。另外 slice 和 rearrange elements 操作不需要拷贝。
3. **More Encodings**
支持 RLE 和 constant encoding(某列的所有值都一样)。
## 4.3 Expression Eval
向量化的表达式求值引擎。
- **表达式树**
表达式求值以表达式树作为输入。每个 tree node 是以下形式之一:
- reference to an input column
- constant/literal
- function call
- CAST
- lambda function
function call 也用于表示 AND/OR、条件(IF/SWITCH)和 try 表达式。
tree node 还包含了元数据:
- determinism(输入相同是否输出也总是相同)
- null progagation(输入有 null 输出是否也为 null)
- 每个表达式求值分为两步:compilation 和 evauation。
### 4.3.1 Compilation
- 输入一个或多个表达式树,输出一个 compiled(executable)表达式。
- 期间执行的主要优化:
- **Common subexpression eliminatation**
- 如 `strpos(upper(a), ‘FOO’) > 0 OR strpos(upper(a), ‘BAR’) > 0`,其中 `upper(a)` 是一个共同字表达式,可以只计算一次。
- FilterProject 算子可以在 filter 和 projection 的表达式之间共享子表达式
- **Constant Folding**:计算不依赖 input columns 的 **deterministic** subexpressions
- **Adaptive Conjunct Reordering**
- 计算 AND/OR 表达式时,动态跟踪单个 conjunts 的性能,优先计算最高效的 conjunt,例如以最小时间 drop 掉最多 values 的。
- 为了最大化 reordering的效果,会展开相邻的 AND/OR 表达式,如 `AND(AND(AND(a, b), c), AND(d, e))` 展开为 `AND(a,b,c,d,e)`
### 4.3.2 Evaluation
- **过程**
- 输入是一个 complied expression 和 一个 input dataset(Vectors),输出一个结果 dataset。
- 这一过程由表达式树的递归下降组成,向下传统一个 row mask 标识活跃的元素(非空且未被条件语句屏蔽掉)。
- **计算避免**
每一步,有两种情况可以避免计算:
- 当前的节点是一个共同子表达式,且已经被计算过
- 表达式标记为 progagating nulls,并且其任何输入为 null。
后者可以高效地实现为简单合并所有输入的 nullability bitmasks,然后使用 SIMD 更新 active row masks。
- **Peeling**
如果输入是 dictionary-encoded 的,deterministic expressions 可以考虑只计算不同的值。
- **Memoization**
记住表达式计算的结果,后续可以重用。
### 4.3.3 Code Generation
- **Codegen**
1. 在执行时整个表达式树被重写为 C++ 函数源码,写入到一个源文件
2. 编译成共享库,链接到主进程,用于计算。
> 需要额外的编译时间,因此不适合短暂的查询和交互式场景。
- 仍然是体验特性
- 降低了开发效率
- 调试困难
- 与传统 JIT LLVM-based compilation 之间的权衡
## 4.4 Functions
允许开发者构建自定义的 scalar 和 aggregate 函数。
### 4.4.1 Scalar Functions
- 标量函数:参数是一行,输出也是一行数据。
- 作为一个向量化引擎,Velox 标量函数的 **API 也是向量化的**
- 输入参数是 Vectors(携带 nullabilty buffers 和 active rows bitmap)
- **Simple Functions**
- 容易使用,尽可能地隐藏底层引擎和数据布局,同时提供跟向量化函数同一级别的性能
- 开发者提供一个 C++ 函数(输入是一次 a single row of values)
- 经过优化、给C++编译器提示,保证大多数 执行 loop 是 inlined(减少函数调用和 cache miss)、允许编译器 auto-vectorization
- 基础类型直接映射为 C++ 类型;非基础类型使用 proxy objects(操作底层 Velox Vectors,比如对外呈现 `std::string` 一样的API)
- 可以指定 determinism 和 null 行为
大多数优化智能应用于 dterministic & 有 default null behavior (任一输入为null,输出也为null)的函数
- **Advanced String Processing**
- ==**Ascii fast path**==
- 提供 `callAscii()` 特化函数,避免 UTF-8 的开销,字符串是 Ascii-only 时自动调用
- 可以标识如果输入是 Ascii,输出则也是,避免计算引擎检查是否为 Ascii
- **zero-copy result**
如 `substr`、`trim` 等,结果可以引用 input strings。
### 4.4.2 Aggregate Functions
- 聚合函数:把一个特定 group 内的多行 rows,总结成一个 output row。
- Velox 典型地两步聚合:
1. partial aggregation:以原始数据为输入,产生中间结果
2. final aggregation:将中间结果处理为最终结果
也允许开发者指定另外两个步骤:
- single aggregation:数据已经按 grouping keys 分区好,不需要 shuffle/中间结果
- intermediate aggregation:用于合并 partial aggregation 的结果。比如多线程计算后
- 聚合函数根据中间结果(accumulators)的特点分为两类:
- fixed-size:如 `count()`、`sum()`、`avg()`、`min()` 和 `max()`,使用 fixed-sized accumulators
- variable-size:如 `distinct()`、`pct()`
- [ ] fixed-sized accumulators 是 inline 在 row 内的, variable-size 需要额外的 buffer 和 pointer。
## 4.5 Operators
- PlanNode -> Operator
- Velox query plans 是一个由 PlanNodes 组成的 tree
- 执行时,首先需要将 plan node 转化为 Operators。大多数是一对一,除了:
- Filter node 后面跟着 Project Node,转化为一个 FilterProject operator
- 有两个及以上 children 的 plan node,转化为多个 Operators,如 HashJoin 转化为一对 HashProbe 和 HashBuild。
- **Task、Pipeline、Driver**
- **task** 是 top level Velox execution concept,是分布式执行的功能单元,对应一个 query plan fragment(及它的 operator tree)
- Task 起始是一个 TableScan 或者 Exchange 作为它的 input,以另外一个 Exchange 作为结束
- 一个 Task 的 operator tree 被分割成一个或多个 linear sub-trees(成为 **Pipelines**)
- 每个 Pipleline 拥有一个或多个执行线程,称为 **Drivers**
- Driver 可以在一个线程上运行,也可以 off-thread(比如下游还没有消费,上游没有数据到达等)。state 是 resumable 的,不需要在栈上构建控制流。
- 所有的 operator 都实现了相同的 base API,如
- 添加一组 vector 作为输入
- 将一批 vectors 作为输出
- 检查 operator 是否准备好接收更多的 input daat
- 或者通知不会再添加更多的数据(比如用于聚合 flush 内部状态、开始输出结果)
### 4.5.1 Table Scans、Filter 、Project
- Table scans 是逐列进行的,并且带 filter pushdown。
- 带有 filters 的列先处理,filters 在运行时排序。
- 简单的 filters 使用 SIMD 一次计算多个 values;dicitionary-encode 数据的 filter 结果会进行 cache,然后再使用 SIMD 检查 cache hits。
### 4.5.2 Aggregate and Hash Joins
- Velox 提供了一个精心设计的哈希表实现
- Hashing keys 以列存的方式进行,使用了 VectorHasher 的抽象,并考虑了 key 的范围和基数。
- hash table布局类似 Meta‘s F14.
- 查找不同keys的内存访问是 interleaved
## 4.6 Memory Management
- 使用内存池跟踪内存的使用。
- 小对象直接从 C++ heap 上分配,大对象使用自定义的分配器来实现zero fragmentations。
- **enable spilling**
### 4.6.1 Caching
- memory + SSD caching
- prefetch(for hot columns)
------