您现在的位置是:首页 > 文章详情

基于SLS+Blink的实时计算最佳实践

日期:2020-04-24点击:513

日志服务简介

阿里云的日志服务(SLS)是针对日志类数据的一站式服务,无需开发就能快捷完成海量日志数据的采集、消费、投递以及查询分析等功能,提升运维、运营效率。在采集端支持30多种写入方式,包括自研的客户端Logtail,开源软件如Logstash、Fluent,Flume,Beats等,各种语言的SDK/Producer,无论是嵌入式设备、网页、服务器、程序等都能轻松接入。在消费端,支持与Storm、Spark Streaming、Flink/Blink等大数据系统无缝对接。
image.png

阿里云实时计算(Blink)

阿里云实时计算是基于Apache Flink构建的一站式、高性能实时大数据处理平台,广泛适用于流式数据处理、离线数据处理等场景。阿里云实时计算提供了如下两种数据处理API:

  • Flink SQL:通过DDL的方式定义Source和Sink,用SQL来实现数据的处理。
  • Flink Datastream: 在程序中使用各个Source和Sink的SDK,通过提交jar的方式运行托管的Flink Job。

通过这两种API,既可以把SLS作为数据源(Source),实现日志端到端的实时采集与处理,也可以把SLS作为结果的输出目标(Sink),在SLS对结果实时查询分析,配置可视化报表等。
image.png

Flink SQL

Flink SQL是阿里云实时计算为了简化计算模型、降低用户使用实时计算门槛而设计的一套符合标准SQL语义的开发语言。

创建SLS源表

SLS源表对应SLS中的Logstore,表中的字段与Logstore中日志字段一一映射,像执行SQL一样流式处理SLS中的数据。除了Logstore所属的Region对应的Endpoint和Project之外,还需要具有读SLS Logstore权限的Access Key以及消费数据起始位置对应的时间点。

create table sls_stream( a INT, b INT, c VARCHAR ) with ( type ='sls', endPoint ='http://cn-hangzhou-share.log.aliyuncs.com', accessId ='<yourAccessId>', accessKey ='<yourAccessKey>', startTime = '2017-07-05 00:00:00', project ='<yourProjectName>', logStore ='<yourLogStoreName>', consumerGroup ='<yourConsumerGroupName>' );

Checkpoint

SLS的底层存储Loghub,是一个类似Kafka的Append Only的存储系统,覆盖Kafka 100%的功能。与Kafka的partition类型,Logstore中的数据存储在每个Shard中。每个Shard都可以通过cursor或者时间戳,确定日志在Shard中的存储位置(对应Kafka中的Offset)。在消费过程中,为了支持程序重启时尽可能少的数据重复,需要将最新消费位置记录下来,用于进程重启之后继续消费。这个位置就是我们所说的checkpoint。
image.png
目前,Flink SQL任务checkpoint保存在Flink的State中,如果任务Failover 或者暂停之后恢复,会从State中恢复消费位置继续消费。然而如果任务重启或者其他原因造成的State丢失,任务将从启动时指定的时间点开始消费。创建源表时建议指定参数consumerGroup,Blink将自动同步消费位置到SLS服务端,以用于在SLS控制台监控消费进度。注意:任务重启时不会使用SLS服务端的消费位置恢复。

创建SLS结果表

通过定义结果表,通过INSERT语句,把从源表中处理之后的数据写入到SLS的Logstore中。

create table sls_output( `name` VARCHAR, age BIGINT, birthday BIGINT )with( type='sls', endPoint='http://cn-hangzhou-corp.sls.aliyuncs.com', accessId='<yourAccessId>', accessKey='<yourAccessKey>', project='<yourProjectName>', logstore='<yourLogstoreName>' ); INSERT INTO sls_output SELECT age, birthday FROM source_table;

Flink Datastream

Flink SQL 的不足

  1. Flink SQL已经能够实现许多场景下的数据处理需求,但是受限于SQL的表达能力,对于比较复杂的业务场景,SQL实现起来比较复杂。
  2. 对于DDL 的定义,需要指定固定的字段列表,对于日志场景而言,日志字段不固定的情况非常普遍,这就意味着很难提前定义好全部的字段。
  3. 在日志中,很多时候单个字段是JSON或者其他复杂的形式,如果要在SQL里面解析和处理这类字段,不如在程序中之间处理起来灵活。

