实践解析可视化开发平台FlinkSever优势

摘要:华为Flink可视化开发平台FlinkServer作为自研服务,能够提供比原生flinksql接口更强的企业级特性,比如任务的集中管理,可视化开发,多数据源配置等。

本文分享自华为云社区《华为FusionInsight MRS实战 - Flink增强特性之可视化开发平台FlinkSever开发学习》,作者:晋红轻。

背景说明

随着流计算的发展,挑战不再仅限于数据量和计算量,业务变得越来越复杂。如何提高开发者的效率,降低流计算的门槛,对推广实时计算非常重要。

SQL 是数据处理中使用最广泛的语言,它允许用户简明扼要地展示其业务逻辑。Flink 作为流批一体的计算引擎自1.7.2版本开始引入Flink SQL的特性,并不断发展。之前,用户可能需要编写上百行业务代码,使用 SQL 后,可能只需要几行 SQL 就可以轻松搞定。

但是真正的要将Flink SQL开发工作投入到实际的生产场景中,如果使用原生的API接口进行作业的开发还是存在门槛较高,易用性低,SQL代码可维护性差的问题。新需求由业务人员提交给IT人员,IT人员排期开发。从需求到上线,周期长,导致错失新业务最佳市场时间窗口。同时,IT人员工作繁重,大量相似Flink作业,成就感低。

华为Flink可视化开发平台FlinkServer优势:

  • 提供基于Web的可视化开发平台,只需要写SQL即可开发作业,极大降低作业开发门槛。
  • 通过作业平台能力开放,支持业务人员自行编写SQL开发作业,快速应对需求,并将IT人员从繁琐的Flink作业开发工作中解放出来;
  • 同时支持流作业和批作业;
  • 支持常见的Connector,包括Kafka、Redis、HDFS等

下面将以kafka为例分别使用原生API接口以及FlinkServer进行作业开发,对比突出FlinkServer的优势

场景说明

参考已发论坛帖 《华为FusionInsight MRS FlinkSQL 复杂嵌套Json解析最佳实践》

需要使用FlinkSQL从一个源kafka topic接收cdl复杂嵌套json数据并进行解析,将解析后的数据发送到另一个kafka topic里

使用原生API接口方案开发flink sql操作步骤

前提条件

  • 完成MRS Flink客户端的安装以及配置
  • 完成Flink SQL原生接口相关配置

操作步骤

  • 使用如下命令首先启动Flink集群
source /opt/hadoopclient/bigdata_env
kinit developuser
cd /opt/hadoopclient/Flink/flink
./bin/yarn-session.sh -t ssl/

  • 使用如下命令启动Flink SQL Client
cd /opt/hadoopclient/Flink/flink/bin
./sql-client.sh embedded -d ./../conf/sql-client-defaults.yaml

  • 使用如下flink sql创建源端kafka表,并提取需要的信息:
CREATE TABLE huditableout_source(
  `schema` ROW < `fields` ARRAY< ROW<type STRING, optional BOOLEAN, field STRING>> >,
  payload ROW < `TIMESTAMP` BIGINT, `data` ROW <  uid INT,
  uname VARCHAR(32),
  age INT,
  sex VARCHAR(30),
  mostlike VARCHAR(30),
  lastview VARCHAR(30),
  totalcost INT> >,
  type1 as `schema`.`fields`[1].type,
  optional1 as `schema`.`fields`[1].optional,
  field1 as `schema`.`fields`[1].field,
  type2 as `schema`.`fields`[2].type,
  optional2 as `schema`.`fields`[2].optional,
  field2 as `schema`.`fields`[2].field,
  ts as payload.`TIMESTAMP`,
  uid as payload.`data`.uid,
  uname as payload.`data`.uname,
  age as payload.`data`.age,
  sex as payload.`data`.sex,
  mostlike as payload.`data`.mostlike,
  lastview as payload.`data`.lastview,
  totalcost as payload.`data`.totalcost,
  localts as LOCALTIMESTAMP
) WITH(
  'connector' = 'kafka',
  'topic' = 'huditableout',
  'properties.bootstrap.servers' = '172.16.9.113:21007,172.16.9.117:21007,172.16.9.118:21007',
  'properties.group.id' = 'example',
  'scan.startup.mode' = 'latest-offset',
  'format' = 'json',

  'json.fail-on-missing-field' = 'false',
  'json.ignore-parse-errors' = 'true',

  'properties.sasl.kerberos.service.name' = 'kafka',
  'properties.security.protocol' = 'SASL_PLAINTEXT',
  'properties.kerberos.domain.name' = 'hadoop.hadoop.com'
);

  • 使用如下flink sql创建目标端kafka表:
