基于SLS+Blink的实时计算最佳实践
日志服务简介
阿里云的日志服务(SLS)是针对日志类数据的一站式服务,无需开发就能快捷完成海量日志数据的采集、消费、投递以及查询分析等功能,提升运维、运营效率。在采集端支持30多种写入方式,包括自研的客户端Logtail,开源软件如Logstash、Fluent,Flume,Beats等,各种语言的SDK/Producer,无论是嵌入式设备、网页、服务器、程序等都能轻松接入。在消费端,支持与Storm、Spark Streaming、Flink/Blink等大数据系统无缝对接。
阿里云实时计算(Blink)
阿里云实时计算是基于Apache Flink构建的一站式、高性能实时大数据处理平台,广泛适用于流式数据处理、离线数据处理等场景。阿里云实时计算提供了如下两种数据处理API:
- Flink SQL:通过DDL的方式定义Source和Sink,用SQL来实现数据的处理。
- Flink Datastream: 在程序中使用各个Source和Sink的SDK,通过提交jar的方式运行托管的Flink Job。
通过这两种API,既可以把SLS作为数据源(Source),实现日志端到端的实时采集与处理,也可以把SLS作为结果的输出目标(Sink),在SLS对结果实时查询分析,配置可视化报表等。
Flink SQL
Flink SQL是阿里云实时计算为了简化计算模型、降低用户使用实时计算门槛而设计的一套符合标准SQL语义的开发语言。
创建SLS源表
SLS源表对应SLS中的Logstore,表中的字段与Logstore中日志字段一一映射,像执行SQL一样流式处理SLS中的数据。除了Logstore所属的Region对应的Endpoint和Project之外,还需要具有读SLS Logstore权限的Access Key以及消费数据起始位置对应的时间点。
create table sls_stream( a INT, b INT, c VARCHAR ) with ( type ='sls', endPoint ='http://cn-hangzhou-share.log.aliyuncs.com', accessId ='<yourAccessId>', accessKey ='<yourAccessKey>', startTime = '2017-07-05 00:00:00', project ='<yourProjectName>', logStore ='<yourLogStoreName>', consumerGroup ='<yourConsumerGroupName>' );
Checkpoint
SLS的底层存储Loghub,是一个类似Kafka的Append Only的存储系统,覆盖Kafka 100%的功能。与Kafka的partition类型,Logstore中的数据存储在每个Shard中。每个Shard都可以通过cursor或者时间戳,确定日志在Shard中的存储位置(对应Kafka中的Offset)。在消费过程中,为了支持程序重启时尽可能少的数据重复,需要将最新消费位置记录下来,用于进程重启之后继续消费。这个位置就是我们所说的checkpoint。
目前,Flink SQL任务checkpoint保存在Flink的State中,如果任务Failover 或者暂停之后恢复,会从State中恢复消费位置继续消费。然而如果任务重启或者其他原因造成的State丢失,任务将从启动时指定的时间点开始消费。创建源表时建议指定参数consumerGroup,Blink将自动同步消费位置到SLS服务端,以用于在SLS控制台监控消费进度。注意:任务重启时不会使用SLS服务端的消费位置恢复。
创建SLS结果表
通过定义结果表,通过INSERT语句,把从源表中处理之后的数据写入到SLS的Logstore中。
create table sls_output( `name` VARCHAR, age BIGINT, birthday BIGINT )with( type='sls', endPoint='http://cn-hangzhou-corp.sls.aliyuncs.com', accessId='<yourAccessId>', accessKey='<yourAccessKey>', project='<yourProjectName>', logstore='<yourLogstoreName>' ); INSERT INTO sls_output SELECT age, birthday FROM source_table;
Flink Datastream
Flink SQL 的不足
- Flink SQL已经能够实现许多场景下的数据处理需求,但是受限于SQL的表达能力,对于比较复杂的业务场景,SQL实现起来比较复杂。
- 对于DDL 的定义,需要指定固定的字段列表,对于日志场景而言,日志字段不固定的情况非常普遍,这就意味着很难提前定义好全部的字段。
- 在日志中,很多时候单个字段是JSON或者其他复杂的形式,如果要在SQL里面解析和处理这类字段,不如在程序中之间处理起来灵活。
对于这些场景,可以考虑使用Data Stream API,通过自定义程序,实现更复杂的业务逻辑。
Maven 依赖
SLS开发了一个与开源Flink集成的SDK,同样适用于Blink。
<dependency> <groupId>com.aliyun.openservices</groupId> <artifactId>flink-log-connector</artifactId> <version>0.1.19.1</version> </dependency>
Github源码:https://github.com/aliyun/aliyun-log-flink-connector
消费SLS示例
public class ConsumerSample { private static final String SLS_ENDPOINT = ""; private static final String ACCESS_KEY_ID = ""; private static final String ACCESS_KEY_SECRET = ""; private static final String SLS_PROJECT = ""; private static final String SLS_LOGSTORE = ""; public static void main(String[] args) throws Exception { final ParameterTool params = ParameterTool.fromArgs(args); final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.getConfig().setGlobalJobParameters(params); env.setParallelism(1); env.enableCheckpointing(5000); env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); env.setStateBackend(new FsStateBackend("your checkpoint dir")); Properties configProps = new Properties(); configProps.put(ConfigConstants.LOG_ENDPOINT, SLS_ENDPOINT); configProps.put(ConfigConstants.LOG_ACCESSSKEYID, ACCESS_KEY_ID); configProps.put(ConfigConstants.LOG_ACCESSKEY, ACCESS_KEY_SECRET); configProps.put(ConfigConstants.LOG_MAX_NUMBER_PER_FETCH, "10"); configProps.put(ConfigConstants.LOG_CONSUMER_BEGIN_POSITION, Consts.LOG_FROM_CHECKPOINT); configProps.put(ConfigConstants.LOG_CONSUMERGROUP, "23_ots_sla_etl_product1"); configProps.put(ConfigConstants.LOG_CHECKPOINT_MODE, CheckpointMode.ON_CHECKPOINTS.name()); configProps.put(ConfigConstants.LOG_COMMIT_INTERVAL_MILLIS, "10000"); FastLogGroupDeserializer deserializer = new FastLogGroupDeserializer(); DataStream<FastLogGroupList> stream = env.addSource( new FlinkLogConsumer<>(SLS_PROJECT, SLS_LOGSTORE, deserializer, configProps)); stream.flatMap((FlatMapFunction<FastLogGroupList, String>) (value, out) -> { for (FastLogGroup logGroup : value.getLogGroups()) { int logCount = logGroup.getLogsCount(); for (int i = 0; i < logCount; i++) { FastLog log = logGroup.getLogs(i); // processing log } } }); stream.writeAsText("log-" + System.nanoTime()); env.execute("Flink consumer"); } }
Checkpoint
与SQL类似,SDK也会把checkpoint保存到Flink State中,此外,还支持把Checkpoint同步到SLS服务端,这样当Flink本地的State无法恢复时,还能从服务端获取checkpoint,从而保证即便任务重启甚至重建,只要服务端的消费组没有删除,checkpoint就不会丢失。SDK同步checkpoint支持如下策略:
- 与Flink snapshotState同步,即在Flink调用snapshotState时,同步到服务端。默认是这种方式。
- 自动提交checkpoint,即定时提交checkpoint到服务端。好处是当Flink下游任务不支持exactly once时最大程度上避免数据重复。
- 不同步到SLS服务端。
处理结果写入SLS示例
public class ProducerSample { private static final String SLS_ENDPOINT = "cn-hangzhou.log.aliyuncs.com"; private static final String ACCESS_KEY_ID = ""; private static final String ACCESS_KEY = ""; private static final String SLS_PROJECT = ""; private static final String SLS_LOGSTORE = "test-flink-producer"; public static void main(String[] args) throws Exception { final ParameterTool params = ParameterTool.fromArgs(args); final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.getConfig().setGlobalJobParameters(params); env.setParallelism(3); DataStream<String> stream = env.addSource(new EventsGenerator()); Properties configProps = new Properties(); configProps.put(ConfigConstants.LOG_ENDPOINT, SLS_ENDPOINT); configProps.put(ConfigConstants.LOG_ACCESSSKEYID, ACCESS_KEY_ID); configProps.put(ConfigConstants.LOG_ACCESSKEY, ACCESS_KEY); configProps.put(ConfigConstants.LOG_PROJECT, SLS_PROJECT); configProps.put(ConfigConstants.LOG_LOGSTORE, SLS_LOGSTORE); FlinkLogProducer<String> logProducer = new FlinkLogProducer<>(new SimpleLogSerializer(), configProps); logProducer.setCustomPartitioner(new LogPartitioner<String>() { @Override public String getHashKey(String element) { String hash = ""; try { MessageDigest md = MessageDigest.getInstance("MD5"); md.update(element.getBytes()); hash = new BigInteger(1, md.digest()).toString(16); } catch (NoSuchAlgorithmException ignore) { } StringBuilder builder = new StringBuilder(); while (builder.length() < 32 - hash.length()) { builder.append("0"); } builder.append(hash); return builder.toString(); } }); stream.addSink(logProducer); env.execute("Flink producer"); } public static class EventsGenerator implements SourceFunction<String> { private volatile boolean running = true; @Override public void run(SourceContext<String> ctx) throws Exception { long seq = 0; while (running) { Thread.sleep(10); ctx.collect((seq++) + "-" + RandomStringUtils.randomAlphabetic(12)); } } @Override public void cancel() { running = false; } } }
任务监控
Metric监控
对于Flink SQL任务,Blink控制台提供了非常完备的监控报表,可以通过这些报表观察任务运行的状态,如延迟,内存状态,Failover等。
Metric解释可以参考Blink作业运维相关文档:https://help.aliyun.com/document_detail/62482.html
注意:SLS Datastream SDK暂时还没有实现Metric上报。
消费进度与监控报警
在SLS查看消费组对应的消费进度。
SLS对于每个消费组会定时输出延迟日志,根据这个延迟日志结合SLS的告警,可以用于监控消费延迟。参考如何开通消费组日志:https://help.aliyun.com/document_detail/85663.html
任务日志采集
Blink支持把作业日志存储到SLS中,在作业编辑页面右侧的作业参数页面,配置Log4j appender:
log4j.logger.org.apache.hadoop=OFF log4j.appender.loghub = com.alibaba.blink.log.loghub.BlinkLogHubAppender log4j.appender.loghub.Threshold = ERROR log4j.appender.loghub.projectName = <your SLS Project> log4j.appender.loghub.logstore = <your SLS Logstore> log4j.appender.loghub.endpoint = <your SLS Endpoint> log4j.appender.loghub.accessKeyId = <your Access Key ID> log4j.appender.loghub.accessKey = <your Access Key Secret>
常见问题
部分task没有读到数据
从SLS消费数据的本质就是从每个shard的指定位置开始消费,直到最新位置。然后单个shard不支持并发消费,也就是说一个shard最多被一个task消费到。而不管是SQL还是Data Stream,Shard分配的方式都是对shard本身的信息做hash,然后对task总数取模来分配的。假设某个shard hashcode是x,task 个数为y,当前task的id为z,那么仅当 x%y=z 时,当前task会消费这个shard。因此可能存在某个task没有分配到任何shard的情况。
消费太慢导致数据堆积
数据堆积的根本原因是写入速度超过了消费速度,而如何提高消费速度取决于具体的场景。常见的原因有:
- 下游处理节点慢导致source节点反压。这种情况可以通过观察Blink的反压状态是否是HIGH来确认,通过优化下游任务节点速度来解决。
- Shard个数太少。当shard数量少于task个数时,无疑会造成部分task空跑的现象,此时增加task个数已经对总体的并发没有任何影响,此时可以尝试在SLS侧分裂shard个数解决。
消费组在多个作业之间没有产生分配shard的效果
Blink消费SLS的数据并没有使用消费组来实现均衡消费的效果,提供的消费组名称仅仅是用于在SLS服务端保存消费位点。如果需要类似Kafka 消费组的功能,应使用SLS的Consumer Library。参考文档 https://help.aliyun.com/document_detail/28998.html
Flink Datastream任务没有同步checkpoint到SLS服务端
检查Blink任务作业参数中的 blink.checkpoint.interval.ms 是否设置过大。
更多资料
Blink开发Datastream作业:https://help.aliyun.com/document_detail/111876.html
定义SLS源表:https://help.aliyun.com/knowledge_detail/62521.html
定义SLS结果表:https://help.aliyun.com/knowledge_detail/62529.html
低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
- 上一篇
导入MaxCompute数据到日志服务实战
简介 日志服务(Log Service,简称 SLS)是针对日志类数据的一站式服务,在阿里巴巴集团经历大量大数据场景锤炼而成。您无需开发就能快捷完成日志数据采集、消费、投递以及查询分析等功能,提升运维、运营效率,建立 DT 时代海量日志处理能力。将MaxCompute 中的数据导入到日志服务,利用日志服务的查询和可视化功能,对数据进行分析和可视化展示,使用数据加工对数据进一步处理,充分发掘数据的价值。 日志服务提供的数据导入功能,支持从OSS,MaxCompute,Kafka等数据源同步数据。使用数据导入同步数据具备如下的优势: 配置简单,用户仅需在日志服务控制台完成简单配置即可实现导入。 导入服务完全由日志服务托管,无需运维。 支持动态水平扩展,根据用户的数据量自动分配资源,实现快速导入。 日志服务基本概念 日志:日志服务中处理的最小数据单元,每行日志包含日志发生时间和一组key-value 均为字符串格式的字段列表。 项目(Project):日志服务中的资源管理单元,用于资源隔离和控制,管理着用户的所有日志库等资源。 日志库(Logstore):日志数据的采集、存储和查询单元。每个...
- 下一篇
Spark DataFrame 不是真正的 DataFrame
文章原载于 Mars 团队专栏,欢迎关注。 从这篇文章开始,我们开始一个新的读 paper 系列。 今天要介绍的 paper 是 Towards Scalable Dataframe Systems,目前还是预印本。作者 Devin Petersohn 来自 Riselab,该实验室的前身是大名鼎鼎的 APMLab,诞生了 Apache Spark、Apache Mesos 等一系列著名开源项目。 个人觉得这篇 paper 蛮有意义的,第一次(据我所知)试图在学术上对 DataFrame 做定义,给了很好的理论指导意义。 这篇文章我不会拘泥于原 paper,我会加入自己的理解。本篇文章会大致分三部分: 什么是真正的 DataFrame? 为什么现在的所谓 DataFrame 系统,典型的如 Spark DataFrame,有可能正在杀死 DataFrame 的原本含义。 从 Mars DataFrame 的角度来看这个问题。 什么是真正的 DataFrame? 起源 最早的 "DataFrame" (开始被称作 "data frame"),来源于贝尔实验室开发的 S 语言。"data ...
相关文章
文章评论
共有0条评论来说两句吧...
文章二维码
点击排行
推荐阅读
最新文章
- CentOS7安装Docker,走上虚拟化容器引擎之路
- CentOS7编译安装Gcc9.2.0,解决mysql等软件编译问题
- CentOS8安装Docker,最新的服务器搭配容器使用
- CentOS7编译安装Cmake3.16.3,解决mysql等软件编译问题
- Docker使用Oracle官方镜像安装(12C,18C,19C)
- Jdk安装(Linux,MacOS,Windows),包含三大操作系统的最全安装
- CentOS7,8上快速安装Gitea,搭建Git服务器
- SpringBoot2全家桶,快速入门学习开发网站教程
- Docker安装Oracle12C,快速搭建Oracle学习环境
- 设置Eclipse缩进为4个空格,增强代码规范