一文了解kafka消息队列,实现kafka的生产者(Producer)和消费者(Consumer)的代码,消息的持久化和消息的同步发送和异步发送

news2024/11/25 23:02:17

文章目录

  • 1. kafka的介绍
    • 1.2 Kafka适合的应用场景
    • 1.2 Kafka的四个核心API
  • 2. 代码实现kafka的生产者和消费者
    • 2.1 引入加入jar包
    • 2.2 生产者代码
    • 2.3 消费者代码
    • 2.4 介绍kafka生产者和消费者模式
  • 3. 消息持久化
  • 4. 消息的同步和异步发送
  • 5. 参考文档

1. kafka的介绍

最近在学习kafka相关的知识,特将学习成功记录成文章,以供大家共同学习。

Apache Kafka是 一个分布式流处理平台, 这到底意味着什么呢?

  1. 它可以让你发布和订阅流式的记录。这一方面与消息队列或者企业消息系统类似。

  2. 它可以储存流式的记录,并且有较好的容错性。

  3. 它可以在流式记录产生时就进行处理。

1.2 Kafka适合的应用场景

我们首先要了解kafka的一些概念:

  1. Kafka作为一个集群,运行在一台或者多台服务器上.

  2. Kafka通过topic对存储的流数据进行分类。

  3. 每条记录中包含一个key,一个value和一个timestamp(时间戳)。

它可以用于两大类别的应用:

  1. 构造实时流数据管道,它可以在系统或应用之间可靠地获取数据。 (相当message queue)

  2. 构建实时流式应用程序,对这些流数据进行转换或者影响。 (就是流处理,通过kafka stream topictopic之间内部进行变化)

1.2 Kafka的四个核心API

  1. The Producer API允许一个应用程序发布一串流式的数据到一个或者多个Kafka topic

  2. The Consumer API允许一个应用程序订阅一个或多个topic,并且对发布给他们的流式数据进行处理。

  3. The Streams API允许一个应用程序作为一个流处理器,消费一个或者多个topic产生的输入流,然后生产一个输出流到一个或多个topic中去,在输入输出流中进行有效的转换。

  4. The Connector API允许构建并运行可重用的生产者或者消费者,将Kafka topics连接到已存在的应用程序或者数据系统。比如,连接到一个关系型数据库,捕捉表(table)的所有变更内容。

在Kafka中,客户端和服务器使用一个简单、高性能、支持多语言的TCP协议。此协议版本化并且向下兼容老版本, 为Kafka提供了Java客户端,也支持许多其他语言的客户端。

2. 代码实现kafka的生产者和消费者

2.1 引入加入jar包

<dependency>
	<groupId>org.apache.kafka</groupId>
	<artifactId>kafka_2.12</artifactId>
	<version>1.0.0</version>
   	<scope>provided</scope> 
</dependency>

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>1.0.0</version>
</dependency>

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-streams</artifactId>
    <version>1.0.0</version>
</dependency>

2.2 生产者代码

/**
 * @author super先生
 * @date 2023/02/09 11:26
 */
public class KafkaProducerDemo extends Thread {
    /**
     * 消息发送者
     */
    private final KafkaProducer<Integer, String> producer;
 
    /**
     * topic
     */
    private final String topic;
 
