Flink动态表
阿里的一篇文章,可以先看看会对动态表有一个模糊的概念
动态表就是一个根据流在动态变化的表。从阿里的例子可以看出,当一个表Stream发生改变的时候,就会引起Keyed Table这张表的一个动态变化,表Stream是一个无法撤回的表,Stream表是只能不停增加的一张表,但是Keyed Table 会根据Stream中数据的增长的变化来修改自己count出来的值,随着count值的改变就会使得以count为key的第二张表的改变,第二张表才是我们需要的结果。第一张表只是一个过渡的表,但是有了第一张表才能满足我们第二张的要求。
将阿里的第一张表以java代码写出
package com.yjp.flink.retraction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.java.StreamTableEnvironment;
public class RetractionITCase {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.getTableEnvironment(env);
env.getConfig().disableSysoutLogging();
DataStream<Tuple2<String, Integer>> dataStream =
env.fromElements(
new Tuple2<>("hello", 1),
new Tuple2<>("word", 1),
new Tuple2<>("hello", 1),
new Tuple2<>("bark", 1),
new Tuple2<>("bark", 1),
new Tuple2<>("bark", 1),
new Tuple2<>("bark", 1),
new Tuple2<>("bark", 1),
new Tuple2<>("bark", 1),
new Tuple2<>("flink", 1)
);
tEnv.registerDataStream("demo1", dataStream, "word ,num");
Table table = tEnv.sqlQuery("select * from demo1 ").groupBy("word")
.select("word AS word ,num.sum AS count")
.groupBy("count").select("count , word.count as frequency");
tEnv.toRetractStream(table, Word.class).print();
env.execute("demo");
}
}
package com.yjp.flink.retraction;
public class Word {
private Integer count;
private Long frequency;
public Word() {
}
public Integer getCount() {
return count;
}
public void setCount(Integer count) {
this.count = count;
}
public Long getFrequency() {
return frequency;
}
public void setFrequency(Long frequency) {
this.frequency = frequency;
}
@Override
public String toString() {
return "Word{" +
"count=" + count +
", frequency=" + frequency +
'}';
}
}
结果:
2> (true,Word{count=1, frequency=1})
2> (false,Word{count=1, frequency=1})
2> (true,Word{count=1, frequency=2})
4> (true,Word{count=3, frequency=1})
4> (false,Word{count=3, frequency=1})
4> (true,Word{count=4, frequency=1})
4> (false,Word{count=4, frequency=1})
2> (false,Word{count=1, frequency=2})
2> (true,Word{count=1, frequency=3})
2> (false,Word{count=1, frequency=3})
3> (true,Word{count=6, frequency=1})
1> (true,Word{count=2, frequency=1})
1> (false,Word{count=2, frequency=1})
1> (true,Word{count=5, frequency=1})
1> (false,Word{count=5, frequency=1})
1> (true,Word{count=2, frequency=1})
2> (true,Word{count=1, frequency=2})
2> (false,Word{count=1, frequency=2})
2> (true,Word{count=1, frequency=3})
2> (false,Word{count=1, frequency=3})
2> (true,Word{count=1, frequency=2})
从结果来分析,我们所希望达到的的目标是:6,1 6个bark 2,1两个hello 1,2 分别是word flink
前面数字相同的是同一组操作,true代表的是写入,false代表的是撤回,true和false一样就会抵消,然后就会发现结果和我们预想的结果是一样的,如果没有撤回操作,阿里的文章已经说明了。
我们在看阿里的第二个例子:看第二个例子的时候会好奇StringLast这个函数应该怎样去实现,java实现如下
package com.yjp.flink.retract;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.java.StreamTableEnvironment;
public class ALiTest {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.getTableEnvironment(env);
env.getConfig().disableSysoutLogging();
DataStreamSource<Tuple3<String, String, Long>> dataStream = env.fromElements(
new Tuple3<>("0001", "中通", 1L),
new Tuple3<>("0002", "中通", 2L),
new Tuple3<>("0003", "圆通", 3L),
new Tuple3<>("0001", "圆通", 4L)
);
tEnv.registerDataStream("Ali", dataStream, "order_id ,company,timestamp");
tEnv.registerFunction("agg", new AliAggrete());
Table table = tEnv.sqlQuery("select * from Ali ")
.groupBy("order_id").select(" order_id,agg(company,timestamp) As company")
.groupBy("company").select("company , order_id.count as order_cnt");
tEnv.toRetractStream(table, ALi.class).print();
env.execute("ALi");
}
}
package com.yjp.flink.retract;
import org.apache.flink.table.functions.AggregateFunction;
public class AliAggrete extends AggregateFunction<String, ALiAccum> {
@Override
public ALiAccum createAccumulator() {
return new ALiAccum();
}
@Override
public String getValue(ALiAccum aLiAccum) {
return aLiAccum.company;
}
//更改累加器中的结果
public void accumulate(ALiAccum aLiAccum, String company, Long time) {
if (time > aLiAccum.timeStamp) {
aLiAccum.company = company;
}
}
// public void retract(ALiAccum aLiAccum, String company, Long time) {
// aLiAccum.company = company;
// aLiAccum.timeStamp = time;
// }
// public void resetAccumulator(ALiAccum aLiAccum) {
// aLiAccum.company = null;
// aLiAccum.timeStamp = 0L;
// }
// public void merge(ALiAccum acc, Iterable<ALiAccum> it) {
// Iterator<ALiAccum> iter = it.iterator();
// while (iter.hasNext()) {
// ALiAccum aLiAccum = iter.next();
// if (aLiAccum.timeStamp > acc.timeStamp) {
// acc.company = aLiAccum.company;
// }
// }
// }
}
package com.yjp.flink.retract;
public class ALiAccum {
public String company = null;
public Long timeStamp = 0L;
}
package com.yjp.flink.retract;
public class ALi {
private String company;
private Long order_cnt;
public ALi() {
}
public String getCompany() {
return company;
}
public void setCompany(String company) {
this.company = company;
}
public Long getOrder_cnt() {
return order_cnt;
}
public void setOrder_cnt(Long order_cnt) {
this.order_cnt = order_cnt;
}
@Override
public String toString() {
return "ALi{" +
"company='" + company + '\'' +
", order_cnt=" + order_cnt +
'}';
}
}
这个整个就是阿里第二个例子用代码去实现,timestamp这个字段其实可以不用给,因为每个流进入的时候就会自带一个时间戳,但是会有乱序的考虑,如果不考虑乱序就用自带的时间戳就可以了。
分析整个逻辑代码
tEnv.registerFunction(“agg”, new AliAggrete());
将我们自己实现的聚合的函数注册, Table table = tEnv.sqlQuery(“select * from Ali “)将流转换为第一张Stream表, .groupBy(“order_id”).select(” order_id,agg(company,timestamp) As company”)以订单id分组,相同id的订单会进入同一组,然后我们通过我们自定义的聚合函数去实现只发送时间戳最大的那个记录,实现的原理,ALiAccum这个类是为了将我们company,timestamp两个字段形成映射关系,然后AggregateFunction String为返回类型,我们这里需要返回的是公司的名字,所以为String类型,ALiAccum是我们传入的两个字段,之前将两个字段映射为了POJP对象,首先会调用createAccumulator()方法,创建一个数据结构来保存聚合的中间结果,然后通过accumulate()方法来该更中间结果的值,最后通过getValue()来返回我们真正需要的值。最后对我们操作过的这张表进行查询操作,就得到我们想要的结果了。主要就是自己需要实现Agg函数。

