kafka快速上手

news2025/1/7 10:27:29

一、kafka介绍

Kafka 是一个分布式流媒体平台,类似于消息队列或企业消息传递系统。

kafka官网:http://kafka.apach e.org/

 

二、kafka入门

  • 生产者发送消息,多个消费者只能有一个消费者接收到消息
  • 生产者发送消息,多个消费者都可以接收到消息

步骤如下: 

(1)创建kafka-demo项目,导入依赖 

<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
 </dependency>

(2)生产者发送消息

public class ProducerQuickStart {
    public static void main(String[] args) throws ExecutionException, InterruptedException {

        //1.kafka链接配置信息
        Properties prop = new Properties();

        //kafka链接地址
        prop.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");

        //key和value的序列化
        prop.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
        prop.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");

        //2.创建kafka生产者对象
        KafkaProducer<String,String> producer = new KafkaProducer<String,String>(prop);

        //3.发送消息
        /**
         * 第一个参数 :topic
         * 第二个参数:消息的key
         * 第三个参数:消息的value
         */
        ProducerRecord<String,String> kvProducerRecord = new ProducerRecord<String,String>("topic-first","key-001","hello kafka");

        //同步发送消息
        producer.send(kvProducerRecord);

        //4.关闭消息通道  必须要关闭,否则消息发送不成功
        producer.close();
    }
}

  (3)消费者接收消息

public class ConsumerQuickStart {
    public static void main(String[] args) {
        //1.kafka的配置信息
        Properties prop = new Properties();

        //链接地址
        prop.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");

        //key和value的反序列化器
        prop.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        prop.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");

        //设置消费者组
        prop.put(ConsumerConfig.GROUP_ID_CONFIG, "group1");

        //2.创建消费者对象
        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(prop);

        //3.订阅主题
        consumer.subscribe(Collections.singletonList("topic-first"));

        //4.拉取消息
        while (true) {
            // 读取数据,读取超时时间为100ms ,即每个1000ms拉取一次
            ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofMillis(1000));
            for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
                System.out.println(consumerRecord.key());
                System.out.println(consumerRecord.value());
            }
        }

    }
}

使用情景:

生产者发送消息,多个消费者订阅同一个主题(多个消费者都是一个组)只能有一个消费者收到消息 (一对一)

生产者发送消息,多个消费者订阅同一个主题(多个消费者不是一个组)所有消费者都能收到消息 (一对多) 

三、springboot集成kafka

1.导入spring-kafka依赖信息

<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <!-- kafkfa -->
    <!-- Spring Kafka的依赖,排除了kafka-clients -->
    <dependency>
        <groupId>org.springframework.kafka</groupId>
        <artifactId>spring-kafka</artifactId>
        <exclusions>
            <exclusion>
                <groupId>org.apache.kafka</groupId>
                <artifactId>kafka-clients</artifactId>
            </exclusion>
        </exclusions>
    </dependency>
    <!-- 显式声明kafka-clients的依赖 -->
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
     </dependency>
    <dependency>
        <groupId>com.alibaba</groupId>
        <artifactId>fastjson</artifactId>
    </dependency>
</dependencies>

2.在resources下创建文件application.yml

server:
  port: 9991
spring:
  application:
    name: kafka-demo
  kafka:
    bootstrap-servers: localhost:9092
    producer:
      retries: 10 #重试的次数
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
    consumer:
      group-id: ${spring.application.name}-test
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

3.消息生产者

@RestController
public class HelloController {
    @Autowired
    private KafkaTemplate<String,String> kafkaTemplate;

    @GetMapping("/hello")
    public String hello(){
        kafkaTemplate.send("test-topic","HelloWorld");
//        User user=new User();
//        user.setUsername("lili");
//        user.setAge(18);
//        kafkaTemplate.send("test-topic", JSON.toJSONString(user));

        return "ok";
    }
}

4.消息消费者

@Component
public class HelloListener {
    
    @KafkaListener(topics = "test-topic")
    public void onMessage(String message){
        if (!StringUtils.isEmpty(message)){
            System.out.println(message);
//            User user = JSON.parseObject(message, User.class);
//            System.out.println(user);
        }
    }
}

传递消息为对象:

