kafka生产者消费者举例

news2024/9/28 11:17:13

文章目录

    • kafka介绍
    • 生产者消费者例子
      • 一、生产者
      • 二、消费者
      • 三、效果
    • KafkaTemplate @KafkaListener

kafka介绍

Kafka 是一款分布式流处理平台,它被设计用于高吞吐量、持久性、分布式的数据流处理。

  • Kafka 简介

    • Kafka 是一个高吞吐、分布式、基于发布 订阅的消息系统。
    • Kafka 具有高吞吐量、低延迟、可扩展性、持久性、可靠性、容错性、高并发等特性。
  • Kafka 应用场景

    • 日志收集:公司可以使用 Kafka 收集各种服务的日志,然后通过 Kafka 统一接口服务的方式将这些日志开放给各种消费者,例如 Hadoop、Hbase、Solr 等。
    • 消息系统:Kafka 可以解耦生产者和消费者,缓存消息等。
    • 用户活动跟踪:Kafka 经常用于记录 web 用户或 app 用户的各种活动,如浏览网页、搜索、点击等。这些活动信息被各个服务器发布到 Kafka 的 topic 中,然后订阅者通过订阅这些 topic 来实时监控分析,或者装载到 Hadoop、数据仓库中进行离线分析和挖掘。
    • 运营指标:Kafka 也经常用来记录运营监控数据,包括收集各种分布式应用的数据、生产各种操作的集中反馈,比如报警和报告。
    • 流式处理:例如 Spark Streaming 和 Storm。

Kafka 在大规模数据流处理和实时数据传输场景中发挥着重要作用,其发布订阅模型、分区和副本机制以及异步消息传递的特性使其成为分布式系统中的重要组件。

生产者消费者例子

当Docker部署Kafka集群时,需要确保安装了ZooKeeper,因为Kafka依赖于ZooKeeper来实现集群协调与管理。ZooKeeper是一个开源的分布式协调服务,用于维护集群的状态信息、进行领导者选举以及协调分布式应用程序的工作。Kafka利用ZooKeeper来管理集群中的节点、配置信息和分区分配等关键任务,确保集群的稳定运行和可靠性。

先引入依赖:

  <dependency>
      <groupId>org.apache.kafka</groupId>
      <artifactId>kafka-clients</artifactId>
      <version>3.7.0</version>
  </dependency>

一、生产者

public class Producer {
    public static void main(String[] args) {
        // 设置Kafka生产者的配置
        Properties props = new Properties();
        // Kafka集群的地址
        props.put("bootstrap.servers", "192.168.13.133:9092,192.168.13.133:9093,192.168.13.133:9094");
        // 确认模式:全部副本确认
        props.put("acks", "all");
        props.put("retries", 2);
        // 键的序列化器
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        // 创建Kafka生产者实例
        org.apache.kafka.clients.producer.Producer<String, String> producer = new KafkaProducer<>(props);
        // 发送10条消息到主题
        for (int i = 0; i < 10; i++) {
            // send异步发送 ProducerRecord参数: 注意  key value【消息是键值对形式】
            producer.send(new ProducerRecord<String, String>("hac", Integer.toString(i), Integer.toString(i)));
        }
        // 关闭生产者实例
        producer.close();
    }
}

二、消费者

public class Consumer {
    public static void main(String[] args) {
        // 创建消费者配置
        Properties props = new Properties();
        props.setProperty("bootstrap.servers", "192.168.13.133:9092,192.168.13.133:9093,192.168.13.133:9094");
        // 消费者主
        props.setProperty("group.id", "groupId1"); // 消费者组ID
        // 是否开启自动提交偏移量
        props.setProperty("enable.auto.commit", "true");
        // 自动提交偏移量的间隔时间
        props.setProperty("auto.commit.interval.ms", "1000");
        props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); // 创建Kafka消费者实例
        consumer.subscribe(Arrays.asList("hac"));// 订阅主题 可以订阅多个主题

        while (true) {
            // 从服务器拉取消息记录
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));

            // 遍历接收到的消息记录
            for (ConsumerRecord<String, String> record : records) {
                // 输出消息的偏移量、键和值
                System.out.println("接受到的消息: " + record.key() + ":" + record.value());
            }
        }
    }
}

三、效果

在启动Kafka消费者之前,需要确保消费者能够连接到可用的Kafka集群,并正确地订阅了所需的主题。一旦消费者启动并成功订阅了主题,它将持续监听并处理来自Kafka集群的消息。在此期间,消费者将与集群保持连接,并持续从指定的主题中拉取消息进行处理。当生产者向所订阅的主题发送新消息时,消费者将立即收到这些消息,并进行相应的处理。在这里插入图片描述

KafkaTemplate @KafkaListener

KafkaTemplate@KafkaListener是Spring Kafka提供的两个核心组件,用于简化在Spring应用程序中与Apache Kafka集成的过程。

第一步:引入依赖

 <!-- kafkfa -->
 <dependency>
     <groupId>org.springframework.kafka</groupId>
     <artifactId>spring-kafka</artifactId>
     <exclusions>
         <exclusion>
             <groupId>org.apache.kafka</groupId>
             <artifactId>kafka-clients</artifactId>
         </exclusion>
     </exclusions>
 </dependency>
 <dependency>
     <groupId>org.apache.kafka</groupId>
     <artifactId>kafka-clients</artifactId>
 </dependency>

