Apache Storm 官方文档 —— 多语言接口协议
本文描述了 Storm (0.7.1 版本以上)的多语言接口协议。
Storm 多语言协议
Shell 组件
Storm 的多语言支持主要通过 ShellBolt,ShellSpout 和 ShellProcess 类来实现。这些类实现了 IBolt 接口、ISpout 接口,并通过使用 Java 的 ProcessBuilder 类调用 shell 进程实现了执行脚本的接口协议。
输出域
输出域是拓扑的 Thrift 定义的一部分。也就是说,如果你在 Java 中使用了多语言接口,那么你就需要创建一个继承自 ShellBolt 并实现 IRichBolt 接口的 bolt,这个 bolt 还需要在 declareOutputFields
方法中声明输出域(ShellSpout 也有类似的问题)。
你可以在基础概念一文中了解更多相关信息。
协议报头
最简单的协议是通过执行脚本或程序的标准输入输出(STDIN/STDOUT)来实现的。在这个过程中传输的数据都是以 JSON 格式编码的,这样可以支持很多种语言。
打包
为了在集群上运行壳组件,执行的外壳脚本必须和待提交的 jar 包一起置于 resources/
目录下。
但是,在本地开发测试时,resources 目录只需要保持在 classpath 中即可。
协议
注意:
- 输入输出协议的结尾都使用行读机制,所以,必须要修剪掉输入中的新行并将他们添加到输出中。
- 所有的 JSON 输入输出都由一个包含 “end” 的行结束标志。注意,这个定界符并不是 JSON 的一部分。
- 下面的几个标题就是从脚本作者的 STDIN 与 STDOUT 的角度出发的。
初始握手
两种类型壳组件的初始握手过程都是相同的:
- STDIN: 设置信息。这是一个包含 Storm 配置、PID 目录、拓扑上下文的 JSON 对象:
{ "conf": { "topology.message.timeout.secs": 3, // etc }, "pidDir": "...", "context": { "task->component": { "1": "example-spout", "2": "__acker", "3": "example-bolt1", "4": "example-bolt2" }, "taskid": 3, // 以下内容仅支持 Storm 0.10.0 以上版本 "componentid": "example-bolt" "stream->target->grouping": { "default": { "example-bolt2": { "type": "SHUFFLE"}}}, "streams": ["default"], "stream->outputfields": {"default": ["word"]}, "source->stream->grouping": { "example-spout": { "default": { "type": "FIELDS", "fields": ["word"] } } } "source->stream->fields": { "example-spout": { "default": ["word"] } } } }
你的脚本应该在这个目录下创建一个以 PID 命名的空文件。比如,PID 是 1234 的时候,在目录中创建一个名为 1234 的空文件。这个文件可以让 supervisor 了解到进程的 PID,这样,supervisor 在需要的时候就可以关闭该进程。
Storm 0.10.0 加强了发送到壳组件的上下文的功能,现在的上下文中包含了兼容 JVM 组件的拓扑上下文中的所有内容。新增的一个关键因素是确定拓扑中某个壳组件的源与目标(也就是输入与输出)的功能,这是通过 stream->target->grouping
和 source->stream->grouping
字典实现的。在这些关联字典的底层,分组是以字典的形式表示的,至少包含有一个 type
键,并且也可以含有一个 fields
键,该键可以用于指定在 FIELDS
分组中所涉及的域。
- STDOUT: 你的 PID,以 JSON 对象的形式展现,比如
{"pid": 1234}
。这个壳组件将会把 PID 记录到它自己的日志中。
接下来怎么做就要取决于组件的具体类型了。
Spouts
Shell Spouts 都是同步的。以下内容是在一个 while(true) 循环中实现的:
- STDIN: 一个 next、ack 或者 fail 命令。
“next” 与 ISpout 的 nextTuple
等价,可以这样定义 “next”:
{"command": "next"}
可以这样定义 “ack”:
{"command": "ack", "id": "1231231"}
可以这样定义 “fail”:
{"command": "fail", "id": "1231231"}
- STDOUT: 前面的命令对你的 spout 作用产生的结果。这个结果可以是一组 emits 和 logs。
emit 大概是这样的:
{ "command": "emit", // tuple 的 id,如果是不可靠 emit 可以省略此值,该 id 可以为字符串或者数字 "id": "1231231", // tuple 将要发送到的流 id,如果发送到默认流,将该值留空 "stream": "1", // 如果是一个直接型 emit,需要定义 tuple 将要发送到的任务 id "task": 9, // 这个 tuple 中的所有值 "tuple": ["field1", 2, 3] }
如果不是直接型 emit,你会立即在 STDIN 上收到一条表示 tuple 发送到的任务的 id 的消息,这个消息是以 JSON 数组形式展现的。
“log” 会将消息记录到 worker log 中,“log” 大概是这样的:
{ "command": "log", // 待记录的消息 "msg": "hello world!" }
- STDOUT: “sync” 命令会结束 emits 与 logs 的队列,“sync” 是这样使用的:
{"command": "sync"}
在 sync 之后, ShellSpout 不会继续读取你的输出,直到它发送出新的 next,ack 或者 fail。
注意,与 ISpout 类似,worker 中的所有 spouts 都会在调用 next,ack 或者 fail 之后锁定,直到你调用 sync。同样,如果没有需要发送的 tuple,你也应该在 sync 之前 sleep 一小段时间。ShellSpout 不会自动 sleep。
Bolts
Shell Bolts 的协议是异步的。你会在有 tuple 可用时立即从 STDIN 中获取到 tuple,同时你需要像下面的示例这样调用 emit,ack,fail,log 等操作写入 STDOUT:
- STDIN: 就是一个 tuple!这是一个 JSON 编码的结构:
{ // tuple 的 id,为了兼容缺少 64 位数据类型的语言,这里使用了字符串 "id": "-6955786537413359385", // 创建该 tuple 的 id "comp": "1", // tuple 将要发往的流 id "stream": "1", // 创建该 tuple 的任务 "task": 9, // tuple 中的所有值 "tuple": ["snow white and the seven dwarfs", "field2", 3] }
- STDOUT: 一个 ack,fail,emit 或者 log。例如,emit 是这样的:
{ "command": "emit", // 标记这个输出 tuple 的 tuples 的 ids "anchors": ["1231231", "-234234234"], // tuple 将要发送到的流 id,如果发送到默认流,将该值留空 "stream": "1", // 如果是一个直接型 emit,需要定义 tuple 将要发送到的任务 id "task": 9, // 这个 tuple 中的所有值 "tuple": ["field1", 2, 3] }
如果不是直接型 emit,你会立即在 STDIN 上收到一条表示 tuple 发送到的任务的 id 的消息,这个消息是以 JSON 数组形式展现的。注意,由于 shell bolt 协议的异步特性,如果你在 emit 之后立即接收数据,有可能不会收到对应的任务 id,而是收到上一个 emit 的任务 id,或者是一个待处理的新 tuple。然而,最终接收到的任务 id 序列仍然是和 emit 的顺序完全一致的。
ack 是这样的:
{ "command": "ack", // 待 ack 的 tuple "id": "123123" }
fail 是这样的:
{ "command": "fail", // 待 fail 的 tuple "id": "123123" }
“log” 会将消息记录到 worker log 中,“log” 是这样的:
{ "command": "log", // 待记录的消息 "msg": "hello world!" }
- 注意:对于 0.7.1 版本,shell bolt 不再需要进行“同步”。
处理心跳(0.9.3 及以上版本适用)
Storm 0.9.3 通过在 ShellSpout/ShellBolt 与他们的多语言子进程之间使用心跳来检测子进程是否处于挂起或僵死状态。所有通过多语言接口与 Storm 交互的库都必须使用以下步骤来……
Spout
Shell Spouts 是同步的,所有子进程会在 next()
的结尾发送 sync
命令。因此,你不需要为 spouts 做过多的处理。也就是说,在 next()
过程中不能够让子进程的 sleep 时间超过 worker 的延时时间。
Bolt
Shell Bolts 是异步的,所以 ShellBolt 会定期向它的子进程发送心跳 tuple。心跳 tuple 是这样的:
{ "id": "-6955786537413359385", "comp": "1", "stream": "__heartbeat", // 这个 shell bolt 的系统任务 id "task": -1, "tuple": [] }
在子进程收到心跳 tuple 之后,它必须向 ShellBolt 发送一个 sync
命令。
低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
- 上一篇
Apache Storm 官方文档 —— 源码组织结构
Strom 的代码有三个层次: 第一,Storm 在一开始就是按照兼容多语言的目的来设计的。Nimbus 是一个 Thrift 服务,拓扑也被定义为 Thrift 架构。Thrift 的使用使得 Storm 可以用于任何一种语言。 第二,所有的 Storm 接口都设计为 Java 接口。所以,尽管 Storm 核心代码中有大量的 Clojure 实现,所有的访问都必须经过 Java API。这就意味着 Storm 的每个特性都可以通过 Java 来实现。 第三,Storm 的实现中大量使用了 Clojure。可以说,Storm 的代码结构大概是一半的 Java 代码加上一半的 Clojure 代码。但是由于 Clojure 更具有表现力,所以实际上 Storm 的核心逻辑大多是采用 Clojure 来实现的。 下面详细说明了每个层次的细节信息。 storm.thrift 要理解 Storm 的代码架构,首先需要了解storm.thrift文件。 Storm 使用这个fork版本的 Thrift(“storm” 分支)来生成代码。这个 “fork” 版本实际上就是 Thrift7,其中所...
- 下一篇
Apache Storm 官方文档 —— 定义 Storm 的非 JVM 语言 DSL
实现非 JVM 语言 DSL(Domain Specific Language,领域专用语言)应该从storm-core/src/storm.thrift文件开始。由于 Storm 拓扑是 Thrift 结构,而且 Nimbus 是一个 Thrift 后台进程,你可以以任意语言创建并提交拓扑。 当你创建 Thrift 结构的 spouts 与 bolts 时,spout 或者 bolt 的代码是以 ComponentObject 结构体的形式定义的: union ComponentObject { 1: binary serialized_java; 2: ShellComponent shell; 3: JavaObject java_object; } 对于非 JVM 语言 DSL(这里以 Python DSL 为例),你需要使用其中的 “2” 与 “3”。ShellComponent 负责指定运行该组件(例如你的 python 代码)的脚本,而 JavaObject 则负责指定该组件的本地(native)Java spouts 与 bolts(而且 Storm 也会使用反射来创建...
相关文章
文章评论
共有0条评论来说两句吧...
文章二维码
点击排行
推荐阅读
最新文章
- CentOS关闭SELinux安全模块
- SpringBoot2编写第一个Controller,响应你的http请求并返回结果
- Mario游戏-低调大师作品
- CentOS6,CentOS7官方镜像安装Oracle11G
- SpringBoot2配置默认Tomcat设置,开启更多高级功能
- Windows10,CentOS7,CentOS8安装Nodejs环境
- Docker安装Oracle12C,快速搭建Oracle学习环境
- Windows10,CentOS7,CentOS8安装MongoDB4.0.16
- CentOS8编译安装MySQL8.0.19
- MySQL8.0.19开启GTID主从同步CentOS8