Apache Kafka – 简单的生产者示例

Apache Kafka – 简单的生产者示例


让我们创建一个使用 Java 客户端发布和使用消息的应用程序。Kafka 生产者客户端由以下 API 组成。

Kafka生产者API

让我们了解本节中最重要的一组 Kafka 生产者 API。KafkaProducer API 的核心部分是KafkaProducer类。KafkaProducer 类提供了一个选项,可以使用以下方法在其构造函数中连接 Kafka 代理。

  • KafkaProducer 类提供了 send 方法来将消息异步发送到主题。send()的签名如下

producer.send(new ProducerRecord<byte[],byte[]>(topic, 
partition, key1, value1) , callback);
  • ProducerRecord – 生产者管理等待发送的记录缓冲区。

  • 回调– 当服务器确认记录时执行的用户提供的回调(null 表示没有回调)。

  • KafkaProducer 类提供了一个flush 方法来确保之前发送的所有消息实际上已经完成。刷新方法的语法如下 –

public void flush()
  • KafkaProducer 类提供 partitionFor 方法,该方法有助于获取给定主题的分区元数据。这可用于自定义分区。此方法的签名如下 –

public Map metrics()

它返回由生产者维护的内部指标的映射。

  • public void close() – KafkaProducer 类提供关闭方法块,直到所有先前发送的请求完成。

生产者 API

Producer API 的核心部分是Producer类。Producer 类提供了通过以下方法在其构造函数中连接 Kafka broker 的选项。

生产者类

生产者类提供了使用以下签名消息发送到单个或多个主题的send 方法


public void send(KeyedMessaget<k,v> message) 
- sends the data to a single topic,par-titioned by key using either sync or async producer.
public void send(List<KeyedMessage<k,v>>messages)
- sends data to multiple topics.
Properties prop = new Properties();
prop.put(producer.type,”async”)
ProducerConfig config = new ProducerConfig(prop);

有两种类型的生产者——同步异步

相同的 API 配置也适用于同步生产者。它们之间的区别是同步生产者直接发送消息,但在后台发送消息。当您想要更高的吞吐量时,异步生产者是首选。在之前的版本(如 0.8)中,异步生产者没有 send() 回调来注册错误处理程序。这仅在 0.9 的当前版本中可用。

公共无效关闭()

Producer 类提供close方法来关闭与所有 Kafka 代理的生产者池连接。

配置设置

Producer API 的主要配置设置列在下表中,以便更好地理解 –

S.No 配置设置和说明
1

client.id

识别生产者应用程序

2

producer.type

同步或异步

3

acks

acks 配置控制生产者请求下的标准被认为是完整的。

4

retries

如果生产者请求失败,则使用特定值自动重试。

5

bootstrap.servers

经纪人的引导列表。

6

linger.ms

如果你想减少请求的数量,你可以将 linger.ms 设置为大于某个值的值。

7

key.serializer

串行器接口的键。

8

value.serializer

串行器接口的值。

9

batch.size

缓冲区大小。

10

buffer.memory

控制生产者可用于缓冲的内存总量。

生产者记录 API

ProducerRecord 是发送到 Kafka 集群的键/值对。ProducerRecord 类构造函数用于使用以下签名创建带有分区、键和值对的记录。

public ProducerRecord (string topic, int partition, k key, v value)
  • 主题– 将附加到记录的用户定义的主题名称。

  • 分区– 分区计数

  • Key – 将包含在记录中的键。

  • – 记录内容
public ProducerRecord (string topic, k key, v value)

ProducerRecord 类构造函数用于创建具有键值对且没有分区的记录。

  • 主题– 创建一个主题来分配记录。

  • – 记录的键。

  • – 记录内容。

public ProducerRecord (string topic, v value)

ProducerRecord 类创建一个没有分区和键的记录。

  • 主题– 创建一个主题。

  • – 记录内容。

ProducerRecord 类方法列在下表中 –

S.No 类方法和描述
1

public string topic()

主题将附加到记录中。

2

public K key()

将包含在记录中的键。如果没有这样的键,这里将重新返回 null。

3

public V value()

记录内容。

4

partition()

记录的分区计数

SimpleProducer 应用程序

在创建应用程序之前,首先启动 ZooKeeper 和 Kafka broker,然后使用 create topic 命令在 Kafka broker 中创建您自己的主题。然后创建一个名为Sim-pleProducer.java的 java 类并输入以下代码。