低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
-
上一篇
Flink操作Hbase
现在有这样一个场景,我们需要将hbase做成一个数据流,而不是数据集。根据Flink自带的Flink-Hbase只能帮我们做到数据集,所以这个时候选择了重写Hbase的数据源。 package com.yjp.flink.demo11; import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.table.shaded.org.joda.time.DateTime; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Result; import org.apache.ha...
-
下一篇
Spark Streaming应该如何消费Kafka?
前言 在项目中,需要对每天千万级的游戏评论信息进行词频统计,在生产者一端,我们将数据按照每天的拉取时间存入了Kafka当中,而在消费者一端,我们利用了spark streaming从kafka中不断拉取数据进行词频统计。本文首先对spark streaming嵌入kafka的方式进行归纳总结,之后简单阐述Spark streaming+kafka在舆情项目中的应用,最后将自己在Spark Streaming+kafka的实际优化中的一些经验进行归纳总结。(如有任何纰漏欢迎补充来踩,我会第一时间改正^v^) Spark streaming接收Kafka数据 用spark streaming流式处理kafka中的数据,第一步当然是先把数据接收过来,转换为spark streaming中的数据结构Dstream。接收数据的方式有两种:1.利用Receiver接收数据,2.直接从kafka读取数据。 基于Receiver的方式 这种方式利用接收器(Receiver)来接收kafka中的数据,其最基本是使用Kafka高阶用户API接口。对于所有的接收器,从kafka接收来的数据会存储在spark...
相关文章
文章评论
共有0条评论来说两句吧...
文章二维码
点击排行
推荐阅读
最新文章
- Springboot2将连接池hikari替换为druid,体验最强大的数据库连接池
- SpringBoot2编写第一个Controller,响应你的http请求并返回结果
- Dcoker安装(在线仓库),最新的服务器搭配容器使用
- Docker快速安装Oracle11G,搭建oracle11g学习环境
- MySQL8.0.19开启GTID主从同步CentOS8
- SpringBoot2配置默认Tomcat设置,开启更多高级功能
- SpringBoot2整合MyBatis,连接MySql数据库做增删改查操作
- SpringBoot2整合Thymeleaf,官方推荐html解决方案
- Docker使用Oracle官方镜像安装(12C,18C,19C)
- MySQL数据库在高并发下的优化方案