第二步:配置application.yml文件

spring:
  application:
    name: kafka-demo
  kafka:
    bootstrap-servers: 192.168.13.133:9092,192.168.13.133:9093,192.168.13.133:9094
    producer:
      retries: 3
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
      acks: 1

    consumer:
      group-id: groupId1
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

第三步:使用

KafkaTemplate:KafkaTemplate是Spring Kafka提供的一个工具类,用于简化向Kafka发送消息的过程。通过KafkaTemplate,可以方便地将消息发送到指定的Kafka主题。它封装了Kafka的Producer API,提供了一系列发送消息的方法,包括同步发送、异步发送、带回调函数的发送等。使用KafkaTemplate,你可以在Spring应用程序中轻松地发送消息到Kafka集群中。

@KafkaListener:@KafkaListener注解用于标记一个方法,表示这个方法是一个Kafka消息监听器。通过在方法上使用@KafkaListener注解,可以让Spring容器自动创建Kafka消息监听器并订阅指定的主题,当有消息到达时,自动调用标记了@KafkaListener注解的方法进行消息处理。

生产者:

@RestController
@RequestMapping(value = "/kafka")
public class SendController {

    @Autowired
    private KafkaTemplate kafkaTemplate;

    @GetMapping(value = "/send")
    public String send() {
        String msg = "hello"; //这里写固定的测试一下
        String topic = "hac";
        kafkaTemplate.send(topic, msg);
        return "OK";
    }
}

消费者:

@Component 
public class KafkaListenerMessage {

    /***
     * 监听新消息
     */
    @KafkaListener(topics = "hac", groupId = "groupId1") 
    public void listener(ConsumerRecord<String, String> record) {
        String value = record.value();
        int partition = record.partition();
        long offset = record.offset();
        System.out.println("value:" + value + ",partition:" + partition + ",offset:" + offset);
    }
}

效果:
在这里插入图片描述


❤觉得有用的可以留个关注❤

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1663199.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

NSS刷题

[SWPUCTF 2021 新生赛]jicao 类型&#xff1a;PHP、代码审计、RCE 主要知识点&#xff1a;json_decode()函数 json_decode()&#xff1a;对JSON字符串解码&#xff0c;转换为php变量 用法&#xff1a; <?php $json {"ctf":"web","question"…

2024年数维杯B题完整代码和思路论文讲解与分析

2024数维杯数学建模完整代码和成品论文已更新&#xff0c;获取↓↓↓↓↓ https://www.yuque.com/u42168770/qv6z0d/bgic2nbxs2h41pvt?singleDoc# 2024数维杯数学建模B题45页论文和代码已完成&#xff0c;代码为全部问题的代码 论文包括摘要、问题重述、问题分析、模型假设、…

怎么让电脑耳机和音响都有声音

电脑耳机音响不能同时用没声音怎么办 一般来说&#xff0c;重新开机后问题能够得到解决。右击“我的电脑”---“属性”---“硬件”---“设备管理器”&#xff0c;打开“声音、视频和游戏控制器”有无问题&#xff0c;即看前面有没有出现黄色的“”。 如果您的 电脑 耳机能正常…

SQL注入(sqli-labs第一关)

sqli-labs第一关 方法一&#xff1a;手工注入 来到第一关&#xff0c;图上说我们需要一个数字的参数 于是我们先手工注入?id1 and 11 跟?id1 and 12发现页面没有报错 每张截图上面页面中有select查询语句&#xff0c;这是我在第一关的源码中加上了echo "$sql ";…

基于UDP协议Python通信网络程序(服务器端+客户端)及通信协议在自动驾驶场景应用示例

一、UDP协议 UDP&#xff08;用户数据报协议&#xff09;是一种无连接的传输层协议&#xff0c;具有简单、高效的特点&#xff0c;适用于一些对数据可靠性要求不高的应用场景。UDP的主要特点包括无连接、不可靠和面向数据报。这意味着在发送数据之前不需要建立连接&#xff0c…

【Spring之依赖注入】2. Spring处理@Async导致的循环依赖失败问题

使用异步Async注解后导致的循环依赖失败详解 1 问题复现1.1 配置类1.2 定义Service1.3 定义Controller1.4 启动springboot报错 2.原因分析&#xff1a;看Async标记的bean注入时机2.1 循环依赖生成过程2.2 自检程序 doCreateBean方法 3.解决方案3.1 懒加载Lazy3.1.1 将Lazy写到A…

根据部门id删除该部门下的员工(事务)

application.properties&#xff1a; 或&#xff1a; application.yml&#xff1a; 新表&#xff1a; 日志对象类&#xff1a; 日志service类&#xff1a; 日志service接口&#xff1a; 日志mapper类&#xff1a; 部门service类&#xff1a; 员工mapper类&#xff1a;

某大型集团SAP数字化转型方案(95页PPT)

