docker安装和使用kafka

news2025/1/19 2:32:18

1. 启动zookeeper

Kafka依赖zookeeper, 首先安装zookeeper
-p:设置映射端口(默认2181

docker run --name zookeeper \
	--network app-tier \
    -e ALLOW_ANONYMOUS_LOGIN=yes \
	--restart=always \
    -d bitnami/zookeeper:latest

2. 启动kafka

docker run --name kafka \
	--network app-tier \
    -p 9092:9092 \
    -e ALLOW_PLAINTEXT_LISTENER=yes \
    -e KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181 \
    -e KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092	 \
	--restart=always \
    -d bitnami/kafka:latest
命令解释
ALLOW_PLAINTEXT_LISTENER=yes任何人可以访问
KAFKA_CFG_ZOOKEEPER_CONNECTzookeeper地址
KAFKA_CFG_ADVERTISED_LISTENERS当前kafka安装的主机地址 如果是服务器部署则配服务器IP或域名否则客户端监听消息会报地址错误

2. 启动kafka-map管理工具

docker run --name kafka-map \
	--network app-tier \
    -p 9001:8080 \
    -v /usr/local/kafka-map/data:/usr/local/kafka-map/data \
    -e DEFAULT_USERNAME=admin \
    -e DEFAULT_PASSWORD=admin \
    --restart=always \
	-d dushixiang/kafka-map:latest

启动成功后, 访问客户端: http://localhost:9001
账户: admin
密码: admin

在这里插入图片描述

3. springboot集成kafka

pom.xml配置

    <dependencies>
    	<!--kafka依赖-->
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>
    </dependencies>        

配置application.yml

#------------------------------------spring----------------------------------
spring:
  #------------------------------------消息队列kafka配置----------------------------------
  kafka:
    #  kafka server的地址,如果有多个,使用逗号分割
    bootstrap-servers: localhost:9092
    producer:
      # 发生错误后,消息重发的次数。
      retries: 1
      #当有多个消息需要被发送到同一个分区时,生产者会把它们放在同一个批次里。该参数指定了一个批次可以使用的内存大小,按照字节数计算。
      batch-size: 16384
      # 设置生产者内存缓冲区的大小。32MB的批处理缓冲区
      buffer-memory: 33554432
      # 键的序列化方式
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      # 值的序列化方式
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
      # acks=0 : 生产者在成功写入消息之前不会等待任何来自服务器的响应。
      # acks=1 : 只要集群的首领节点收到消息,生产者就会收到一个来自服务器成功响应。
      # acks=all :只有当所有参与复制的节点全部收到消息时,生产者才会收到一个来自服务器的成功响应。
      acks: 1
      properties:
        # 自定义拦截器
        interceptor.classes: com.wms.message.kafka.interceptor.CustomProducerInterceptor
        #自定义分区器
        partitioner.classes: com.wms.message.kafka.interceptor.CustomPartitioner
    consumer:
      # 自动提交的时间间隔 在spring boot 2.X 版本中这里采用的是值的类型为Duration 需要符合特定的格式,如1S,1M,2H,5D
      auto-commit-interval: 1S
      # 该属性指定了消费者在读取一个没有偏移量的分区或者偏移量无效的情况下该作何处理:
      # latest(默认值)在偏移量无效的情况下,消费者将从最新的记录开始读取数据(在消费者启动之后生成的记录)
      # earliest :在偏移量无效的情况下,消费者将从起始位置读取分区的记录
      auto-offset-reset: earliest
      # 是否自动提交偏移量,默认值是true,为了避免出现重复数据和数据丢失,可以把它设置为false,然后手动提交偏移量
      enable-auto-commit: false
      # 键的反序列化方式
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      # 值的反序列化方式
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      properties:
        # 自定义消费者拦截器
        interceptor.classes: com.wms.message.kafka.interceptor.CustomConsumerInterceptor
      # 默认消费者组
      group-id: code-safe-group
      # 设置最大轮询间隔时间(毫秒),默认值为 300000(5分钟)
      # 如果两次 poll() 之间的时间超过此配置值,可能导致 rebalance, 消费者会被剔除 此处设置10分钟
      max-poll-interval-ms: 600000
      # 批量一次最大拉取数据量
      max-poll-records: 1000
      batch:
        # 批消费并发量,小于或等于Topic的分区数
        concurrency: 3
    listener:
      # 在侦听器容器中运行的线程数。
      concurrency: 5
      #listner负责ack,每调用一次,就立即commit
      ack-mode: manual_immediate
      missing-topics-fatal: false
    topics:
      # 自定义主题名称
      twsm: webSocket_send_message_dev
      group-id: group-id
      topic-name:
        - topic1

测试发送消息到kafka

/**
 * Kafka测试
 *
 * @version 1.0
 * @author: web
 * @date: 2024/1/18 15:07
 */
@Slf4j
@RestController
@RequestMapping("/message/kafkaTest")
public class KafkaTestController extends BaseController
{

    @Autowired
    private KafkaUtils kafkaUtils;

    /**
     * 生产者_推送消息到kafka
     *
     * @param msg
     * @author: web
     * @return: AjaxResult
     * @date: 2024/1/18 15:16
     */
    @PostMapping("/send")
    public AjaxResult send(@RequestBody Map<String, Object> msg)
    {
        try
        {
            String userId = msg.get("userId").toString();
            Object content = msg.get("content");
            Message message = kafkaUtils.setMessage(userId, content);
            kafkaUtils.send(KafkaUtils.TOPIC_TEST, message);
        }
        catch (Exception e)
        {
            log.error("生产者_推送消息到kafka发生异常");
        }
        return success();
    }

    /**
     * 消费者1
     *
     * @param record
     * @param ack
     * @param topic
     * @author: web
     * @return: void
     * @date: 2024/1/18 15:07
     */
    @KafkaListener(topics = KafkaUtils.TOPIC_TEST)
    public void topicTest1(ConsumerRecord<?, ?> record, Acknowledgment ack,
                           @Header(KafkaHeaders.RECEIVED_TOPIC) String topic)
    {
        Optional message = Optional.ofNullable(record.value());
        if (message.isPresent())
        {
            Object msg = message.get();
            log.info("topic.group1 消费了: Topic:" + topic + ",Message:" + msg);
            ack.acknowledge();
        }
    }

    /**
     * 消费者2
     *
     * @param record
     * @param ack
     * @param topic
     * @author: web
     * @return: void
     * @date: 2024/1/18 15:07
     */
    //    @KafkaListener(topics = KafkaUtils.TOPIC_TEST, groupId = KafkaUtils.TOPIC_GROUP2)
    //    public void topicTest2(ConsumerRecord<?, ?> record, Acknowledgment ack,
    //                           @Header(KafkaHeaders.RECEIVED_TOPIC) String topic)
    //    {
    //
    //        Optional message = Optional.ofNullable(record.value());
    //        if (message.isPresent())
    //        {
    //            Object msg = message.get();
    //            log.info("topic.group2 消费了: Topic:" + topic + ",Message:" + msg);
    //            ack.acknowledge();
    //        }
    //    }

}

KafkaUtils类

/**
 * 生产者
 *
 * @version: 1.0
 * @author: web
 * @date: 2024/1/18 10:37
 */
@Component
@Slf4j
public class KafkaUtils
{

    @Resource
    private KafkaTemplate<String, Object> kafkaTemplate;

    /**
     * 自定义topic
     */
    public static final String TOPIC_TEST = "topic.code-safe";
    /**
     * 自定义消费组
     */
    public static final String TOPIC_GROUP1 = "topic.group1";
    public static final String TOPIC_GROUP2 = "topic.group2";

    // 业务相关topic

    /**
     * 主题: webSocket发送消息到客户端
     */
    public static String TOPIC_WEBSOCKET_SEND_MESSAGE;

    @Autowired
    private String[] kafkaTopicName;

    /**
     * 获取配置文件中的盐值,并设置到静态变量中
     *
     * @param topic 主题
     */
    @Value("${spring.kafka.topics.twsm}")
    private void setTwsmTopic(String topic)
    {
        TOPIC_WEBSOCKET_SEND_MESSAGE = topic;
    }

    /**
     * 发送消息
     *
     * @param topic   主题
     * @param message 消息内容
     * @author: web
     * @return: void
     * @date: 2024/1/18 10:42
     */
    public void send(String topic, Object message)
    {
        if (StringUtils.isEmpty(topic) || StringUtils.isNull(message))
        {
            throw new ServiceException("生产者发送消息到kafka_主题或消息内容不能为空!");
        }
        String obj2String = JsonUtils.toJsonString(message);
        //        log.info("准备发送消息为:{}", obj2String);
        //发送消息
        ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send(topic, obj2String);
        // 监听回调
        future.addCallback(new ListenableFutureCallback<SendResult<String, Object>>()
        {
            @Override
            public void onFailure(Throwable throwable)
            {
                //发送失败的处理
                log.error(topic + " - 生产者 发送消息失败:" + throwable.getMessage());
            }

            @Override
            public void onSuccess(SendResult<String, Object> stringObjectSendResult)
            {
                //成功的处理
//                log.info(topic + " - 生产者 发送消息成功:" + stringObjectSendResult.toString());
            }
        });
    }

    /**
     * 设置websocket发送的消息体
     *
     * @param userId 用户ID
     * @param msg    消息内容
     * @author: web
     * @return: Message 消息对象
     * @date: 2024/1/19 11:36
     */
    public Message setMessage(String userId, Object msg)
    {
        Message message = new Message();
        message.setSendUserId(userId);
        message.setSendTime(DateUtils.getTime());
        message.setSendContent(String.valueOf(msg));
        return message;
    }
}

Message类

@Data
public class Message implements Serializable
{

    private static final long serialVersionUID = -118L;

    /**
     * 发送人ID
     */
    private String sendUserId;

    /**
     * 发送人
     */
    //    private String sendUserName;

    /**
     * 发送时间
     */
    private String sendTime;

    /**
     * 发送内容
     */
    private String sendContent;
}

监听消息

/**
 * 消息接收监听器【分布式系统】
 *
 * @version: 1.0
 * @author: web
 * @date: 2024/1/19 13:44
 */
@Component
@Slf4j
public class MessageListener
{
    /**
     * 根据用户id发送消息到客户端
     *
     * @param record
     * @param ack
     * @param topic
     * @author: web
     * @return: void
     * @date: 2024/1/20 22:05
     */
    @KafkaListener(topics = "#{'${spring.kafka.topics.twsm}'}", groupId = "#{topicGroupId}")
    public void sendMessageByUserId(ConsumerRecord<String, String> record, Acknowledgment ack,
                                    @Header(KafkaHeaders.RECEIVED_TOPIC) String topic)
    {
        Optional<String> optional = Optional.ofNullable(record.value());
        if (optional.isPresent())
        {
            Message message = JsonUtils.parseObject(optional.get(), Message.class);
            if (StringUtils.isNull(message))
            {
                log.error("消费者收到kafka消息的内容为空!");
                return;
            }
//            log.info("消费者收到kafka消息");
            String sendUserId = message.getSendUserId();
            String sendContent = message.getSendContent();
            // 确认收到消息
			ack.acknowledge();
        }
    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1496331.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Vscode 使用SSH远程连接树莓派的教程(解决卡在Downloading with wget)

配置Vscode Remote SSH 安装OpenSSH 打开Windows开始页面&#xff0c;直接进行搜索PowerShell&#xff0c;打开第一个Windows PowerShell&#xff0c;点击以管理员身份运行 输入指令 Get-WindowsCapability -Online | ? Name -like OpenSSH* 我是已经安装好了&#xff0c;…

基于springboot的车辆充电桩管理系统(系统+数据库+文档)

** &#x1f345;点赞收藏关注 → 私信领取本源代码、数据库&#x1f345; 本人在Java毕业设计领域有多年的经验&#xff0c;陆续会更新更多优质的Java实战项目&#xff0c;希望你能有所收获&#xff0c;少走一些弯路。&#x1f345;关注我不迷路&#x1f345;** 一、研究背景…

彻底搞清楚CUDA和cuDNN版本问题

彻底搞清楚CUDA和cuDNN版本问题 1. 缘起 我的机器上以下三条指令输出的版本不相同。 nvcc -V # 这个输出11.7 nvidia-smi # 右上角显示12.3 import torch; torch.version.cuda # 这个输出12.1我想以此为契机&#xff0c;彻底搞清楚CUDA、cuDNN和torch之间的关系。 环境&a…

Wireshark——获取捕获流量的前N个数据包

1、问题 使用Wireshark捕获了大量的消息&#xff0c;但是只想要前面一部分。 2、方法 使用Wireshark捕获了近18w条消息&#xff0c;但只需要前5w条。 选择文件&#xff0c;导出特定分组。 输入需要保存的消息范围。如&#xff1a;1-50000。 保存即可。

阿里云2核4G服务器支持人数并发测试,2核4G主机测评

阿里云2核4G服务器多少钱一年&#xff1f;2核4G配置1个月多少钱&#xff1f;2核4G服务器30元3个月、轻量应用服务器2核4G4M带宽165元一年、企业用户2核4G5M带宽199元一年。可以在阿里云CLUB中心查看 aliyun.club 当前最新2核4G服务器精准报价、优惠券和活动信息。 阿里云官方2…

Docker部署ruoyi前后端分离项目

目录 一. 介绍前后端项目 二. 搭建局域网 2.1 创建网络 2.2 注意点 三. Redis 3.1 安装 3.2 配置redis.conf文件 3.3 测试 四. 安装MySQL 4.1 安装 4.2 配置my2.cnf文件 4.3 充许远程连接 五. 若依部署后端服务 5.1 数据导入 5.2 使用Dockerfile自定义镜像 5.3 运行…

青少年如何从零开始学习Python编程?有它就够了!

文章目录 写在前面青少年为什么要学习编程 推荐图书图书特色内容简介 推荐理由粉丝福利写在最后 写在前面 本期博主给大家带来一本非常适合青少年学习编程的图书&#xff0c;快来看看吧~ 青少年为什么要学习编程 青少年学习编程&#xff0c;就好比在他们年轻时就开始掌握一种…

[C语言]——分支和循环(2)

目录 一.逻辑操作符&#xff1a;&& , || , &#xff01; 1.逻辑取反运算符! 2.与运算符&& 3.或运算符 4.练习&#xff1a;闰年的判断 5.短路 二.switch语句 1.if语句和switch语句的对比 2.switch语句中的break 3.switch语句中的default 4.switch…

docker 数据卷 详解与实践

常见的数据卷命令 命令 说明 文档地址 docker volume create 创建数据卷 docker volume create docker volume ls 查看所有数据卷 docs.docker.com docker volume rm 删除指定数据卷 docs.docker.com docker volume inspect 查看某个数据卷的详情 docs.docker.co…

MIT 6.S081---Lab: Multithreading

Uthread: switching between threads (moderate) 修改uthread.c&#xff0c;在thread中新增context字段&#xff1a; 修改uthread.c&#xff0c;在thread_create函数中新增以下逻辑&#xff1a; 修改uthread.c中的thread_switch函数定义&#xff1a; 修改uthread.c中的th…

代码随想录算法训练营第三十八天|509. 斐波那契数、70. 爬楼梯、746. 使用最小花费爬楼梯

509. 斐波那契数 刷题https://leetcode.cn/problems/fibonacci-number/description/文章讲解https://programmercarl.com/0509.%E6%96%90%E6%B3%A2%E9%82%A3%E5%A5%91%E6%95%B0.html#%E7%AE%97%E6%B3%95%E5%85%AC%E5%BC%80%E8%AF%BE视频讲解https://www.bilibili.com/video/BV…

框架漏洞--->Log4j2 Shiro1.2.4反序列化基础

上次讲了thinkphp 那么这次我们就来讲一下log4j2 1.关于log4j2的原理 ----> CVE-2021-44228 当时这个漏洞出来的时候&#xff0c;可以说是轰动了全球~!!!!! 当时基本上是用这个框架的都爆出了这个漏洞 于是&#xff0c;它就在框架漏洞中占有了一席重要之地&#xff01;&am…

【C语言 数据结构】堆与二叉树(下)

接着上次的&#xff0c;这里主要介绍的是堆排序&#xff0c;二叉树的遍历&#xff0c;以及之前讲题时答应过的简单二叉树问题求解 堆排序 给一组数据&#xff0c;升序&#xff08;降序&#xff09;排列 思路 思考&#xff1a;如果排列升序&#xff0c;我们应该建什么堆&#x…

Redis面试题(答案版)2024

基础内容 1、简单介绍以下你了解的Redis &#xff08;1&#xff09;高性能&#xff1a;Redis是基于内存的&#xff0c;读写速度非常快&#xff0c;可以支持10w的QPS。 &#xff08;2&#xff09;用途多样&#xff1a;缓存、消息队列、分布式锁等 &#xff08;3&#xff09;支持…

物联网电气融合实训室建设方案

1 教学实训总体设计 1.1 建设背景 &#xff08;一&#xff09;政策推动与战略部署 近年来&#xff0c;物联网技术在全球范围内得到了广泛的关注和应用。作为信息技术的重要组成部分&#xff0c;物联网在推动经济转型升级、提升社会管理水平、改善民生福祉等方面发挥着重要作…

ChatGPT高效提问——说明提示技巧

ChatGPT高效提问——说明提示技巧 现在&#xff0c;让我们开始体验“说明提示技巧”&#xff08;IPT, Instructions Prompt Technique&#xff09;和如何用它生成来自ChatGPT的高质量的文本。说明提示技巧是一个通过向ChatGPT提供需要依据的具体的模型的说明来指导ChatGPT输出…

FPGA-串口接收图像写入RAM并读出在TFT显示屏上显示

系统框图&#xff1a; 需要用到的模块有&#xff1a; 1&#xff0c;UART_RX(串口接收模块)&#xff1b; 2&#xff0c;串口接受的数据存放到RAM模块&#xff1b; 3&#xff0c;RAM IP核&#xff1b; 4&#xff0c;时钟IP核 &#xff08;TFT显示屏驱动时钟的产生&#xff09…

理解循环神经网络(RNN)

文章目录 1. 引言&#xff1a;什么是RNN以及它的重要性RNN简介RNN在机器学习中的作用和应用场景 2. RNN的工作原理神经网络基础RNN的结构和运作方式循环单元的作用 3. RNN的关键特点与挑战参数共享长期依赖问题门控机制&#xff08;例如LSTM和GRU&#xff09;代码示例&#xff…

【Vue】vue3 在图片上渲染 OCR 识别后的文本框、可复制文本组件

需求 后面返回解析后的文本和四角坐标&#xff0c;在图片上渲染成框&#xff0c;并且可复制。图片还可以缩放、拖拽 实现 这里要重点讲下关于OCR文本框的处理&#xff1a; 因为一些文字可能是斜着放的&#xff0c;所有我们要特殊处理&#xff0c;根据三角函数来计算出它的偏…

openEuler学习——部署MGR集群

本文介绍如何利用GreatSQL 8.0.25构建一个三节点的MGR集群。 1.安装准备 IP端口角色192.168.20.1103306mgr1192.168.20.1113306mgr2192.168.20.1123306mgr3 配置hosts解析 [rootMGR1 ~]# cat >> /etc/hosts << EOF > 192.168.20.110 MGR1 > 192.168.20.1…