//import util.properties packages
import java.util.Properties;

//import simple producer packages
import org.apache.kafka.clients.producer.Producer;

//import KafkaProducer packages
import org.apache.kafka.clients.producer.KafkaProducer;

//import ProducerRecord packages
import org.apache.kafka.clients.producer.ProducerRecord;

//Create java class named “SimpleProducer”
public class SimpleProducer {
   
   public static void main(String[] args) throws Exception{
      
      // Check arguments length value
      if(args.length == 0){
         System.out.println("Enter topic name");
         return;
      }
      
      //Assign topicName to string variable
      String topicName = args[0].toString();
      
      // create instance for properties to access producer configs   
      Properties props = new Properties();
      
      //Assign localhost id
      props.put("bootstrap.servers", “localhost:9092");
      
      //Set acknowledgements for producer requests.      
      props.put("acks", “all");
      
      //If the request fails, the producer can automatically retry,
      props.put("retries", 0);
      
      //Specify buffer size in config
      props.put("batch.size", 16384);
      
      //Reduce the no of requests less than 0   
      props.put("linger.ms", 1);
      
      //The buffer.memory controls the total amount of memory available to the producer for buffering.   
      props.put("buffer.memory", 33554432);
      
      props.put("key.serializer", 
         "org.apache.kafka.common.serializa-tion.StringSerializer");
         
      props.put("value.serializer", 
         "org.apache.kafka.common.serializa-tion.StringSerializer");
      
      Producer<String, String> producer = new KafkaProducer
         <String, String>(props);
            
      for(int i = 0; i < 10; i++)
         producer.send(new ProducerRecord<String, String>(topicName, 
            Integer.toString(i), Integer.toString(i)));
               System.out.println(“Message sent successfully”);
               producer.close();
   }
}

编译– 可以使用以下命令编译应用程序。

javac -cp “/path/to/kafka/kafka_2.11-0.9.0.0/lib/*” *.java

执行– 可以使用以下命令执行应用程序。

java -cp “/path/to/kafka/kafka_2.11-0.9.0.0/lib/*”:. SimpleProducer <topic-name>

输出

Message sent successfully
To check the above output open new terminal and type Consumer CLI command to receive messages.
>> bin/kafka-console-consumer.sh --zookeeper localhost:2181 —topic <topic-name> —from-beginning
1
2
3
4
5
6
7
8
9
10

简单的消费者示例

到目前为止,我们已经创建了一个生产者来向 Kafka 集群发送消息。现在让我们创建一个消费者来消费来自 Kafka 集群的消息。KafkaConsumer API 用于消费来自 Kafka 集群的消息。KafkaConsumer 类构造函数定义如下。

public KafkaConsumer(java.util.Map<java.lang.String,java.lang.Object> configs)

configs – 返回消费者配置的映射。

KafkaConsumer 类具有下表中列出的以下重要方法。

S.No 方法和说明
1

public java.util.Set<TopicPar-tition> assignment()

获取消费者当前分配的一组分区。

2

public string subscription()

订阅给定的主题列表以获取动态分配的分区。

3

public void sub-scribe(java.util.List<java.lang.String> topics, ConsumerRe-balanceListener listener)

订阅给定的主题列表以获取动态分配的分区。

4

public void unsubscribe()

从给定的分区列表中取消订阅主题。

5

public void sub-scribe(java.util.List<java.lang.String> topics)

订阅给定的主题列表以获取动态分配的分区。如果给定的主题列表为空,则将其视为与 unsubscribe() 相同。

6

public void sub-scribe(java.util.regex.Pattern pattern, ConsumerRebalanceLis-tener listener)

参数模式引用正则表达式格式的订阅模式,侦听器参数从订阅模式获取通知。

7

public void as-sign(java.util.List<TopicParti-tion> partitions)

手动将分区列表分配给客户。

8

poll()

获取使用订阅/分配 API 之一指定的主题或分区的数据。如果在轮询数据之前没有订阅主题,这将返回错误。

9

public void commitSync()

在最后一次 poll() 上为所有订阅的主题和分区列表返回的提交偏移量。相同的操作适用于 commitAsyn()。

10

public void seek(TopicPartition partition, long offset)

获取消费者将在下一个 poll() 方法中使用的当前偏移值。

11

public void resume()

恢复暂停的分区。

12

public void wakeup()

唤醒消费者。

消费者记录 API

ConsumerRecord API 用于从 Kafka 集群接收记录。该 API 由主题名称、分区编号(从中接收记录)和指向 Kafka 分区中记录的偏移量组成。ConsumerRecord 类用于创建具有特定主题名称、分区计数和 <key, value> 对的消费者记录。它具有以下签名。

public ConsumerRecord(string topic,int partition, long offset,K key, V value)
  • 主题– 从 Kafka 集群收到的消费者记录的主题名称。

  • 分区– 主题的分区。

  • Key – 记录的键,如果不存在键,则返回 null。

  • – 记录内容。

消费者记录 API

ConsumerRecords API 充当 ConsumerRecord 的容器。此 API 用于为特定主题保留每个分区的 ConsumerRecord 列表。它的构造函数定义如下。

public ConsumerRecords(java.util.Map<TopicPartition,java.util.List
<Consumer-Record>K,V>>> records)
  • TopicPartition – 返回特定主题的分区图。

  • Records – ConsumerRecord 的返回列表。

ConsumerRecords 类定义了以下方法。

S.No 方法和说明
1

public int count()

所有主题的记录数。

2

public Set partitions()

此记录集中具有数据的分区集(如果没有返回数据,则该集为空)。

3

public Iterator iterator()

迭代器使您能够循环访问集合、获取或重新移动元素。

4

public List records()

获取给定分区的记录列表。

配置设置

下面列出了消费者客户端 API 主要配置设置的配置设置 –

S.No 设置和说明
1

bootstrap.servers

经纪人的引导列表。

2

group.id

将单个消费者分配给一个组。

3

enable.auto.commit

如果值为 true,则为偏移量启用自动提交,否则不提交。

4

auto.commit.interval.ms

返回更新消耗的偏移量写入 ZooKeeper 的频率。

5

session.timeout.ms

表示在放弃并继续消费消息之前,Kafka 将等待 ZooKeeper 响应请求(读取或写入)的毫秒数。

简单消费者应用

生产者申请步骤在这里保持不变。首先,启动您的 ZooKeeper 和 Kafka 代理。然后使用名为SimpleCon-sumer.java的 java 类创建一个SimpleConsumer应用程序并键入以下代码。

import java.util.Properties;
import java.util.Arrays;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.ConsumerRecord;

public class SimpleConsumer {
   public static void main(String[] args) throws Exception {
      if(args.length == 0){
         System.out.println("Enter topic name");
         return;
      }
      //Kafka consumer configuration settings
      String topicName = args[0].toString();
      Properties props = new Properties();
      
      props.put("bootstrap.servers", "localhost:9092");
      props.put("group.id", "test");
      props.put("enable.auto.commit", "true");
      props.put("auto.commit.interval.ms", "1000");
      props.put("session.timeout.ms", "30000");
      props.put("key.deserializer", 
         "org.apache.kafka.common.serializa-tion.StringDeserializer");
      props.put("value.deserializer", 
         "org.apache.kafka.common.serializa-tion.StringDeserializer");
      KafkaConsumer<String, String> consumer = new KafkaConsumer
         <String, String>(props);
      
      //Kafka Consumer subscribes list of topics here.
      consumer.subscribe(Arrays.asList(topicName))
      
      //print the topic name
      System.out.println("Subscribed to topic " &plus topicName);
      int i = 0;
      
      while (true) {
         ConsumerRecords<String, String> records = con-sumer.poll(100);
         for (ConsumerRecord<String, String> record : records)
         
         // print the offset,key and value for the consumer records.
         System.out.printf("offset = %d, key = %s, value = %s\n", 
            record.offset(), record.key(), record.value());
      }
   }
}

编译– 可以使用以下命令编译应用程序。

javac -cp “/path/to/kafka/kafka_2.11-0.9.0.0/lib/*” *.java

执行 –可以使用以下命令执行应用程序

java -cp “/path/to/kafka/kafka_2.11-0.9.0.0/lib/*”:. SimpleConsumer <topic-name>

输入– 打开生产者 CLI 并向主题发送一些消息。您可以将简单的输入设为“Hello Consumer”。

输出– 以下将是输出。

Subscribed to topic Hello-Kafka
offset = 3, key = null, value = Hello Consumer

觉得文章有用?

点个广告表达一下你的爱意吧 !😁