数据结果表概述
实时计算 Flink使用CREATE TABLE
作为输出结果数据的格式定义,同时定义数据如何写入到目的数据存储。实时计算有Append类型和Update类型。
-
Append类型:如果输出存储是日志系统或消息系统,或未定义主键的RDS,则流的输出结果都会以追加的方式写入存储中,而不会修改存储中原有的数据。
-
Update类型:如果输出存储是声明了主键(PRIMARY KEY)的数据库(例如RDS、HBase),流的输出结果会发生以下两种情况。
- 如果根据主键查询数据在数据库中不存在,则会将该数据插入到数据库。
- 如果根据主键查询数据在数据库中存在,则会根据主键更新数据。
语法
CREATE TABLE tableName
(columnName dataType [, columnName dataType ]*)
[ WITH (propertyName=propertyValue [, propertyName=propertyValue ]*) ];
示例
create table rds_output(
id int,
len int,
content VARCHAR,
primary key(id)
) with (
type='rds',
url='jdbc:mysql:XXXXXXXXXX',
tableName='test4',
userName='test',
password='XXXXXX'
);

低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
-
上一篇
创建消息队列(Kafka)源表
本页目录 WITH参数 kafka版本对应关系 Kafka消息解析 自建kafka WITH参数 Kafka源表的实现来源于自社区的kafka版本实现。 注意:本文档只适合独享模式下使用。 Kafka需要定义的DDL如下。 create table kafka_stream( messageKey VARBINARY, `message` VARBINARY, topic varchar, `partition` int, `offset` bigint ) with ( type ='kafka010', topic = 'xxx', `group.id` = 'xxx', bootstrap.servers = 'ip:端口,ip:端口,ip:端口' ); 注意:以上表中的五个字段顺序务必保持一致。 WITH参数 通用配置 参数 注释说明 备注 type Kafka对应版本 推荐使用KAFKA010 topic 读取的单个topic topic名称 必选配置 (1)kafka08必选配置: 参数 注释说明 备注 group.id 无 消费组id zookeeper.connect ...
-
下一篇
创建ElasticSearch(ES)结果表
创建 ElasticSearch 结果表 注意:本文档只适合独享模式下使用。 ElasticSearch 结果表的实现使用 REST API,理论上兼容 ElasticSearch 的各个版本。以下将 ElasticSearch 简称为 ES。 ES 需要定义的 DDL 如下: create table es_stream_sink( field1 long, field2 varbianary, field3 varchar ) with ( type ='elasticsearch', endPoint = 'http://127.0.0.1:9211', accessId = 'abcd', accessKey = 'efgh', index = 'mockIdx', typeName = 'mockType' ... ); WITH参数 通用配置: 参数 注释说明 默认值 Required endPoint server 地址,例:http://127.0.0.1:9211 无 是 accessId 访问实例 id 无 是 accessKey 访问实例密钥 无 是 index ...
相关文章
文章评论
共有0条评论来说两句吧...