作者:竹恩、岁月、不周
关键词:Kafka ETL,高弹性、免运维、低成本
引言: 阿里云消息队列 Kafka 版提供兼容 Apache Kafka 生态的全托管服务,彻底解决开源产品长期的痛点,是大数据生态中不可或缺的产品之一。随着 Kafka 越来越流行,最初只是作为简单的消息总线,后来逐渐成为数据集成系统,Kafka 可靠的传递能力让它成为流式处理系统可靠的数据来源。在大数据工程领域,Kafka 在承接上下游、串联数据流管道方面发挥了重要作用,Kafka 应用流式框架处理消息也逐渐成为趋势。
消息流处理框架选型
说到流计算,常用的便是 Storm、Spark Streaming 和 Flink,目前这些框架都已经完美的支持流计算,并且都有相应的使用案例,但这些框架使用起来门槛相对较高,首先要学习框架和各种技术、规范的使用,然后要将业务迁移到这些框架中,最后线上使用、运维这些流计算框架,对于简单的流处理应用来说,可能较为复杂。
![]()
在与传统流处理系统对接中,由于所有的数据基础都要从一个系统流入 Kafka 然后再流入到另一个系统中,以至于引发 Kafka 社区的思考:与其把数据从一个系统传递到下一个系统中做处理,为何不自己实现一套流处理框架呢?基于这个考量,从 0.10.0 版本开始,Kafka 不仅为每一个流行的流式处理框架提供了可靠的数据来源,还提供了一个强大的流式处理类库 Kafka Streams,并将其作为客户端类的一部分。这样,开发人员就可以在应用程序里读取、处理和生成事件,而不需要再依赖外部的处理框架。
但由于 Kafka Streams 本身是一个 Java 客户端库,需要开发人员自行打包和部署;同时 Kafka Streams 是开源版本,可靠性和可用性不能得到很好的保障,也不能实现按需使用;此外使用过程中需要用到流的编程,使用的门槛也相对较高。
![]()
消息流处理框架主要面临的问题
通过前面对常见的消息流处理的介绍,不论是传统的流处理架构还是 Kafka Streams,对于开发人员来说都会面临一些问题,尤其是在面对 70% 以上简单流场景的需求,原有的方案弊端被不断放大,客户仍然需要投入较大的人力成本和较高的资源,同时整个架构也很复杂。总体来说,目前面临主要是四个方面的问题:
1、运维成本较大,研发团队自行编写代码,后期持续维护,运维成本较大;
2、技术成本较大,对于很多轻量或简单计算需求,需要进行技术选型,引入一个全新组件的技术成本过高;
3、学习成本不可预期,在某组件选定后,需要研发团队进行学习并持续维护,这就带来了不可预期的学习成本;
4、自行选用开源组件后,可靠性和可用性不能得到有效保证。
![]()
面对这些问题,阿里消息队列 Kafka 也推出了相应的解决方案:Kafka ETL。
阿里云的解决方案 - Kafka ETL
Kafka ETL 简介
阿里云消息队列 Kafka 版推出更低成本的 Kafka –ETL 组件,是一款免运维的流计算组件,主要特性是支持配置化流式处理消息。Kafka ETL 组件主要提供的是非时间窗口相关的流计算服务,客户可以配置,甚至简单写入几行代码就能满足包括格式转换、内容富化、本地聚合、路由分发等常用的数据处理需求。
Kafka ETL 在使用上拆分成有向无环图,在计算节点转换时,把 Topic 作为一个存储,在 Topic 里进行有状态的计算,还可以支持消息的转储。
![]()
目前 Kafka ETL 已支持的模版包括:
1)数据清洗:规则过滤;
2)转换模版:字符串替换,添加前后缀、字符串大小写转换、空格去除数;
3)数据富化模版:数据富化;
4)Split 模版:Topic Split;
5)路由模版:Topic 路由。
Kafka ETL 优势
通过对 Kafka ETL 基础应用及功能的介绍可以看到,相比于 Storm、Spark Streaming、Flink、Kafka Streams,Kafka ETL 的优势主要体现在以下四个方面:
1)开箱即用,免运维;
2)节省成本,不用额外购买其他流计算产品,目前 Kafka ETL 仍处于公测免费阶段;
3)低代码,支持快速上线,学习成本低,一站式体验,技术投入小,时间成本节省 80%;
4)便于监控排查,控制台上相关日志信息比较全面。
![]()
Kafka ETL 操作
通过以上 Kafka ETL 应用和优势的介绍可以看到 Kafka ETL 在使用中的具备轻量、低成本等特性,不仅如此,Kafka ETL 的操作也比较简单,仅需三步便可完成 ETL 操作。
1)第一步:创建任务
选择 Kafka 来源实例、来源 Topic,以及对应的选择 Kafka 目标实例、目标 Topic。并配置消息初始位置、失败处理以及创建资源方式。
![]()
![]()
2)第二步:编写 ETL 主逻辑
选择 Python 3 作为函数语言。这里提供了多种数据清洗、数据转化模板,比如规则过滤、字符串替换、添加前/后缀等常用函数。
![]()
3)第三步:设置任务运行、异常参数配置,并执行
![]()
Kafka ETL 应用场景
基于 Kafka ETL 的功能和优势,目前 Kafka ETL 主要应用在下面这些场景中:
1)转储场景,支持格式化数据,以方便数据进行转储;
2)流式处理场景,流式计算,支持消息的流式处理,主要提供的是非时间窗口相关的流计算服务;
3)实时行为计算场景,包括风控,金融,电商等需要实时行为计算场景;
4)还支持其他一些场景,包括实时报表,自动化运营场景等。
![]()
Kafka ETL 的架构解析
通过前三部分介绍,想必大家对阿里云 Kafka ETL 有了一定了解,本节的主要内容是对 Kafka ETL 的架构进行解析,帮助大家对 Kafka ETL 有更深入的理解。Kafka ETL 是基于 Kafka connect + 函数计算,为云上的用户提供一套数据流转和计算的一站式解决方案。
在当今的大数据、云计算时代,一个复杂的大型系统一般都会由许多处理特定任务的子系统构成。各个子系统一般会由不同的团队开发,因此,各系统中的数据在内容和格式上,存在天然的不一致性。当数据在各个子系统之间流转的时候,需要对数据进行格式处理,以消除各系统数据之间格式的不同。此外,还可能需要收集来自各个子系统中的异构数据,对采集到的数据做一些加工和计算,然后投递到数据仓库进行后续的数据分析。在这个过程中,可以抽象出两个典型场景:数据流转场景和数据计算场景。
数据流转场景主要面对的问题是,异构系统间数据如何流转?
数据计算场景主要面对的问题是,如何在数据流转过程中,进行数据的加工计算?
下面就展开对这两个主要场景进行介绍。
数据流转场景
在数据流转场景中,可能需要将各种关系型和非关系型数据库中的数据导入到数据仓库;或是将 mysql 的数据导入到 ElasticSearch,用来提高查询体验;此外一些数据还会导入到图形数据库。这些场景面临的主要问题是:
1)各种不同源之间的数据如何拷贝;
2)如何满足传递的实时性。
比如,mysql 里的一个变更,希望马上能在 ElasticSearch 中反映出来,不然就会导致后台数据变更了,用户却查不出最新的数据。除此之外,还需要保证数据拷贝的高可用、可伸缩性以及可扩展性。
为应对这一问题,传统的方案可能是:为各数据源之间都专门做一个数据拷贝工具。这种方案会带来以下问题:
1)首先是工作量问题,需要为每个场景都写一个专门的工具,工作量会非常大;
2)业务耦合严重,比如想监听价格变化,就需要在所有变化价格的业务里,都加上一个 producer。假设上层 schema 发生了变化,下层就需要修改代码,因此上层需要感知到所有下层的存在。
专门的工具看起来不太可行,那么,是否做一个完全通用的工具,让它支持任意数据源之间数据拷贝。这个听起来不错,但是实际却不可行,正因为它要求太通用了,很难去制定各种规范。
![]()
Kafka connect 正是为解决以上异构数据同步问题而生的。它解决的思路是在各个数据源之间加一层消息中间件,所有的数据都经过消息中间件进行存储和分发。这样做的好处有:
1)通过消息中间件做异步解耦,所有系统只用和消息中间件通信;
2)需要开发的解析工具数量,也从原来的 n 平方个,变成线性的 2*n 个。
Kafka connect 则用于连接消息系统和数据源,根据数据的流向不同,连接可以分为 source connector 和 sink connector。其原理也很简单,souce connector 负责解析来源数据,转换成标准格式的消息,通过 Kafka producer 发送到 Kafka broker 中。同理,sink connector 则通过 Kafka consumer 消费对应的 Topic,然后投递到目标系统中。在整个过程中,Kafka connect 统一解决了任务调度、与消息系统交互、自动扩缩容、容错以及监控等问题,大大减少了重复劳动。但是,如何将来源系统的数据解析成 message、或是将 message 解析成目标系统数据,这两件事情是需要根据不同的数据系统而做不同实现的。对于目前主流的系统,各大厂商均有提供相应的 connector 实现。
![]()
阿里云消息队列 Kafka 版就提供了全托管、免运维的 Kafka Connect,用于消息队列 Kafka 版和其他阿里云服务之间的数据同步。可以看到消息队列 Kafka 版支持了 Mysql source connector、OSS sink connector、MaxCompute sink connector 以及 FC sink connector 等主流的 connector。如果用户想要使用这些 connector 进行数据同步,只用在消息队列 Kafka 控制台的图形界面上做几个配置,就可以一键拉起 connector 任务。
![]()
数据计算场景
Kafka connect 解决了异构数据源之间数据同步的问题,虽然也提供了 transformer,解决部分数据转换需求,但是依旧缺乏实时计算能力。为应对以上场景的数据实时处理需求,市场上出现了许多优秀的处理工具,从最初的 Hadoop,Hive 到 Spark,Flink 以及 Kafka streams 等,都提供了对应的组件模块和上下游解决方案。
![]()
![]()
但这些处理方案中都存在或多或少的问题,主要的问题是:
- 首先处理框架比较重,占用资源多。比如当下流行的 Spark 和 Flink 都需要先搭建一个集群,集群本身运行起来就要不少资源。集群规模一般按照流量峰值配置,在大多数时候,资源是浪费的。
- 其次在诸多框架中,需要根据实际需求做技术选型,后期可能需要专门的团队或者人去运维,这个过程需要较大的学习成本和后期维护成本。
针对部分无状态的简单计算,函数计算或许是一个很好的选择。阿里云上的函数计算,是事件驱动的全托管计算服务。使用函数计算时,用户无需采购与管理服务器等基础设施,只需编写并上传代码即可。函数计算会帮助用户准备好计算资源,弹性地、可靠地运行任务,并提供日志查询、性能监控和报警等功能。可以看到,函数计算以简单易用的方式给用户的许多场景提供了计算能力。
![]()
阿里云消息队列 Kafka 版近期推出的 Kafka ETL 组件,通过 Kafka+Kafka connect+函数计算的架构,能够很好的应对数据转储+实时计算问题。具有轻量,学习成本低,开发周期短,资源动态伸缩,简单快速等优点。
Kafka+Kafka connect+函数计算的云原生数据应用解决方案,通过 Kafka connect 作为实时处理任务触发器,能够实时接收到新发送到消息队列集群的数据,然后转发到函数计算,触发实时数据处理任务的运行。在这个数据流转阶段,将大量异构系统中的数据以各种方式汇集到 Kafka 中,然后围绕 Kafka 为中心,做后续的处理。作为后续数据流转中的一环,Kafka connect 除了保障数据的实时性以外,还解决了任务调度、与消息系统交互、自动扩缩容、容错以及监控等问题,大大减少了重复劳动。数据到了函数计算以后,会自动触发用户自己编写的数据处理逻辑,对原始消息内容进行计算。最后,函数计算可以将加工完成的数据,投递到用户指定的目标端,例如投递回消息队列 Kafka,或者是投递到 Max compute 进行下一步的数据分析。以上所说的整个任务的配置、创建、运行,都只用通过云上的 Kafka 控制台图形页面进行操作即可完成。
![]()
应用场景详解
接下来一起来看一个 Kafka ETL 的应用示例。在这个示例中,用户的一个大致使用场景是这样的:从一个电商业务系统中,采集日志,存储到 Kafka 侧,然后需要对日志数据进行加工,最后将加工好的数据投递到两个目标端:一个是投递到 MaxCompute 进行数据分析;另一个是投递到 ElasticSearch 进行日志检索。
现在分节点来看,如何利用消息队列 Kafka 版来做这个事情:
1)第一步:采集原始日志到消息队列 Kafka 版的 Topic 中
这里可以使用一些比较成熟的开源组件例如 FileBeat、Logstash、Flume 等,将用户应用端的日志消息,投递到 Kafka 中。一般情况下,这个步骤会将原始的日志信息投递到 Kafka。当然这里也可以做一些简单的转换,但一般不这么做,而是保留一份原始信息,原始的日志可能来自各个关联的应用,内容和格式会存在些许差异。
在这个例子,订单应用中生成一条日志。日志中包含用户 Id、action、订单 Id 以及当前状态:
![]()
从支付应用中,又生成一条日志。日志中同样包含以上信息,只是格式上存在一些小差别。
![]()
这两条来自不同子系统的日志,都被采集到 Kafka 的一个 Topic,叫做 user_order_raw。这两条日志,最终对应这个 Topic 里的两条消息,key 均为 null,value 为日志的原始内容。可以看到,由于来源系统日志格式不一样,这个 Topic 里包含的这两条消息,消息格式上也存在一定差别。
2)第二步是对 Topic 中的消息,做简单的数据加工计算
数据到达 Kafka 的 Topic 之后,Kafka connect 会消费消息,并投递到函数计算中。数据到了函数计算后,需要对这个数据进行加工计算,计算的目标是抽取 UserId、Action、OrderId 以及 Status,并将数据都转换为大写字母。然后所有处理后的消息发往 MaxCompute 进行分析,此外还需要筛选 Action 为 pay 的所有消息发往 Elastic Search 中。
这个步骤,可以在 Kafka 控制台图形界面上创建 ETL 任务,用户选择数据来源 Topic:user_order_raw,然后写一段对数据的处理代码。这里,ETL 已经提供了部分模板,可以在模板的基础上,做稍许改动即可。
本示例的代码如下图所示。在这个例子中,用户需要写一段从不同格式的日志中,抽取UserId、Action、OrderId 以及 Status 的代码,然后将所有处理过的消息路由到目标 Topic。
![]()
3)最后一步,可以将处理完的消息,再次投递到目标端。
函数计算将处理完的消息,投递回 Kafka。经过这一步处理,所有消息被路由到目标Topic:user_order_processed,此时这个 Topic 中会包含两条消息,消息 key 为 null,value 如下所示:
![]()
另外,Action 为 pay 的消息还会被路由到 Topic: user_order_pay_info 中,此时这个 Topic 会包含一条消息,key 为 null,value 如下所示:
![]()
可以看到,此时的消息格式已经统一了。
这个例子中,将 Topic:user_order_processed 中所有处理完的订单相关消息,投递到 MaxCompute 中进行数据分析。将 Topic: user_order_pay_info 中的支付信息,投递到 ElasticSearch 中进行后续搜索。
这一步,可以一键创建相应的 Kafka connect 任务,将数据投递到相应的目标端。
总结一下上述整个过程。在这个示例中,所要做的仅仅是在消息队列 Kafka 控制台上配置一个 ETL 任务,写一小段处理代码即可。上述步骤中,第二步处理完数据之后可以不经过第三步投递回 Kafka,而是在处理完之后,直接路由到 MaxCompute 和 ES 中。在该例子中采用的方式是将处理完的数据再次发送回 Kafka 中,然后再投递到目标系统中。这种方式可以在 Kafka 端保留一份处理后的数据,用户还可以比较灵活地对这份数据做进一步处理或者继续投递到其他第三方系统中。
![]()
阿里云消息队列 Kafka 版的优势
最后,给大家额外分享一下阿里云上的消息队列 Kafka 在内核层面的差异化优势。阿里云上的消息队列 Kafka 版在发展过程中除了解决易用性和稳定性方面的问题以外,还做到了有区分度,并在内核层面做出自己的核心竞争力和优势。
阿里云消息队列 Kafka 版支持云存储和 Local 存储这两种存储引擎。其中 Local Topic 指的就是以 Kafka 原生的方式存储数据,保留开源 Kafka 全部特性,100%兼容开源 Kafka。云存储是接下来要介绍的重点,消息队列 Kafka 通过自研云存储引擎,彻底解决了原生 Kafka 一些深层的 bug,以及因为本身架构而难以解决的问题,实现了支持海量分区、通过多副本技术降低存储成本,以及支持无缝迁移弹缩性。接下来,将详细介绍这三大特性和其中的技术细节。
![]()
支持海量分区
在消息引擎中,常见的消息存储方式有碎片式存储和集中式存储。
碎片式存储通常以 Topic 或者分区纬度存储,其主要优势是架构简单,可以针对 Topic 或者分区,控制持久化的容量。Kafka 在架构上,是基于分区的碎片式存储,在分区规模不大的情况下,可以通过磁盘的顺序读写,获得高效的消息读写性能。通常情况下,一般规格的 Kafka 集群可以支持到千级别的分区规模。如果分区规模持续扩大,且大部分分区都有读写请求时,由于这种设计上的问题,原本的顺序读写就变成了随机读写,从而导致 Kafka 的读写性能急剧下降。
不同于碎片式存储,集中式存储则将所有消息集中存储到同一个 Commit Log,然后根据 Topic 和分区信息构建队列,队列通常作为索引使用。相比于碎片式存储,集中式存储的主要优势是,支持分区数多,很容易通过删除旧的 Commit Log 的形式控制磁盘水位。在阿里云消息队列 Kafka 中,底层的自研云存储引擎正是采用了集中式的存储方式,云存储引擎相比 Kafka 原生的存储的主要优势有:
1)解决了 Kafka 分区规模扩大时,性能急剧下降的问题,相比于原生 Kafka 千级别的分区规模,其支持的分区规模可以达到十万级别;
2) 在大量分区同时写的场景下,相比原生 Kafka 的碎片式存储,自研云存储引擎能获得更好的性能;同时,对写入耗时做了优化,减少了毛刺的产生。
![]()
多副本技术优化
为保证 Kafka 集群的高可靠和高可用性,通常情况下会为所有 Topic 设置 3 副本存储。这样,在出现机器宕机时,Kafka 可以快速从可用的 Follower 副本中选出新的 Leader,接替宕机机器上的 Leader 继续提供服务。消息队列 Kafka 在选择块存储设备时,选择的是阿里云上的云盘。云盘是阿里云为云服务器 ECS 提供的,数据块级别的块存储产品,具有低时延、高性能、持久性、高可靠等特点。云盘本身采用了分布式三副本机制,为 ECS 实例提供了极强的数据可靠性和可用性保证。
在这种背景下,在 Kafka 层面设置 3 副本,由于使用了云盘,实际会有 9 个副本。同时,由于 Kafka 层面的 Follower 需要主动从 Leader 同步数据,这也会消耗集群的计算和网络资源,将用户的业务流量扩大至 3 倍。但是,如果在 Kafka 层面设置单副本,由于 Kafka 本身不能利用到云盘的 3 副本能力,其高可用性就不能保证。因此,如何利用好云盘的 3 副本能力,降低的存储成本和网络成本,就成了面临的一大挑战。
阿里云通过接入自研云存储引擎,解决了存储成本和网络成本问题。其核心原理主要是:在自研存储引擎中引入了逻辑队列和物理队列两个概念。逻辑队列也就是暴露给用户的概念,在这里可以直接理解成客户端看到的 partition,而物理队列则用于实际存储数据。通过映射关系,将逻辑队列和物理队列绑定在一起。在自研引擎中,所有的分区在逻辑上都是单副本的。数据的可靠性和可用性由云盘底层的 3 副本机制保证。在正常情况下,发送到特定逻辑 partition 的数据,都会根据映射关系,写入到对应的物理队列中。同理,消费也是根据映射关系从实际的物理队列中拉取。
接下来来看云存储是如何做到容错和高可用的。例如,在节点 0 的 ECS 宕机时,可以通过 QueueMapper,秒级切换逻辑队列 0 的映射关系到节点 1 中的已有队列 Queue-3,或者新增一个物理队列 Queue-4。此时,发往逻辑队列-0 的消息,将被路由到Queue-3 或者 Queue-4 中。这种情况下,用户的发送业务不会受到影响,依旧可以继续发送成功,并且最新的消息也能被消费到。当然,在这种 Failover 期间,会存在一个问题:逻辑队列-0 在节点-0 上的消息,暂时不能消费;但是,对大多数应用场景来说,短暂的部分消息消费延迟并不是大问题,只要不影响发送就能满足要求。
在节点-0 的 ECS 宕机后,阿里云备用 ECS 会迅速生成新的机器替换节点-0,挂载原有云盘,分钟级时间内恢复节点-0 服务。在节点-0 恢复后,只用重新将逻辑队列-0 的映射关系切回 Queue-0,系统又重新恢复了原有状态。此时,发送/消费依旧能保持原生Kafka 的特性。
通过以上方式,将存储成本节省到原生 Kafka 的大约三分之一。同时,由于在 Kafka 层面,副本数是 1,从而避免了 Follower 从 Leader 中同步数据的操作,网络流量也节省到原生 Kafka 的大约三分之一。
![]()
水平扩容,秒级数据均衡
弹性扩缩能力是消息队列的核心能力之一。由于 Kafka 服务端节点是有状态的,因此新增了若干节点之后,需要重新均衡各个 Topic 的队列,使得客户端往集群中发送或是消费的流量,能均衡地打到后端各个服务节点上。
开源 Kafka 在水平扩展了机器之后,做数据均衡的主要方式有两种:
第一种是在新的 broker 中新增队列。这种方式主要的痛点是:
1)系统状态发生改变,这种情况下一些多语言客户端的早期版本,需要客户端主动重启,否则无法消费新分区;
2) 第二是 Kafka 设计上的问题,分片数无法下降,导致后续无法缩容。
![]()
第二种做法是数据迁移。这种方式的主要痛点是:
1)流量复制,产生网络风暴,干扰正常使用;
2)均衡与数据量有关,如果数据量巨大,可能要花费几天来迁移。
![]()
那么,云存储引擎是怎么解决以上弹缩问题的呢?
前文提到消息队列 Kafka 引入了两级队列:第一级为逻辑队列,第二级为物理队列,也就是阿里云自研云存储队列。逻辑队列对外暴露,物理队列则用于存储实际数据。通过 QueueMapper 模块维护逻辑队列与物理队列之间的映射关系,如下图所示。
一个逻辑队列可以由多个物理队列拼接而成,通过位点分段映射,保证顺序。扩容时,只需要将逻辑队列指向新机器上的物理队列即可,这样新写入的消息就可以根据新的映射关系,直接写入到新加的机器。同样的,在消费时,可以根据位点分段映射关系,找到实际的物理队列,然后从物理队列中读取消息。
可以看到,通过两级队列分段映射,解决了消息队列弹缩和迁移问题,具有如下优点:
1)服务端扩缩容后,不变更队列数量,保持系统状态不变;
2)扩缩容时无需迁移数据,耗时短,可以在秒级时间内完成 Topic 队列重新均衡;
3)兼顾了吞吐与扩展性,不影响原有消息队列的性能。
![]()
总结
简单对主要介绍内容进行总结,Kafka 在流式处理场景中,传统方案一般会采用 Storm、Spark Streaming、Flink 和 Kafka Streams 等流式处理框架,但是开发人员在使用的过程中会遇到不少问题,尤其是在面对 70% 以上简单流场景的需求,会遇到运维成本较高、技术成本较大、学习成本不可预期和可用性、可靠性较低等痛点问题,阿里云消息队列 Kafka 发布 Kafka ETL 组件,是一款免运维的流计算组件,通过 Kafka+Kafka connect+函数计算的架构,能够很好的应对数据转储+实时计算问题,具备免运维、低成本、低代码、易监控等优势。
目前 Kafka ETL 正处于免费公测阶段,欢迎大家体验试用。阿里云消息队列 Kafka 版具备内核层面的差异化优势,欢迎前往下方链接了解更多详情。
https://www.aliyun.com/product/kafka
如有更多问题,欢迎扫描加入 Kafka 钉钉交流群。
![]()
点击此处,即可查看直播回放了解更多详情!