Kafka 生产者回调
Producer without Keys
在上一节中,我们看到了生产者如何向Kafka发送数据。为了更深入地了解,即数据是否正确产生,产生于何处,其偏移量和分区值等。让我们了解更多。
为了执行回调,用户需要实现回调函数。该函数用于异步处理请求完成。这就是为什么它的返回类型将为空的原因。该功能将在生产者向 Kafka 发送数据的区块中实现。其他代码块不需要改动。
生产者使用的回调函数是onCompletion()。基本上,此方法需要两个参数:
记录的元数据: 记录的元数据意味着获取有关分区及其偏移量的信息。如果它不为空,则会抛出错误。
异常: 处理时可能会抛出以下异常:
1 ) 可重试异常: 此异常表示消息可能被发送。
2) 不可重试异常: 此异常会抛出永远不会发送消息的错误。
让我们在下面的快照中看看 Producer 回调的实现:
first_producer.send(record, new Callback() { public void onCompletion(RecordMetadata recordMetadata, Exception e) { Logger logger=LoggerFactory.getLogger(producer1call.class); if (e== null) { logger.info("Successfully received the details as: \n" + "Topic:" + recordMetadata.topic() + "\n" + "Partition:" + recordMetadata.partition() + "\n" + "Offset" + recordMetadata.offset() + "\n" + "Timestamp" + recordMetadata.timestamp()); } else { logger.error("Can't produce,getting error",e); } } });
'Logger' 的对象已经创建,允许导入 'slf4j.Logger' 和 'slf4j.LoggerFactory' .这个记录器对象将记录有关分区、偏移量以及时间戳的信息。如果异常值等于空,记录器将显示信息,否则将显示错误。当上面的代码被执行时,用户就会知道消息发送到的主题名称、分区号、时间戳、偏移值。
输出的快照如下所示:
在上面的输出中,可以看到消息是生产到'my_first' ,存储在具有"偏移值 9"的"分区 0"。
注意: 我们到现在发送的消息都是没有key的,因此没有key的消息会被存储在随机分区中并且是异步的。
带密钥的生产者
当用户想要将消息发送到同一分区时,密钥变得有用。为了发送数据,用户需要指定一个密钥。该键将唯一地从其他分区中识别该分区。用户需要向Kafka发送同步消息。
实现key的一种方式如下所示:
在在快照上面,我们已经指定了主题名称、它的值和键。在创建 ProducerRecord 时,将其中三个作为参数传递。如果异常 'e' 将等于 null,则记录器将获取有关密钥的信息。最后,在将数据发送到 Kafka 时使用 get() 函数。此方法同步且强制发送数据。用户可以尝试自己的方式来实现这些键。
注意: 使用get(),会出现红色下划线。按 alt+enter,它会说"向方法签名添加例外",选择它。这将为 main() 添加两个异常,如上所示。此外,它会将"java.util.concurrent.ExecutionException"导入到代码中。
执行上述代码时,输出显示为:
输出中突出显示的部分告诉键值、主题名称、分区号、偏移值以及时间戳。消息"OneTwo"现在将始终发送到指定的分区。
因此,通过这种方式,生产者可以使用和不使用密钥将数据发送到 Kafka。
用Java创建Kafka消费者在上一节中,我们学习了用Java创建一个生产者。在本节中,我们将学习在 Java 中实现一个 Kafka 消费者。创建消费者需要采取以下步骤:创建记录器创建消费者属性。创建消费者。为消费 ...