Apache Storm 官方文档 —— Storm 与 Kestrel
本文说明了如何使用 Storm 从 Kestrel 集群中消费数据。
前言
Storm
本教程中使用了 storm-kestrel 项目和 storm-starter 项目中的例子。建议读者将这几个项目 clone 到本地,并动手运行其中的例子。
Kestrel
本文假定读者可以如此项目所述在本地运行一个 Kestrel 集群。
Kestrel 服务器与队列
Kestrel 服务中包含有一组消息队列。Kestrel 队列是一种非常简单的消息队列,可以运行于 JVM 上,并使用 memcache 协议(以及一些扩展)与客户端交互。详情可以参考 storm-kestrel 项目中的 KestrelThriftClient 类的实现。
每个队列均严格遵循先入先出的规则。为了提高服务性能,数据都是缓存在系统内存中的;不过,只有开头的 128MB 是保存在内存中的。在服务停止的时候,队列的状态会保存到一个日志文件中。
请参阅此文了解更多详细信息。
Kestrel 具有 * 快速 * 小巧 * 持久 * 可靠 等特点。
例如,Twitter 就使用 Kestrel 作为消息系统的核心环节,此文中介绍了相关信息。
** 向 Kestrel 中添加数据
首先,我们需要一个可以向 Kestrel 的队列添加数据的程序。下述方法使用了 storm-kestrel 项目中的 KestrelClient
的实现。该方法从一个包含 5 个句子的数组中随机选择一个句子添加到 Kestrel 的队列中。
private static void queueSentenceItems(KestrelClient kestrelClient, String queueName) throws ParseError, IOException { String[] sentences = new String[] { "the cow jumped over the moon", "an apple a day keeps the doctor away", "four score and seven years ago", "snow white and the seven dwarfs", "i am at two with nature"}; Random _rand = new Random(); for(int i=1; i<=10; i++){ String sentence = sentences[_rand.nextInt(sentences.length)]; String val = "ID " + i + " " + sentence; boolean queueSucess = kestrelClient.queue(queueName, val); System.out.println("queueSucess=" +queueSucess+ " [" + val +"]"); } }
从 Kestrel 中移除数据
此方法从一个队列中取出一个数据,但并不把该数据从队列中删除:
private static void dequeueItems(KestrelClient kestrelClient, String queueName) throws IOException, ParseError { for(int i=1; i<=12; i++){ Item item = kestrelClient.dequeue(queueName); if(item==null){ System.out.println("The queue (" + queueName + ") contains no items."); } else { byte[] data = item._data; String receivedVal = new String(data); System.out.println("receivedItem=" + receivedVal); } }
此方法会从队列中取出并移除数据:
private static void dequeueAndRemoveItems(KestrelClient kestrelClient, String queueName) throws IOException, ParseError { for(int i=1; i<=12; i++){ Item item = kestrelClient.dequeue(queueName); if(item==null){ System.out.println("The queue (" + queueName + ") contains no items."); } else { int itemID = item._id; byte[] data = item._data; String receivedVal = new String(data); kestrelClient.ack(queueName, itemID); System.out.println("receivedItem=" + receivedVal); } } }
向 Kestrel 中连续添加数据
下面的程序可以向本地 Kestrel 服务的一个 sentence_queue 队列中连续添加句子,这也是我们的最后一个程序。
可以在命令行窗口中输入一个右中括号 ]
并回车来停止程序。
import java.io.IOException; import java.io.InputStream; import java.util.Random; import backtype.storm.spout.KestrelClient; import backtype.storm.spout.KestrelClient.Item; import backtype.storm.spout.KestrelClient.ParseError; public class AddSentenceItemsToKestrel { /** * @param args */ public static void main(String[] args) { InputStream is = System.in; char closing_bracket = ']'; int val = closing_bracket; boolean aux = true; try { KestrelClient kestrelClient = null; String queueName = "sentence_queue"; while(aux){ kestrelClient = new KestrelClient("localhost",22133); queueSentenceItems(kestrelClient, queueName); kestrelClient.close(); Thread.sleep(1000); if(is.available()>0){ if(val==is.read()) aux=false; } } } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (ParseError e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } System.out.println("end"); } }
使用 KestrelSpout
下面的拓扑使用 KestrelSpout
从一个 Kestrel 队列中读取句子,并将句子分割成若干个单词(Bolt:SplitSentence),然后输出每个单词出现的次数(Bolt:WordCount)。数据处理的细节可以参考消息的可靠性保证一文。
TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("sentences", new KestrelSpout("localhost",22133,"sentence_queue",new StringScheme())); builder.setBolt("split", new SplitSentence(), 10) .shuffleGrouping("sentences"); builder.setBolt("count", new WordCount(), 20) .fieldsGrouping("split", new Fields("word"));
运行
首先,以生产模式或者开发者模式启动你的本地 Kestrel 服务。
然后,等待大约 5 秒钟以防出现网络连接异常。
现在可以运行向队列中添加数据的程序,并启动 Storm 拓扑。程序启动的顺序并不重要。
如果你以 TOPOLOGY_DEBUG 模式运行拓扑你会观察到拓扑中 tuple 发送的细节信息。
低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
- 上一篇
Apache Storm 官方文档 —— 分布式 RPC
分布式 RPC(DRPC)的设计目标是充分利用 Storm 的计算能力实现高密度的并行实时计算。Storm 接收若干个函数参数作为输入流,然后通过 DRPC 输出这些函数调用的结果。严格来说,DRPC 并不能算作是 Storm 的一个特性,因为它只是一种基于 Storm 原语 (Stream、Spout、Bolt、Topology) 实现的计算模式。虽然可以将 DRPC 从 Storm 中打包出来作为一个独立的库,但是与 Storm 集成在一起显然更有用。 概述 DRPC 是通过一个 DRPC 服务端(DRPC server)来实现分布式 RPC 功能的。DRPC server 负责接收 RPC 请求,并将该请求发送到 Storm 中运行的 Topology,等待接收 Topology 发送的处理结果,并将该结果返回给发送请求的客户端。因此,从客户端的角度来说,DPRC 与普通的 RPC 调用并没有什么区别。例如,以下是一个使用参数 “http://twitter.com”调用 “reach” 函数计算结果的例子: DRPCClient client = new DRPCClient(...
- 下一篇
Apache Storm 官方文档 —— 常用模式
本文列出了 Storm 拓扑中使用的一些常见模式,包括: 数据流的 join 批处理 BasicBolt 内存缓存与域分组的结合 Top N 流式计算 TimeCacheMap CoordinatedBolt 与 KeyedFairBolt Joins 数据流的 join 一般指的是通过共有的域来聚合两个或多个数据流的过程。与一般的数据库中 join 操作要求有限的输入与清晰的语义不同,数据流 join 的输入往往是无限的数据集,而且并不具备明确的语义。 join 的类型一般是由应用的需求决定的。有些应用需要将两个流在某个固定时间内的所有 tuple 进行 join,另外一些应用却可能要求对每个 join 域的 join 操作过程的两侧只保留一个 tuple,而其他的应用也许还有一些其他需求。不过这些 join 类型一般都会有一个基本的模式,那就是将多个输入流进行分区。Storm 可以很容易地使用域分组的方法将多个输入流聚集到一个联结 bolt 中,比如下面这样: builder.setBolt("join", new MyJoiner(), parallelism) .fieldsG...
相关文章
文章评论
共有0条评论来说两句吧...
文章二维码
点击排行
推荐阅读
最新文章
- Jdk安装(Linux,MacOS,Windows),包含三大操作系统的最全安装
- CentOS7编译安装Cmake3.16.3,解决mysql等软件编译问题
- CentOS7,CentOS8安装Elasticsearch6.8.6
- Linux系统CentOS6、CentOS7手动修改IP地址
- SpringBoot2全家桶,快速入门学习开发网站教程
- SpringBoot2整合MyBatis,连接MySql数据库做增删改查操作
- Docker使用Oracle官方镜像安装(12C,18C,19C)
- CentOS关闭SELinux安全模块
- CentOS8安装Docker,最新的服务器搭配容器使用
- 设置Eclipse缩进为4个空格,增强代码规范