实践解析可视化开发平台FlinkSever优势
摘要:华为Flink可视化开发平台FlinkServer作为自研服务,能够提供比原生flinksql接口更强的企业级特性,比如任务的集中管理,可视化开发,多数据源配置等。
本文分享自华为云社区《华为FusionInsight MRS实战 - Flink增强特性之可视化开发平台FlinkSever开发学习》,作者:晋红轻。
背景说明
随着流计算的发展,挑战不再仅限于数据量和计算量,业务变得越来越复杂。如何提高开发者的效率,降低流计算的门槛,对推广实时计算非常重要。
SQL 是数据处理中使用最广泛的语言,它允许用户简明扼要地展示其业务逻辑。Flink 作为流批一体的计算引擎自1.7.2版本开始引入Flink SQL的特性,并不断发展。之前,用户可能需要编写上百行业务代码,使用 SQL 后,可能只需要几行 SQL 就可以轻松搞定。
但是真正的要将Flink SQL开发工作投入到实际的生产场景中,如果使用原生的API接口进行作业的开发还是存在门槛较高,易用性低,SQL代码可维护性差的问题。新需求由业务人员提交给IT人员,IT人员排期开发。从需求到上线,周期长,导致错失新业务最佳市场时间窗口。同时,IT人员工作繁重,大量相似Flink作业,成就感低。
华为Flink可视化开发平台FlinkServer优势:
- 提供基于Web的可视化开发平台,只需要写SQL即可开发作业,极大降低作业开发门槛。
- 通过作业平台能力开放,支持业务人员自行编写SQL开发作业,快速应对需求,并将IT人员从繁琐的Flink作业开发工作中解放出来;
- 同时支持流作业和批作业;
- 支持常见的Connector,包括Kafka、Redis、HDFS等
下面将以kafka为例分别使用原生API接口以及FlinkServer进行作业开发,对比突出FlinkServer的优势
场景说明
参考已发论坛帖 《华为FusionInsight MRS FlinkSQL 复杂嵌套Json解析最佳实践》
需要使用FlinkSQL从一个源kafka topic接收cdl复杂嵌套json数据并进行解析,将解析后的数据发送到另一个kafka topic里
使用原生API接口方案开发flink sql操作步骤
前提条件
- 完成MRS Flink客户端的安装以及配置
- 完成Flink SQL原生接口相关配置
操作步骤
- 使用如下命令首先启动Flink集群
source /opt/hadoopclient/bigdata_env
kinit developuser
cd /opt/hadoopclient/Flink/flink
./bin/yarn-session.sh -t ssl/
- 使用如下命令启动Flink SQL Client
cd /opt/hadoopclient/Flink/flink/bin
./sql-client.sh embedded -d ./../conf/sql-client-defaults.yaml
- 使用如下flink sql创建源端kafka表,并提取需要的信息:
CREATE TABLE huditableout_source(
`schema` ROW < `fields` ARRAY< ROW<type STRING, optional BOOLEAN, field STRING>> >,
payload ROW < `TIMESTAMP` BIGINT, `data` ROW < uid INT,
uname VARCHAR(32),
age INT,
sex VARCHAR(30),
mostlike VARCHAR(30),
lastview VARCHAR(30),
totalcost INT> >,
type1 as `schema`.`fields`[1].type,
optional1 as `schema`.`fields`[1].optional,
field1 as `schema`.`fields`[1].field,
type2 as `schema`.`fields`[2].type,
optional2 as `schema`.`fields`[2].optional,
field2 as `schema`.`fields`[2].field,
ts as payload.`TIMESTAMP`,
uid as payload.`data`.uid,
uname as payload.`data`.uname,
age as payload.`data`.age,
sex as payload.`data`.sex,
mostlike as payload.`data`.mostlike,
lastview as payload.`data`.lastview,
totalcost as payload.`data`.totalcost,
localts as LOCALTIMESTAMP
) WITH(
'connector' = 'kafka',
'topic' = 'huditableout',
'properties.bootstrap.servers' = '172.16.9.113:21007,172.16.9.117:21007,172.16.9.118:21007',
'properties.group.id' = 'example',
'scan.startup.mode' = 'latest-offset',
'format' = 'json',
'json.fail-on-missing-field' = 'false',
'json.ignore-parse-errors' = 'true',
'properties.sasl.kerberos.service.name' = 'kafka',
'properties.security.protocol' = 'SASL_PLAINTEXT',
'properties.kerberos.domain.name' = 'hadoop.hadoop.com'
);
- 使用如下flink sql创建目标端kafka表:
CREATE TABLE huditableout(
type1 VARCHAR(32),
optional1 BOOLEAN,
field1 VARCHAR(32),
type2 VARCHAR(32),
optional2 BOOLEAN,
field2 VARCHAR(32),
ts BIGINT,
uid INT,
uname VARCHAR(32),
age INT,
sex VARCHAR(30),
mostlike VARCHAR(30),
lastview VARCHAR(30),
totalcost INT,
localts TIMESTAMP
) WITH(
'connector' = 'kafka',
'topic' = 'huditableout2',
'properties.bootstrap.servers' = '172.16.9.113:21007,172.16.9.117:21007,172.16.9.118:21007',
'properties.group.id' = 'example',
'scan.startup.mode' = 'latest-offset',
'format' = 'json',
'json.fail-on-missing-field' = 'false',
'json.ignore-parse-errors' = 'true',
'properties.sasl.kerberos.service.name' = 'kafka',
'properties.security.protocol' = 'SASL_PLAINTEXT',
'properties.kerberos.domain.name' = 'hadoop.hadoop.com'
);
- 使用如下flink sql将源端kafka流表写入到目标端kafka流表中
insert into
huditableout
select
type1,
optional1,
field1,
type2,
optional2,
field2,
ts,
uid,
uname,
age,
sex,
mostlike,
lastview,
totalcost,
localts
from
huditableout_source;
- 检查测试结果
消费生产源kafka topic的数据(由cdl生成)
消费目标端kafka topic解析后的数据(flink sql任务生成的结果)
可以登录flink原生界面查看任务
- 使用flink sql client方式查看结果
首先使用命令set execution.result-mode=tableau; 可以让查询结果直接输出到终端
使用flink sql查询上面已创建好的流表
select * from huditableout
注意:因为是kafka流表,所以查询结果只会显示select任务启动之后写进该topic的数据
使用FlinkServer可视化开发平台方案开发flink sql操作步骤
前提条件
- 参考产品文档 《基于用户和角色的鉴权》章节创建一个具有“FlinkServer管理操作权限”的用户,使用该用户访问Flink Server
操作步骤
- 登录FlinkServer选择作业管理
- 创建任务cdl_kafka_json_test3并输入flink sql
说明: 可以看到开发flink sql任务时在FlinkServer界面可以自行设置flink集群规模
CREATE TABLE huditableout_source(
`schema` ROW < `fields` ARRAY< ROW<type STRING, optional BOOLEAN, field STRING>> >,
payload ROW < `TIMESTAMP` BIGINT, `data` ROW < uid INT,
uname VARCHAR(32),
age INT,
sex VARCHAR(30),
mostlike VARCHAR(30),
lastview VARCHAR(30),
totalcost INT> >,
type1 as `schema`.`fields`[1].type,
optional1 as `schema`.`fields`[1].optional,
field1 as `schema`.`fields`[1].field,
type2 as `schema`.`fields`[2].type,
optional2 as `schema`.`fields`[2].optional,
field2 as `schema`.`fields`[2].field,
ts as payload.`TIMESTAMP`,
uid as payload.`data`.uid,
uname as payload.`data`.uname,
age as payload.`data`.age,
sex as payload.`data`.sex,
mostlike as payload.`data`.mostlike,
lastview as payload.`data`.lastview,
totalcost as payload.`data`.totalcost,
localts as LOCALTIMESTAMP
) WITH(
'connector' = 'kafka',
'topic' = 'huditableout',
'properties.bootstrap.servers' = '172.16.9.113:21007,172.16.9.117:21007,172.16.9.118:21007',
'properties.group.id' = 'example',
'scan.startup.mode' = 'latest-offset',
'format' = 'json',
'json.fail-on-missing-field' = 'false',
'json.ignore-parse-errors' = 'true',
'properties.sasl.kerberos.service.name' = 'kafka',
'properties.security.protocol' = 'SASL_PLAINTEXT',
'properties.kerberos.domain.name' = 'hadoop.hadoop.com'
);
CREATE TABLE huditableout(
type1 VARCHAR(32),
optional1 BOOLEAN,
field1 VARCHAR(32),
type2 VARCHAR(32),
optional2 BOOLEAN,
field2 VARCHAR(32),
ts BIGINT,
uid INT,
uname VARCHAR(32),
age INT,
sex VARCHAR(30),
mostlike VARCHAR(30),
lastview VARCHAR(30),
totalcost INT,
localts TIMESTAMP
) WITH(
'connector' = 'kafka',
'topic' = 'huditableout2',
'properties.bootstrap.servers' = '172.16.9.113:21007,172.16.9.117:21007,172.16.9.118:21007',
'properties.group.id' = 'example',
'scan.startup.mode' = 'latest-offset',
'format' = 'json',
'json.fail-on-missing-field' = 'false',
'json.ignore-parse-errors' = 'true',
'properties.sasl.kerberos.service.name' = 'kafka',
'properties.security.protocol' = 'SASL_PLAINTEXT',
'properties.kerberos.domain.name' = 'hadoop.hadoop.com'
);
insert into
huditableout
select
type1,
optional1,
field1,
type2,
optional2,
field2,
ts,
uid,
uname,
age,
sex,
mostlike,
lastview,
totalcost,
localts
from
huditableout_source;
- 点击语义校验,确保语义校验通过
- 点击提交并启动任务
- 检查测试结果
消费生产源kafka topic的数据(由cdl生成)
消费目标端kafka topic解析后的数据(flink sql任务生成的结果)

