您现在的位置是:首页 > 文章详情

创建消息队列(Kafka)源表

日期:2018-11-13点击:451

Kafka源表的实现来源于自社区的kafka版本实现。

注意:本文档只适合独享模式下使用。

Kafka需要定义的DDL如下。

 
  1. create table kafka_stream(
  2. messageKey VARBINARY,
  3. `message` VARBINARY,
  4. topic varchar,
  5. `partition` int,
  6. `offset` bigint
  7. ) with (
  8. type ='kafka010',
  9. topic = 'xxx',
  10. `group.id` = 'xxx',
  11. bootstrap.servers = 'ip:端口,ip:端口,ip:端口'
  12. );

注意:以上表中的五个字段顺序务必保持一致。

WITH参数

通用配置

参数 注释说明 备注
type Kafka对应版本 推荐使用KAFKA010
topic 读取的单个topic topic名称

必选配置

(1)kafka08必选配置:

参数 注释说明 备注
group.id 消费组id
zookeeper.connect zk链接地址 zk连接id

(2)kafka09/kafka010/kafka011必选配置:

参数 注释说明 备注
group.id 消费组id
bootstrap.servers kafka集群地址 kafka集群地址

Kafka集群地址:

如果您的kafka是阿里云商业版,请参考kafka商业版准备配置文档。

如果您的kafka是阿里云公测版,请参考kafka公测版准备配置文档。

可选配置

 
  1. "consumer.id","socket.timeout.ms","fetch.message.max.bytes","num.consumer.fetchers","auto.commit.enable","auto.commit.interval.ms","queued.max.message.chunks", "rebalance.max.retries","fetch.min.bytes","fetch.wait.max.ms","rebalance.backoff.ms","refresh.leader.backoff.ms","auto.offset.reset","consumer.timeout.ms","exclude.internal.topics","partition.assignment.strategy","client.id","zookeeper.session.timeout.ms","zookeeper.connection.timeout.ms","zookeeper.sync.time.ms","offsets.storage","offsets.channel.backoff.ms","offsets.channel.socket.timeout.ms","offsets.commit.max.retries","dual.commit.enabled","partition.assignment.strategy","socket.receive.buffer.bytes","fetch.min.bytes"

注意:其它可选配置项参考kafka官方文档:
Kafka09
https://kafka.apache.org/0110/documentation.html#consumerconfigs
Kafka010
https://kafka.apache.org/090/documentation.html#newconsumerconfigs
Kafka011
https://kafka.apache.org/0102/documentation.html#newconsumerconfigs

kafka版本对应关系

Type Kafka 版本
Kafka08 0.8.22
Kafka09 0.9.0.1
Kafka010 0.10.2.1
Kafka011 0.11.0.2

Kafka消息解析

默认Kafka读到的消息:

 
  1. messageKey varbianry,
  2. message varbianry,
  3. topic varchar,
  4. partition int,
  5. offset bigint

这样一个五元组,如果您希望在source阶段把数据parser成特定的其它格式,可以按照下面实践进行。

参数 注释说明 备注
parserUdtf 自定义解析函数 用于解析从kafka读到的消息映射到ddl具体对应的类型

如何写一个parserUdtf参见自定义表值函数(UDTF)

自建kafka

与阿里云Kafka消息队列一样,DDL定义相同。

示例:

 
  1. create table kafka_stream(
  2. messageKey VARBINARY,
  3. `message` VARBINARY,
  4. topic varchar,
  5. `partition` int,
  6. `offset` bigint
  7. ) with (
  8. type ='kafka011',
  9. topic = 'kafka_01',
  10. `group.id` = 'CID_blink',
  11. bootstrap.servers = '192.168.0.251:9092'
  12. );

WITH参数

关于自建Kafka的with参数,请参考本文档Kafka创建时DDL的with参数说明。需要注意的是 bootstrap.servers参数需要填写自建的地址和端口号。

注意:无论是阿里云Kafka还是自建Kafka,目前实时计算均无Tps、Rps等指标信息。在作业上线之后,运维界面暂时不支持显示指标信息。

本文转自实时计算——创建消息队列(Kafka)源表

原文链接:https://yq.aliyun.com/articles/669174
关注公众号

低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。

持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。

转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。

文章评论

共有0条评论来说两句吧...

文章二维码

扫描即可查看该文章

点击排行

推荐阅读

最新文章