一、资料介绍 《某大型集团SAP数字化转型方案》是一份详尽的95页PPT资料&#xff0c;旨在为某大型集团提供一套全面而深入的SAP数字化转型方案。该方案紧密结合了集团的业务特点和发展需求&#xff0c;以SAP系统为核心&#xff0c;通过数字化技术的运用&#xff0c;实现业务流…

【redis】Redis五种常用数据类型和内部编码,以及对String字符串类型的总结

˃͈꒵˂͈꒱ write in front ꒰˃͈꒵˂͈꒱ ʕ̯•͡˔•̯᷅ʔ大家好&#xff0c;我是xiaoxie.希望你看完之后,有不足之处请多多谅解&#xff0c;让我们一起共同进步૮₍❀ᴗ͈ . ᴗ͈ აxiaoxieʕ̯•͡˔•̯᷅ʔ—CSDN博客 本文由xiaoxieʕ̯•͡˔•̯᷅ʔ 原创 CSDN 如…

ESP32引脚入门指南(七):从理论到实践(IIC)

引言 IIC&#xff08;Inter-Integrated Circuit&#xff09;&#xff0c;又称为IC&#xff0c;是一种简单而高效的多主控器串行通信协议&#xff0c;常用于微控制器和各种外围设备之间的通信。在ESP32系列芯片中&#xff0c;IIC协议被广泛应用于连接各种传感器、存储器和其他支…

计算机网络实验1:交换机基本配置管理

实验目的和要求 安装Packer Tracer&#xff0c;了解Packer Tracer的基本操作掌握交换机基本命令集实验项目内容 认识Packet Tracer软件 交换机的基本配置与管理 交换机的端口配置与管理 交换机的端口聚合配置 交换机划分Vlan配置 实验环境 硬件&#xff1a;PC机&#x…

HTML5 + CSS3实现卖茶女与水果男的巅峰微信聊天对决,看完后笑一整天

记得之前看过一段卖茶女与水果男的聊天视频&#xff0c;当时觉得真有意思&#xff0c;竟然还可以这样热爱自己的事业。我就想&#xff0c;用HTML5 CSS3实现一下这个过程&#xff0c;锻炼了技术&#xff0c;也娱乐了开发人员&#xff0c;多有意思的一件事啊。 目录 1 实现思路…

Dbeaver连接一段时间不操作后断开的问题

右键数据库连接点击编辑连接点击初始化将连接保持改成60s

杰理AC632N 通过写flash修改蓝牙名字

杰理修改蓝牙名字一般有他自己的一个工具的,如下图,在编译前修改 现在用写flash的方式更改: 主要看杰理CFG_BT_NAME这个宏里面的.c文件 杰理给这里的flash都定义好每个宏的信息,你需要读写就行,修改后记得重启上电才能生效.如果你要自定义数据写进去flash断电不丢失,估计就是在…

智慧公厕的技术基础、保障技术和应用价值

近年来&#xff0c;随着信息技术的快速发展&#xff0c;智慧公厕逐渐成为城市管理的热点项目。智慧公厕利用物联网技术与大数据、云计算、网络通信、自动化控制等先进技术相结合&#xff0c;公共厕所的管理变得更加快捷高效&#xff0c;实现了真正的智能化使用和智慧化管理。下…

盘点自动驾驶的技术发展趋势

自动驾驶技术在不断发展变快&#xff0c;我们之前提过算法岗如今越来越卷&#xff0c;从今年的就业局势看&#xff0c;前年还属于蓝海行业的自动驾驶&#xff0c;今年就已经满满关上了招揽之门——呈红海之势。作为在这个行业中摸爬滚打的一以子&#xff0c;我们到底该如何纵观…

Goland GC

Goland GC 引用Go 1.3 mark and sweep 标记法Go 1.5 三色标记法屏障机制插入屏障删除写屏障总结 Go 1.8 混合写屏障(hybrid write barrier)机制总结 引用 https://zhuanlan.zhihu.com/p/675127867 Garbage Collection&#xff0c;缩写为GC&#xff0c;一种内存管理回收的机制…

ABAP 直连sqlserver或oracle数据库

1、事务码DBCO,配置链接 2、测试链接&#xff1a;sm38 执行ADBC_TEST_CONNECTION 3、运行时会报驱动找不到的错误&#xff0c;解决方法&#xff1a; S4 HANA连接其他数据库&#xff08;oracle,sqlserver)

代码审计平台sonarqube的安装及使用

docker搭建代码审计平台sonarqube 一、代码审计关注的质量指标二、静态分析技术分类三、使用sonarqube的目的四、sonarqube流程五、docker快速搭建sonarqube六、sonarqube scanner的安装和使用七、sonarqube对maven项目进行分析八、sonarqube分析报告解析九、代码扫描规则定制十…

mysql--join

Join 两个表都有一个主键索引 id 和一个索引 a&#xff0c;字段 b 上无索引&#xff0c;表 t2 里插入了 1000 行数据&#xff0c;在表 t1 里插入的是 100 行数据 CREATE TABLE t2 (id int(11) NOT NULL,a int(11) DEFAULT NULL,b int(11) DEFAULT NULL,PRIMARY KEY (id),KEY …