Apache Kafka – 与 Spark 集成
Apache Kafka – 与 Spark 集成
在本章中,我们将讨论如何将 Apache Kafka 与 Spark Streaming API 集成。
关于星火
Spark Streaming API 支持对实时数据流进行可扩展、高吞吐量、容错的流处理。数据可以从 Kafka、Flume、Twitter 等多种来源摄取,并且可以使用复杂的算法进行处理,例如 map、reduce、join 和 window 等高级函数。最后,可以将处理过的数据推送到文件系统、数据库和实时仪表板。弹性分布式数据集 (RDD) 是 Spark 的基本数据结构。它是一个不可变的分布式对象集合。RDD 中的每个数据集都被划分为逻辑分区,可以在集群的不同节点上进行计算。
与 Spark 集成
Kafka 是 Spark 流的潜在消息传递和集成平台。Kafka 充当实时数据流的中心枢纽,并使用 Spark Streaming 中的复杂算法进行处理。处理完数据后,Spark Streaming 可以将结果发布到另一个 Kafka 主题或存储在 HDFS、数据库或仪表板中。下图描述了概念流程。
现在,让我们详细了解 Kafka-Spark API。
SparkConf API
它代表 Spark 应用程序的配置。用于将各种 Spark 参数设置为键值对。
SparkConf
类具有以下方法 –
-
set(string key, string value) – 设置配置变量。
-
remove(string key) – 从配置中删除密钥。
-
setAppName(string name) – 为您的应用程序设置应用程序名称。
-
get(string key) – 获取密钥
流上下文 API
这是 Spark 功能的主要入口点。SparkContext 表示与 Spark 集群的连接,可用于在集群上创建 RDD、累加器和广播变量。签名定义如下。
public StreamingContext(String master, String appName, Duration batchDuration, String sparkHome, scala.collection.Seq<String> jars, scala.collection.Map<String,String> environment)
-
master – 要连接到的集群 URL(例如 mesos://host:port、spark://host:port、local[4])。
-
appName – 作业名称,显示在集群 Web UI 上
-
batchDuration – 流数据被分成批次的时间间隔
public StreamingContext(SparkConf conf, Duration batchDuration)
通过提供新 SparkContext 所需的配置来创建 StreamingContext。
-
conf – Spark 参数
-
batchDuration – 流数据被分成批次的时间间隔
KafkaUtils API
KafkaUtils API 用于将 Kafka 集群连接到 Spark 流。该 API 具有如下定义的有效方法createStream
签名。
public static ReceiverInputDStream<scala.Tuple2<String,String>> createStream( StreamingContext ssc, String zkQuorum, String groupId, scala.collection.immutable.Map<String,Object> topics, StorageLevel storageLevel)
上面显示的方法用于创建一个从 Kafka Brokers 拉取消息的输入流。
-
ssc – StreamingContext 对象。
-
zkQuorum – Zookeeper 法定人数。
-
groupId – 此消费者的组 ID。
-
主题– 返回要消费的主题地图。
-
storageLevel – 用于存储接收到的对象的存储级别。
KafkaUtils API 还有一个方法 createDirectStream,用于创建输入流,直接从 Kafka Brokers 拉取消息,不使用任何接收器。该流可以保证来自 Kafka 的每条消息都只包含在转换中一次。
示例应用程序是在 Scala 中完成的。要编译应用程序,请下载并安装sbt
、scala 构建工具(类似于 maven)。主要应用程序代码如下所示。
import java.util.HashMap import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, Produc-erRecord} import org.apache.spark.SparkConf import org.apache.spark.streaming._ import org.apache.spark.streaming.kafka._ object KafkaWordCount { def main(args: Array[String]) { if (args.length < 4) { System.err.println("Usage: KafkaWordCount <zkQuorum><group> <topics> <numThreads>") System.exit(1) } val Array(zkQuorum, group, topics, numThreads) = args val sparkConf = new SparkConf().setAppName("KafkaWordCount") val ssc = new StreamingContext(sparkConf, Seconds(2)) ssc.checkpoint("checkpoint") val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(_._2) val words = lines.flatMap(_.split(" ")) val wordCounts = words.map(x => (x, 1L)) .reduceByKeyAndWindow(_ &plus _, _ - _, Minutes(10), Seconds(2), 2) wordCounts.print() ssc.start() ssc.awaitTermination() } }
构建脚本
spark-kafka 集成依赖于 spark、spark Streaming 和 spark Kafka 集成 jar。创建一个新文件build.sbt
并指定应用程序详细信息及其依赖项。该SBT
在编译和打包应用程序会下载必要的罐子。
name := "Spark Kafka Project" version := "1.0" scalaVersion := "2.10.5" libraryDependencies += "org.apache.spark" %% "spark-core" % "1.6.0" libraryDependencies += "org.apache.spark" %% "spark-streaming" % "1.6.0" libraryDependencies += "org.apache.spark" %% "spark-streaming-kafka" % "1.6.0"
编译/打包
运行以下命令编译并打包应用程序的jar文件。我们需要将 jar 文件提交到 spark 控制台才能运行应用程序。
sbt package
提交给 Spark
启动 Kafka Producer CLI(在前一章中解释过),创建一个名为my-first-topic
的新主题
并提供一些示例消息,如下所示。
Another spark test message
运行以下命令将应用程序提交到 spark 控制台。
/usr/local/spark/bin/spark-submit --packages org.apache.spark:spark-streaming -kafka_2.10:1.6.0 --class "KafkaWordCount" --master local[4] target/scala-2.10/spark -kafka-project_2.10-1.0.jar localhost:2181 <group name> <topic name> <number of threads>
此应用程序的示例输出如下所示。
spark console messages .. (Test,1) (spark,1) (another,1) (message,1) spark console message ..