资源介绍
写第一个Storm应用--数单词数量(一个spout读取文本,第一个bolt用来标准化单词,第二个bolt为单词计数)
一、Storm运行模式:
1.本地模式(Local Mode): 即Topology(相当于一个任务,后续会详细讲解) 运行在本地机器的单一JVM上,这个模式主要用来开发、调试。
2.远程模式(Remote Mode):在这个模式,我们把我们的Topology提交到集群,在这个模式中,Storm的所有组件都是线程安全的,
因为它们都会运行在不同的Jvm或物理机器上,这个模式就是正式的生产模式。
二、写一个HelloWord Storm
我们现在创建这么一个应用,统计文本文件中的单词个数,详细学习过Hadoop的朋友都应该写过。
那么我们需要具体创建这样一个Topology,用一个spout负责读取文本文件,用第一个bolt来解析成单词,用第二个bolt来对解析出的单词计数,
整体结构流程所示:Word Storage (words.txt) --> Spout(WordReader.java) --> Bolt(WordNormalizer.java) --> Bolt(WordCounter.java)
可以从这里下载源码:https://github.com/storm-book/examples-ch02-getting_started/zipball/master
三、写一个可运行的Demo很简单,我们只需要三步:
1.创建一个Spout读取数据(数据源)
Spout作为数据源,它实现了IRichSpout接口,功能是读取一个文本文件并把它的每一行内容发送给bolt。
2.创建bolt处理数据
创建两个bolt来处理Spout发射出的数据
Spout已经成功读取文件并把每一行作为一个tuple(在Storm数据以tuple的形式传递)发射过来,我们这里需要创建两个bolt分别来负责解析每一行和对单词计数。
Bolt中最重要的是execute方法,每当一个tuple传过来时它便会被调用。
3.创建一个Topology提交到集群
4.运行结果分析
如果你的words.txt文件有如下内容: Storm test are great is an Storm simple application but very powerful really Storm is great
你应该会在日志中看到类似下面的内容: is: 2 application: 1 but: 1 great: 1 test: 1 simple: 1 Storm: 3 really: 1 are: 1 great: 1 an: 1 powerful: 1 very: 1
在这个例子中,每类节点只有一个实例。但是如果你有一个非常大的日志文件呢?你能够很轻松的改变系统中的节点数量实现并行工作。这个时候,你就要创建两个WordCounter实例。
builder.setBolt("word-counter", new WordCounter(),2).shuffleGrouping("word-normalizer");
程序返回时,你将看到:
— 单词数 【word-counter-2】 — application: 1 is: 1 great: 1 are: 1 powerful: 1 Storm: 3
— 单词数 [word-counter-3] — really: 1 is: 1 but: 1 great: 1 test: 1 simple: 1 an: 1 very: 1
棒极了!修改并行度实在是太容易了(当然对于实际情况来说,每个实例都会运行在单独的机器上)。
不过似乎有一个问题:单词is和great分别在每个WordCounter各计数一次。怎么会这样?
当你调用shuffleGrouping时,就决定了Storm会以随机分配的方式向你的bolt实例发送消息。
在这个例子中,理想的做法是相同的单词问题发送给同一个WordCounter实例。
你把shuffleGrouping(“word-normalizer”)换成fieldsGrouping(“word-normalizer”, new Fields(“word”))就能达到目的。
试一试,重新运行程序,确认结果。 你将在后续章节学习更多分组方式和消息流类型。
参考文章
http://blog.****.net/suifeng3051/article/details/38369689
http://ifeve.com/getting-started-with-storm-2/
- 上一篇: Storm源码走读笔记
- 下一篇: 基于storm实现的日志监控系统源码.zip