#database/olap # 概述 ## 整体架构 ![](https://hudi.apache.org/assets/images/hudi-data-lake-platform_-_Copy_of_Page_1_3-2d54eeaee61f34b3146391bec58c11e5.png) **核心能力:** - Streaming + Batch API 支持流式写入(CDC、行级别的快速更新能力)和流式读取,实现 incremetant process - ACID 事务 - 存算分离 - 查询优化(索引和统计信息) - 开放性:多云支持 + 引擎中立 - time travel - table 管理:小文件智能处理、数据布局优化等 ## 表类型和查询模式 |表类型|支持的查询模式| |--------|--------------| |Copy On Write|Snapshot Query + Incremental Query| |Merge On Read|Snapshot Query + Incremental Query + Read Optimized Query| 两种表类型: - **Copy On Write** 只有parquet列存文件,写入时同步地跟老数据进行merge重写。 - **Merge On Read** parquet + delta log(行存),更新写入到 delta log,后续再进行合并。 不同表类型的优劣势对比: | Trade-off | CopyOnWrite|MergeOnRead| |----------|----------|-------| | 数据延迟 | 高 | 低 | | 查询延迟 | 低 | 高 | |更新开销(I/O)|高 (需要重写整个parquet)|低 (追加到delta log即可) | |写放大| 高 | 低 (取决于 compaction 策略) | 不同的查询模式: - **Snapshot Query**:获取最近的快照进行查询。对于 Merge-On-Read 表,包含了最新提交的 base file + delta logs,并在查询时现场合并(查询延迟比较高)。 - **Incremental Query**:持续地获取从某个时间点之后的新数据,适用于流式场景下。 - **Read Optimized Query**:适用于 Merge-On-Read 表类型,只读取 base parquet 文件(只有列存),忽略 delta logs。查询延迟比较低,但数据新鲜度不够。 COW Table示意图: ![](https://hudi.apache.org/assets/images/hudi_cow-9750b5f006646e2d1874ad18b355d200.png) MOR **Table示意图:** ![](https://hudi.apache.org/assets/images/hudi_mor-5f9da4e0c57c9ee20b74b31c035ba0e6.png) ## Data Model Hudi 的每个 table 需要配置主键。 每条写入的记录都有主键唯一标识,Hudi table 的主键由两部分构成: - record key - partition_path(可选),控制是全局唯一还是parition级别唯一。 主键的定义是通过指定不同的KeyGenerator来实现的。 Hudu内置了多种 KeyGenerator,也支持自定义的方式。 - **SimpleKeyGenerator** 指定一个字段为Record Key, partition字段也指定一个字段,大部分情况使用该配置 - **ComplexKeyGenerator** 使用多个列组合作为 record key 或 parition paths - **GlobalDeleteKeyGenerator** 全局索引场景下,主键不需要使用 parition value - **NonparitionKeyGenerator** 只有一个分区 - **CustomKeyGenerator** 如field_3:simple, field_5:timestamp - **TimestampBasedKeyGenerator** 字段值转换为timestamp而非string 基于主键的数据模型再配合映射到主键的索引就可以实现: - 快速的更新逻辑(upsert) - 去重(记录按主键唯一, parition或global级别) > 注:主键也可以配置为空,适合于不需要更新和去重的场景。 默认 record key 需要作为一个 meta column(_hoodie_record_key)来存储到数据中的,这样会带来一些额外的空间占用。后续([RFC-21](https://cwiki.apache.org/confluence/display/HUDI/RFC+-+21+%3A+Allow+HoodieRecordKey+to+be+Virtual))Hudi 新增了 Virtual Key 的特性,可以不物理存储 record key(只存储生成方式)。 ## Timeline 即 transaction logs,表示不同时间点对一个 table 上执行的不同动作。 Hudi 可以保证每个动作都是原子的。 每个动作(Instant)都有三个属性: - action:操作类型 - time:一般是个递增的时间戳 - state:instant 当前的状态 关键的动作类型: - COMMITS:原子写入 - CLEANS:后台活动,清理不需要的旧版本文件 - DELTA_COMMIT:MOR table 原子写入 - COMPACTION:后台活动,将 log 合并到 base - ROLLBACK:回滚一次 commit/delta commit - SAVEPOINT:标记某些 file group 为 saved,cleaner不会删除它们。用于灾难恢复 每个动作都有三种状态: - REQUESTED:操作已经被调度,还没开始 - INFLIGHT:正在执行 - COMPLETED ![](https://hudi.apache.org/assets/images/hudi_timeline-bf5d8c5e59180434796d82af2b783e6c.png) Hudi 基于 Timeline 来实现 MVCC + Incremental Processing 的能力。 ## File Layouts 一个表的数据包含了数据文件和hudi内部的元数据文件(.hoodie文件夹)。 数据文件组织的级别: table -> parition -> file group -> file slice -> file 1. 整个 table 的数据存储在分布式文件系统中的一个 base path 下 2. table 又拆分为多个 partition(不同子目录) 3. 一个 parition 内又分为多个 file group,不同group之间的数据不重复 4. 一个 file group 内又有多个 file slice,一次提交/Compaction产生一个 file slice(MVCC下的一个新版本)。 5. 一个 flie slice 包含一个 base file(parquet) + 多个 log files。 ![](https://hudi.apache.org/assets/images/hudi-design-diagrams-table-format-3ba591d07f846d8a739366efdf6071ce.png) # Metadata Table Metadata Table主要意义:不需要执行昂贵的 list files 操作。 Metadata table 是一个 internally-managed hudi table,位于 base path 的 _.hoodie/metadata_ 目录下。 Metadata table 是一个 merge-on-read 类型的 table。 与用户 table 的区别是 Metadata table 的 base file 是 HFile 格式,索引以 Key value 的形式存储,来支持快速的点查和范围扫描。 Metadata table 包含了文件列表等元数据以及索引数据。 ![|900](https://hudi.apache.org/assets/images/hudi-design-diagrams_-_Page_5-5ca4af1d2d91e19dc4e9e9e5138bb2b7.png) 索引数据以单独 partition 的形式存储在 metadata table 中。 ![](https://apijoyspace.jd.com/v1/files/lwAegh4ROnSqQfvlXIf3/link) ## 主键索引 主键索引是映射 hoodie key(record key + partition_path)到 file id,来高效支持 upserts操作。 对于 MOR table,有了主键索引后,merge也可以变得更高效。 ![](https://hudi.apache.org/assets/images/with-and-without-index-81d481917e61e4cd1be2426c12994b8b.png) **主键索引类型** - Bloom Index(默认) 使用 bloom filter + record key ranges(可选,interval tree)来有效过滤文件 过滤后再从候选文件中读取确认(处理 upsert 时) - Simple Index 直接从文件里读 - Hbase Index 使用外部 Hbase 来存储索引(需要处理两部分数据的一致性) - 用户自定义 **Global vs Non-Global Index** • Global index 能保证 key 在所有分区都是唯一的 • Non Global index 默认实现,分区内的索引 ## Column Stats Index 包含了所有列的统计信息,可以根据列值作 file pruning,来提升查询效率。 列统计信息示例: ![](https://apijoyspace.jd.com/v1/files/ctMUg4I2ONdjL4WkxEme/link) # 并发控制 即处理读、写、内置异步table service(Hudi后台任务)之间的并发问题。 ## Read-Write Atomic Write (基于事务日志)+ MVCC 来处理读写之间的并发,提供了快照隔离的语义。 读写操作可以同时进行,并保证可以读到最新的完整数据(no partial-write)。 ## Single Writer + Inline Table Service 写入过程同步执行 table service (compaction、clustering等),该场景下完全没有并发。 ![](https://hudi.apache.org/assets/images/SingleWriterInline-d18346421aa3f1d11a3247164389e1ce.gif) ## Single Writer + Async Table Service 写入跟 table services 之间存在并发。 ![](https://hudi.apache.org/assets/images/SingleWriterAsync-3d7ddf7312381eab7fdb91a7f2746376.gif) 数据之间没有并发(比如compaction,处理都是已提交的数据),资源上存在并发。 根据不同的 table service 作简单的并发控制。例如 delta log 如果正在被 compaction,则不能继续写入,需要生成新 log 文件来服务写入。 ## Multi Writer ![](https://hudi.apache.org/assets/images/MultiWriter-fec6bf4269df78d4fa91e7a353144def.gif) 并发控制手段:File-Level(file group) Optimistic Concurrency Control 写入前期不加锁,commit 阶段加锁(表级别的锁,由 lock provider 提供)。 commit 时先检查写入期间涉及的文件是否发生更改,是的话就中止本次写入。 > 注:写文件时仍然需要加 file group 级别的悲观锁 如果不同的Writer写入的数据不相关,则性能会比较好。 开启OCC需要配置 lock provider(四种之一): - File system hoodie.base.path+/.hoodie/lock,并配置过期时间 - Zookeeper - HiveMetastore - Amazone DynamoDB **OCC优化**: - Writer增加心跳机制,定时跟踪写入哪些文件,及时检测冲突,尽早中止,避免造成资源浪费 - 优先终止开始时间晚的任务,对 long-running 任务更友好(long-running 任务的回滚代价高) - 如果 Multi Writers 来自同一个 JVM client,使用进程内的锁,不需要使用外部锁 **Multi Writer Gurantees:** - UPSERT:数据无重复 - INSERT:数据可能会重复,即使开启 dedup 选项。 - BULK_INSERT: 可能会重复,即使开启 dedup 选项。 # 读写流程 ## Read 大致流程: 1. 从 Timeline 上挑选时间点(最近成功 commit version 或者 查询中指定的 commit version) 2. 读取指定时间点的 metadata table,获取对应的数据文件列表,并根据索引和统计信息过滤文件(file pruning) 3. COW表类型:从目标parquet文件扫描记录获取数据 MOR表类型:将 delta logs 加载到 _spillable map ,_读取到 parquet 文件中的每条记录后,需要检查 map 中是否有更新的数据。 ## Write Hudi支持的写入方式: **增量写入** • insert 不查找索引来判断数据是否存在 • upsert 需要查找索引 • delete **批量写入** • insert_overwrite 重写input里涉及的所有 parition • insert_overwrite_table 重写整个表(只删除元数据,后续靠Cleaning来删除数据文件,速度更快) • delete_parition • bulk_insert 适用于初始化时导入数据,比 upsert 更高效(没有写入时File Sizing,对输入进行排序避免的同时写多个文件) **upsert 写入大致流程:** 1. 开启写事务 2. 输入数据先在内存中去重 3. tag:从当前数据中按主键查找,判断是 insert 还是 update,定位目标 file group,对旧数据的更新写入旧数据所在 file group;新数据写入新的file group(不考虑 filesizing) 4. 数据合并或者写入 delta log 5. 索引维护 6. 提交事务 # Table Service ## Compaction 适用于Merge-On-Read表类型,将 delta logs 与 base file 合并生成新的 base file,提升查询性能。 compaction默认是在后台异步执行。异步compaction整个过程分为两步: 1. 任务生成: 由 ingestion job 负责,扫描分区、挑选需要 compact 的文件,并将 compaction plan 写入 Hudi timeline 2. 任务执行:读取 compaction plan 并执行 compaction 不同的执行方式: - 异步执行:默认在 spark streaming job、 DeltaStreamer Continuous Mode(CDC同步)下自动异步执行。 - 同步执行:可配置为写入时同步执行(比如某批数据写入后,想马上提升查询它的性能) - 手动触发:手动向 Spark/Flink 提交任务,或者通过 Hudi-CLI 命令行工具进行离线的compaction。 ## FileSizing 大量的小文件将会导致很差的查询分析性能,因为查询引擎执行查询时需要进行太多次文件的打开/读取/关闭。Hudi提供了 FileSizing Service 来解决小文件的问题,主要是采取了两个措施: - 写入时:避免小文件的生成 - 写入后:提供 Clustering 特性将小文件进行合并 ### 写入时(Auto-Size) 新数据写入时(数据不存在),不优先创建新的 file group,而是查找已有的较小的file group,追加到其后。 例如:小文件阈值为100Mb,则当前 File_1,File_2, File_3都为小文件: ![](https://apijoyspace.jd.com/v1/files/IAKRbtoD44Uzq1OaEuXT/link) 写入时新数据优先追加到小文件上: ![](https://apijoyspace.jd.com/v1/files/yyQhDzWqOKK5UWHTlsfM/link) 小文件填充完后,再创建新的 file group: ![](https://apijoyspace.jd.com/v1/files/bz6MQVMSGwRSY5mAkvJf/link) **缺点:** - 开销发生 ingestion 期间,影响写入速度 - 如果数据文件上有其他任务在进行(如compaction),则无法进行 ### 写入后(Clustering) [RFC-19](https://cwiki.apache.org/confluence/display/HUDI/RFC+-+19+Clustering+data+for+freshness+and+query+performance) 跟 ingestion job 脱离,不影响 ingestion的速度。 与ingestion job并行地执行,同步或在后台异步地将小文件合并为大文件(合并多个小 file group)。 ![](https://cwiki.apache.org/confluence/download/attachments/153816015/COW-Collapse-noupdates%20%283%29.jpg?version=1&modificationDate=1592439146000&api=v2) **缺点:** 同一个file group,clustering和更新操作无法同时执行(未来会支持)。 默认配置下,一个 file group 如果正在执行 clustering 动作 ,则无法对该 file group 进行更新操作。 也可以配置有更新时,回滚 clustering 操作。 ## Clustering clustering 除了包含异步 FileSizing 功能外,还支持对数据文件进行布局上的优化,基于 [Z-Order 和 Hilbert Space Filling Curves](https://hudi.apache.org/blog/2021/12/29/hudi-zorder-and-hilbert-space-filling-curves/) 实现多字段排序,达到更好的数据局部性以及更强的 data-skipping 能力(结合相关的z-order索引)。 ## Cleaning 清理没有被使用的旧版本文件来回收存储空间。 关键的配置: - **KEEP_LATEST_COMMITS** 保留最近N次 commit 的历史版本 - **KEEP_LATEST_FILE_VERSIONS** 为每个文件最多保留多少个版本 - **KEEP_LATEST_BY_HOURS** 基于时间做清理 cleaning会影响 time travel 功能,需要考虑与 storage cost 之间做一定的权衡。 ## Data Quality 写入时校验数据 ## Transforms 写入时对数据进行一层转换后再写入到 Hudi。 # 生态 ## 多云支持 Hudi 支持的云对象存储: - HDFS - AWS S3 - Google Cloud Storage - Alibaba Cloud OSS - Microsoft Azure - Tecent Cloud Object Storage - IBM Cloud Object Storage - Baidu Cloud Object Storage - JuiceFS - Orace Cloud Infrastructure ## 查询引擎支持 ### Read Support - Apache Flink - Apache Spark - Apache Hive - Apache Impala - Athena - Databricks Spark - Presto - RedShift - Trino - Doris/StarRocks ### Write Support - Apache Flink - Apache Spark - Apache Impala - Databricks Spark # Ongoing & RoadMap - fully lock free事务(基于日志、 crdt等) - Cache - [Consistent Hashing Index](https://github.com/apache/hudi/blob/master/rfc/rfc-42/rfc-42.md) ![](https://github.com/apache/hudi/raw/master/rfc/rfc-42/basic_bucket_hashing.png) - [Record Level Indexing](https://cwiki.apache.org/confluence/display/HUDI/RFC-08++Record+level+indexing+mechanisms+for+Hudi+datasets) ![](https://cwiki.apache.org/confluence/download/attachments/135860174/Screen%20Shot%202020-02-09%20at%205.01.17%20PM.png?version=1&modificationDate=1581296504000&api=v2) - Secondary Index - 生态:BigQuery集成、Kafka connect sink等 # 总结 ## Hudi特点 - 功能丰富,且高度可配置化(多种表类型、有无主键等)、可扩展(可插拔的索引模块、主键函数等),能够覆盖非常多的应用场景 (Hudi 有500多个不同的配置项)。 - 索引模块能够很好地支持更新操作 - file group 是一个比较核心的设计,可以实现按数据量作切分,分而治之,不同的 file groups 可以并行处理,相互不影响。 ## 与 LSM 模型对比 - **对有序数据集的支持** LSM 对有序数据支持地更好,因为数据在物理上是整体排序的。 - **场景一**:range scan操作 如查询主键的一段连续区间,LSM能快速定位到目标文件,且读取的数据在磁盘上是连续的。而 Hudi 可能需要读取多个 file group。 - **场景二**:查询一组完全随机的 uuid ,完全无序,LSM会命中较多的数据文件。而对于Hudi,由于数据写入和访问一般具有时间上的局部性,有可能只会命中最新的几个 file groups。 同理,当更新一组随机 uuid 时,LSM下该更新会与非常多的旧数据文件重叠,导致compaction的开销很大 - **写入开销** LSM写入和compaction时有排序的开销。Hudi没有,写入吞吐会更大,但merge的效率会低(内存占用也比较大)。 - **更新延迟** - LSM可以做到更低的更新延迟。 - Hudi需要先定位 file group(尤其 file group 较多时,极端情况下 key range 失效时需要检查每个 file group的索引,并且 bloom filter 的误判率也会变高,检查完索引后还需要文件里确认)。 - **数据规模** - Hudi基于 file group 按数据量做切分(一个 file group 在百兆级别) ,每个 file group 是自治的,可以独立 compaction,能支持更大的数据规模。 - LSM支持较小的数据规模。当数据量变大时,一次 compaction 的开销会很大。利用 bucket lsm 可以缓解,但需要用户定义分桶策略,以及可能会存在数据热点、分布不均匀的问题,每个bucket的数据量也比较难限制。 - 另外存储模型的选择,可能也与技术演进路线相关。比如Hudi 最初的起点是 HDFS + parquet files,再演进出事务性地COW(此时仍然只有parquet文件),以及后面率先提出 MOR table、索引等。 ## 与传统数仓对比 Hudi 只负责存储层,是引擎中立的,互操作性和生态更好,一份数据可以支持多个业务(可能使用不同引擎)的读写,能够节省成本。 Doris等传统数仓,查询引擎是定制的,底层存储可以暴露更多的细节给上层查询引擎,有更强的查询优化能力,生成更高效的查询计划,从而达到更好的查询性能。