主页 > 新闻资讯 > 大数据学习:Kafka生产者实现步骤

大数据学习:Kafka生产者实现步骤

作者:张老师 浏览次数: 2021-03-15 17:53
Kafka当中,生产者和消费者两个环节,是确保其高吞吐量的消息传递的重要组件,生产者引入数据,消费者处理数据,各司其职,保证消息队列的通畅运行。今天的大数据学习分享,我们就主要来讲讲Kafka生产者是如何工作的。

大数据学习:Kafka生产者实现步骤

Kafka生产者实现步骤

实现一个简单的Kafka生产者一般步骤如下:

1、创建Properties对象,设置生产者级别配置

以下3个配置是必须指定的:

bootstrap.servers:配置连接Kafka代理列表,不必包含Kafka集群所有的代理地址,当连接上一个代理后,会从集群元数据信息中获取其他存活的代理信息。为了保证能够成功连上Kafka集群,建议至少配置两个代理。

key.serializer:配置用于序列化消息Key的类

value.serializer:配置用于序列化消息实际数据的类

2、根据Properties对象实例化一个KafkaProducer对象

3、实例化ProducerRecord对象,每条消息对应一个ProducerRecord对象

4、调用KafkaProducer发送消息的方法将ProducerRecord发送到Kafka相应节点

Kafka提供了两个发送消息的方法,即send(ProducerRecord<String,String>record)方法和send(ProducerRecord<String,String>record,Callbackcallback)方法,如果消息发送发生异常,Callback接口的onCompletion会捕获到相应异常。KafkaProducer默认是异步发送消息,会将消息缓存到消息缓冲区中,当消息在消息缓冲区中累计到一定数量后作为一个RecordBatch再发送。

生产者发送消息实质分两个阶段:第一阶段是将消息发送到消息缓冲区;第二阶段是一个Sender线程负责将缓冲区的消息发送到代理,执行真正的I/O操作,而在第一阶段执行完后就返回一个Future对象,根据对Future对象处理方式的不同,KafkaProducer支持两种发送消息方式:

异步方式

两个send方法都返回一个Future<RecordMetadata>对象,只负责将消息发送到消息缓冲区,并不等待Sender线程处理结果,若希望了解异步方式消息发送成功与否,可以在回调函数中进行相应处理,当消息被Sender线程处理后会回调Callback。

同步方式

通过调用send方法返回的Future对象的get()方法以阻塞式获取执行结果,等待Sender线程处理的最终结果。

5、关闭KafkaProducer,释放连接的资源

为了提升Kafka发送消息的吞吐量,在数据量比较大同时对消息顺序也没有严格要求的情况下,可以采用多线程的方式。由于KafkaProducer是线程安全,只实例化一个KafkaProducer对象运行多个线程共享该生产者发送消息。

关于大数据学习,Kafka生产者实现步骤,以上就为大家做了基本的讲解了。Kafka在生产者这一端,其架构设计是称得上考虑严谨的,值得大家去深究学习。成都加米谷大数据,专业大数据培训机构,大数据开发、数据分析与挖掘,零基础班本月正在招生中,课程大纲及学习视频,可联系客服领取!
热点排行
推荐文章
立即申请>>