    public KafkaProducerDemo(String topic) {
        //构建相关属性
        //@see ProducerConfig
        Properties properties = new Properties();
        //Kafka 地址
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.220.135:9092,192.168.220.136:9092");
        //kafka 客户端 Demo
        properties.put(ProducerConfig.CLIENT_ID_CONFIG, "KafkaProducerDemo");
        //The number of acknowledgments the producer requires the leader to have received before considering a request complete. This controls the durability of records that are sent.
        /**发送端消息确认模式:
         *  0:消息发送给broker后,不需要确认(性能较高,但是会出现数据丢失,而且风险最大,因为当 server 宕机时,数据将会丢失)
         *  1:只需要获得集群中的 leader节点的确认即可返回
         *  -1/all:需要 ISR 中的所有的 Replica进行确认(集群中的所有节点确认),最安全的,也有可能出现数据丢失(因为 ISR 可能会缩小到仅包含一个 Replica)
          */
        properties.put(ProducerConfig.ACKS_CONFIG, "-1");
 
        /**【调优】
         * batch.size 参数(默认 16kb)
         *  public static final String BATCH_SIZE_CONFIG = "batch.size";
         *
         *  producer对于同一个 分区 来说,会按照 batch.size 的大小进行统一收集进行批量发送,相当于消息并不会立即发送,而是会收集整理大小至 16kb.若将该值设为0,则不会进行批处理
         */
 
        /**【调优】
         * linger.ms 参数
         *  public static final String LINGER_MS_CONFIG = "linger.ms";
         *  一个毫秒值。Kafka 默认会把两次请求的时间间隔之内的消息进行搜集。相当于会有一个 delay 操作。比如定义的是1000(1s),消息一秒钟发送5条,那么这 5条消息不会立马发送,而是会有一个 delay操作进行聚合,
         *  delay以后再次批量发送到 broker。默认是 0,就是不延迟(同 TCP Nagle算法),那么 batch.size 也就不生效了
         */
        //linger.ms 参数和batch.size 参数只要满足其中一个都会发送
 
        /**【调优】
         * max.request.size 参数(默认是1M)   设置请求最大字节数
         * public static final String MAX_REQUEST_SIZE_CONFIG = "max.request.size";
         * 如果设置的过大,发送的性能会受到影响,同时写入接收的性能也会受到影响。
         */
 
        //设置 key的序列化,key 是 Integer类型,使用 IntegerSerializer
        //org.apache.kafka.common.serialization
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.IntegerSerializer");
        //设置 value 的序列化
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
 
        //构建 kafka Producer,这里 key 是 Integer 类型,Value 是 String 类型
        producer = new KafkaProducer<Integer, String>(properties);
        this.topic = topic;
    }
 
    public static void main(String[] args) {
        new KafkaProducerDemo("test").start();
    }
 
