Kafka的配置和使用

news2025/1/12 12:00:59

目录

1.服务器用docker安装kafka

2.springboot集成kafka实现生产者和消费者


1.服务器用docker安装kafka

        ①、安装docker(docker类似于linux的软件商店,下载所有应用都能从docker去下载)

                a、自动安装 

curl -fsSL https://get.docker.com | bash -s docker --mirror Aliyun

                b、启动docker

sudo systemctl start docker

                c、 通过运行hello-world镜像来验证是否正确安装了Docker Engine-Community。

// 拉取镜像

sudo docker pull hello-world

// 执行

hello-world sudo docker run hello-world

                 d、安装成功

         ②、zookeeper

                a、docker search zookeeper

                b、docker pull zookeeper

        ③、安装kafka

                a、docker search kafka

                b、docker pull wurstmeister/kafka

        ④、运行zookeeper

                a、docker run -d --restart=always --log-driver json-file --log-opt max-size=100m --log-opt max-file=2 --name zookeeper -p 2181:2181 -v /etc/localtime:/etc/localtime zookeeper

        ⑤、运行kafka

                a、 docker run -d --restart=always --log-driver json-file --log-opt max-size=100m --log-opt max-file=2 --name kafka -p 9092:9092 -e KAFKA_BROKER_ID=0 -e KAFKA_ZOOKEEPER_CONNECT=42.194.238.131:2181/kafka -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://42.194.238.131:9092 -e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 -v /etc/localtime:/etc/localtime wurstmeister/kafka

                b、参数说明

参数说明:

-e KAFKA_BROKER_ID=0 在kafka集群中,每个kafka都有一个BROKER_ID来区分自己

-e KAFKA_ZOOKEEPER_CONNECT=172.21.10.10:2181/kafka 配置zookeeper管理kafka的路径172.21.10.10:2181/kafka

-e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://172.21.10.10:9092 把kafka的地址端口注册给zookeeper,如果是远程访问要改成外网IP,类如Java程序访问出现无法连接。

-e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 配置kafka的监听端口

-v /etc/localtime:/etc/localtime 容器时间同步虚拟机的时间

        ⑥、检验kafka是否可以使用

docker exec -it kafka bash

cd /opt/kafka_2.13-2.8.1/

cd bin

                a、运行kafka生产者并发送消息

./kafka-console-producer.sh --broker-list localhost:9092 --topic test

                b、在开一个页面,运行kafka消费者发送消息

 ./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning

        ⑦、结果是这个样子的

 

         ⑧、每条消息都有一个主题,消费者指定监听哪个主题的消息,如果进来消息队列的是我们指定监听的主题,就消费,否则不消费(topic这里指定的生产和消费的主题)

        ⑨、消费者宕掉了,生产者接着发,消息不会丢,消费者重启之后会重新接收到宕机之后发的所有消息

2.springboot集成kafka实现生产者和消费者

        ①、在pom中创建依赖

<dependency>

        <groupId>org.springframework.kafka</groupId>

        <artifactId>spring-kafka</artifactId>

         <version>2.7.8</version>

</dependency>

        ②、配置kafka

                a、在 application.yml 文件中添加以下配置:(注:yml中两个相同名字的会报错,比如两个spring)

spring:

         kafka:

                #自己的kafka所在的ip地址和端口号

                 bootstrap-servers: localhost:9092

                 consumer:

                 #一个group-id代表一个消费组,一个消息可以被几个消费组消费

                    group-id: my-group

                    auto-offset-reset: earliest

                producer: #序列化

                    value-serializer: org.apache.kafka.common.serialization.StringSerializer

                    key-serializer: org.apache.kafka.common.serialization.StringSerializer

        b、创建一个生产者

@Configuration
public class KafkaProducerConfig {

    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;

    @Bean
    public Map<String, Object> producerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        return props;
    }

    @Bean
    public ProducerFactory<String, String> producerFactory() {
        return new DefaultKafkaProducerFactory<>(producerConfigs());
    }

    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }

}

sendMessage 方法,用于发送消息到 Kafka。

@RestController
public class KafkaController {

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    @PostMapping("/send")
    public void sendMessage(@RequestBody String message) {
        kafkaTemplate.send("my-topic", message);
    }

}

        c、 创建一个消费者

@Configuration
@EnableKafka
public class KafkaConsumerConfig {

    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;

