笔记列表:
- apache storm简介
- apache storm核心概念
- apache storm群集体系结构
- apache storm工作流
- apachestorm分布式消息系统
- apache storm安装
- apache storm工作示例
- 阿帕奇风暴三叉戟
- twitter上的阿帕奇风暴
- 雅虎金融的阿帕奇风暴
- apache storm应用程序
- apache storm快速指南
- apache storm有用资源
- 阿帕奇风暴讨论
- apache tajo简介
- apache tajo体系结构
- apache tajo安装
- apache tajo配置设置
- apache tajo shell命令
- apache tajo数据类型
- apache tajo运算符
- apache tajo sql函数
- apache tajo数学函数
- apache tajo字符串函数
- apache tajo datetime函数
- apache tajo json函数
- apache tajo数据库创建
- apache tajo表管理
- apache tajo sql语句
- apache tajo聚合和窗口函数
- apache tajo sql查询
- apache tajo存储插件
- apache tajo与hbase的集成
- apache tajo与hive的集成
- apache-tajo-openstack-swift集成
- ApacheTajoJDBC接口
- apache tajo自定义函数
- apache tajo快速指南
- apache tajo有用的资源
- apache tajo讨论
Apache Storm-三叉戟
三叉戟是Storm的扩展。像暴风雨一样,Trident也由Twitter开发。开发Trident的主要原因是在Storm之上提供高级抽象以及状态流处理和低延迟分布式查询。
Trident使用喷口和螺栓,但是这些低级组件在执行之前由Trident自动生成。Trident具有功能,过滤器,联接,分组和聚合。
Trident将流作为一系列批处理(称为事务)进行处理。通常,这些小批量的大小将在数千或数百万个元组的数量级上,具体取决于输入流。这样,Trident与Storm不同,后者执行逐个元组处理。
批处理概念与数据库事务非常相似。每个交易都分配有交易ID。一旦完成所有处理,该事务即被视为成功。但是,处理事务元组之一失败将导致重新传输整个事务。对于每个批次,Trident将在事务开始时调用beginCommit,并在事务结束时提交。
三叉戟拓扑
Trident API提供了一个简单的选项,可使用“ TridentTopology”类创建Trident拓扑。基本上,Trident拓扑从spout接收输入流,并对流执行有序的操作顺序(过滤,聚合,分组等)。“三叉戟元组”替换了“风暴元组”,而“操作”替换了“螺栓”。可以创建一个简单的Trident拓扑,如下所示-
TridentTopology topology = new TridentTopology();
三叉戟元组
三叉戟元组是值的命名列表。TridentTuple接口是Trident拓扑的数据模型。TridentTuple接口是Trident拓扑可以处理的基本数据单元。
三叉戟喷口
Trident喷口与Storm喷口相似,具有使用Trident功能的其他选项。实际上,我们仍然可以使用在Storm拓扑中使用的IRichSpout,但是它本质上是非事务性的,我们将无法使用Trident提供的优势。
具有使用Trident功能的所有功能的基本喷嘴是“ ITridentSpout”。它支持事务性和不透明的事务性语义。其他喷嘴是IBatchSpout,IPartitionedTridentSpout和IOpaquePartitionedTridentSpout。
除了这些通用的喷嘴外,Trident还提供了许多Trident喷嘴的示例实现。其中之一是FeederBatchSpout喷口,我们可以使用它轻松地发送三叉戟元组的命名列表,而不必担心批处理,并行性等。
FeederBatchSpout的创建和数据馈送可以如下所示进行:
TridentTopology topology = new TridentTopology(); FeederBatchSpout testSpout = new FeederBatchSpout( ImmutableList.of("fromMobileNumber", "toMobileNumber", “duration”)); topology.newStream("fixed-batch-spout", testSpout) testSpout.feed(ImmutableList.of(new Values("1234123401", "1234123402", 20)));
三叉戟行动
Trident依靠“ Trident操作”来处理三叉戟元组的输入流。Trident API具有许多内置操作来处理从简单到复杂的流处理。这些操作的范围从简单的验证到三叉戟元组的复杂分组和聚合。让我们经历最重要和最常用的操作。
筛选
筛选器是用于执行输入验证任务的对象。Trident过滤器获取三叉戟元组字段的子集作为输入,并根据是否满足某些条件来返回true或false。如果返回true,则将元组保留在输出流中;否则,将保留元组。否则,将元组从流中删除。Filter基本上将从BaseFilter类继承,并实现isKeep方法。这是过滤器操作的示例实现-
public class MyFilter extends BaseFilter { public boolean isKeep(TridentTuple tuple) { return tuple.getInteger(1) % 2 == 0; } } input [1, 2] [1, 3] [1, 4] output [1, 2] [1, 4]
可以使用“每种”方法在拓扑中调用过滤器功能。“字段”类可用于指定输入(三叉戟元组的子集)。示例代码如下-
TridentTopology topology = new TridentTopology(); topology.newStream("spout", spout) .each(new Fields("a", "b"), new MyFilter())
功能
函数是用于对单个三叉戟元组执行简单操作的对象。它接受三叉戟元组字段的子集,并发出零个或多个新的三叉戟元组字段。
函数基本上继承自BaseFunction类,并实现execute方法。下面给出了一个示例实现-
public class MyFunction extends BaseFunction { public void execute(TridentTuple tuple, TridentCollector collector) { int a = tuple.getInteger(0); int b = tuple.getInteger(1); collector.emit(new Values(a + b)); } } input [1, 2] [1, 3] [1, 4] output [1, 2, 3] [1, 3, 4] [1, 4, 5]
就像过滤器操作一样,可以使用每种方法在拓扑中调用函数操作。示例代码如下-
TridentTopology topology = new TridentTopology(); topology.newStream("spout", spout) .each(new Fields(“a, b"), new MyFunction(), new Fields(“d")));
聚合
聚合是用于对输入批处理或分区或流执行聚合操作的对象。三叉戟有三种类型的聚合。它们如下-
-
聚合-孤立地聚合每批三叉戟元组。在聚合过程中,最初使用全局分组对元组进行重新分区,以将同一批的所有分区合并为一个分区。
-
partitionAggregate-聚集每个分区,而不是整批三叉戟元组。分区聚合的输出完全替换了输入元组。分区聚合的输出包含单个字段元组。
-
persistentaggregate-在所有批处理中的所有三叉戟元组上进行聚合,并将结果存储在内存或数据库中。
TridentTopology topology = new TridentTopology(); // aggregate operation topology.newStream("spout", spout) .each(new Fields(“a, b"), new MyFunction(), new Fields(“d”)) .aggregate(new Count(), new Fields(“count”)) // partitionAggregate operation topology.newStream("spout", spout) .each(new Fields(“a, b"), new MyFunction(), new Fields(“d”)) .partitionAggregate(new Count(), new Fields(“count")) // persistentAggregate - saving the count to memory topology.newStream("spout", spout) .each(new Fields(“a, b"), new MyFunction(), new Fields(“d”)) .persistentAggregate(new MemoryMapState.Factory(), new Count(), new Fields("count"));
可以使用CombinerAggregator,ReducerAggregator或通用Aggregator接口创建聚合操作。上例中使用的“计数”聚合器是内置聚合器之一,它是使用“ CombinerAggregator”实现的,实现如下-
public class Count implements CombinerAggregator<Long> { @Override public Long init(TridentTuple tuple) { return 1L; } @Override public Long combine(Long val1, Long val2) { return val1 + val2; } @Override public Long zero() { return 0L; } }
分组
分组操作是一种内置操作,可以通过groupBy方法调用。groupBy方法通过对指定字段执行partitionBy来对流进行重新分区,然后在每个分区内,将组字段相等的元组分组在一起。通常,我们将“ groupBy”与“ persistentAggregate”一起使用以获取分组的聚合。示例代码如下-
TridentTopology topology = new TridentTopology(); // persistentAggregate - saving the count to memory topology.newStream("spout", spout) .each(new Fields(“a, b"), new MyFunction(), new Fields(“d”)) .groupBy(new Fields(“d”) .persistentAggregate(new MemoryMapState.Factory(), new Count(), new Fields("count"));
合并与加入
合并和合并可以分别使用“合并”和“合并”方法来完成。合并合并一个或多个流。联接与合并类似,不同之处在于联接从两侧使用三叉戟元组字段来检查和联接两个流。此外,加入只能在批处理级别下进行。示例代码如下-
TridentTopology topology = new TridentTopology(); topology.merge(stream1, stream2, stream3); topology.join(stream1, new Fields("key"), stream2, new Fields("x"), new Fields("key", "a", "b", "c"));
国家维护
Trident提供了一种状态维护机制。状态信息可以存储在拓扑本身中,否则,您也可以将其存储在单独的数据库中。原因是保持一种状态,如果在处理过程中任何元组失败,那么将重试失败的元组。这在更新状态时会产生问题,因为您不确定此元组的状态是否之前已更新。如果元组在更新状态之前失败,则重试该元组将使状态稳定。但是,如果元组在更新状态后失败,则重试相同的元组将再次增加数据库中的计数,并使状态不稳定。一个人需要执行以下步骤,以确保仅处理一次消息-
-
分批处理元组。
-
为每个批次分配一个唯一的ID。如果重试该批次,则会为其赋予相同的唯一ID。
-
状态更新按批次排序。例如,在完成第一批的状态更新之前,将无法进行第二批的状态更新。
分布式RPC
分布式RPC用于从Trident拓扑查询和检索结果。Storm具有内置的分布式RPC服务器。分布式RPC服务器从客户端接收RPC请求,并将其传递到拓扑。拓扑处理请求,并将结果发送到分布式RPC服务器,分布式RPC服务器将其重定向到客户端。Trident的分布式RPC查询的执行方式与普通RPC查询类似,不同之处在于这些查询是并行运行的。
什么时候使用三叉戟?
与许多用例一样,如果要求只处理一次查询,则可以通过在Trident中编写拓扑来实现。另一方面,在Storm的情况下,很难一次完成精确的处理。因此,对于那些只需要一次处理的用例,Trident将非常有用。Trident并非适用于所有用例,尤其是高性能用例,因为它增加了Storm的复杂性并管理状态。
三叉戟的工作实例
我们将把上一节中研究的呼叫日志分析器应用程序转换为Trident框架。与普通风暴相比,Trident应用程序将相对容易,这要归功于它的高级API。基本上,将需要Storm来执行Trident中的Function,Filter,Aggregate,GroupBy,Join和Merge操作中的任何一项。最后,我们将使用LocalDRPC类启动DRPC服务器,并使用LocalDRPC类的execute方法搜索一些关键字。
格式化通话信息
FormatCall类的目的是格式化包含“主叫方号码”和“接收方号码”的呼叫信息。完整的程序代码如下-
编码:FormatCall.java
import backtype.storm.tuple.Values; import storm.trident.operation.BaseFunction; import storm.trident.operation.TridentCollector; import storm.trident.tuple.TridentTuple; public class FormatCall extends BaseFunction { @Override public void execute(TridentTuple tuple, TridentCollector collector) { String fromMobileNumber = tuple.getString(0); String toMobileNumber = tuple.getString(1); collector.emit(new Values(fromMobileNumber + " - " + toMobileNumber)); } }
CSV分割
CSVSplit类的目的是基于“逗号(,)”分割输入字符串,并发出字符串中的每个单词。此函数用于解析分布式查询的输入参数。完整的代码如下-
编码:CSVSplit.java
import backtype.storm.tuple.Values; import storm.trident.operation.BaseFunction; import storm.trident.operation.TridentCollector; import storm.trident.tuple.TridentTuple; public class CSVSplit extends BaseFunction { @Override public void execute(TridentTuple tuple, TridentCollector collector) { for(String word: tuple.getString(0).split(",")) { if(word.length() > 0) { collector.emit(new Values(word)); } } } }
日志分析器
这是主要的应用程序。最初,应用程序将使用FeederBatchSpout初始化TridentTopology和feed调用者信息。可以使用TridentTopology类的newStream方法创建Trident拓扑流。同样,可以使用TridentTopology类的newDRCPStream方法创建Trident拓扑DRPC流。可以使用LocalDRPC类创建一个简单的DRCP服务器。LocalDRPC具有execute方法来搜索某些关键字。完整的代码如下。
编码:LogAnalyserTrident.java
import java.util.*; import backtype.storm.Config; import backtype.storm.LocalCluster; import backtype.storm.LocalDRPC; import backtype.storm.utils.DRPCClient; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Values; import storm.trident.TridentState; import storm.trident.TridentTopology; import storm.trident.tuple.TridentTuple; import storm.trident.operation.builtin.FilterNull; import storm.trident.operation.builtin.Count; import storm.trident.operation.builtin.Sum; import storm.trident.operation.builtin.MapGet; import storm.trident.operation.builtin.Debug; import storm.trident.operation.BaseFilter; import storm.trident.testing.FixedBatchSpout; import storm.trident.testing.FeederBatchSpout; import storm.trident.testing.Split; import storm.trident.testing.MemoryMapState; import com.google.common.collect.ImmutableList; public class LogAnalyserTrident { public static void main(String[] args) throws Exception { System.out.println("Log Analyser Trident"); TridentTopology topology = new TridentTopology(); FeederBatchSpout testSpout = new FeederBatchSpout(ImmutableList.of("fromMobileNumber", "toMobileNumber", "duration")); TridentState callCounts = topology .newStream("fixed-batch-spout", testSpout) .each(new Fields("fromMobileNumber", "toMobileNumber"), new FormatCall(), new Fields("call")) .groupBy(new Fields("call")) .persistentAggregate(new MemoryMapState.Factory(), new Count(), new Fields("count")); LocalDRPC drpc = new LocalDRPC(); topology.newDRPCStream("call_count", drpc) .stateQuery(callCounts, new Fields("args"), new MapGet(), new Fields("count")); topology.newDRPCStream("multiple_call_count", drpc) .each(new Fields("args"), new CSVSplit(), new Fields("call")) .groupBy(new Fields("call")) .stateQuery(callCounts, new Fields("call"), new MapGet(), new Fields("count")) .each(new Fields("call", "count"), new Debug()) .each(new Fields("count"), new FilterNull()) .aggregate(new Fields("count"), new Sum(), new Fields("sum")); Config conf = new Config(); LocalCluster cluster = new LocalCluster(); cluster.submitTopology("trident", conf, topology.build()); Random randomGenerator = new Random(); int idx = 0; while(idx < 10) { testSpout.feed(ImmutableList.of(new Values("1234123401", "1234123402", randomGenerator.nextInt(60)))); testSpout.feed(ImmutableList.of(new Values("1234123401", "1234123403", randomGenerator.nextInt(60)))); testSpout.feed(ImmutableList.of(new Values("1234123401", "1234123404", randomGenerator.nextInt(60)))); testSpout.feed(ImmutableList.of(new Values("1234123402", "1234123403", randomGenerator.nextInt(60)))); idx = idx + 1; } System.out.println("DRPC : Query starts"); System.out.println(drpc.execute("call_count","1234123401 - 1234123402")); System.out.println(drpc.execute("multiple_call_count", "1234123401 - 1234123402,1234123401 - 1234123403")); System.out.println("DRPC : Query ends"); cluster.shutdown(); drpc.shutdown(); // DRPCClient client = new DRPCClient("drpc.server.location", 3772); } }
生成和运行应用程序
完整的应用程序包含三个Java代码。它们如下-
- FormatCall.java
- CSVSplit.java
- LogAnalyerTrident.java
可以使用以下命令来构建应用程序-
javac -cp “/path/to/storm/apache-storm-0.9.5/lib/*” *.java
可以使用以下命令运行该应用程序-
java -cp “/path/to/storm/apache-storm-0.9.5/lib/*”:. LogAnalyserTrident
输出
一旦启动应用程序,该应用程序将输出有关群集启动过程,操作处理,DRPC服务器和客户端信息以及最后群集关闭过程的完整详细信息。该输出将显示在控制台上,如下所示。
DRPC : Query starts [["1234123401 - 1234123402",10]] DEBUG: [1234123401 - 1234123402, 10] DEBUG: [1234123401 - 1234123403, 10] [[20]] DRPC : Query ends
其他教程链接:
- apache storm简介
- apache storm核心概念
- apache storm群集体系结构
- apache storm工作流
- apachestorm分布式消息系统
- apache storm安装
- apache storm工作示例
- 阿帕奇风暴三叉戟
- twitter上的阿帕奇风暴
- 雅虎金融的阿帕奇风暴
- apache storm应用程序
- apache storm快速指南
- apache storm有用资源
- 阿帕奇风暴讨论
- apache tajo简介
- apache tajo体系结构
- apache tajo安装
- apache tajo配置设置
- apache tajo shell命令
- apache tajo数据类型
- apache tajo运算符
- apache tajo sql函数
- apache tajo数学函数
- apache tajo字符串函数
- apache tajo datetime函数
- apache tajo json函数
- apache tajo数据库创建
- apache tajo表管理
- apache tajo sql语句
- apache tajo聚合和窗口函数
- apache tajo sql查询
- apache tajo存储插件
- apache tajo与hbase的集成
- apache tajo与hive的集成
- apache-tajo-openstack-swift集成
- ApacheTajoJDBC接口
- apache tajo自定义函数
- apache tajo快速指南
- apache tajo有用的资源
- apache tajo讨论