    @Override
    public void run() {
        int num = 0;
        while (num < 100) {
            String message = "message--->" + num;
            System.out.println("start to send message 【 " + message + " 】");
            producer.send(new ProducerRecord<Integer, String>(topic, message));
            num++;
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

2.3 消费者代码

/**
 * @author super先生
 * @date 2023/02/09 15:26
 */
public class KafkaConsumerDemo extends Thread {
 
    private final KafkaConsumer<Integer, String> kafkaConsumer;
 
    public KafkaConsumerDemo(String topic) {
        //构建相关属性
        //@see ConsumerConfig
        Properties properties = new Properties();
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.220.135:9092,192.168.220.136:9092");
        //消费组
        /**
         * consumer group是kafka提供的可扩展且具有容错性的消费者机制。既然是
         一个组,那么组内必然可以有多个消费者或消费者实例((consumer instance),
         它们共享一个公共的ID,即group ID。组内的所有消费者协调在一起来消费订
         阅主题(subscribed topics)的所有分区(partition)。当然,每个分区只能由同一
         个消费组内的一个consumer来消费.后面会进一步介绍。
         */
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, "KafkaConsumerDemo");
 
        /** auto.offset.reset 参数  从什么时候开始消费
         *  public static final String AUTO_OFFSET_RESET_CONFIG = "auto.offset.reset";
         *
         *  这个参数是针对新的groupid中的消费者而言的,当有新groupid的消费者来消费指定的topic时,对于该参数的配置,会有不同的语义
         *  auto.offset.reset=latest情况下,新的消费者将会从其他消费者最后消费的offset处开始消费topic下的消息
         *  auto.offset.reset= earliest情况下,新的消费者会从该topic最早的消息开始消费
            auto.offset.reset=none情况下,新的消费组加入以后,由于之前不存在 offset,则会直接抛出异常。说白了,新的消费组不要设置这个值
         */
 
        //enable.auto.commit
        //消费者消费消息以后自动提交,只有当消息提交以后,该消息才不会被再次接收到(如果没有 commit,消息可以重复消费,也没有 offset),还可以配合auto.commit.interval.ms控制自动提交的频率。
        //当然,我们也可以通过consumer.commitSync()的方式实现手动提交
        properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
 
        /**max.poll.records
         *此参数设置限制每次调用poll返回的消息数,这样可以更容易的预测每次poll间隔
         要处理的最大值。通过调整此值,可以减少poll间隔
         */
 
        //间隔时间
        properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
        //反序列化 key
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.IntegerDeserializer");
        //反序列化 value
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        //构建 KafkaConsumer
        kafkaConsumer = new KafkaConsumer<>(properties);
        //设置 topic
        kafkaConsumer.subscribe(Collections.singletonList(topic));
    }
 
 
    /**
     * 接收消息
      */
    @Override
    public void run() {
        while (true) {
            //拉取消息
            ConsumerRecords<Integer, String> consumerRecord = kafkaConsumer.poll(100000000);
            for (ConsumerRecord<Integer, String> record : consumerRecord) {
                System.out.println("message receive 【" + record.value() + "】");
            }
        }
    }
 
    public static void main(String[] args) {
        new KafkaConsumerDemo("test").start();
    }
}

2.4 介绍kafka生产者和消费者模式

如下图所示,分别有三个消费者,属于两个不同的group,那么对于firstTopic这个topic 来说,这两个组的消费者都能同时消费这个topic中的消息,对于此事的架构来说,这个firstTopic就类似于 ActiveMQ中的topic概念。

(组内是竞争的,不同组之间是不竞争的)

Producer产生一个hello消息,group=1 和 group=2都能消费,但是每个组里面只有一个Consumer可以消费。

在这里插入图片描述

但如果3个消费者都属于同一个group,那么此时firstTopic就是一个Queue的概念。Producer产生一个hello消息,group=1组里面只有一个Consumer可以消费,如下图所示:

在这里插入图片描述

Kafka通过Group就能够实现p2p和发布订阅。一个Group只能消费一次消息。

3. 消息持久化

Kafka的消息都会持久化到磁盘上。

一个Group只能消费一次消息。然后再换一个GroupId,消费者又能够再次消费消息 (只要消息在磁盘上,Kafka默认保存2天)

  1. 启动Producer

在这里插入图片描述

  1. 启动Consumer

在这里插入图片描述

4. 消息的同步和异步发送

修改Producer增加异步发送参数,如下代码所示:

/**
 * @author super先生
 * @date 2023/02/09 15:00
 */
public class KafkaProducerDemo extends Thread {
    /**
     * 消息发送者
     */
    private final KafkaProducer<Integer, String> producer;
 
    /**
     * topic
     */
    private final String topic;
 
    private final Boolean isAsync;
 
    public KafkaProducerDemo(String topic, Boolean isAsync) {
        this.isAsync = isAsync;
        //构建相关属性
        //@see ProducerConfig
        Properties properties = new Properties();
        //Kafka 地址
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.220.135:9092,192.168.220.136:9092");
        //kafka 客户端 Demo
        properties.put(ProducerConfig.CLIENT_ID_CONFIG, "KafkaProducerDemo");
        //The number of acknowledgments the producer requires the leader to have received before considering a request complete. This controls the durability of records that are sent.
           /**发送端消息确认模式:
         *  0:消息发送给broker后,不需要确认(性能较高,但是会出现数据丢失,而且风险最大,因为当 server 宕机时,数据将会丢失)
         *  1:只需要获得集群中的 leader节点的确认即可返回
         *  -1/all:需要 ISR 中的所有的 Replica进行确认(集群中的所有节点确认),最安全的,也有可能出现数据丢失(因为 ISR 可能会缩小到仅包含一个 Replica)
          */
        properties.put(ProducerConfig.ACKS_CONFIG, "-1");
 
        /**【调优】
         * batch.size 参数(默认 16kb)
         *  public static final String BATCH_SIZE_CONFIG = "batch.size";
         *
         *  producer对于同一个 分区 来说,会按照 batch.size 的大小进行统一收集进行批量发送,相当于消息并不会立即发送,而是会收集整理大小至 16kb.若将该值设为0,则不会进行批处理
         */
 
        /**【调优】
         * linger.ms 参数
         *  public static final String LINGER_MS_CONFIG = "linger.ms";
         *  一个毫秒值。Kafka 默认会把两次请求的时间间隔之内的消息进行搜集。相当于会有一个 delay 操作。比如定义的是1000(1s),消息一秒钟发送5条,那么这 5条消息不会立马发送,而是会有一个 delay操作进行聚合,
         *  delay以后再次批量发送到 broker。默认是 0,就是不延迟(同 TCP Nagle算法),那么 batch.size 也就不生效了
         */
        //linger.ms 参数和batch.size 参数只要满足其中一个都会发送
 
        /**【调优】
         * max.request.size 参数(默认是1M)   设置请求最大字节数
         * public static final String MAX_REQUEST_SIZE_CONFIG = "max.request.size";
         * 如果设置的过大,发送的性能会受到影响,同时写入接收的性能也会受到影响。
         */
 
        //设置 key的序列化,key 是 Integer类型,使用 IntegerSerializer
        //org.apache.kafka.common.serialization
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.IntegerSerializer");
        //设置 value 的序列化
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
 
        //构建 kafka Producer,这里 key 是 Integer 类型,Value 是 String 类型
        producer = new KafkaProducer<Integer, String>(properties);
        this.topic = topic;
    }
 
    public static void main(String[] args) {
        new KafkaProducerDemo("test",true).start();
    }
 
    @Override
    public void run() {
        int num = 0;
        while (num < 100) {
            String message = "message--->" + num;
            System.out.println("start to send message 【 " + message + " 】");
            if (isAsync) {  //如果是异步发送
                producer.send(new ProducerRecord<Integer, String>(topic, message), new Callback() {
                    @Override
                    public void onCompletion(RecordMetadata metadata, Exception exception) {
                        if (metadata!=null){
                            System.out.println("async-offset:"+metadata.offset()+"-> partition"+metadata.partition());
                        }
                    }
                });
            } else {   //同步发送
                try {
                    RecordMetadata metadata = producer.send(new ProducerRecord<Integer, String>(topic, message)).get();
                    System.out.println("sync-offset:"+metadata.offset()+"-> partition"+metadata.partition());
                } catch (InterruptedException | ExecutionException e) {
                    e.printStackTrace();
                }
            }
            num++;
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

Kafka 1.0 以后,客户端默认发送都是异步发送,简单说就是都是会发到一个队列中,然后在一个线程中不断的从队列中发送数据。

异步发送发送成功后会有一个回调,执行相应的操作。

同步发送也只是基于Future#get而已,相当于是同步获取结果,这个就很好理解了。

5. 参考文档

  1. https://blog.csdn.net/Dongguabai/article/details/86520617

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/336545.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Ubuntu20.04+cuda11.2+cudnn8.1+Anaconda3安装tensorflow-GPU环境,亲测可用

(1)安装nvidia显卡驱动注意Ubuntu20.04和Ubuntu16.04版本的安装方法不同,安装驱动前一定要更新软件列表和安装必要软件、依赖&#xff08;必须&#xff09;sudo apt-get update #更新软件列表sudo apt-get install gsudo apt-get install gccsudo apt-get install make查看GP…

4.5.1 泛型

文章目录1.概述2.泛型的具体表现形式3.泛型的作用4.泛型示例5.练习&#xff1a;泛型测试一6.练习:泛型测试二1.概述 泛型不是指一种具体的类型&#xff0c;而是说&#xff0c;这里有个类型需要设置&#xff0c;那么具体设置成什么类型&#xff0c;得看具体的使用&#xff1b; …

RabbitMQ-持久化

一、介绍如何保证RabbitMQ服务停掉以后生产者发送过来的消息不丢失。默认情况下RabbitMQ退出或由于某种原因崩溃时&#xff0c;他将忽视队列和消息&#xff0c;除非告知它不要这样做。确保消息不丢失需要做两件事情&#xff1a;将队列和消息都标记为持久化二、队列持久化再声明…

(1分钟速通面试) SLAM中的最小二乘问题

最小二乘拟合问题 求解超定方程首先写这篇博客之前说一个背景&#xff0c;这个最小二乘拟合问题是我在去年面试实习的时候被问到的&#xff0c;然后当时是非常的尴尬&#xff0c;没有回答上来里面的问题。Hhh 所以这篇博客来进行一个补充学习一下下。感觉这个最小二乘问题还是比…

根据报告20%的白领在一年内做过副业,你有做副业吗?

现在大部分人收入单一&#xff0c;收入都是来源于本职工作&#xff0c;当没有了工作就没有了收入的来源&#xff0c;而生活压力又很大&#xff0c;各种开支&#xff0c;各种消费。所以很多人想要增加收入来源&#xff0c;增加被动收入&#xff0c;同时通过副业提升自己的价值和…

LeetCode·每日一题·1223.掷骰子模拟·记忆化搜索

作者&#xff1a;小迅链接&#xff1a;https://leetcode.cn/problems/dice-roll-simulation/solutions/2103471/ji-yi-hua-sou-suo-zhu-shi-chao-ji-xiang-xlfcs/来源&#xff1a;力扣&#xff08;LeetCode&#xff09;著作权归作者所有。商业转载请联系作者获得授权&#xff0…

libxlsxwriter中文报错问题

libxlsxwriter库在windows系统下VS中存在中文输入报错问题。这在小白关于libxlsxwriter的第一篇博客libxlsxwriter初体验里有所阐述。当时小白给出的解决方案是将文件编码修改成不带签名的utf-8。后来在使用中&#xff0c;小白发现这样并没有完全解决问题。有的中文可以正常写入…

VHDL语言基础-时序逻辑电路-触发器

目录 触发器&#xff1a; D触发器&#xff1a; 触发器的VHDL描述&#xff1a; 触发器的仿真波形如下&#xff1a;​编辑 时钟边沿检测的三种方法&#xff1a; 方法一: 方法二&#xff1a; 方法三&#xff1a; 带有Q非的D触发器&#xff1a; 带有Q非的D触发器的描述&am…

微信小程序 Springboot高校课堂教学管理系统-java

小程序端 学生在小程序端进行注册并且进行登录。 填写自己的个人信息进行注册 登录成功后可以看到有首页、课程资源、测试、互动论坛、我的功能模块。 课程资源学生可以点击想要查看的资源进行观看。 课程分类学生可以按照自己想要的分类进行搜索并且进行观看。 互动论坛可以查…

四种方式的MySQL安装过程 数据库(2)

目录 1. 仓库安装&#xff1a; 1.1 卸载数据库软件&#xff1a; 2. 本地安装&#xff1a; 2.1 卸载数据库软件&#xff1a; 3. 容器安装&#xff1a; 4. 源码安装&#xff1a; 4.1 使用systemctl命令启动进程 1. 仓库安装&#xff1a; &#xff08;1&#xff09;查看版本…

超级详细GitBook和GitLab集成步骤【linux环境】

介绍 本文主要是在 gitlab 上集成 gitbook 实现提交时 gitbook 自动刷新部署 &#xff0c;以及在 linux 环境上搭建 gitlab gitbook,集成 GitLab CI 实现一个企业级或个人的 Wiki 系统 环境准备 1.一台 linux 服务器 2.安装 node 以及 npm 环境 (这里注意 node 环境不要过高 不…

CS反制之批量伪装上线

分析原理&#xff1a; 我们利用Wireshark抓包工具分析一下Cobalt Strike的上线过程是怎么样的 点击木马&#xff0c;主机上线并抓包 查看数据包 可以看到cookie是一串非对称RSA加密类型&#xff0c;需要一个私钥Private Key才能对其进行解密 我们对Cookie解密看看&#xff…

Django框架之系列二

为什么要搭建虚拟环境? 在开发过程中, 当需要使用python的某些工具包/框架时需要联网安装 比如联网安装Django框架django的1.11.11版本 sudo pip install django1.11.11提示&#xff1a;使用如上命令, 会将Django安装到/usr/local/lib/python2.7/dist-packages路径下问题&…

常见的10种网络安全攻击类型

1. DoS 和 DDoS 攻击DoS 是 Denial of Service 的简称&#xff0c;即拒绝服务。单一的 DoS 攻击一般是采用一对一方式的&#xff0c;通过制造并发送大流量无用数据&#xff0c;造成通往被攻击主机的网络拥塞&#xff0c;耗尽其服务资源&#xff0c;致使被攻击主机无法正常和外界…

57 长短期记忆网络(LSTM)【动手学深度学习v2】

57 长短期记忆网络&#xff08;LSTM&#xff09;【动手学深度学习v2】 深度学习学习笔记 学习视频&#xff1a;https://www.bilibili.com/video/BV1JU4y1H7PC/?spm_id_fromautoNext&vd_source75dce036dc8244310435eaf03de4e330 长短期记忆网络&#xff08;LSTM&#xff09…

Element UI框架学习篇(四)

Element UI框架学习篇(四) 1 准备工作 1.0 创建Emp表并插入相应数据的sql语句 /*MySQL数据库*/SET NAMES utf8mb4; SET FOREIGN_KEY_CHECKS 0;-- ---------------------------- -- Table structure for emp -- ---------------------------- DROP TABLE IF EXISTS emp; CRE…

为什么需要内存对齐

内存对齐 为什么需要内存对齐&#xff1f; 平台原因&#xff1a;不是所有的硬件平台都能访问任意内存地址上的任意数据&#xff0c;某些硬件平台只能在某些地址处取某些特定类型的数据&#xff0c;否则抛出硬件异常。为了同一个程序可以在多平台运行&#xff0c;需要内存对齐…

阻塞式队列-生产者消费者模型

1.阻塞队列是什么 阻塞队列是一种特殊的队列. 也遵守 "先进先出" 的原则. 阻塞队列能是一种线程安全的数据结构, 并且具有以下特性: 当队列满的时候, 继续入队列就会阻塞, 直到有其他线程从队列中取走元素.当队列空的时候, 继续出队列也会阻塞, 直到有其他线程往队…

HLNet代码debug记录

昨天跑HLNet的代码&#xff0c;配环境的时候又双叒叕遇到了一些问题&#xff0c;记录一下&#xff1a; 1.error: identifier “AT_CHECK“ is undefined 出现在python setup.py build develop的时候 参照https://blog.csdn.net/sinat_29957455/article/details/113334944 根据报…

如何在腾讯云服务器上安装Jupyter Notebook示例?

Jupyter简介及服务器端安装 首先&#xff0c;服务器端安装Jupyter。 sudo pip3 install jupyterlab&#xff1a; 启动Jupyter服务 # 设置jupyter web的密码jupyter-notebook password# 创建jupyter工作目录mkdir ~/jupyter_workspace# 启动jupyter (两次ctrlc停止服务)jup…