ES 数据写入方式:直连 VS Flink 集成系统
ES 作为一个分布式搜索引擎,从扩展能力和搜索特性上而言无出其右,然而它有自身的弱势存在,其作为近实时存储系统,由于其分片和复制的设计原理,也使其在数据延迟和一致性方面都是无法和 OLTP(Online Transaction Processing)系统相媲美的。
ES 基础写入原理
ES 直连写入
- 不易于接入多机房容灾部署,目前 ES 容灾机房都属于独立部署,独立读写模式,所以如果采用该方式,则难以同时对多机房写入分别做管控,达不到容灾效果。Binlog-->Dsyncer 通常一个 MySQL Table 对应一个转换任务,如果为了写多机房起多个重复的转换任务,则显得有些愚笨。
- 如果自身业务场景有对同一条记录并发写场景,但写不一定全部来源于 Binlog 的情况下,那全局考虑直写 ES 则更容易遇到写入冲突问题,因为缺乏有序队列的保障。
通过 Flink 搭建 ES 集成系统
- 通过 MQ 可以更快捷的接入多机房 ES 集群,写入解耦,三机房分别起消费者写入数据, 彼此独立,当出现单机房故障时,只要有可用机房,直接处理读流量切流即可, 容灾方案简单清晰;
- 网络抖动等问题会导致 ES 暂时性写入失败时,不影响其他集群写入的情况下,RocketMQ 会暂存消息,Flink 会保存消费快照,不断重试直至成功, 更好的保障了数据最终一致性;
- 多数据源写入能保证全局分区一致性。
- 依赖了更多组件,会增加全链路数据同步延迟,而 ES 默认的 Refresh 频率是每秒一次,经测试该链路正常情况下数据延迟都是秒级的,不是完全不可接受;
- 依赖了更多组件,对基础组件的稳定性有更高的要求,RocketMQ 异常,或者 Flink 任务异常都会导致同步链路出现问题,增加一定的业务异常风险。
- Flink 运行环境:首先需要有 Flink 任务的运行环境,通常企业级的 Flink 任务会作为一个 YARN 作业在分布式系统中被调度并分配资源执行,但同时 Flink 也可作为单机进程,亦或搭建一个独立集群运行。
- ES 消息格式:需要约定一种 ES 消息传输格式和序列化方式,一套范式解决所有同步场景,目前流行的序列化方式是 pb 格式或 json 格式,目前我们都是推荐使用 pb 格式的,数据格式 Schema 定义:
字段名 | 值类型 | 必需/可选 | 描述 |
_index | string | 必需 | 文档要写入索引的名称或别名 |
_type | string | 必需/可选 | 文档的类型 |
_op_type | string | 必需 | 文档写入操作类型,取值范围: index, create, update, upsert , delete |
_id | string | 可选 | 文档 ID,不指定时写入 ES 会 自动生成,但同一条数据被重复消费写入 ES 会生成多个文档 |
_routing | string | 可选 | 文档 路由,不指定时默认使用 _id 字段值路由 |
_version | int64 | 可选 | 文档版本,指定时大于 0 且仅操作为 index/delete 有效,默认使用 external_gte 版本类型 |
_source | object | 必需/可选 | 文档内容,操作类型为 delete 时可不指定 |
_script | object | 可选 | 文档脚本,操作类型为 update/upsert 时有效,但和 _source 不能同时存在 |
syntax = "proto3"; message ESIndexInfo { string Name = 1; // 文档要写入索引的名称或别名 } enum ESOPType { // 文档写入操作类型 DELETE = 0; // 删除文档 INDEX = 1; // 创建新文档或更新老文档,只能全量更新 (替换老文档) UPDATE = 2; // 更新老文档,支持部分更新 (合并老文档) UPSERT = 3; // 创建新文档或更新老文档,支持部分更新 (合并老文档) CREATE = 4; // 创建新文档,存在时报错丢弃 } message ESDocAction { ESIndexInfo IndexInfo = 1; // 索引信息 (必需) ESOPType OPType = 2; // 操作类型 (必需) string ID = 3; // 文档 ID (可选) string Doc = 4; // 文档内容 (JSON 格式, 删除操作时不需要) int64 Version = 5; // 文档版本 (可选, 大于 0 且操作为 index/create/delete 有效) string Routing = 6; // 文档路由 (可选, 非空有效) string Script = 7; // 文档脚本 (JSON 格式, 操作类型为 update/upsert 有效,但和 Doc 不能同时存在) }
- Flink 任务必要配置:监听的 RocketMQ Topic 信息,写 ES 集群信息;
- Flink 执行函数:Flink 处理流式消息有流式 SQL 和自定义应用程序两种方式,流式 SQL 约束于本身的一些限制,比如不支持同一个 MQ 有多个索引消息,而自定义编程更加灵活,比如添加各种打点,日志,错误码处理等,推荐该方式;
- Flink 资源配置:JobManager 资源配置,TaskManager 资源配置等等;
- Flink 自定义参数配置:可以自定义一些与应用程序紧密相关的动态配置,方便动态调节 Flink 消费能力,比如:
参数名 | 用途 | 默认值 |
job.writer.connector.bulk-flush.max-actions | 单次 bulk 最大文档数,超过进行一次 flush (即执行一次 es 的 bulk 请求) | 默认 300 |
job.writer.connector.bulk-flush.max-size | 单次 bulk 最大字节数,超过进行一次 flush (即执行一次 es 的 bulk 请求) | 默认 10MB |
job.writer.connector.bulk-flush.interval | 两次 bulk 最大间隔,超过进行一次 flush (即执行一次 es 的 bulk 请求) | 默认 1000ms |
job.writer.connector.global-rate-limit | 全局写入限速值 | 默认 -1,不限速 |
job.writer.connector.failure-handler | 指定自定义失败处理器,比如处理4xx错误,5xx错误的方式不同,429总是无限重试等; | |
global_parallelism_num | flink 任务全局并发度 | rmq 是 queue/4,bmq/kafka 是 partition/3 |
max_parallelism_num | flink 任务最大并发度 | mq 的 queue/partition 的个数 |
checkpoint_interval | 创建 Checkpoint 的间隔,单位 ms (5min=300000) | 默认 15min |
checkpoint_timeout | 创建 Checkpoint 的超时时间,单位 ms (5min=300000) | 默认 10min |
rebalance_enable | 开启乱序消费 | 默认 false |
对比建议
写入方式 | 同步延迟 | 写入特性 | ES写入性能 | 消费者 | 容灾能力 |
直连 | 依赖组件少,延迟低 | Binlog 单 key 有序 | bulk写入 | FaaS | 较差 |
RocketMQ+Flink+ES | 依赖组件多,延迟较高/秒级 | 全局单 key 有序 | bulk写入 | Flink | 好 |

低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
- 上一篇
7个prompt小技巧,让你和AI的对话更有效
欢迎大家在 GitHub 上 Star 我们: 分布式全链路因果学习系统 OpenASCE:https://github.com/Open-All-Scale-Causal-Engine/OpenASCE 大模型驱动的知识图谱 OpenSPG:https://github.com/OpenSPG/openspg 大规模图学习系统 OpenAGL:https://github.com/TuGraph-family/TuGraph-AntGraphLearning 你会花多长时间去写prompt呢?也许很多人认为prompt仅仅是输入一些文本然后按下回车键那么简单,但实际情况并非如此,若想从AI聊天机器人获取准确且有价值的信息,prompt至关重要。那么如何正确构建prompt?应具体到何种程度?包含哪些信息?又该如何以有用方式提出数据请求?一个全新的提示工程领域正在兴起,其致力于如何打造和完善AI提示。但是你可以跳过那些棘手的部分,通过本文分享的技巧快速提升prompt的技能。 尽可能具体 使用ChatGPT或其他AI工具时,一个常见的错误是未提供足够具体的信息。若想让AI生成更精确的结...
- 下一篇
blazork8s v0.1.9 已经发布,Kubernetes 管理工具
blazork8s v0.1.9 已经发布,Kubernetes 管理工具 此版本更新内容包括: 更新内容 0.1.9 增加ingress详情页面展示Service、Pod功能 增加新页面,使得编辑Yaml、Doc查看两个功能合二为一 Workload资源增加对应的Service、Ingress关联展示功能 调整界面彩色Tag显示逻辑,相同的文字使用同一个颜色。不再频闪烁 扩缩容页面增加数值点击加减功能 增加集群级容量显示功能。 修复一些微小使用bug 完整的更新日志: https://gitee.com/weibaohui/blazork8s/compare/v0.1.8...v0.1.9 详情查看:https://gitee.com/weibaohui/blazork8s/releases/v0.1.9
相关文章
文章评论
共有0条评论来说两句吧...
文章二维码
点击排行
推荐阅读
最新文章
- CentOS6,7,8上安装Nginx,支持https2.0的开启
- CentOS8,CentOS7,CentOS6编译安装Redis5.0.7
- SpringBoot2更换Tomcat为Jetty,小型站点的福音
- Jdk安装(Linux,MacOS,Windows),包含三大操作系统的最全安装
- CentOS7,8上快速安装Gitea,搭建Git服务器
- SpringBoot2整合MyBatis,连接MySql数据库做增删改查操作
- SpringBoot2全家桶,快速入门学习开发网站教程
- CentOS8安装MyCat,轻松搞定数据库的读写分离、垂直分库、水平分库
- CentOS8编译安装MySQL8.0.19
- CentOS7,CentOS8安装Elasticsearch6.8.6