CREATE TABLE huditableout(
  type1 VARCHAR(32),
  optional1 BOOLEAN,
  field1 VARCHAR(32),
  type2 VARCHAR(32),
  optional2 BOOLEAN,
  field2 VARCHAR(32),
  ts BIGINT,
  uid INT,
  uname VARCHAR(32),
  age INT,
  sex VARCHAR(30),
  mostlike VARCHAR(30),
  lastview VARCHAR(30),
  totalcost INT,
  localts TIMESTAMP
) WITH(
  'connector' = 'kafka',
  'topic' = 'huditableout2',
  'properties.bootstrap.servers' = '172.16.9.113:21007,172.16.9.117:21007,172.16.9.118:21007',
  'properties.group.id' = 'example',
  'scan.startup.mode' = 'latest-offset',
  'format' = 'json',

  'json.fail-on-missing-field' = 'false',
  'json.ignore-parse-errors' = 'true',

  'properties.sasl.kerberos.service.name' = 'kafka',
  'properties.security.protocol' = 'SASL_PLAINTEXT',
  'properties.kerberos.domain.name' = 'hadoop.hadoop.com'
);

  • 使用如下flink sql将源端kafka流表写入到目标端kafka流表中
insert into
  huditableout
  select
  type1,
  optional1,
  field1,
  type2,
  optional2,
  field2,
  ts,
  uid,
  uname,
  age,
  sex,
  mostlike,
  lastview,
  totalcost,
  localts
from
  huditableout_source;

  • 检查测试结果

消费生产源kafka topic的数据(由cdl生成)

消费目标端kafka topic解析后的数据(flink sql任务生成的结果)

可以登录flink原生界面查看任务

  • 使用flink sql client方式查看结果

首先使用命令set execution.result-mode=tableau; 可以让查询结果直接输出到终端

使用flink sql查询上面已创建好的流表

select * from huditableout

注意:因为是kafka流表,所以查询结果只会显示select任务启动之后写进该topic的数据

使用FlinkServer可视化开发平台方案开发flink sql操作步骤

前提条件

  • 参考产品文档 《基于用户和角色的鉴权》章节创建一个具有“FlinkServer管理操作权限”的用户,使用该用户访问Flink Server

操作步骤

  • 登录FlinkServer选择作业管理

  • 创建任务cdl_kafka_json_test3并输入flink sql

说明: 可以看到开发flink sql任务时在FlinkServer界面可以自行设置flink集群规模

