# ABSTRACT
Impala:Hadoop环境下的 MPP SQL引擎(SQL-on-Hadoop)。
低延迟 + 高并发,分析型查询。
----------------------
# 1. INTRODUCTION
- multi-user performance
- 语言:C++ 和 Java
- 利用了标准的 Hadoop 组件:HDFS、HBase、Metastore、YARN、Sentry
- 可读取的格式:[[parquet-doc | Parquet]]、Avro、RCFile
- 分布式架构、运行在跟 Hadoop 设施相同的机器上(daemon 进程的形式)
> 总结:高性能的 SQL-on-Hadoop 系统,特别是在 multi-user workloads 下。
--------------
# 2. USER VIEW OF IMPALA
- 借助 Hadoop 组件提供 RDBMS 级别的体验。
- OBDB/JDBC 连接
- Kerberos/LDAP 认证、SQL roles 和 privileges(利用 [Sentry](http://sentry.incubator.apache.org/) )
- 使用 CREATE TABLE 来连接 HDFS 数据
- 提供数据的 logical schema
- 制定数据在 HDFS 上的位置
## 2.1 Physical schema design
- create table 时,可以指定 partition columns
```sql
CREATE TABLE T (...) PARTITIONED BY (day int, month
int) LOCATION ’<hdfs-path>’ STORED AS PARQUET;
```
非分区表:数据文件默认直接存储在 root 目录下。
分区表:存储在子目录下,路径取决于 partition columns 的 value,如 `<root>/day=17/month=2`
- 文件格式灵活
支持压缩/不压缩的文本文件、sequence file(文本文件切分)
RCFile、Avro、Parquet
## 2.2 SQL Support
- 查询
- 支持大多数的 SQL-92 SELECT 语句
- SQL-2003 分析函数
- 大多数标准的数据类型
- UDF(java/c++)
- UDA(c++) 用户自定义聚合函数
- 不支持 UPDATE 和 DELETE(HDFS带来的限制)
- 只支持 bulk insert(`INSERT INTO .. SELECT ...`)
- LOAD DATA 语句
- DROP PARTITION
- COMPUTE STATS `<table>` 手动触发统计
-----------
# 3. ARCHITECTURE
与底层存储引擎是解耦的。
<img src="https://img.jonahgao.com/oss/note/2025p1/impala_paper_arch.jpeg" width="800" />
部署包括三个 services:
- **Impala daemon(impalad)**
负责接受查询、编排查询在集群内的执行、为其他 daemon 执行单独的 query fragments
coordinator:管理某个查询执行的 daemon,就称为是该查询的 coordinator。
所有 impala daemons 都是对称的,因此谁都可以称为 coordinator,有助于容错和负载均衡。
data locality:daemon进程跟 HDFS datanode 部署在一个机器上,同时不跨网络。
- **Statestore daemon(statestored)**
impala 的 metadata publish-subscribe service
负责向所有 Impala processes 传播集群级别的 metadata。
只有一个实例。
- **Catalog daemon(catalogd)**
作为 Impala 的 catalog 仓库、metadata 访问网关。
通过 catalogd ,Impala daemons 可以执行 DDL 命令(反映在 HMS 等外部存储中)
对 system catalog 的更改会通过 statestore 进行广播。
## 3.1 State distribution
- 元数据同步的必要性:
Impala 的 symmetric-node 架构需要所有 nodes 都可以接收和执行查询。
因此所有 nodes 必须有最新版本的 system catalog 和最近的 cluster membership 来调度查询。
- pub-sub 设计的初衷:
为什么靠 Statestored 来传播(pub-sub)而非直接去询问:
在核心路径上尽可能地避免同步RPC
**订阅流程:**
- statestore 上维护了 topics 集合,以 (key, value, version) 三元组的形式。topic 由应用定义,对 statestore 透明。
- subscribers:启动时向 statestore 订阅 topics
- statestore 收到订阅后返回 topic 当前所有的 entries,之后定期发送两类消息给 subscribers:
- topic update
topic之上的增删改变更
每个 subscriber 维护 topic 级别的最新 version 信息,只接收其后的增量更新
- keepalive
跟 subscriber 之间保活。失败后停止发送更新。
并删除标记为 transient 的 topic entries。
- weak semantics
推送可能不及时。
但是 Impala **只使用它来做本地决策**,不进行集群级别的协调。
比如在单节点上进行 query plan,然后执行 plan 所需要的信息也发送给执行节点。
(不要求执行节点上有相同版本的 metadata)
- 故障恢复
statestore 不持久化任何 metadata。
- [ ] 所有当前的 metadata 都是通过 live subscribers 推送给 statestore 的
因此重启后可以通过 initial subscriber registration 阶段恢复它的状态。
## 3.2 Catalog service
- **作用:**
- 通过 statestore 的广播机制给 Impala daemons 提供 catalog metadata 服务。
- 执行 DDL 操作(替 Impala daemons 执行)
- **元数据类型**
- catalog service 从第三方的 metadata store 拉去信息(如 Hive Metastore 或 HDFS Namenode),并聚合信息到 Impala 兼容的 catalog 结构。 这种设计方便添加新的 metadata store。
- 也负责 Impala 特有的信息,如 UDF(不会传播给 HMS)。
- **Lazy load**
catalogs 通常比较大,但是通常只访问一部分。
因此启动时只加载 table 的粗略信息,详细信息在后台异步加载。
---------
# 4. FRONTEND
- FE 负责将 SQL 转换为 query plan,再由 BE 进行执行。
- 由 Java 编写,从头实现了 fully-featured SQL parser、CBO
- 一个可执行的查询计划由两个阶段组成:
1. single node planning
2. plan parallelization and fragmentation
- **第一阶段**
parse tree 转换为不可执行的 single-node plan tree。
这一步骤负责:谓词下推、根据等价类推断谓词、裁剪 partitions、设置 limits/offset、应用 column projections
同时执行一些 CBO,比如 ordering 和 coalescing window function、join reordering。
成本预估依据:table/partition 的基数加上每列的基数。
使用简单的启发式方法来避免穷举。
- **第二阶段**
将 single-node plan 转换为分布式执行计划。
总体目标:减少数据挪动,最大化 scan locality。
分布式计划的生成:
- plan nodes 之间按需添加 exchange nodes
- 添加额外的 non-exchange plan nodes,尽可能减少数据挪动(如 local aggregation nodes)
期间会决定每个 join 的 策略(支持 broadcast 和 partitioned 两种)。
- **两阶段聚合**
所有聚合执行时都是先本地预聚合 local pre-aggregation,再跟一个 merge aggregation 操作。
grouping aggregation:按 grouping 表达式对预聚合结果分区,并行 merge
non-grouping:单节点 merge
sort 和 top-n 也采取类型的并行逻辑(local 执行+ 单节点merge)
- **Plan fragment**
最后将分布式 plan tree 按照 exchange 作为边界拆分。
每部分放到一个 plan fragment 中,作为 BE 执行的单元。
每个 plan fragment 在一台机器上操作相同的数据分区。
-----------
# 5 BACKEND
- BE 从 FE 接收 query fragments ,并负责最终的执行。
- 设计上充分利用了现代硬件的优势。
- C++ 编写以及运行时使用代码生成,来产生高效的代码路径、更小的内存占用
- 执行模型:Volcano-style(带 Exchange operators)
GetNext() 返回一批 rows
fully pipeline(除了 sort 等算子),可以较少存储中间结果的内存开销
在内存中处理时,使用传统的行存格式(roadmap中将支持列存)
- spillable to disk
如 hash join、aggregation、sorting等算子需要将部分工作集置换到磁盘上。
- partitioning of hash join & aggregation operators
每个元组 hash 值的一些 bits 用来确定目标分区,剩下的 bits 用于 hash table probe。
构建 hash table 时,同时构建一个 Bloom filter。
## 5.1 Runtime Code Generation
- 使用 LLVM 来进行 runtime code generation。
- Impala 使用 runtime code generation 来生成 query-specific 版本的函数(性能关键的)。特别是应用于 inner loop 里的函数(可能会执行很多次)
- 代码生成的优势
- 为特定类型使用特定的使用,如只处理整型
- 避免 virtual functions 的开销,甚至可以 inline,避免函数调用开销
## 5.2 I/O Management
- 使用 HDFS 的 short-circuit local reads 特性
读本地磁盘时,跳过 DataNode protocol
- HDFS caching
- 每个物理磁盘使用固定数量的 worker threads,对外提供异步接口。
## 5.3 Storage Formats
- 支持大多数流行的数据格式,可以结合不同的压缩算法(snappy、gzip、bz2)
- 推荐使用 [[parquet-doc | Parquet]]
-----------
# 6. Resource/Workload Management
两个机制:
- 增加 Admission Control,可以让用户控制自己的 workload,不需要通过 YARN 集中决策
- 在 Impala 和 YARN 之间增加中间层优化 YARN 的使用