Flink State 状态原理解析 | 京东物流技术团队
一、Flink State 概念
State 用于记录 Flink 应用在运行过程中,算子的中间计算结果或者元数据信息。运行中的 Flink 应用如果需要上次计算结果进行处理的,则需要使用状态存储中间计算结果。如 Join、窗口聚合场景。
Flink 应用运行中会保存状态信息到 State 对象实例中,State 对象实例通过 StateBackend 实现将相关数据存储到 FS 文件系统或者 RocksDB 数据库中。在Flink应用运行过程中,通过 checkpoint 快照定期地保存状态数据。并在 Flink 应用重启时加载checkpoint/savepoint 来实现状态的恢复,从而让 Flink 应用继续完成之前的数据计算,实现数据精确一次向下游传递。
1.1 Apache Flink 中 State 的存储实现 StateBackend 分类
分为以下3类:
- 基于内存的 HeapStateBackend。状态存储在内存中。
- 基于 HDFS 或 OSS 的 FsStateBackend。状态存储在内存,并在做 cp(checkpoint)时存到远端。
- 基于 RocksDB 的 RocksDBStateBackend。将对象序列化成二进制存在内存和本地磁盘的 RocksDB 数据中,并在 cp 时存到远端。
HeapStateBackend 和 RocksDBStateBackend 分别对应在 TaskManager 内存模型中的位置:
RocksDBStateBackend 中存储结构:
namespace: 在不同的 namespace 下存在相同名称的状态。
1.1.1 State 状态持久化
通过 Chandy-Lamport 分布式快照算法进行 checkpoint 完成状态数据的持久化。然后在 Flink 应用重启时读取 State 状态数据,进行运行现场的还原。
chekcpoint 分类:
- 基于内存的全量 checkpoint
- HDFS 全量 checkpoint
- RocksDB 全量 checkpoint/增量 checkpoint
1.2 State 基于算子和数据分组的分类
State 可分为 Operator State 和 Keyed State 两类。
- Operator State(称为 non-keyed state)
常常存在于Source, Sink中。具体实现类例如:
- BroadcastState
例:Kafka Source 中用 OperatorState 记录 offset。
- Keyed State
任何类型的 keyed state 都可以有有效期(TTL),所有状态类型都支持单元素的 TTL。 这意味着 List 元素和 Map 映射元素将独立到期。
例:SQL GroupBy/PartitionBy 后的窗口中的数据,每个 key 都有对应的 State。key 与 key 之间的 State 数据不可见。
keyed state 的具体实现类:
- ValueState
- MapState
- ListState
- AggregatingState
- ReducingState
- 。。。。。
Flink State思维导图:
| Keyed State | Operator State |
---|---|---|
适用算子类型 | 只适用于KeyedStream上的算子 | 可用于所有算子 |
状态分配 | 每个Key对应一个状态 | 一个算子子任务对应一个状态 |
横向扩展 | 状态随着keyBy的分组KeyGroup自动在多个算子子任务上迁移 | 有多种状态重新分配的方式 |
创建和访问方式 | 自定义算子(重写RichFunction,通过State 名称从 getRuntimeContext方法创建或获得 State ) | 实现 CheckpointedFunction 等接口 |
支持数据结构 | ValueState、ListState、MapState等 | ListState、BroadcastState等 |
二、常见状态相关处理流程
2.1 Flink 应用中状态是如何存储的?
1. Kafka Source 如何存储 OperatorState?
class FlinkKafkaConsumerBase { private transient ListState<Tuple2<KafkaTopicPartition, Long>> unionOffsetStates; // state名称:"topic-partition-offset-states" // 特殊的State类型:Union State }
unionOffsetStates这个变量就是 OperatorState类型的。
2. Map算子如何存储需要累计的数据?
- ValueState/MapState/ListState/......
思考:keyby 后的数据分发与多并行度 subtask 之间的关系是怎样的?
首先,datastream 中数据经过 keyby 之后,会划分到各个 KeyedStream 中。每个 KeyedStream 有自己的 KeyedState(如ValueState/ListState/MapState)。
其次,KeyedStream 中的数据会以 KeyGroup 方式组织在一起。KeyGroup 是 Flink 重新分发 key state 的最小单元。
最后,KeyGroup 中的数据会通过取模最大并行度的方式分散到各个 subtask 中。以下是关键源码:
KeyGroupStreamPartitioner#selectChannel(record) { K key; key = keySelector.getKey(record.getInstance().getValue()); return KeyGroupRangeAssignment.assignKeyToParallelOperator( key, maxParallelism, numberOfChannels); } --KeyGroupRangeAssignment#assignKeyToParallelOperator() { return computeOperatorIndexForKeyGroup(maxParallelism, parallelism, assignToKeyGroup(key, maxParallelism)); } --KeyGroupRangeAssignment#computeOperatorIndexForKeyGroup() 公式:OperatorIndex = keyGroupId * parallelism / maxParallelism --KeyGroupRangeAssignment#assignToKeyGroup() { return computeKeyGroupForKeyHash(key.hashCode(), maxParallelism); }
2.2 修改并行度场景时 State 状态存储的变化
2.3 State 与 Checkpoint 关系
分布式快照 Checkpoint 的概念,定期将 State 持久化到 外部存储系统(HDFS/OSS) 上。用户可以通过实现 CheckpointedFunction 接口来使用 operator state。通过 barrier 来对齐 checkpoint,等待 State 持久化完成(此过程参数不同也可能是异步的)。
常见 State 与 CP 相关的问题:
- State 状态过大。现象为多个算子或单个算子多个 subtask 做 checkpoint 慢,可导致 CP 对齐时间长,严重时会导致 CP 超时。
- 数据倾斜导致某个 subtask 处理不及时。现象为单个算子少数几个 subtask 做 checkpoint 慢,导致 CP 对齐时间长。严重时会导致 CP 超时。
- 大作业(并行度搞)频繁做 CP,会频繁上传小文件,导致 HDFS 集群小文件过多。
常用解决措施:调大托管内存大小。
三、参考文档:
- Flink State 官方文档:Flink 状态与容错
- https://cloud.tencent.com/developer/article/1403939
- https://www.modb.pro/db/81206
作者:京东物流 吴云涛
来源:京东云开发者社区 自猿其说Tech 转载请注明来源

低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
- 上一篇
鸿蒙开发丨设备内UIAbility的几种交互方式
本文分享自华为云社区《设备内UIAbility交互:无缝体验与高级技巧》,作者: 柠檬味拥抱。 UIAbility组件间交互(设备内) 在设备内,UIAbility(用户界面能力)是系统调度的最小单元,它们负责展示用户界面和执行相关的业务逻辑。设备内的不同功能模块之间的交互是应用程序开发中的重要部分。本文将探讨设备内UIAbility之间的交互方式,包括启动应用内的UIAbility、启动其他应用的UIAbility以及通过Call调用实现UIAbility交互。 启动应用内的UIAbility 在一个应用内部存在多个UIAbility时,可能需要从一个UIAbility启动另一个UIAbility。例如,在支付应用中,从主界面UIAbility启动收款UIAbility。下面是一个示例代码,演示了如何在EntryAbility中启动FuncAbility: let wantInfo = { deviceId: '', // deviceId为空表示本设备 bundleName: 'com.example.myapplication', abilityName: 'FuncAbi...
- 下一篇
SLS支持高精度时间戳和全局排序
引言 随着数字化浪潮下企业数字化转型进程的不断加速,以及云原生趋势下可观测性理念的逐渐普及,企业的日志数据来源越来越丰富, 数据规模也正在快速增长,为了高效处理分析这些数据, 日志的集中管理越来越有必要, 数据集中收集之后, 在需要的时候再查询和分析以充分挖掘这些数据的价值。 现在许多业务场景,对于高精度时间戳的需求越来越强烈,很多已经从秒、毫秒,变成了微秒和纳秒。 因为除了日志内容本身是一种重要信息之外,日志之间的相对顺序也是因果关系的一种反映,某些场景下如果日志内容完全相同,但是日志间的顺序错乱了反映出来的结果可能和真实世界里面的事件完全相反。接下来我们简要介绍下对日志顺序一致性有强烈需求的业务场景。 有序业务场景 随着云原生时代的到来,越来越多的业务对全局有序有强烈依赖,下面是几种常见的业务场景。 Trace 在现代IT系统中,尤其是云原生、微服务系统,一次外部请求往往需要多个内部服务、多个中间件和多台机器的相互调用才能完成。在这一系列的调用中,任何阶段出现的问题都可能导致外部服务失败或延迟升高,影响用户体验。如果您想要精确定位及分析问题,需要使用分布式链路追踪技术。 分布式链路...
相关文章
文章评论
共有0条评论来说两句吧...
文章二维码
点击排行
推荐阅读
最新文章
- SpringBoot2整合Redis,开启缓存,提高访问速度
- SpringBoot2整合MyBatis,连接MySql数据库做增删改查操作
- CentOS8,CentOS7,CentOS6编译安装Redis5.0.7
- MySQL8.0.19开启GTID主从同步CentOS8
- Mario游戏-低调大师作品
- Linux系统CentOS6、CentOS7手动修改IP地址
- Docker安装Oracle12C,快速搭建Oracle学习环境
- Docker使用Oracle官方镜像安装(12C,18C,19C)
- CentOS7安装Docker,走上虚拟化容器引擎之路
- Docker快速安装Oracle11G,搭建oracle11g学习环境