目前springboot整合后的kafka,因为序列化器是StringSerializer,可以把要传递的对象进行转json字符串,接收消息后再转为对象即可

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2098142.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

HarmonyOS开发实战( Beta5版)多线程能力场景化示例最佳实践

在介绍Worker和TaskPool的详细使用方法前&#xff0c;我们先简单介绍并发模型的相关概念&#xff0c;以便于大家的理解。 并发模型概述 并发的意思是多个任务同时执行。并发模型分为两大类&#xff1a;基于内存共享的并发模型和基于消息传递的并发模型。 在基于内存共享的并…

手撕Python之序列类型

1.列表---list 索引的使用 当我们有一个数据的时候&#xff0c;我们怎么将这个数据存储到程序呢&#xff1f; 我们定义一个变量&#xff0c;将数据存储在变量中 那么如果有100个数据呢&#xff1f;要定义100个变量吗&#xff1f; 我们是可以用列表这个东西进行多个数据的存…

单元测试 Mock不Mock?

文章目录 前言单元测试没必要?Mock不Mock?什么是Mock?Mock的意义何在? 如何Mock&#xff1f;应该Mock什么&#xff1f;Mock 编写示例 总结 前言 前段时间&#xff0c;我们团队就单元测试是否采用 Mock 进行了一番交流&#xff0c;各有各的说法。本文就单元测试 Mock不Mock…

11 Java 方法引用、异常处理、Java接口之函数式编程(接口知识补充Function<T,R>、BiFunction<T, U, R>和自定义泛型接口)

文章目录 前言一、Java接口之函数式编程 --- 接口知识补充1 Function<T,R>泛型接口2 BiFunction<T, U, R>泛型接口3 自定义泛型函数式编程接口4 使用lambda表达式、方法引用进行函数式编程 二、方法引用1 方法引用初体验&#xff08;以Array.sort()方法为例&#x…

当了中层才发现:领导根本不在意,你干了多少活

在职场的棋盘上&#xff0c;每个人都是一枚棋子&#xff0c;而中层领导则像是那些走在前线的骑士——既要冲锋陷阵&#xff0c;又要运筹帷幄。但你有没有想过&#xff0c;领导真的在意你加班到深夜&#xff0c;或是周末还在回复邮件吗&#xff1f; 当你从基层一步步爬到中层&a…

Unity(2022.3.41LTS) - UI详细介绍-Slider(滑动条)

目录 零.简介 一、基本功能与用途 二、组件介绍 零.简介 在 Unity 中&#xff0c;Slider&#xff08;滑动条&#xff09;是一个可以滑动的 UI 组件. 一、基本功能与用途 数值调节&#xff1a;主要功能是让用户在一个特定的数值范围内进行选择。例如&#xff0c;可以用于调…

深度学习之张量(Tensor)的创建、常见属性及数据转换

基本概念 PyTorch会将数据封装成张量&#xff08;Tensor&#xff09;进行计算&#xff0c;所谓张量就是元素为相同类型的多维矩阵。 张量是一个多维数组&#xff0c;通俗来说可以看作是扩展了标量、向量、矩阵的更高维度的数组。张量的维度决定了它的形状&#xff08;Shape&a…

同城搭子系统小程序开发产品分析

1. 市场调研与需求分析 目标用户定位&#xff1a;定义核心用户群&#xff0c;例如上班族、学生、旅游爱好者等。需求收集&#xff1a;运用问卷调查、用户访谈等方法收集用户对功能的具体需求&#xff0c;特别是对安全、便捷性和费用分摊的关注点。竞品分析&#xff1a;分析同类…

开心第一课:健康坐姿

文章目录 引言保持脊柱的自然伸展选择一把合适的座椅引言 建议坐位时间超过 30 分钟,就起身活动一下,促进血液循环,预防久坐带来的各种健康问题。 保持脊柱的自然伸展 正确的骨盆位置是使坐位时身体重量都作用在双侧的坐骨结节上,在结节的顶端有滑囊,滑囊分泌液体减少组…

由“为什么VO不能继承PO?” 引出的为什么组合优于继承?

简述VO、DTO、PO的概念。 如下概念是我个人的理解&#xff1a; VO&#xff08;View Object&#xff09;&#xff1a; 视图对象&#xff0c;用于展示&#xff0c;这很好理解&#xff0c;就是前端页面所需数据封装&#xff0c;一般所需要的属性比 PO 多并且。DTO&#xff08;Da…