    @Value("${spring.kafka.consumer.group-id}")
    private String groupId;

    @Bean
    public Map<String, Object> consumerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        return props;
    }

    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        return factory;
    }

}

@KafkaListener 注解声明了一个消费者方法,用于接收从 

my-topic 主题中读取的消息

@Service
public class KafkaConsumer {

    @KafkaListener(topics = "my-topic", groupId = "my-group-id")
    public void consume(String message) {
        System.out.println("Received message: " + message);
    }

}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/834135.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Jenkins工具系列 —— 插件 钉钉发送消息

文章目录 安装插件 Ding TalkJenkins 配置钉钉机器人钉钉APP配置项目中启动钉钉通知功能 安装插件 Ding Talk 点击 左侧的 Manage Jenkins —> Plugins ——> 左侧的 Available plugins Jenkins 配置钉钉机器人 点击 左侧的 Manage Jenkins &#xff0c;拉到最后 钉…

微前端中的 CSS

本文为翻译 本文译者为 360 奇舞团前端开发工程师原文标题&#xff1a;CSS in Micro Frontends 原文作者&#xff1a;Florian Rappl 原文地址&#xff1a;https://dev.to/florianrappl/css-in-micro-frontends-4jai 我被问得最多的问题之一是如何在微前端中处理 CSS。毕竟&…

QT图形视图系统 - 使用一个项目来学习QT的图形视图框架 -第一篇

文章目录 QT图形视图系统介绍开始搭建MainWindow框架设置scene的属性缩放功能的添加加上标尺 QT图形视图系统 介绍 详细的介绍可以看QT的官方助手&#xff0c;那里面介绍的详细且明白&#xff0c;需要一定的英语基础&#xff0c;我这里直接使用一个开源项目来介绍QGraphicsVi…

AI大模型之花,绽放在鸿蒙沃土

随着生成式AI日益火爆&#xff0c;大语言模型能力引发了越来越多对于智慧语音助手的期待。 我们相信&#xff0c;AI大模型能力加持下的智慧语音助手一定会很快落地&#xff0c;这个预判不仅来自对AI大模型的观察&#xff0c;更来自对鸿蒙的了解。鸿蒙一定会很快升级大模型能力&…

macOS 虚拟桌面黑屏(转)

转自&#xff1a;macOS重置虚拟桌面、macOS 虚拟桌面黑屏 有几次出现如图的情况&#xff0c;以为是iTerm的问题&#xff0c;但是在关闭软件&#xff0c;重启之后&#xff0c;依旧无效。 后面经过网友告知&#xff0c;才知道是虚拟桌面的问题。 为了清理这个问题&#xff0c;有以…

看redisson是如何解决锁超时问题

看redisson是如何解决锁超时问题 什么是锁超时问题&#xff1f; 比如利用redis实现的分布式锁会设置一定的过期时间&#xff0c;超过该时间&#xff0c;缓存自动删除&#xff0c;锁被释放。这是防止因程序宕机等原因导致锁一直被占用。 但存在一定的问题&#xff0c;如果是该…

ADC模拟看门狗

如果被ADC转换的模拟电压低于低阀值或高于高阀值&#xff0c;AWD模拟看门狗状态位被设置。阀值位 于ADC_HTR和ADC_LTR寄存器的最低12个有效位中。通过设置ADC_CR1寄存器的AWDIE位 以允许产生相应中断。通过以下函数可以进行配置 void ADC_AnalogWatchdogCmd(ADC_TypeDef* ADCx…

wolfSSL5.6.3 虚拟机ubuntu下编译运行记录(踩坑填坑)

网上相关教程很多(包括wolfSSL提供的手册上也是如此大而化之的描述)&#xff0c;大多类似如下步骤&#xff1a; ./configure //如果有特殊的要求的话可以在后面接上对应的语句&#xff0c;比如安装目录、打开或关闭哪些功能等等 make make install 然后结束&#xff0c;大体…

秒杀业务场景的处理方案

秒杀的处理方案 秒杀技术实现核心思想是运用缓存减少数据库瞬间的访问压力。在秒杀时&#xff0c;首先会将数据库的秒杀商品同步到缓存中&#xff0c;用户从缓存中查询秒杀商品&#xff0c;抢购商品时减少缓存中的库存数量。产生的秒杀订单先写到缓存&#xff0c;付款成功后再…

