#database/olap
[FLIP-188](https://cwiki.apache.org/confluence/display/Flink/FLIP-188%3A+Introduce+Built-in+Dynamic+Table+Storage)
# Motivation
streaming analytics:

用户如果需要关心中间数据,问题是 kafka 不可查询:双写

问题:
- 高理解成本
- 架构复杂
# Proposal
Fink dynamic table:
是一个逻辑概念,有两种不同的物理表示:changelog 和 table,但**只能定义其中的一种**。
引入FTS:支持 dynamic table 的存储,统一 changelog 和 table 两个形式。
特点:
- Flink SQL 内置存存
- subsecond 级别的流式写入和消费
- scan 吞吐能力强(列存格式)
- 支持不同的输入(Insert/Update/Delete)和 table 定义(有无主键)
# Public Interfaces
## INSERT
batch insert 是否需要向下游产生变更
## Configuration
**table-storage.log.consistency**
- **transactional**: checkpoint后数据才可读
- **eventual**:最终是正确的,表需要有主键
**change-tracking**
如果不需要消费changes,可以关闭 Change Tracking。
## Bucket
按照**主键**(没有主键使用 whole row)来计算 hash。
bucket 和并行能力相关:
- 写入:一个 bucket 同时只能支持一个写入并发。
- 读取:通常一个bucket只使用一个并发读取(除非bucket过大)
## Concurrent Write
写冲突使用分布式乐观锁机制。
对于 HDFS,使用 rename 来支持并发写入。
其他不支持 rename 的,需要 catalog lock。
## Retention
log 中过期的数据,在 FileStore 仍然可以找到。
# Design
包含两部分:LogStore 和 FileStore。
FileStore 是列存,LSM结构(支持按主键更新)。

## LogStore
默认使用 Kafka。
LogStore中的bucket就是 Kafka 分区。
使用 Open format:
- Key:json格式。
- Value:debezium-json
### Consistency & Visibility
默认 checkpoint 后可见。
设置立即可见:
- 定义主键
- log.consistency = eventual
- log.changelog-mode = upsert
当使用 upsert 模式时,下游消费任务会生成一个 **normalized node**,基于主键生成 update_before 消息。这个 node 会存储所有的 key-value。
## FileStore
### Overview
特点:
- LSM + 列存
- 快速更新和 data skipping
- 高压缩率 + 高性能分析
- Partition + Bucket
- data warehouse支持
- Consistency
- file management
- version control
**目录结构:**

- part:分区,DDL中 PARTITIONED BY 定义。
- bucket
- data file:抽象格式,支持 orc、parquet 和 avro。
record 格式:
- SequenceNumber
- ValueKind(add/delete)
- RowData: Key
- RowData: Value
**Meta file:**
- Manifest file
代表对table的一次change,record(DataFile) schema:
- data file name
- FileKind: add/delete
- partition
- bucket
- min/max key
- min/max sequence number
- statistics: data file size, row count
- Snapshot file
manifest文件的集合,代表 table 的一个快照(version)。
Record schema:
- manifest name
- low/upper partition:用于 partition pruning
- statistic: manifest file size, add file count, delete file count
### 写入流程
1. LSM Process
1. 写Memtable(内存中),每条数据有一个序列号
2. 当Memtable满了或者PrepareCommit,flush memtable(排序+去重),写入到远端文件
3. 异步线程执行 LSM compaction
2. Prepare Commit
1. Flush Memtable
2. Commit message:Delete Files 和 AddFiles
3. Global Commit
1. 获取 Old Snapshots,如果这个 checkpoint 已经被提交,则返回
2. 读取前一个 snapshot-`${i}`,写新manifest,生成新 snapshot-`${i+1}`
### Compaction
在 streaming sink(writer)模式下才会开启自动 compaction。
目前不会有单独的服务来负责compaction。
每一个LSM,只能有**一个** streaming writer,由它负责 compaction。
### 查询流程
1. Planner
1. 读取当前 snapshot,根据过滤条件裁剪 paritions,获取需要读的 manifests
2. 合并 manifest 中的 deleteFiles 和 addFiles,生成每个分区每个bucket的 file list
2. SplitEnumerator
1. 遍历需要读的 partitions,生成每个 bucket 的 SourceSplit
2. 根据过滤条件,裁剪 bucket 中的文件,输出每个 SourceSplit中 每层的 files。
3. Runtime Task
1. 获取需要读的 SourceSplit,生成 LSM 的 MergeIterator,读取数据