# ABSTRACT - C++ database acceleration **library** - 可复用、可扩展、高性能、SQL方言中立的数据处理组件 - 集成到多个 data systems 如 Presto、Spark、机器学习等。 ----------- # 1. INTRODUCTION 问题背景:每个场景都使用专门/特定的查询和计算引擎,引发数据生态孤岛。 不同引擎的主要区别在于SQL前端、优化器、查询任务的分发和 IO 层。 执行的核心是非常类似的,比如都具有: - type system(表示 scalar 和 复杂 数据类型) - 数据集内存中的形式(通常是列存) - 表达式求值系统 - operators - 存储和网络序列化、编码格式 - 资源管理 Velox**不提供** SQL parser、dataframe layer、global query optimizer,通常不是直接面向用户的。 Velox 的价值是三个层面的: - **Efficiency** - SIMD - lazy evaluation - adaptive predicate re-ordering and pushdown - common subexpression eliminatation - execution over encoded data - data generation - etc - **Consistency**:数据类型、函数等方面提供一致体验 - **Engineering Efficiency**:特性和优化都有 Velox 提供,可以重用,避免重复开发 目标:让数据系统更加模块化和可互操作。 ------- # 2. LIBRARY OVERVIEW Velox 期望的输入是一个 **fully optimized query plan**。 另外,Velox 不提供 global query optimizer,而是在执行期间利用大量自适应的技术,如 filter/conjunct reordering、dynamic filter pushdown 和 adaptive column prefetching。 Velox 提供的组件: - **Type**: - scalar - complex - nested data types(如 structs、maps、arrays、tensors 等) - **Vector**: - 和 arrow 兼容的列式内存布局,支持多种编码(Flat、Doct、RLE 等) - lazy materialization - out-of-order result buffer population - **Expression Eval** - 完全向量化的表达式计算引擎(基于 vector-encoded data) - 共同表达式消除、常量折叠、==efficient null propagation==、encoding-aware evalutation、dictinary memoization 等优化 - **Functions** - 支持自定义函数 - 为标量函数提供 row-by-row 和 vectorized(batch-by-batch)两种接口。 - **Operators**: 实现了常见的算子 - **I/O**: - 一个通用的 connector 接口,允许可插拔的文件格式/存储 - 支持 ORC/Parquet,S3 和 HDFS - **Serializers**:序列化接口支持网络通信,可以实现不同的 wire protocols,如 PrestoPage,Spark UnsafeRow 格式等。 - **Resource Management**: 一组原语集合来处理计算资源。 比如 memory arenas、buffer management、tasks、drivers、thread pool、spilling 和 caching 等 可以选择使用不同的组件(全部使用,或者只使用部分组件)。 Velox 也提供了扩展 API 用于定制化 Library。 **策略**:足够通用就是现在 Velox 内,否则就用扩展的形式实现。 ----- # 3. USE CASES ## 3.1 Presto - Presto 的两层架构: - **coordinator node**:负责接收用户请求、解析SQL、元数据解析、全局查询优化和资源管理 - **worker node**:负责实际执行具体的 query plan fragment。 两者共享相同的 java codebase,通过 HTTP REST 接口通信。 - **Prestissimo:** 使用 Velox 替换 presto 的 java workers。 实现了 HTTP REST 接口,包含 workers 之间的 exchange 协议等。 **主要步骤**:接收来自 coordinator 的 query plan fragment,翻译为 Velox 的 query plan,传递给 Velox 来执行。 ## 3.2 Spark - Spark 由一个 driver process 和 一组 executor processes 组成: - **driver process**: 负责 task planning、scheduling、与外部资源管理器的通信 - **executor**:负责执行实际的计算、与外部存储系统通信。 - Spruce 是 Velox版 Spark 的 codename 利用了 Spark script transform 接口(允许用户在 Spark 中执行任意二进制程序),将执行 offload 到外部的 C++ 进程(Velox)。 流程: 1. Spark executor 接受到 query plan fragment,序列化后转发给外部的 C++ 进程(SparkCpp) 2. SparkCpp 反序列化,转换成一个 Velox plan,然后使用 Velox 执行。 ## 3.3 Realtime Data Infrastructure 使用在 Meta 的三个不同但相关的案例: - stream processing - distributed messaging infrastructure - data ingestion ### 3.3.1 Streaming Processing 尽管流处理提供的抽象是一次操作一行数据,但在实践中,批量处理可以优化 IO,并因此也可以受益于 Velox 的向量化执行模型。 > 生产环境中,数据通常每批最大到 500 KB,buffered 最多 20 秒。 大多数的 data processing operations **直接映射**到 Velox 的 operators。 聚合实现为 Velox 的一个扩展(涉及时间窗口等,非通用场景)。 ### 3.3.2 Messaging Bus **Scribe** 是一个分布式的消息系统,用来收集、聚合、分发,作为 data ingestion 的主要入口。 数据由 web 层生产,写入到 Scribe,然后由 streaming processing 应用消费 或者 分发给数仓。 Scribe 的写入和读取传统上都是 row-by-row 的形式,目前读取可以支持编码为列存。 另外 Velox 用在 Scribe 的读取服务,在消费时可以执行 **pushdown** 操作(如 projection、filters等)。 ### 3.3.3 Data Ingestion FBETL 是 Meta 的 data ingestion engine,负责两个主要的用途: - data warehouse ingestion - database ingestion 在 FBETL 中使用 Velox 可以**允许用户指定数据转换规则**,在 ingestion 时执行表达式、UDF、filters等。 ## 3.4 Machine Learning 用于 Data Preprocessing 等。 ---- # 4. DEEP DIVE ## 4.1 Type System - 支持的数据类型 - primitive types:integer/float/strings/dates/timestamps/functions - complex types:arrays/maps/rows/structs,可以任意嵌套 - opaque data type:可以包装任意 C++ 数据类型 - 可扩展,允许添加引擎特定的类型,而不需修改主库。 ## 4.2 Vectors - Velex Vectors 可以在内存中表示列存数据(利用各种编码格式) - 扩展了 Arrow 格式: - size 变量(表示 Vector 中的行数) - data type - 可选的 nullability bitmap(表示 null values) - Vectors 可以表示 fixed-size/ variable-size 元素,也可以任意嵌套。利用了不同的编码格式(如 flat、dictionary,constant,sequence/RLE,bias)。 - **Vector 的存储**: - Vector 的数据存储在 Velox buffers 中(由内存池分配出来的连续内存)。 - 支持不同的 ownership mode(owned/buffer view) - Vectors 和 Buffers 都有引用计数,单引用计数的是可修改的,但也可以通过 COW 修改。 - **Lazy Vectors** - 首次使用时才会被填充 - 支持在加载数据时应用一个 callback,用于计算下推(如聚合),不需要物化 intermediate vector - **Decoded Vector 抽象** 将 encoded Vector 转换为一个 flat vector 和 全部/部分元素的索引。 对于 flat、constant、单层词典编码的输入是 zero-copy 的。 但对于多字典嵌套/RLE 需要物化一个字典索引的新数组。 ### 4.2.1 Arrow Comparison 1. **Strings** 采取 StringView 的设计,string vectors 由两个 buffer 组成: - 一个是 metadata,每元素16字节,称为 StringView - 一个存储 string 的数据 ```cpp struct StringView { uint32_t size_; char prefix_[4]; union { char inlined[8]; const char* data; } value_; } ``` StringView inline 了字符串的前4字节,用于加速 filter 和 orderig 操作。 trim、substr 等操作的执行是 zero-copy 的,只需要更新 metadata pointers。 2. **Out-of-order Write Support** 目的:高效支持**条件语句**的执行(如 IF 和 SWITCH 操作) 条件执行: 先根据条件生成一个 bitmask,表示每行数据执行哪个分支,然后每个分支独立地进行向量化执行,将计算完的值写入一个 output vector。 - [ ] ==为啥不分配多个 output vector再合并== 实现: - 基础类型天然支持(每个元素大小是固定的) - string也可以(metadata object 大小是固定的) - 变长类型(如 arrays 和 maps):同时维护 lengths 和 offsets buffers。另外 slice 和 rearrange elements 操作不需要拷贝。 3. **More Encodings** 支持 RLE 和 constant encoding(某列的所有值都一样)。 ## 4.3 Expression Eval 向量化的表达式求值引擎。 - **表达式树** 表达式求值以表达式树作为输入。每个 tree node 是以下形式之一: - reference to an input column - constant/literal - function call - CAST - lambda function function call 也用于表示 AND/OR、条件(IF/SWITCH)和 try 表达式。 tree node 还包含了元数据: - determinism(输入相同是否输出也总是相同) - null progagation(输入有 null 输出是否也为 null) - 每个表达式求值分为两步:compilation 和 evauation。 ### 4.3.1 Compilation - 输入一个或多个表达式树,输出一个 compiled(executable)表达式。 - 期间执行的主要优化: - **Common subexpression eliminatation** - 如 `strpos(upper(a), ‘FOO’) > 0 OR strpos(upper(a), ‘BAR’) > 0`,其中 `upper(a)` 是一个共同字表达式,可以只计算一次。 - FilterProject 算子可以在 filter 和 projection 的表达式之间共享子表达式 - **Constant Folding**:计算不依赖 input columns 的 **deterministic** subexpressions - **Adaptive Conjunct Reordering** - 计算 AND/OR 表达式时,动态跟踪单个 conjunts 的性能,优先计算最高效的 conjunt,例如以最小时间 drop 掉最多 values 的。 - 为了最大化 reordering的效果,会展开相邻的 AND/OR 表达式,如 `AND(AND(AND(a, b), c), AND(d, e))` 展开为 `AND(a,b,c,d,e)` ### 4.3.2 Evaluation - **过程** - 输入是一个 complied expression 和 一个 input dataset(Vectors),输出一个结果 dataset。 - 这一过程由表达式树的递归下降组成,向下传统一个 row mask 标识活跃的元素(非空且未被条件语句屏蔽掉)。 - **计算避免** 每一步,有两种情况可以避免计算: - 当前的节点是一个共同子表达式,且已经被计算过 - 表达式标记为 progagating nulls,并且其任何输入为 null。 后者可以高效地实现为简单合并所有输入的 nullability bitmasks,然后使用 SIMD 更新 active row masks。 - **Peeling** 如果输入是 dictionary-encoded 的,deterministic expressions 可以考虑只计算不同的值。 - **Memoization** 记住表达式计算的结果,后续可以重用。 ### 4.3.3 Code Generation - **Codegen** 1. 在执行时整个表达式树被重写为 C++ 函数源码,写入到一个源文件 2. 编译成共享库,链接到主进程,用于计算。 > 需要额外的编译时间,因此不适合短暂的查询和交互式场景。 - 仍然是体验特性 - 降低了开发效率 - 调试困难 - 与传统 JIT LLVM-based compilation 之间的权衡 ## 4.4 Functions 允许开发者构建自定义的 scalar 和 aggregate 函数。 ### 4.4.1 Scalar Functions - 标量函数:参数是一行,输出也是一行数据。 - 作为一个向量化引擎,Velox 标量函数的 **API 也是向量化的** - 输入参数是 Vectors(携带 nullabilty buffers 和 active rows bitmap) - **Simple Functions** - 容易使用,尽可能地隐藏底层引擎和数据布局,同时提供跟向量化函数同一级别的性能 - 开发者提供一个 C++ 函数(输入是一次 a single row of values) - 经过优化、给C++编译器提示,保证大多数 执行 loop 是 inlined(减少函数调用和 cache miss)、允许编译器 auto-vectorization - 基础类型直接映射为 C++ 类型;非基础类型使用 proxy objects(操作底层 Velox Vectors,比如对外呈现 `std::string` 一样的API) - 可以指定 determinism 和 null 行为 大多数优化智能应用于 dterministic & 有 default null behavior (任一输入为null,输出也为null)的函数 - **Advanced String Processing** - ==**Ascii fast path**== - 提供 `callAscii()` 特化函数,避免 UTF-8 的开销,字符串是 Ascii-only 时自动调用 - 可以标识如果输入是 Ascii,输出则也是,避免计算引擎检查是否为 Ascii - **zero-copy result** 如 `substr`、`trim` 等,结果可以引用 input strings。 ### 4.4.2 Aggregate Functions - 聚合函数:把一个特定 group 内的多行 rows,总结成一个 output row。 - Velox 典型地两步聚合: 1. partial aggregation:以原始数据为输入,产生中间结果 2. final aggregation:将中间结果处理为最终结果 也允许开发者指定另外两个步骤: - single aggregation:数据已经按 grouping keys 分区好,不需要 shuffle/中间结果 - intermediate aggregation:用于合并 partial aggregation 的结果。比如多线程计算后 - 聚合函数根据中间结果(accumulators)的特点分为两类: - fixed-size:如 `count()`、`sum()`、`avg()`、`min()` 和 `max()`,使用 fixed-sized accumulators - variable-size:如 `distinct()`、`pct()` - [ ] fixed-sized accumulators 是 inline 在 row 内的, variable-size 需要额外的 buffer 和 pointer。 ## 4.5 Operators - PlanNode -> Operator - Velox query plans 是一个由 PlanNodes 组成的 tree - 执行时,首先需要将 plan node 转化为 Operators。大多数是一对一,除了: - Filter node 后面跟着 Project Node,转化为一个 FilterProject operator - 有两个及以上 children 的 plan node,转化为多个 Operators,如 HashJoin 转化为一对 HashProbe 和 HashBuild。 - **Task、Pipeline、Driver** - **task** 是 top level Velox execution concept,是分布式执行的功能单元,对应一个 query plan fragment(及它的 operator tree) - Task 起始是一个 TableScan 或者 Exchange 作为它的 input,以另外一个 Exchange 作为结束 - 一个 Task 的 operator tree 被分割成一个或多个 linear sub-trees(成为 **Pipelines**) - 每个 Pipleline 拥有一个或多个执行线程,称为 **Drivers** - Driver 可以在一个线程上运行,也可以 off-thread(比如下游还没有消费,上游没有数据到达等)。state 是 resumable 的,不需要在栈上构建控制流。 - 所有的 operator 都实现了相同的 base API,如 - 添加一组 vector 作为输入 - 将一批 vectors 作为输出 - 检查 operator 是否准备好接收更多的 input daat - 或者通知不会再添加更多的数据(比如用于聚合 flush 内部状态、开始输出结果) ### 4.5.1 Table Scans、Filter 、Project - Table scans 是逐列进行的,并且带 filter pushdown。 - 带有 filters 的列先处理,filters 在运行时排序。 - 简单的 filters 使用 SIMD 一次计算多个 values;dicitionary-encode 数据的 filter 结果会进行 cache,然后再使用 SIMD 检查 cache hits。 ### 4.5.2 Aggregate and Hash Joins - Velox 提供了一个精心设计的哈希表实现 - Hashing keys 以列存的方式进行,使用了 VectorHasher 的抽象,并考虑了 key 的范围和基数。 - hash table布局类似 Meta‘s F14. - 查找不同keys的内存访问是 interleaved ## 4.6 Memory Management - 使用内存池跟踪内存的使用。 - 小对象直接从 C++ heap 上分配,大对象使用自定义的分配器来实现zero fragmentations。 - **enable spilling** ### 4.6.1 Caching - memory + SSD caching - prefetch(for hot columns) ------