二手电脑配置给你不一样的成就感之四

今天测了e3 1220 v1 没有想象中的好&#xff0c;鲁大师才评20多万分&#xff0c;比fm2平台6600k强一点点有限。 6600k fm2平台是20万出头。cpuz的评分还是比较高&#xff0c;实际使用效果比6600k稍好一些。 单盒能力确实比apu强一些。同样拿world frvr 测试&#xff0c;调到最低…

微分方程_by小崔说数

可降解的微分方程 不显含x&#xff1a;y两撇dp/dxdp/dy*dy/dx 不显含y&#xff1a;dp/dx 都是y撇等于p 自变量与因变量呼唤 讲解为一阶线性微风方程 &#xff0c;变成可分离得 公式得 高阶可降解得微分方程 通解非齐次特解齐次通解 非齐次特解&#xff1b;解得叠加原…

旗晟智能助推浙大海创人形机器人创新中心发展!

8月27日上午&#xff0c;余杭区成功举办机器人产业高质量发展大会。会上&#xff0c;由地方政府与浙江大学共同建设的人形机器人产业创新中心成立并揭牌。该中心目标明确&#xff0c;致力于打造机器人领域国内、国际的高等级创新高地。浙江大学校领导、院士专家、机器人企业代表…

js调试--本地替换

js调试--本地替换 一、本地替换的作用二、操作方法(以百度首页为例)1、选中目标资源地址二、替换为本地资源三、修改本地内容一、本地替换的作用 本地替换的作用就是将原本访问服务器的数据改成访问本地。 二、操作方法(以百度首页为例) 1、选中目标资源地址 以百度首页…

智能工厂MES实施规划

智能工厂MES&#xff08;制造执行系统&#xff09;实施规划是一个复杂而系统的过程&#xff0c;旨在通过数字化手段提升工厂的生产效率、降低成本并提高产品质量。以下是一个全面的智能工厂MES实施规划方案&#xff0c;涵盖主要步骤和关键点&#xff1a; 一、前期准备与需求分析…

手撸瀑布流

一、需求&#xff1f; 要求实现 一排两列 瀑布流样式&#xff0c;样式如下&#xff1a;其中A为容器&#xff0c;B为组件样式&#xff0c;卡片高度会因为标题的多少来自适应。 二、解法 1.使用CSS的column&#xff08;⚠️不推荐&#xff09; 使用CSS 属性 column 用来设置…

朋友们!pinterest视频保存教学给你们做出来啦

有没有在为pinterest视频无法下载而烦恼的小伙伴呀&#xff1f;我反正是被这个困扰好久了&#xff0c;每每看到喜欢的视频素材不能下载&#xff0c;真的很捉急&#xff01;最近发现了这款神器&#xff0c;就是之前给大家推荐【水印抹布】app可以完美解决pinterest视频保存的问题…

Vue2与Vue3区别以及兼容那些浏览器酌情使用

一、vue2与vue3区别 vuejs/vue: 是一个用于构建用户界面的 JavaScript 框架,具有简洁的语法和丰富的组件库,可以用于开发单页面应用程序和多页面应用程序。 项目地址:https://gitcode.com/gh_mirrors/vu/vue 免费下载资源 1.根节点不同 vue2中必须要有根标签vue3中可以没…

pod进阶:

pod进阶&#xff1a; pod的生命周期当中的状态&#xff1a; 1、Running 运行中&#xff0c;pod已经分配到节点上&#xff0c;且pod内的容器正常运行。正常状态&#xff08;ready 1/1&#xff09; 2、complete 完成之后退出&#xff0c;容器内的返回码是0 echo $? 表示容器是…

基于Easy-Wav2lip-Gradio的AI数字人

数字人技术近年来在多个领域得到广泛应用,从虚拟主播到电影特效,都需要精确的音视频同步技术来实现逼真的效果。传统的嘴型同步技术往往面临着准确性不高、处理速度慢等问题。为了解决这些问题,原有的Wav2Lip项目应运而生。Wav2Lip通过人工智能技术,实现了音频和视频的高精…