#database/olap [FLIP-188](https://cwiki.apache.org/confluence/display/Flink/FLIP-188%3A+Introduce+Built-in+Dynamic+Table+Storage) # Motivation streaming analytics: ![|600](https://cwiki.apache.org/confluence/download/attachments/191335084/image2021-10-20_15-22-9.png?version=1&modificationDate=1634714531000&api=v2) 用户如果需要关心中间数据,问题是 kafka 不可查询:双写 ![|600](https://cwiki.apache.org/confluence/download/attachments/191335084/image2021-10-20_15-22-26.png?version=1&modificationDate=1634714548000&api=v2) 问题: - 高理解成本 - 架构复杂 # Proposal Fink dynamic table: 是一个逻辑概念,有两种不同的物理表示:changelog 和 table,但**只能定义其中的一种**。 引入FTS:支持 dynamic table 的存储,统一 changelog 和 table 两个形式。 特点: - Flink SQL 内置存存 - subsecond 级别的流式写入和消费 - scan 吞吐能力强(列存格式) - 支持不同的输入(Insert/Update/Delete)和 table 定义(有无主键) # Public Interfaces ## INSERT batch insert 是否需要向下游产生变更 ## Configuration **table-storage.log.consistency** - **transactional**: checkpoint后数据才可读 - **eventual**:最终是正确的,表需要有主键 **change-tracking** 如果不需要消费changes,可以关闭 Change Tracking。 ## Bucket 按照**主键**(没有主键使用 whole row)来计算 hash。 bucket 和并行能力相关: - 写入:一个 bucket 同时只能支持一个写入并发。 - 读取:通常一个bucket只使用一个并发读取(除非bucket过大) ## Concurrent Write 写冲突使用分布式乐观锁机制。 对于 HDFS,使用 rename 来支持并发写入。 其他不支持 rename 的,需要 catalog lock。 ## Retention log 中过期的数据,在 FileStore 仍然可以找到。 # Design 包含两部分:LogStore 和 FileStore。 FileStore 是列存,LSM结构(支持按主键更新)。 ![|600](https://cwiki.apache.org/confluence/download/attachments/191335084/image2021-10-20_15-33-6.png?version=1&modificationDate=1634715187000&api=v2) ## LogStore 默认使用 Kafka。 LogStore中的bucket就是 Kafka 分区。 使用 Open format: - Key:json格式。 - Value:debezium-json ### Consistency & Visibility 默认 checkpoint 后可见。 设置立即可见: - 定义主键 - log.consistency = eventual - log.changelog-mode = upsert 当使用 upsert 模式时,下游消费任务会生成一个 **normalized node**,基于主键生成 update_before 消息。这个 node 会存储所有的 key-value。 ## FileStore ### Overview 特点: - LSM + 列存 - 快速更新和 data skipping - 高压缩率 + 高性能分析 - Partition + Bucket - data warehouse支持 - Consistency - file management - version control **目录结构:** ![|400](https://cwiki.apache.org/confluence/download/attachments/191335084/image2021-10-20_15-33-39.png?version=1&modificationDate=1634715220000&api=v2) - part:分区,DDL中 PARTITIONED BY 定义。 - bucket - data file:抽象格式,支持 orc、parquet 和 avro。 record 格式: - SequenceNumber - ValueKind(add/delete) - RowData: Key - RowData: Value **Meta file:** - Manifest file 代表对table的一次change,record(DataFile) schema: - data file name - FileKind: add/delete - partition - bucket - min/max key - min/max sequence number - statistics: data file size, row count - Snapshot file manifest文件的集合,代表 table 的一个快照(version)。 Record schema: - manifest name - low/upper partition:用于 partition pruning - statistic: manifest file size, add file count, delete file count ### 写入流程 1. LSM Process 1. 写Memtable(内存中),每条数据有一个序列号 2. 当Memtable满了或者PrepareCommit,flush memtable(排序+去重),写入到远端文件 3. 异步线程执行 LSM compaction 2. Prepare Commit 1. Flush Memtable 2. Commit message:Delete Files 和 AddFiles 3. Global Commit 1. 获取 Old Snapshots,如果这个 checkpoint 已经被提交,则返回 2. 读取前一个 snapshot-`${i}`,写新manifest,生成新 snapshot-`${i+1}` ### Compaction 在 streaming sink(writer)模式下才会开启自动 compaction。 目前不会有单独的服务来负责compaction。 每一个LSM,只能有**一个** streaming writer,由它负责 compaction。 ### 查询流程 1. Planner 1. 读取当前 snapshot,根据过滤条件裁剪 paritions,获取需要读的 manifests 2. 合并 manifest 中的 deleteFiles 和 addFiles,生成每个分区每个bucket的 file list 2. SplitEnumerator 1. 遍历需要读的 partitions,生成每个 bucket 的 SourceSplit 2. 根据过滤条件,裁剪 bucket 中的文件,输出每个 SourceSplit中 每层的 files。 3. Runtime Task 1. 获取需要读的 SourceSplit,生成 LSM 的 MergeIterator,读取数据