Kafka 消费者组
通常,Kafka 消费者属于特定的消费者组。消费者组基本上代表应用程序的名称。为了消费消费者组中的消息,使用'-group'命令。
让我们看看消费者如何消费来自Kafka主题的消息:
步骤 1: 打开 Windows 命令提示符。
步骤 2: 使用"-group"命令作为: 'kafka-console-consumer-bootstrap-server 本地主机: 9092-topic <group_name>-group <group_name>'。为该组命名。
在上面的快照中,组是"first_app"。可以看到没有显示任何消息,因为没有向该主题生成新消息。如果使用"-from-beginning"命令,将显示所有以前的消息。
步骤3: 要查看一些新消息,请生成来自生产者控制台的一些即时消息(如上一节中所做的那样)。
因此,生产者产生的新消息可以在消费者的控制台中看到。
Step4: 但是,它是单个消费者读取组中的数据。让我们创造更多的消费者来理解一个消费群体的力量。为此,打开一个新终端并输入与以下完全相同的消费者命令:
'kafka-console-consumer.bat--bootstrap-server 127.0.0.1:9092--topic <topic_name>--group <group_name>'.
在上面的快照中,很明显生产者正在将数据发送到卡夫卡话题。这两个消费者正在消费这些消息。查看消息的顺序。由于为"myfirst"主题创建了三个分区(之前讨论过),因此消息仅按该顺序拆分。
我们可以在同一组下进一步创建更多消费者,每个消费者都会消费消息根据分区的数量。尝试自己更好地理解。
注意: group id 必须相同,这样才会在消费者之间拆分消息。
但是,如果任何消费者被终止,分区将重新分配给活动消费者,这些活动消费者将收到消息。
Consumer with Keys
当生产者为数据附加了一个键值时,它将被存储到指定的分区。如果未指定键值,则数据将移动到任何分区。因此,当消费者使用键读取消息时,如果未指定键,它将显示为空。使用来自 Kafka 主题的消息需要一个 'print.key' 和一个 'key.seperator' sre。使用的命令是:
'kafka-console-consumer-bootstrap-server localhost:9092-topic <topic_name>--from-beginning-property print.key=true-property key.seperator =,'
使用上面的命令,消费者可以通过指定的键读取数据。
更多关于消费者组
'--from-beginning' 命令
该命令用于从开始(前面讨论过)读取消息。因此,在消费者组中使用它会得到以下输出:
可以注意到,新的消费者组'second_app'用于从头开始读取消息。如果再次运行相同的命令,它将不会显示任何输出。这是因为偏移量是在 Apache Kafka 中提交的。因此,一旦消费者组读取了所有直到写入的消息,下一次,它将只读取新消息。
例如,在下面的快照中,当 '-from-beginning' 命令再次使用,只读取新消息。
'kafka-consumer-groups' 命令
此命令提供了完整的文档,用于列出所有组、描述组、删除消费者信息或重置消费者组偏移量。
它需要一个引导服务器让客户端执行不同的功能消费者组。
列出消费者组
'-list'命令用于列出可用的消费者组数量在 Kafka 集群中。该命令用作:
'kafka-consumer-groups.bat-bootstrap-server localhost:9092-list'。
显示快照
描述消费者组
"--describe"命令用于描述消费者组。该命令用作:
'kafka-consumer-groups.bat-bootstrap-server localhost:9092-describe group <group_name>'
该命令描述是否存在任何活跃的消费者,当前偏移值,滞后值为0-表示消费者已经读取了所有数据。
重置偏移量
偏移量在 Apache Kafka 中提交。因此,如果用户想要再次读取消息,则需要重新设置偏移值。 "Kafka-consumer-groups"命令提供了一个重置偏移量的选项。重置偏移值意味着定义用户想要再次读取消息的点。它一次只支持一个消费者组,并且该组应该没有活动实例。
重置偏移量时,用户需要选择三个参数:
- 一个执行选项
- 重置规范
- 范围
有两个可用的执行选项:
'-dry-run': 这是默认的执行选项。此选项用于规划那些需要重置的偏移量。
-execute': 此选项用于更新偏移量值。
有以下可用的重置规范:
'-to-datetime': 它根据日期时间的偏移量重置偏移量。使用的格式是: 'YYYY-MM-DDTHH:mm:SS.sss'。
'--to-earliest': 它将偏移量重置为最早的偏移量。
'--to-latest': 将偏移量重置为最新的偏移量。
'--shift-by': 它通过将当前偏移值移动"n"来重置偏移。 'n' 的值可以是正数或负数。
'--from-file': 它将偏移量重置为 CSV 文件中定义的值。
'--to-current': 它将偏移量重置为当前偏移量。
有两个范围可以定义:
'-all-topics': 重置组内所有可用主题的偏移值。
'-topics': 它仅重置指定主题的偏移值。用户需要指定主题名称以重置偏移值。
让我们尝试看看:
1) 使用 '-to-最早'命令
在上面的快照中,偏移量被重置为新的偏移量为 0。这是因为使用了 '-to-earliest' 命令,它已将偏移量值重置为 0。
2) 使用'-shift-by' 命令
在第一个快照中,偏移值从'0'偏移到"+2"。在第二个中,偏移值从"2"移到"-1"。
注意: 要将偏移值移动到正数,不必使用"+"符号。默认情况下,它只会被视为正数。
Kafka 编程简介在上一节中,我们学习了创建主题,写入一个topic ,并使用命令行界面阅读该主题。生产者和消费者用于从/向Kafka主题读取/写入消息的命令。在本节中,用户将再次学习通过java代码读取和写入K ...