低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
-
上一篇
千万级索引的聚合性能优化
当搜索引擎 ElasticSearch 面对千万级索引量的去重统计时,该如何实现快速的响应。本文将结合我们的亲身经历,为读者朋友呈现生产环境中遇到这类问题时的解决思路。 1. 背景 在数周前的某一天,交易团队的同学发现运行在某云上面的订单索引存在严重的增量同步延迟问题,且已经对业务造成了影响。所以将该索引的流量切换到自研搜索平台中,虽说自研搜索不存在增量延迟情况,但却发现查询的 RT 竟高达十几秒,依旧无法解决业务面临的困境。 当时获知该情况时还是比较错愕的,接口 RT 的增长通常是个渐进式的过程,既然存在性能问题应该在早期就有所表现,不至于突然暴涨至十几秒。进一步了解情况后,得知一年前由于当时的自研搜索平台基建不够成熟,该索引的查询流量便一直由三方云服务承接。过去这么长时间,自研搜索平台仅保留着该索引的增量功能,但从未对外提供检索服务。而如今突然将查询流量导入自研平台,不曾预料到会存在如此严重的查询性能问题。 ElasticSearch 作为一款能够轻松应对上亿规模检索的分布式搜索引擎,却发生如此“反常”的表现,下意识就觉得症结应该出在我们的使用方式上。在抓取了相关的 DSL 语句后...
-
下一篇
大型集团企业云管平台建设参考架构
摘要:本文通过对不同的集团企业及国家机构IT治理组织架构提出华为云管平台集中部署、分布式部署、分散独立部署三种方式,实现集团企业IT云时代的治理管控诉求。 本文分享自华为云社区《【华为云Stack】【大架光临】第6期:大型集团企业云管平台建设参考架构》,作者: 华为云Stack云管平台首席架构师 熊洪槐。 一、集团企业云化IT建设及管理问题及诉求 集团企业由于具有多层次组织结构、多元化业务、多地域产业布局等诸多特点,在管理方面面临很大挑战,IT平台已成为集团企业运营管理运作必不可或缺的重要组成部分,是当前集团企业数字化管理升级及数字化生产及创新的核心支撑系统,所以,集团企业IT建设及治理一直是集团企业的投资重点,特别是近5年,在业务数字化转型及云化IT技术驱动之下的建设投资不断增加,据Gartner预测:2021年中国IT支出预计将增长7.2%,2021年全球IT支出预计将增长4%,另外,据iThome 2020年数字化转型领先企业调查报告,数字化领先企业2020年IT投资比例最高达到营收的5.1%(平均4.1%)。然而,另一方面,2019年埃森哲对中国企业IT调查报告显示,具有三分之...
相关文章
文章评论
共有0条评论来说两句吧...
文章二维码
点击排行
推荐阅读
最新文章
- Mario游戏-低调大师作品
- SpringBoot2配置默认Tomcat设置,开启更多高级功能
- Windows10,CentOS7,CentOS8安装Nodejs环境
- Springboot2将连接池hikari替换为druid,体验最强大的数据库连接池
- CentOS6,CentOS7官方镜像安装Oracle11G
- MySQL8.0.19开启GTID主从同步CentOS8
- SpringBoot2编写第一个Controller,响应你的http请求并返回结果
- CentOS6,7,8上安装Nginx,支持https2.0的开启
- CentOS8安装MyCat,轻松搞定数据库的读写分离、垂直分库、水平分库
- CentOS关闭SELinux安全模块