对于这些场景,可以考虑使用Data Stream API,通过自定义程序,实现更复杂的业务逻辑。

Maven 依赖

SLS开发了一个与开源Flink集成的SDK,同样适用于Blink。

<dependency> <groupId>com.aliyun.openservices</groupId> <artifactId>flink-log-connector</artifactId> <version>0.1.19.1</version> </dependency>

Github源码:https://github.com/aliyun/aliyun-log-flink-connector

消费SLS示例

public class ConsumerSample { private static final String SLS_ENDPOINT = ""; private static final String ACCESS_KEY_ID = ""; private static final String ACCESS_KEY_SECRET = ""; private static final String SLS_PROJECT = ""; private static final String SLS_LOGSTORE = ""; public static void main(String[] args) throws Exception { final ParameterTool params = ParameterTool.fromArgs(args); final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.getConfig().setGlobalJobParameters(params); env.setParallelism(1); env.enableCheckpointing(5000); env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); env.setStateBackend(new FsStateBackend("your checkpoint dir")); Properties configProps = new Properties(); configProps.put(ConfigConstants.LOG_ENDPOINT, SLS_ENDPOINT); configProps.put(ConfigConstants.LOG_ACCESSSKEYID, ACCESS_KEY_ID); configProps.put(ConfigConstants.LOG_ACCESSKEY, ACCESS_KEY_SECRET); configProps.put(ConfigConstants.LOG_MAX_NUMBER_PER_FETCH, "10"); configProps.put(ConfigConstants.LOG_CONSUMER_BEGIN_POSITION, Consts.LOG_FROM_CHECKPOINT); configProps.put(ConfigConstants.LOG_CONSUMERGROUP, "23_ots_sla_etl_product1"); configProps.put(ConfigConstants.LOG_CHECKPOINT_MODE, CheckpointMode.ON_CHECKPOINTS.name()); configProps.put(ConfigConstants.LOG_COMMIT_INTERVAL_MILLIS, "10000"); FastLogGroupDeserializer deserializer = new FastLogGroupDeserializer(); DataStream<FastLogGroupList> stream = env.addSource( new FlinkLogConsumer<>(SLS_PROJECT, SLS_LOGSTORE, deserializer, configProps)); stream.flatMap((FlatMapFunction<FastLogGroupList, String>) (value, out) -> { for (FastLogGroup logGroup : value.getLogGroups()) { int logCount = logGroup.getLogsCount(); for (int i = 0; i < logCount; i++) { FastLog log = logGroup.getLogs(i); // processing log } } }); stream.writeAsText("log-" + System.nanoTime()); env.execute("Flink consumer"); } }

Checkpoint

与SQL类似,SDK也会把checkpoint保存到Flink State中,此外,还支持把Checkpoint同步到SLS服务端,这样当Flink本地的State无法恢复时,还能从服务端获取checkpoint,从而保证即便任务重启甚至重建,只要服务端的消费组没有删除,checkpoint就不会丢失。SDK同步checkpoint支持如下策略:

  • 与Flink snapshotState同步,即在Flink调用snapshotState时,同步到服务端。默认是这种方式。
  • 自动提交checkpoint,即定时提交checkpoint到服务端。好处是当Flink下游任务不支持exactly once时最大程度上避免数据重复。
  • 不同步到SLS服务端。

处理结果写入SLS示例

public class ProducerSample { private static final String SLS_ENDPOINT = "cn-hangzhou.log.aliyuncs.com"; private static final String ACCESS_KEY_ID = ""; private static final String ACCESS_KEY = ""; private static final String SLS_PROJECT = ""; private static final String SLS_LOGSTORE = "test-flink-producer"; public static void main(String[] args) throws Exception { final ParameterTool params = ParameterTool.fromArgs(args); final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.getConfig().setGlobalJobParameters(params); env.setParallelism(3); DataStream<String> stream = env.addSource(new EventsGenerator()); Properties configProps = new Properties(); configProps.put(ConfigConstants.LOG_ENDPOINT, SLS_ENDPOINT); configProps.put(ConfigConstants.LOG_ACCESSSKEYID, ACCESS_KEY_ID); configProps.put(ConfigConstants.LOG_ACCESSKEY, ACCESS_KEY); configProps.put(ConfigConstants.LOG_PROJECT, SLS_PROJECT); configProps.put(ConfigConstants.LOG_LOGSTORE, SLS_LOGSTORE); FlinkLogProducer<String> logProducer = new FlinkLogProducer<>(new SimpleLogSerializer(), configProps); logProducer.setCustomPartitioner(new LogPartitioner<String>() { @Override public String getHashKey(String element) { String hash = ""; try { MessageDigest md = MessageDigest.getInstance("MD5"); md.update(element.getBytes()); hash = new BigInteger(1, md.digest()).toString(16); } catch (NoSuchAlgorithmException ignore) { } StringBuilder builder = new StringBuilder(); while (builder.length() < 32 - hash.length()) { builder.append("0"); } builder.append(hash); return builder.toString(); } }); stream.addSink(logProducer); env.execute("Flink producer"); } public static class EventsGenerator implements SourceFunction<String> { private volatile boolean running = true; @Override public void run(SourceContext<String> ctx) throws Exception { long seq = 0; while (running) { Thread.sleep(10); ctx.collect((seq++) + "-" + RandomStringUtils.randomAlphabetic(12)); } } @Override public void cancel() { running = false; } } }

任务监控

Metric监控

对于Flink SQL任务,Blink控制台提供了非常完备的监控报表,可以通过这些报表观察任务运行的状态,如延迟,内存状态,Failover等。
image.png
Metric解释可以参考Blink作业运维相关文档:https://help.aliyun.com/document_detail/62482.html
注意:SLS Datastream SDK暂时还没有实现Metric上报。

消费进度与监控报警

在SLS查看消费组对应的消费进度。
image.png
SLS对于每个消费组会定时输出延迟日志,根据这个延迟日志结合SLS的告警,可以用于监控消费延迟。参考如何开通消费组日志:https://help.aliyun.com/document_detail/85663.html
image.png

任务日志采集

Blink支持把作业日志存储到SLS中,在作业编辑页面右侧的作业参数页面,配置Log4j appender:
image.png

log4j.logger.org.apache.hadoop=OFF log4j.appender.loghub = com.alibaba.blink.log.loghub.BlinkLogHubAppender log4j.appender.loghub.Threshold = ERROR log4j.appender.loghub.projectName = <your SLS Project> log4j.appender.loghub.logstore = <your SLS Logstore> log4j.appender.loghub.endpoint = <your SLS Endpoint> log4j.appender.loghub.accessKeyId = <your Access Key ID> log4j.appender.loghub.accessKey = <your Access Key Secret>

常见问题

部分task没有读到数据

从SLS消费数据的本质就是从每个shard的指定位置开始消费,直到最新位置。然后单个shard不支持并发消费,也就是说一个shard最多被一个task消费到。而不管是SQL还是Data Stream,Shard分配的方式都是对shard本身的信息做hash,然后对task总数取模来分配的。假设某个shard hashcode是x,task 个数为y,当前task的id为z,那么仅当 x%y=z 时,当前task会消费这个shard。因此可能存在某个task没有分配到任何shard的情况。

消费太慢导致数据堆积

数据堆积的根本原因是写入速度超过了消费速度,而如何提高消费速度取决于具体的场景。常见的原因有:

  • 下游处理节点慢导致source节点反压。这种情况可以通过观察Blink的反压状态是否是HIGH来确认,通过优化下游任务节点速度来解决。
  • Shard个数太少。当shard数量少于task个数时,无疑会造成部分task空跑的现象,此时增加task个数已经对总体的并发没有任何影响,此时可以尝试在SLS侧分裂shard个数解决。
消费组在多个作业之间没有产生分配shard的效果

Blink消费SLS的数据并没有使用消费组来实现均衡消费的效果,提供的消费组名称仅仅是用于在SLS服务端保存消费位点。如果需要类似Kafka 消费组的功能,应使用SLS的Consumer Library。参考文档 https://help.aliyun.com/document_detail/28998.html

Flink Datastream任务没有同步checkpoint到SLS服务端

检查Blink任务作业参数中的 blink.checkpoint.interval.ms 是否设置过大。

更多资料

Blink开发Datastream作业:https://help.aliyun.com/document_detail/111876.html
定义SLS源表:https://help.aliyun.com/knowledge_detail/62521.html
定义SLS结果表:https://help.aliyun.com/knowledge_detail/62529.html




原文链接:https://yq.aliyun.com/articles/757399
关注公众号

低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。

持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。

转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。

文章评论

共有0条评论来说两句吧...

文章二维码

扫描即可查看该文章

点击排行

推荐阅读

最新文章