作为Write for DOnations计划的一部分,作者选择了免费和开源基金来接受捐赠。
介绍
Apache Kafka是一种流行的分布式消息代理,旨在处理大量实时数据。Kafka 集群具有高度可扩展性和容错性。与ActiveMQ和RabbitMQ等其他消息代理相比,它还具有更高的吞吐量。尽管它通常用作发布/订阅消息系统,但许多组织也将其用于日志聚合,因为它为已发布的消息提供持久存储。
发布/订阅消息系统允许一个或多个生产者发布消息,而无需考虑消费者的数量或他们将如何处理消息。订阅的客户端会自动收到有关更新和新消息创建的通知。该系统比客户端定期轮询以确定是否有新消息可用的系统更有效和可扩展。
在本教程中,您将在 Ubuntu 20.04 上安装和使用 Apache Kafka 2.6.0。
先决条件
要继续,您将需要:
- 一台 Ubuntu 20.04 服务器和一个具有 sudo 权限的非 root 用户。如果您没有设置非 root 用户,请按照本指南中指定的步骤进行操作。
- 您的服务器上至少有 4GB 的 RAM。没有这么多 RAM 的安装可能会导致 Kafka 服务失败。
- 您的服务器上安装了OpenJDK 11。要安装此版本,请按照我们的教程如何在 Ubuntu 20.04 上使用 APT 安装 Java。Kafka 是用 Java 编写的,所以它需要一个 JVM。
第 1 步 – 为 Kafka 创建用户
因为 Kafka 可以处理网络上的请求,所以第一步是为该服务创建一个专用用户。如果有人破坏了 Kafka 服务器,这可以最大限度地减少对您的 Ubuntu 机器的损坏。我们将kafka
在此步骤中创建一个专用用户。
以您的非 root sudo 用户身份登录,创建一个名为 的用户kafka
:
- sudo adduser kafka
按照提示设置密码并创建kafka
用户。
接下来,使用命令将kafka
用户添加到sudo
组中adduser
。您需要这些权限来安装 Kafka 的依赖项:
- sudo adduser kafka sudo
您的kafka
用户现在已准备就绪。使用su
以下命令登录帐户:
- su -l kafka
现在您已经创建了一个特定于 Kafka 的用户,您可以下载和提取 Kafka 二进制文件。
第 2 步 – 下载和提取 Kafka 二进制文件
让我们下载 Kafka 二进制文件并将其解压缩到kafka
用户主目录中的专用文件夹中。
首先,在/home/kafka
调用中创建一个目录Downloads
来存储您的下载:
- mkdir ~/Downloads
使用curl
下载卡夫卡的二进制文件:
- curl "https://downloads.apache.org/kafka/2.6.2/kafka_2.13-2.6.2.tgz" -o ~/Downloads/kafka.tgz
创建一个名为的目录kafka
并更改到此目录。这将是 Kafka 安装的基本目录:
- mkdir ~/kafka && cd ~/kafka
使用以下tar
命令提取您下载的存档:
- tar -xvzf ~/Downloads/kafka.tgz --strip 1
我们指定--strip 1
标志以确保存档的内容是在~/kafka/
其自身中提取的,而不是在其中的另一个目录(例如)中提取。~/kafka/kafka_2.13-2.6.0/
现在我们已经成功下载并解压了二进制文件,我们可以开始配置我们的 Kafka 服务器了。
第 3 步 – 配置 Kafka 服务器
Kafka 的默认行为不允许您删除主题。Kafka 主题是可以发布消息的类别、组或提要名称。要修改它,您必须编辑配置文件。
Kafka 的配置选项在server.properties
. 使用nano
或您喜欢的编辑器打开此文件:
- nano ~/kafka/config/server.properties
首先,添加一个允许我们删除 Kafka 主题的设置。将以下内容添加到文件底部:
delete.topic.enable = true
其次,通过修改logs.dir
属性更改Kafka日志的存放目录:
log.dirs=/home/kafka/logs
保存并关闭文件。现在您已经配置了 Kafka,下一步是创建 systemd 单元文件,以便在启动时运行和启用 Kafka 服务器。
第 4 步 – 创建 Systemd 单元文件并启动 Kafka 服务器
在本节中,您将为Kafka 服务创建systemd 单元文件。这将帮助您以与其他 Linux 服务一致的方式执行常见的服务操作,例如启动、停止和重新启动 Kafka。
Zookeeper 是 Kafka 用来管理其集群状态和配置的服务。它用于许多分布式系统。如果您想了解更多信息,请访问Zookeeper 官方文档。
创建单元文件zookeeper
:
- sudo nano /etc/systemd/system/zookeeper.service
在文件中输入以下单位定义:
[Unit]
Requires=network.target remote-fs.target
After=network.target remote-fs.target
[Service]
Type=simple
User=kafka
ExecStart=/home/kafka/kafka/bin/zookeeper-server-start.sh /home/kafka/kafka/config/zookeeper.properties
ExecStop=/home/kafka/kafka/bin/zookeeper-server-stop.sh
Restart=on-abnormal
[Install]
WantedBy=multi-user.target
该[Unit]
部分指定 Zookeeper 在启动之前需要网络和文件系统准备就绪。
该[Service]
部分指定 systemd 应使用zookeeper-server-start.sh
和zookeeper-server-stop.sh
shell 文件来启动和停止服务。它还指定如果 Zookeeper 异常退出应该重新启动。
添加此内容后,保存并关闭文件。
接下来,为以下内容创建 systemd 服务文件kafka
:
- sudo nano /etc/systemd/system/kafka.service
在文件中输入以下单位定义:
[Unit]
Requires=zookeeper.service
After=zookeeper.service
[Service]
Type=simple
User=kafka
ExecStart=/bin/sh -c '/home/kafka/kafka/bin/kafka-server-start.sh /home/kafka/kafka/config/server.properties > /home/kafka/kafka/kafka.log 2>&1'
ExecStop=/home/kafka/kafka/bin/kafka-server-stop.sh
Restart=on-abnormal
[Install]
WantedBy=multi-user.target
该[Unit]
部分指定此单元文件依赖于zookeeper.service
. 这将确保zookeeper
在kafka
服务启动时自动启动。
该[Service]
部分指定 systemd 应使用kafka-server-start.sh
和kafka-server-stop.sh
shell 文件来启动和停止服务。它还指定如果Kafka异常退出应该重新启动。
现在您已经定义了单位,使用以下命令启动 Kafka:
- sudo systemctl start kafka
为确保服务器已成功启动,请检查kafka
单元的日志日志:
- sudo systemctl status kafka
您将收到如下输出:
Output● kafka.service
Loaded: loaded (/etc/systemd/system/kafka.service; disabled; vendor preset: enabled)
Active: active (running) since Wed 2021-02-10 00:09:38 UTC; 1min 58s ago
Main PID: 55828 (sh)
Tasks: 67 (limit: 4683)
Memory: 315.8M
CGroup: /system.slice/kafka.service
├─55828 /bin/sh -c /home/kafka/kafka/bin/kafka-server-start.sh /home/kafka/kafka/config/server.properties > /home/kafka/kafka/kafka.log 2>&1
└─55829 java -Xmx1G -Xms1G -server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+ExplicitGCInvokesConcurrent -XX:MaxInlineLevel=15 -Djava.awt.headless=true -Xlog:gc*:file=>
Feb 10 00:09:38 cart-67461-1 systemd[1]: Started kafka.service.
您现在有一个 Kafka 服务器侦听端口9092
。
您已启动kafka
服务。但是如果你重启服务器,Kafka 不会自动重启。要kafka
在服务器启动时启用该服务,请运行以下命令:
- sudo systemctl enable zookeeper
- sudo systemctl enable kafka
在此步骤中,您启动并启用了kafka
和zookeeper
服务。在下一步中,您将检查 Kafka 安装。
步骤 5 — 测试 Kafka 安装
在这一步中,您将测试您的 Kafka 安装。具体来说,您将发布和使用“Hello World”消息以确保 Kafka 服务器正常运行。
在 Kafka 中发布消息需要:
- 一个制片人,谁使的记录和数据,以主题出版物。
- 一个消费者,从主题中读取消息和数据。
首先,创建一个名为 的主题TutorialTopic
:
- ~/kafka/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic TutorialTopic
您可以使用kafka-console-producer.sh
脚本从命令行创建生产者。它需要 Kafka 服务器的主机名、端口和主题作为参数。
现在将字符串发布"Hello, World"
到TutorialTopic
主题:
- echo "Hello, World" | ~/kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic TutorialTopic > /dev/null
接下来,使用kafka-console-consumer.sh
脚本创建一个 Kafka 消费者。它需要 ZooKeeper 服务器的主机名和端口,以及一个主题名称作为参数。
以下命令使用来自TutorialTopic
. 注意--from-beginning
标志的使用,它允许消费在消费者启动之前发布的消息:
- ~/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic TutorialTopic --from-beginning
如果没有配置问题,您将Hello, World
在终端中看到:
OutputHello, World
脚本将继续运行,等待更多消息发布。要对此进行测试,请打开一个新的终端窗口并登录到您的服务器。
在这个新终端中,启动生产者以发布另一条消息:
- echo "Hello World from Sammy at DigitalOcean!" | ~/kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic TutorialTopic > /dev/null
您将在消费者的输出中看到此消息:
OutputHello, World
Hello World from Sammy at DigitalOcean!
完成测试后,按CTRL+C
停止消费者脚本。
您现在已经在 Ubuntu 20.04 上安装并配置了 Kafka 服务器。在下一步中,您将执行一些快速任务来加强 Kafka 服务器的安全性。
第 6 步 – 强化 Kafka 服务器
安装完成后,您可以删除kafka
用户的管理员权限。在执行此操作之前,请注销并以任何其他非 root sudo 用户身份重新登录。如果您仍在运行开始本教程时使用的相同 shell 会话,请键入exit
.
kafka
从 sudo 组中删除用户:
- sudo deluser kafka sudo
为了进一步提高 Kafka 服务器的安全性,请kafka
使用该passwd
命令锁定用户的密码。这确保没有人可以使用此帐户直接登录服务器:
- sudo passwd kafka -l
此时,只有 root 或 sudo 用户可以kafka
通过键入以下命令登录:
- sudo su - kafka
将来,如果您想解锁它,请使用passwd
以下-u
选项:
- sudo passwd kafka -u
您现在已成功限制kafka
用户的管理员权限。您已准备好开始使用 Kafka,或者您可以按照下一个可选步骤将 KafkaT 添加到您的系统中。
步骤 7 — 安装 KafkaT(可选)
KafkaT是 Airbnb 开发的一个工具。它可以更轻松地查看有关 Kafka 集群的详细信息并从命令行执行某些管理任务。但是因为它是 Ruby gem,所以您需要 Ruby 才能使用它。您还需要该build-essential
包来构建KafkaT
依赖的其他 gem 。使用apt
以下方法安装它们:
- sudo apt install ruby ruby-dev build-essential
您现在可以使用 gem 命令安装 KafkaT:
- sudo CFLAGS=-Wno-error=format-overflow gem install kafkat
需要“Wno-error=format-overflow”编译标志来抑制 Zookeeper 在kafkat
安装过程中的警告和错误。
KafkaT.kafkatcfg
用作配置文件来确定 Kafka 服务器的安装和日志目录。它还应该有一个入口,将 KafkaT 指向您的 ZooKeeper 实例。
创建一个名为 的新文件.kafkatcfg
:
- nano ~/.kafkatcfg
添加以下行以指定有关 Kafka 服务器和 Zookeeper 实例的必需信息:
{
"kafka_path": "~/kafka",
"log_path": "/home/kafka/logs",
"zk_path": "localhost:2181"
}
您现在已准备好使用 KafkaT。首先,您将如何使用它来查看有关所有 Kafka 分区的详细信息:
- kafkat partitions
您将看到以下输出:
Output[DEPRECATION] The trollop gem has been renamed to optimist and will no longer be supported. Please switch to optimist as soon as possible.
/var/lib/gems/2.7.0/gems/json-1.8.6/lib/json/common.rb:155: warning: Using the last argument as keyword parameters is deprecated
...
Topic Partition Leader Replicas ISRs
TutorialTopic 0 0 [0] [0]
__consumer_offsets 0 0 [0] [0]
...
...
您将看到TutorialTopic
以及__consumer_offsets
Kafka 用于存储客户端相关信息的内部主题。您可以安全地忽略以__consumer_offsets
.
要了解有关 KafkaT 的更多信息,请参阅其GitHub 存储库。
结论
您现在可以在 Ubuntu 服务器上安全地运行 Apache Kafka。您现在可以使用Kafka 客户端将 Kafka 集成到您喜欢的编程语言中。
要了解有关 Kafka 的更多信息,您还可以查阅其文档。