Kafka 创建Producer
用 Java 创建 Kafka Producer
在上一节中,我们学习了创建 Kafka 项目的基本步骤。现在,在用 Java 创建 Kafka 生产者之前,我们需要定义必要的项目依赖项。在我们的项目中,将需要两个依赖项:
- Kafka 依赖项
- 记录依赖项,即 SLF4J 记录器。
设置依赖需要以下步骤:
Step1: 构建工具Maven包含一个'pom.xml ' 文件。 "pom.xml"是一个默认的 XML 文件,它包含有关 GroupID、ArtifactID 以及 Version 值的所有信息。用户需要在"pom.xml"文件中定义所有必要的项目依赖项。转到"pom.xml"文件。
Step2: 首先,我们需要定义 Kafka 依赖项。创建一个" ... "块,我们将在其中定义所需的依赖项。
步骤3: 现在,打开一个网络浏览器并搜索"Kafka Maven",如下所示:
单击突出显示的链接并选择"Apache Kafka, Kafka-Clients"存储库。示例如下图所示:
Step4: 根据系统上下载的Kafka版本选择repository版本。例如,在本教程中,我们使用"Apache Kafka 2.3.0"。因此,我们需要存储库版本 2.3.0(突出显示的版本)。
步骤 5: 单击存储库版本后,将打开一个新窗口。从那里复制依赖代码。
由于我们使用的是 Maven,因此请复制 Maven 代码。如果用户使用 Gradle,请复制 Gradle 编写的代码。
Step6: 将复制的代码粘贴到' ... '块,如下图:
如果版本号显示为红色,则为意味着用户错过了启用"自动导入"选项。如果是这样,请转到查看>工具窗口>Maven。 Maven 项目窗口将出现在屏幕的右侧。单击出现在那里的"刷新"按钮。这将启用错过的自动导入 Maven 项目。如果颜色变为黑色,则表示已下载丢失的依赖项。用户可以继续下一步。
第 7 步: 现在,打开网络浏览器并搜索"SL4J Simple"并打开以下快照中突出显示的链接:
会出现一堆仓库。单击相应的存储库。
要知道合适的仓库,看在 Maven 项目窗口中,并在"依赖项"下查看 slf4j 版本。
点击相应版本,复制代码,在'pom.xml'文件中的Kafka依赖下面粘贴,如下图:
注意: 要么添加注释,要么从代码中删除 test 标记行。因为这个scope标签为依赖定义了一个有限的范围,而我们所有的代码都需要这个依赖,所以范围不应该是有限的。
现在,我们已经设置了所有必需的依赖项。让我们试试"简单的Hello World"示例。
首先,创建一个java 包,例如"com.firstgroupapp.aktutorial",并在其下创建一个java 类。创建 java 包时,请遵循包命名约定。最后,创建"hello world"程序。
执行'producer1.java'文件后,输出成功显示为'Hello World'。这说明了 IntelliJ IDEA 的成功运行。
创建 Java 生产者
创建 Java 生产者基本上有四个步骤,如前所述:
- 创建生产者属性
- 创建生产者
- 创建生产者记录
- 发送数据。
创建生产者属性
Apache Kafka 提供了各种用于创建生产者的 Kafka 属性。要了解每个属性,请访问 Apache 的官方网站,即"https://kafka.apache.org"。转到 Kafka>文档>配置>生产者配置。
用户可以在那里了解 Apache Kafka 提供的所有生产者属性。在这里,我们将讨论所需的属性,例如:
- bootstrap.servers: 它是用于建立到 Kafka 集群的初始连接的端口对列表。用户只能使用引导服务器进行初始连接。该服务器以 host:port, host:port,... 形式存在。
- key.serializer: 是key的一种Serializer类,用于实现'org.apache.kafka.common.serialization.Serializer'接口。
- value.serializer: 它是一种实现"org.apache.kafka.common.serialization.Serializer"接口的Serializer类。\
现在,让我们看看 IntelliJ IDEA 中生产者属性的实现。
当我们创建属性时,它会将"java.util.Properties"导入到代码中。
所以,这样,第一步创建生产者属性就完成了。
创建生产者
要创建一个Kafka生产者,我们只需要创建一个对象KafkaProducer 的成员。
KafkaProducer 的对象可以创建为:
KafkaProducer<String,String> first_producer = new KafkaProducer<String, String>(properties);
这里,"first_producer"是我们选择的制作人的名字。用户可以相应地选择。
让我们在下面的快照中看到:
创建Producer Record
为了将数据发送到Kafka,用户需要创建ProducerRecord。这是因为所有生产者都位于生产者记录中。在这里,生产者指定了主题名称以及要传递给 Kafka 的消息。
一个 ProducerRecord 可以创建为:
ProducerRecord<String, String> record=new ProducerRecord<String, String>("my_first", "Hye Kafka");
这里,'record' 是为创建生产者记录选择的名称,'my_first' 是主题名称,'Hye Kafka' 是消息。用户可以相应地选择。
让我们在下面的快照中看到:
发送数据
现在,用户准备将数据发送到 Kafka。生产者只需要调用 ProducerRecord 的对象:
first_producer.send(record);
让我们看看下面的快照:
要了解上述代码的输出,请使用以下命令在 CLI 上打开 'kafka-console-consumer':
'kafka-console-consumer-bootstrap-server 127.0.0.1:9092-topic my_first-group first_app'
生产者产生的数据是异步的。因此,需要两个附加函数,即 flush() 和 close()(如上图所示)。 flush() 将强制生成所有数据,close() 将停止生产者。如果不执行这些函数,数据将永远不会发送到Kafka,消费者将无法读取。
下面显示了消费者控制台上的代码输出:
在终端上,用户可以看到各种日志文件。终端上的最后一行表示 Kafka 生产者已关闭。因此,消息会异步显示在消费者控制台上。
Producer without Keys在上一节中,我们看到了生产者如何向Kafka发送数据。为了更深入地了解,即数据是否正确产生,产生于何处,其偏移量和分区值等。让我们了解更多。为了执行回调,用户需要实现回调函数 ...