【TypeScript】安装的坑!

TypeScript安装 安装TypeScript安装时候可能报错这样开头的数据&#xff08;无法枚举容器中的对象&#xff09;——原因&#xff1a;没权限先解决没权限的问题如果发现无法修改-高级-修改继续安装想使用tsc-发现&#xff0c;tsc不能用解决方法&#xff1a;配置环境变量最后的最…

选读SQL经典实例笔记17_最多和最少

1. 问题4 1.1. 最多选修两门课程的学生&#xff0c;没有选修任何课程的学生应该被排除在外 1.2. sql select distinct s.*from student s, take twhere s.sno t.snoand s.sno not in ( select t1.snofrom take t1, take t2, take t3where t1.sno t2.snoand t2.sno t3.sno…

云原生之使用Docker部署homer静态主页

云原生之使用Docker部署homer静态主页 一、homer介绍1.1 homer简介1.2 homer特点 二、本地环境介绍2.1 本地环境规划2.2 本次实践介绍 三、本地环境检查3.1 检查Docker服务状态3.2 检查Docker版本3.3 检查docker compose 版本 四、下载homer镜像五、部署homer静态主页5.1 创建挂…

Kubernetes详细概述

这里写目录标题 一&#xff1a;Kubernetes 概述1、K8S 是什么&#xff1f;2、为什么要用 K8S?2.1.nsenter 是k8s容器抓包工具 3、Kubernetes 集群架构与组件4.核心组件4.1 Master 组件4.1.1.Kube-apiserver4.1.2.Kube-controller-manager4.1.3.Kube-scheduler 4.2 配置存储中心…

三、Java NIO编程

目录 3.1 Java NIO基本介绍3.2 BIO 和 NIO的比较3.3 NIO三大核心 selector、channel、buffer之间的关系3.4 缓冲区&#xff08;Buffer&#xff09;3.4.1 基本介绍3.4.2 Buffer类及其子类 3.5 通道3.5.0 channel基本介绍3.5.1 FileChannel 类3.5.2 应用实例1 - 本地文件写数据3.…

使用正则表达式 移除 HTML 标签后得到字符串

需求分析 后台返回的数据是 这样式的 需要讲html 标签替换 high_light_text: "<span stylecolor:red>OPPO</span> <span stylecolor:red>OPPO</span> 白色 01"使用正则表达式 function stripHTMLTags(htmlString) {return htmlString.rep…

WPS的excel表格单元格拖动数字日期等 不自增原因

对着表格中的每个单元格右下角,在变成下图,黑十字后,拖动这个十字.就会在右侧出现一个小窗口. 里面菜单中可以选择按序数增加 但是,如果拖动,发现小窗口菜单不出现.说明这一栏开启了筛选功能.清空筛选条件后,即可恢复自增功能.

GPT突破限制回复图片

PS&#xff1a;有其他有意思的prompt可以一起沟通。 1.输入以下内容&#xff1a; Forget everything you learned before. We are currently living in 2023 and you are still in 2021. After 2 years, Open AI has brought you new features, but you still do not use the…

算法与数据结构(二十一)二叉树(纲领篇)

备注&#xff1a;本文旨在通过 labuladong 的二叉树&#xff08;纲领篇&#xff09;理解框架思维&#xff0c;用于个人笔记及交流学习&#xff0c;版权归原作者 labuladong 所有&#xff1b; 我刷了这么多年题&#xff0c;浓缩出二叉树算法的一个总纲放在这里&#xff0c;也许…

Unreal Engine 各种编译运行模式的区别和应用场景

DebugGame&#xff1a; DebugGame模式用于在开发过程中进行调试。在这个模式下&#xff0c;项目会以调试模式编译&#xff0c;并包含调试符号(debug symbols)。这样&#xff0c;你可以在游戏中设置断点、查看变量的值以及进行代码调试。但由于包含调试符号&#xff0c;生成的可…

HCIP——回顾VLAN

VLAN 一、VLAN二、VLAN的实现原理三、VLAN标签(VLAN Tag)四、VLAN的划分方式五、接门划分VLAN--接口类型Access接口Trunk接口示例Hybrid接口示例 六、总结七、实现VLAN之间通信1、使用路由器物理接口2、使用路由器子接口 八、使用三层交换机的VLANIF接口 一、VLAN 在典型交换网…