CREATE TABLE huditableout_source(
  `schema` ROW < `fields` ARRAY< ROW<type STRING, optional BOOLEAN, field STRING>> >,
  payload ROW < `TIMESTAMP` BIGINT, `data` ROW <  uid INT,
  uname VARCHAR(32),
  age INT,
  sex VARCHAR(30),
  mostlike VARCHAR(30),
  lastview VARCHAR(30),
  totalcost INT> >,
  type1 as `schema`.`fields`[1].type,
  optional1 as `schema`.`fields`[1].optional,
  field1 as `schema`.`fields`[1].field,
  type2 as `schema`.`fields`[2].type,
  optional2 as `schema`.`fields`[2].optional,
  field2 as `schema`.`fields`[2].field,
  ts as payload.`TIMESTAMP`,
  uid as payload.`data`.uid,
  uname as payload.`data`.uname,
  age as payload.`data`.age,
  sex as payload.`data`.sex,
  mostlike as payload.`data`.mostlike,
  lastview as payload.`data`.lastview,
  totalcost as payload.`data`.totalcost,
  localts as LOCALTIMESTAMP
) WITH(
  'connector' = 'kafka',
  'topic' = 'huditableout',
  'properties.bootstrap.servers' = '172.16.9.113:21007,172.16.9.117:21007,172.16.9.118:21007',
  'properties.group.id' = 'example',
  'scan.startup.mode' = 'latest-offset',
  'format' = 'json',

  'json.fail-on-missing-field' = 'false',
  'json.ignore-parse-errors' = 'true',

  'properties.sasl.kerberos.service.name' = 'kafka',
  'properties.security.protocol' = 'SASL_PLAINTEXT',
  'properties.kerberos.domain.name' = 'hadoop.hadoop.com'
);
CREATE TABLE huditableout(
  type1 VARCHAR(32),
  optional1 BOOLEAN,
  field1 VARCHAR(32),
  type2 VARCHAR(32),
  optional2 BOOLEAN,
  field2 VARCHAR(32),
  ts BIGINT,
  uid INT,
  uname VARCHAR(32),
  age INT,
  sex VARCHAR(30),
  mostlike VARCHAR(30),
  lastview VARCHAR(30),
  totalcost INT,
  localts TIMESTAMP
) WITH(
  'connector' = 'kafka',
  'topic' = 'huditableout2',
  'properties.bootstrap.servers' = '172.16.9.113:21007,172.16.9.117:21007,172.16.9.118:21007',
  'properties.group.id' = 'example',
  'scan.startup.mode' = 'latest-offset',
  'format' = 'json',

  'json.fail-on-missing-field' = 'false',
  'json.ignore-parse-errors' = 'true',

  'properties.sasl.kerberos.service.name' = 'kafka',
  'properties.security.protocol' = 'SASL_PLAINTEXT',
  'properties.kerberos.domain.name' = 'hadoop.hadoop.com'
);
insert into
  huditableout
  select
  type1,
  optional1,
  field1,
  type2,
  optional2,
  field2,
  ts,
  uid,
  uname,
  age,
  sex,
  mostlike,
  lastview,
  totalcost,
  localts
from
  huditableout_source;
  • 点击语义校验,确保语义校验通过

  • 点击提交并启动任务

  • 检查测试结果

消费生产源kafka topic的数据(由cdl生成)

消费目标端kafka topic解析后的数据(flink sql任务生成的结果)

 

点击关注,第一时间了解华为云新鲜技术~

优秀的个人博客,低调大师

微信关注我们

原文链接:https://my.oschina.net/u/4526289/blog/5385597

转载内容版权归作者及来源网站所有!

低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。

相关文章

发表评论

资源下载

更多资源
优质分享Android(本站安卓app)

优质分享Android(本站安卓app)

近一个月的开发和优化,本站点的第一个app全新上线。该app采用极致压缩,本体才4.36MB。系统里面做了大量数据访问、缓存优化。方便用户在手机上查看文章。后续会推出HarmonyOS的适配版本。

Mario,低调大师唯一一个Java游戏作品

Mario,低调大师唯一一个Java游戏作品

马里奥是站在游戏界顶峰的超人气多面角色。马里奥靠吃蘑菇成长,特征是大鼻子、头戴帽子、身穿背带裤,还留着胡子。与他的双胞胎兄弟路易基一起,长年担任任天堂的招牌角色。

Eclipse(集成开发环境)

Eclipse(集成开发环境)

Eclipse 是一个开放源代码的、基于Java的可扩展开发平台。就其本身而言,它只是一个框架和一组服务,用于通过插件组件构建开发环境。幸运的是,Eclipse 附带了一个标准的插件集,包括Java开发工具(Java Development Kit,JDK)。

Sublime Text 一个代码编辑器

Sublime Text 一个代码编辑器

Sublime Text具有漂亮的用户界面和强大的功能,例如代码缩略图,Python的插件,代码段等。还可自定义键绑定,菜单和工具栏。Sublime Text 的主要功能包括:拼写检查,书签,完整的 Python API , Goto 功能,即时项目切换,多选择,多窗口等等。Sublime Text 是一个跨平台的编辑器,同时支持Windows、Linux、Mac OS X等操作系统。