笔记列表:
- apache storm简介
- apache storm核心概念
- apache storm群集体系结构
- apache storm工作流
- apachestorm分布式消息系统
- apache storm安装
- apache storm工作示例
- 阿帕奇风暴三叉戟
- twitter上的阿帕奇风暴
- 雅虎金融的阿帕奇风暴
- apache storm应用程序
- apache storm快速指南
- apache storm有用资源
- 阿帕奇风暴讨论
- apache tajo简介
- apache tajo体系结构
- apache tajo安装
- apache tajo配置设置
- apache tajo shell命令
- apache tajo数据类型
- apache tajo运算符
- apache tajo sql函数
- apache tajo数学函数
- apache tajo字符串函数
- apache tajo datetime函数
- apache tajo json函数
- apache tajo数据库创建
- apache tajo表管理
- apache tajo sql语句
- apache tajo聚合和窗口函数
- apache tajo sql查询
- apache tajo存储插件
- apache tajo与hbase的集成
- apache tajo与hive的集成
- apache-tajo-openstack-swift集成
- ApacheTajoJDBC接口
- apache tajo自定义函数
- apache tajo快速指南
- apache tajo有用的资源
- apache tajo讨论
Apache Storm-工作示例
我们已经了解了Apache Storm的核心技术细节,现在是时候编写一些简单的场景了。
方案–移动呼叫日志分析器
移动呼叫及其持续时间将作为Apache Storm的输入,Storm将处理和分组同一呼叫者和接收者之间的呼叫及其总呼叫数。
壶嘴创作
Spout是用于数据生成的组件。基本上,喷口将实现IRichSpout接口。“ IRichSpout”界面具有以下重要方法-
-
开放式-为壶嘴提供执行环境。执行者将运行此方法来初始化喷口。
-
nextTuple-通过收集器发出生成的数据。
-
关闭-当口会关机时调用此方法。
-
clarifyOutputFields-声明元组的输出模式。
-
ack-确认已处理特定的元组
-
fail-指定特定元组不被处理并且不被重新处理。
打开
打开方法的签名如下-
open(Map conf, TopologyContext context, SpoutOutputCollector collector)
-
conf-为该喷嘴提供风暴配置。
-
context-提供有关拓扑中喷口位置,其任务ID,输入和输出信息的完整信息。
-
collector-使我们能够发出将由螺栓处理的元组。
nextTuple
nextTuple方法的签名如下-
nextTuple()
从与ack()和fail()方法相同的循环中定期调用nextTuple()。当没有工作要做时,它必须释放对线程的控制,以便其他方法有机会被调用。因此,nextTuple的第一行将检查处理是否完成。如果是这样,它应至少休眠一毫秒以减少返回之前的处理器负载。
关闭
close方法的签名如下-
close()
声明输出字段
clarifyOutputFields方法的签名如下-
declareOutputFields(OutputFieldsDeclarer declarer)
声明器-用于声明输出流ID,输出字段等。
此方法用于指定元组的输出模式。
确认
ack方法的签名如下-
ack(Object msgId)
此方法确认已处理特定的元组。
失败
nextTuple方法的签名如下-
ack(Object msgId)
此方法通知特定的元组尚未完全处理。Storm将重新处理特定的元组。
FakeCallLogReaderSpout
在我们的方案中,我们需要收集呼叫日志的详细信息。呼叫日志的信息包含在内。
- 来电者号码
- 接收方编号
- 期间
由于我们没有通话记录的实时信息,因此我们将生成伪造的通话记录。伪造的信息将使用Random类创建。完整的程序代码如下。
编码-FakeCallLogReaderSpout.java
import java.util.*; //import storm tuple packages import backtype.storm.tuple.Fields; import backtype.storm.tuple.Values; //import Spout interface packages import backtype.storm.topology.IRichSpout; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.spout.SpoutOutputCollector; import backtype.storm.task.TopologyContext; //Create a class FakeLogReaderSpout which implement IRichSpout interface to access functionalities public class FakeCallLogReaderSpout implements IRichSpout { //Create instance for SpoutOutputCollector which passes tuples to bolt. private SpoutOutputCollector collector; private boolean completed = false; //Create instance for TopologyContext which contains topology data. private TopologyContext context; //Create instance for Random class. private Random randomGenerator = new Random(); private Integer idx = 0; @Override public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { this.context = context; this.collector = collector; } @Override public void nextTuple() { if(this.idx <= 1000) { List<String> mobileNumbers = new ArrayList<String>(); mobileNumbers.add("1234123401"); mobileNumbers.add("1234123402"); mobileNumbers.add("1234123403"); mobileNumbers.add("1234123404"); Integer localIdx = 0; while(localIdx++ < 100 && this.idx++ < 1000) { String fromMobileNumber = mobileNumbers.get(randomGenerator.nextInt(4)); String toMobileNumber = mobileNumbers.get(randomGenerator.nextInt(4)); while(fromMobileNumber == toMobileNumber) { toMobileNumber = mobileNumbers.get(randomGenerator.nextInt(4)); } Integer duration = randomGenerator.nextInt(60); this.collector.emit(new Values(fromMobileNumber, toMobileNumber, duration)); } } } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("from", "to", "duration")); } //Override all the interface methods @Override public void close() {} public boolean isDistributed() { return false; } @Override public void activate() {} @Override public void deactivate() {} @Override public void ack(Object msgId) {} @Override public void fail(Object msgId) {} @Override public Map<String, Object> getComponentConfiguration() { return null; } }
螺栓创建
Bolt是一个将元组作为输入,处理该元组并生成新的元组作为输出的组件。螺栓将实现IRichBolt接口。在此程序中,两个螺栓类CallLogCreatorBolt和CallLogCounterBolt用于执行操作。
IRichBolt接口具有以下方法-
-
准备-为螺栓提供执行环境。执行者将运行此方法来初始化喷口。
-
执行-处理单个输入元组。
-
cleanup-螺栓即将关闭时调用。
-
clarifyOutputFields-声明元组的输出模式。
准备
准备方法的签名如下-
prepare(Map conf, TopologyContext context, OutputCollector collector)
-
conf-为该螺栓提供Storm配置。
-
上下文-提供有关拓扑中螺栓位置,其任务ID,输入和输出信息等的完整信息。
-
collector-使我们能够发出已处理的元组。
执行
execute方法的签名如下-
execute(Tuple tuple)
这里的元组是要处理的输入元组。
所述执行方法处理在一个时间的单个元组。元组数据可以通过Tuple类的getValue方法访问。不必立即处理输入元组。可以处理多个元组并将其作为单个输出元组输出。可以使用OutputCollector类来发出已处理的元组。
清理
清除方法的签名如下-
cleanup()
声明输出字段
clarifyOutputFields方法的签名如下-
declareOutputFields(OutputFieldsDeclarer declarer)
在这里,参数声明器用于声明输出流ID,输出字段等。
此方法用于指定元组的输出模式
通话记录创建者螺栓
呼叫日志创建者螺栓接收呼叫日志元组。呼叫日志元组具有呼叫者号码,接收者号码和呼叫持续时间。该螺栓简单地通过组合呼叫者号码和接收者号码来创建新值。新值的格式为“主叫方号码-接收方号码”,并将其命名为新字段“ call”。完整的代码如下。
编码-CallLogCreatorBolt.java
//import util packages import java.util.HashMap; import java.util.Map; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Values; import backtype.storm.task.OutputCollector; import backtype.storm.task.TopologyContext; //import Storm IRichBolt package import backtype.storm.topology.IRichBolt; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.tuple.Tuple; //Create a class CallLogCreatorBolt which implement IRichBolt interface public class CallLogCreatorBolt implements IRichBolt { //Create instance for OutputCollector which collects and emits tuples to produce output private OutputCollector collector; @Override public void prepare(Map conf, TopologyContext context, OutputCollector collector) { this.collector = collector; } @Override public void execute(Tuple tuple) { String from = tuple.getString(0); String to = tuple.getString(1); Integer duration = tuple.getInteger(2); collector.emit(new Values(from + " - " + to, duration)); } @Override public void cleanup() {} @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("call", "duration")); } @Override public Map<String, Object> getComponentConfiguration() { return null; } }
通话记录计数器螺栓
呼叫日志计数器螺栓以元组形式接收呼叫及其持续时间。此螺栓在prepare方法中初始化字典(地图)对象。在execute方法中,它检查元组,并为元组中的每个新“调用”值在字典对象中创建一个新条目,并在字典对象中将值设置为1。对于字典中已经可用的条目,它只是增加其值。简单来说,此螺栓将调用及其计数保存在字典对象中。除了将调用及其计数保存在字典中之外,我们还可以将其保存到数据源中。完整的程序代码如下-
编码-CallLogCounterBolt.java
import java.util.HashMap; import java.util.Map; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Values; import backtype.storm.task.OutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.IRichBolt; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.tuple.Tuple; public class CallLogCounterBolt implements IRichBolt { Map<String, Integer> counterMap; private OutputCollector collector; @Override public void prepare(Map conf, TopologyContext context, OutputCollector collector) { this.counterMap = new HashMap<String, Integer>(); this.collector = collector; } @Override public void execute(Tuple tuple) { String call = tuple.getString(0); Integer duration = tuple.getInteger(1); if(!counterMap.containsKey(call)){ counterMap.put(call, 1); }else{ Integer c = counterMap.get(call) + 1; counterMap.put(call, c); } collector.ack(tuple); } @Override public void cleanup() { for(Map.Entry<String, Integer> entry:counterMap.entrySet()){ System.out.println(entry.getKey()+" : " + entry.getValue()); } } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("call")); } @Override public Map<String, Object> getComponentConfiguration() { return null; } }
创建拓扑
Storm拓扑基本上是Thrift结构。TopologyBuilder类提供了创建复杂拓扑的简单方法。TopologyBuilder类具有设置喷口(setSpout)和设置螺栓(setBolt)的方法。最后,TopologyBuilder具有createTopology来创建拓扑。使用以下代码片段创建拓扑-
TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("call-log-reader-spout", new FakeCallLogReaderSpout()); builder.setBolt("call-log-creator-bolt", new CallLogCreatorBolt()) .shuffleGrouping("call-log-reader-spout"); builder.setBolt("call-log-counter-bolt", new CallLogCounterBolt()) .fieldsGrouping("call-log-creator-bolt", new Fields("call"));
shuffleGrouping和fieldsGrouping方法有助于为喷口和螺栓设置流分组。
本地集群
出于开发目的,我们可以使用“ LocalCluster”对象创建本地集群,然后使用“ LocalCluster”类的“ submitTopology”方法提交拓扑。“ submitTopology”的参数之一是“ Config”类的实例。“ Config”类用于在提交拓扑之前设置配置选项。此配置选项将在运行时与集群配置合并,并通过prepare方法发送到所有任务(喷嘴和螺栓)。将拓扑提交到集群后,我们将等待10秒钟,等待集群计算提交的拓扑,然后使用“ LocalCluster”的“ shutdown”方法关闭集群。完整的程序代码如下-
编码-LogAnalyserStorm.java
import backtype.storm.tuple.Fields; import backtype.storm.tuple.Values; //import storm configuration packages import backtype.storm.Config; import backtype.storm.LocalCluster; import backtype.storm.topology.TopologyBuilder; //Create main class LogAnalyserStorm submit topology. public class LogAnalyserStorm { public static void main(String[] args) throws Exception{ //Create Config instance for cluster configuration Config config = new Config(); config.setDebug(true); // TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("call-log-reader-spout", new FakeCallLogReaderSpout()); builder.setBolt("call-log-creator-bolt", new CallLogCreatorBolt()) .shuffleGrouping("call-log-reader-spout"); builder.setBolt("call-log-counter-bolt", new CallLogCounterBolt()) .fieldsGrouping("call-log-creator-bolt", new Fields("call")); LocalCluster cluster = new LocalCluster(); cluster.submitTopology("LogAnalyserStorm", config, builder.createTopology()); Thread.sleep(10000); //Stop the topology cluster.shutdown(); } }
生成和运行应用程序
完整的应用程序具有四个Java代码。他们是-
- FakeCallLogReaderSpout.java
- CallLogCreaterBolt.java
- CallLogCounterBolt.java
- LogAnalyerStorm.java
可以使用以下命令构建应用程序-
javac -cp “/path/to/storm/apache-storm-0.9.5/lib/*” *.java
可以使用以下命令运行该应用程序-
java -cp “/path/to/storm/apache-storm-0.9.5/lib/*”:. LogAnalyserStorm
输出
一旦启动该应用程序,它将输出有关集群启动过程,喷口和螺栓处理以及最后集群关闭过程的完整详细信息。在“ CallLogCounterBolt”中,我们已打印呼叫及其计数详细信息。此信息将在控制台上显示,如下所示:
1234123402 - 1234123401 : 78 1234123402 - 1234123404 : 88 1234123402 - 1234123403 : 105 1234123401 - 1234123404 : 74 1234123401 - 1234123403 : 81 1234123401 - 1234123402 : 81 1234123403 - 1234123404 : 86 1234123404 - 1234123401 : 63 1234123404 - 1234123402 : 82 1234123403 - 1234123402 : 83 1234123404 - 1234123403 : 86 1234123403 - 1234123401 : 93
非JVM语言
Storm拓扑由Thrift接口实现,可轻松以任何语言提交拓扑。Storm支持Ruby,Python和许多其他语言。让我们看一下python绑定。
Python绑定
Python是一种通用的解释型,交互式,面向对象的高级编程语言。Storm支持Python来实现其拓扑。Python支持发射,锚定,确认和记录操作。
如您所知,可以用任何语言定义螺栓。用另一种语言编写的螺栓被作为子流程执行,Storm通过stdin / stdout上的JSON消息与这些子流程进行通信。首先获取一个支持python绑定的示例螺栓WordCount。
public static class WordCount implements IRichBolt { public WordSplit() { super("python", "splitword.py"); } public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("word")); } }
在这里,WordCount类实现了IRichBolt接口,并使用指定的超级方法参数“ splitword.py”的python实现运行。现在创建一个名为“ splitword.py”的python实现。
import storm class WordCountBolt(storm.BasicBolt): def process(self, tup): words = tup.values[0].split(" ") for word in words: storm.emit([word]) WordCountBolt().run()
这是Python的示例实现,它对给定句子中的单词进行计数。同样,您也可以绑定其他支持语言。
其他教程链接:
- apache storm简介
- apache storm核心概念
- apache storm群集体系结构
- apache storm工作流
- apachestorm分布式消息系统
- apache storm安装
- apache storm工作示例
- 阿帕奇风暴三叉戟
- twitter上的阿帕奇风暴
- 雅虎金融的阿帕奇风暴
- apache storm应用程序
- apache storm快速指南
- apache storm有用资源
- 阿帕奇风暴讨论
- apache tajo简介
- apache tajo体系结构
- apache tajo安装
- apache tajo配置设置
- apache tajo shell命令
- apache tajo数据类型
- apache tajo运算符
- apache tajo sql函数
- apache tajo数学函数
- apache tajo字符串函数
- apache tajo datetime函数
- apache tajo json函数
- apache tajo数据库创建
- apache tajo表管理
- apache tajo sql语句
- apache tajo聚合和窗口函数
- apache tajo sql查询
- apache tajo存储插件
- apache tajo与hbase的集成
- apache tajo与hive的集成
- apache-tajo-openstack-swift集成
- ApacheTajoJDBC接口
- apache tajo自定义函数
- apache tajo快速指南
- apache tajo有用的资源
- apache tajo讨论