docker安装kafka,并集成springboot进行测试

news2025/1/15 21:00:49

大家好,今天我们开始学习kafka中间件,今天我们改变一下策略,不刷视频学习,改为实践学习,在网上找一些案例功能去做,来达到学习实践的目的。

首先,是安装相关组件。

1. docker安装安装

1.1 yum-utils软件包

yum install -y yum-utils

1.2 设置阿里云镜像


yum-config-manager --add-repo http://mirrors.aliyun.com/docker-ce/linux/centos/docker-ce.repo

1.3 安装docker

yum install docker-ce docker-ce-cli containerd.io 

1.4 启动docker

systemctl start docker

1.5 测试

docker version
docker run hello-world
docker images

至此,docker就安装完毕了。接下来就是安装zookeeper和kafka了,我这里用的是kafka2.x的版本,因此需要结合zookeeper去是使用。现在最新的kafka3.x已经可以抛弃zookeeper去单独使用了,小伙伴们有兴趣的话可以自己去动手安装实践下。

2. 安装zookeeper和kafka

2.1 docker安装zookeeper

docker pull wurstmeister/zookeeper

2.2 启动zookeeper

docker run -d --name zookeeper -p 2181:2181 -e TZ="Asia/Shanghai" --restart always wurstmeister/zookeeper 

2.3 docker查看zookeeper容器是否启动

docker ps

 出现以上信息,就代表zookeeper已经安装并启动成功。

2.4 安装kafka

docker pull wurstmeister/kafka

2.5 启动kafka

docker run -d --name kafka -p 9092:9092 -e KAFKA_BROKER_ID=0 -e KAFKA_ZOOKEEPER_CONNECT=124.223.205.125:2181 -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://124.223.205.125:9092 -e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 -e KAFKA_HEAP_OPTS="-Xmx256M -Xms256M" -e TZ="Asia/Shanghai" wurstmeister/kafka 

2.6 用docker ps查看kafka是否启动

出现以上信息,就代表kafka启动成功了。

下来就测试一下

3. 发送消息和消费消息

3.1 进入kafka容器

docker exec -it 容器id /bin/bash

cd /opt/kafka_2.13-2.8.1/bin/

 3.2 连接生产者

./kafka-console-producer.sh --broker-list localhost:9092 --topic shopping

接下来就可以发送消息了。

 3.3 另起一个窗口,重复3.1的动作进入kafka容器,然后连接消费者

./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic shopping --from-beginning

这是就能就收消息了。

 到达这里,我们的kafka就安装并测试成功了。

4. 接下来我们就创建Springboot工程来连接kafka进行消息的生产和消费

4.1 pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.7.4</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.volga</groupId>
    <artifactId>kafka</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <description>Demo project for Spring Boot</description>
    <properties>
        <java.version>1.8</java.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-devtools</artifactId>
            <scope>runtime</scope>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>

        <!-- Kafka -->
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka-test</artifactId>
            <scope>test</scope>
        </dependency>
        <!-- 阿里巴巴 fastjson -->
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.58</version>
        </dependency>

    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>

4.2 我们创建一个订单的实体类

@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
public class Order {
    /**
     * 订单id
     */
    private long orderId;
    /**
     * 订单号
     */
    private String orderNum;
    /**
     * 订单创建时间
     */
    private LocalDateTime createTime;
}

4.3 创建生产者

@Component
@Slf4j
public class KafkaProvider {
    /**
     * 消息 TOPIC
     */
    private static final String TOPIC = "shopping";

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    public void sendMessage(long orderId, String orderNum, LocalDateTime createTime) {
        // 构建一个订单类
        Order order = Order.builder()
                .orderId(orderId)
                .orderNum(orderNum)
                .createTime(createTime)
                .build();

        // 发送消息,订单类的 json 作为消息体
        ListenableFuture<SendResult<String, String>> future =
                kafkaTemplate.send(TOPIC, JSONObject.toJSONString(order));

        // 监听回调
        future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
            @Override
            public void onFailure(Throwable throwable) {
                log.info("生产者产生消息 失败 ## Send message fail ...");
            }

            @Override
            public void onSuccess(SendResult<String, String> result) {
                log.info("生产者产生消息 成功 ## Send message success ...");
            }
        });
    }
}

4.4 创建消费者

@Component
@Slf4j
public class KafkaConsumer {
    @KafkaListener(topics = "shopping", groupId = "group_id") //这个groupId是在yml中配置的
    public void consumer(String message) {
        log.info("消费者消费信息 ## consumer message: {}", message);
    }
}

4.5 创建测试类

@SpringBootTest
public class SpringBootKafakaApplicationTests {
    @Autowired
    private KafkaProvider kafkaProvider;

    @Test
    public void sendMessage() throws InterruptedException {
        System.out.println("是否为空??+"+kafkaProvider);
        // 发送 10 个消息
        for (int i = 0; i < 10; i++) {
            long orderId = i+1;
            String orderNum = UUID.randomUUID().toString();
            kafkaProvider.sendMessage(orderId, orderNum, LocalDateTime.now());
        }
        TimeUnit.MINUTES.sleep(1);
    }
}

4.6 要创建一个Application方法,不然项目会启动报错

@SpringBootApplication
public class KafkaApplication {
    public static void main(String[] args) {
        SpringApplication.run(KafkaApplication.class,args);
    }
}

4.7 配置application.yml

spring:
  kafka:
    # 指定 kafka 地址,我这里部署在的虚拟机,开发环境是Windows,kafkahost是虚拟机的地址, 若外网地址,注意修改为外网的IP( 集群部署需用逗号分隔)
    bootstrap-servers: 服务器ip:9092
    consumer:
      # 指定 group_id
      group-id: group_id
      auto-offset-reset: earliest
      # 指定消息key和消息体的序列化方式
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
    producer:
      # 发生错误后,消息重发的次数。
      retries: 0
      #当有多个消息需要被发送到同一个分区时,生产者会把它们放在同一个批次里。该参数指定了一个批次可以使用的内存大小,按照字节数计算。
      batch-size: 16384
      # 设置生产者内存缓冲区的大小。
      buffer-memory: 33554432
      # 指定消息key和消息体的序列化方式
      key-deserializer: org.apache.kafka.common.serialization.StringSerializer
      value-deserializer: org.apache.kafka.common.serialization.StringSerializer
    listener:
      # 在侦听器容器中运行的线程数。
      concurrency: 5
      #listner负责ack,每调用一次,就立即commit
      ack-mode: manual_immediate
      missing-topics-fatal: false

以上就创建项目成功了,我们运行测试方法,就能获取kafka中的消息了。

### 生产消息

 ### 消费消息

这里就是简单实现了kafka的消息生产和消费,后续的kafka复杂场景的实现会持续更新。

我是空谷有来人,谢谢支持。 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/443878.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

深度学习中的各种不变性

不变性 平移不变性&#xff08;Translation Invariance&#xff09;旋转不变性&#xff08;Ratation Invariance&#xff09;尺度不变性&#xff08;Size Invariance&#xff09;光照不变性&#xff08;Illumination Invariance&#xff09;仿射不变性&#xff08;Affine Invar…

PS学习记录-图像【像素】与【分辨率】的说明

我们经常能在图片的属性中看到 1920像素x1080像素 &#xff08;老司机在视频文件中也经常看到~&#xff09; 这就是我们常说的图片分辨率&#xff0c;以下是我学习整理的关于像素、分辨率的资料。 注意&#xff1a; 图像分辨率是针对【位图】的&#xff0c;图片分辨率决定了…

记录-JS简单实现购物车图片局部放大预览效果

这里给大家分享我在网上总结出来的一些知识&#xff0c;希望对大家有所帮助 一、实现效果 二、代码实现 代码不多&#xff0c;先看一下 HTML 里面结构很简单&#xff0c;初始化 MagnifyingGlass 对象来关联一个 IMG 标签来实现放大。 <!DOCTYPE html> <html> <h…

做一个网站需要多少个技术人员?

作为互联网从业者&#xff0c;这么多年来经常会碰到一个灵魂拷问&#xff0c;那就是“为什么一个网站需要那么多技术人员&#xff1f;”&#xff0c;尤其是提问者如果再追问一下“听说几个相关专业的学生一个课程的作业就是开发一个网站或者app&#xff0c;那为什么现在主流的网…

C++ | 认识标准库string和vector

本文概要 本篇文章主要介绍C的标准库类型string和vector&#xff0c;文中描述和代码示例很详细&#xff0c;看完即可掌握&#xff0c;感兴趣的小伙伴快来一起学习吧。 &#x1f31f;&#x1f31f;&#x1f31f;个人简介 &#x1f31f;&#x1f31f;&#x1f31f; ☀️大家好&a…

stable diffusion安装从0到1总结:包括遇到的坑和步骤

注&#xff1a;最低电脑配置&#xff1a;8G Vram16G RAM30G磁盘空间以上&#xff0c;20系列显卡及以上&#xff0c;windows>linux>macos。 文件可以不放在系统盘。举个例子&#xff1a;安装在D盘&#xff0c;在D盘创建一个StableDiffusion文件夹。下载下面文件: 1.下载…

为什么实现 API 最佳实践需要重新考虑安全性

随着应用程序编程接口 (API) 的使用与日俱增&#xff0c;实现和维护有效安全性的挑战从未像现在这样大。 由于缺乏管理 API 的单一标准&#xff0c;这意味着团队不能仅依靠工具来解决安全问题&#xff0c;因此这一挑战变得更加严峻。没有任何一种产品可以解决 API 环境的每种…

pg编码相关问题梳理

Lightdb/PG 编码相关问题梳理 之前在通过SQL文件导入数据时&#xff0c;报&#xff1a;ERROR: invalid byte sequence for encoding "EUC_CN"错误。然后就梳理了一下编码相关问题&#xff0c;这边记录一下。涉及到如下两种类型的报错&#xff1a; ERROR: invalid b…

电脑如何还原系统?这样做可以快速解决!

案例&#xff1a;我的电脑系统出问题了&#xff0c;怎么还原&#xff1f; 【我的电脑用了好几年了&#xff0c;最近它的系统出现了一些问题&#xff0c;我想还原电脑系统。有没有知道电脑系统如何还原&#xff1f;蹲一个简单的解决方法&#xff01;】 随着电脑使用时间的增加…

面试华为,花了2个月才上岸,真的难呀····

花2个月时间面试一家公司&#xff0c;你们觉得值吗&#xff1f; 背景介绍 美本计算机专业&#xff0c;代码能力一般&#xff0c;之前有过两段实习以及一个学校项目经历。第一份实习是大二暑期在深圳的一家互联网公司做前端开发&#xff0c;第二份实习由于大三暑假回国的时间比…

Linux中的git命令行

Linux中的git命令行 目录 Linux中的git命令行引入1、Linux下的git工具起源2、gitee的使用.gitignore.git 3、git三板斧3.1 git add3.2 git commit3.3 git push 4、git操作4.1 查看提交日志4.2 查看状态4.3 远端同步4.4 删除文件4.5 修改文件名 引入 当多个开发者同时参与同一个…

(一)Jhipster的基本介绍及入门安装

目录 1、为什么要使用 Jhipster &#xff1f; 2、安装配置 3、安装Jhipster 4、基本使用 5、介绍一下JDL Studio 6、启动 Jhipster 搭建项目 1、为什么要使用 Jhipster &#xff1f; JHipster是一个开发平台&#xff0c;可以快速生成、开发和部署现代Web应用程序和微服务…

分享几个自动化测试的练手项目

学习自动化测试最难的是没有合适的项目练习。 测试本身既要讲究科学&#xff0c;又有艺术成分&#xff0c;单单学几个 api 的调用很难应付工作中具体的问题。 你得知道什么场景下需要添加显性等待&#xff0c;什么时候元素定位需要写得更加优雅&#xff0c;为什么需要断言这个…

CANoe使用记录(四):CANoe Graphics图形窗口

目录 1、概述 2、Graphics图形窗口 2.1、打开测量窗口&#xff08;回放Log&#xff09; 2.2、输出Log 2.3、添加解析DBC文件 2.4、窗口排列 2.5、添加Graphics窗口 2.6、 信号栏选择 2.7、添加信号 2.8、波形样式 2.9、单Y轴测量尺 2.10、多Y轴测量尺 2.11、数据隐…

10行Python代码,助你整理杂乱无章的文件

朋友们好&#xff0c;今天是周五&#xff0c;又到了快放假的时间&#xff0c;激不激动&#xff1f;高不高兴&#xff1f;但是我还是要继续分享 Python 小工具给大家&#xff0c;嘻嘻~~ 今天的小程序&#xff0c;可以一键完成文件整理&#xff0c;一起来看看吧&#xff01; 按…

API 接口主流协议有哪些? 如何创建不同协议?

API 接口协议繁多&#xff0c;不同的协议有着不同的使用场景。70% 互联网应用开发者日常仅会接触到最通用的 HTTP 协议&#xff0c;相信大家希望了解更多其他协议的信息。我们今天会给大家介绍各种 API 接口主流协议和他们之间的关系。 1、API 接口主流协议有哪些? 接口协议分…

java环境安装 以jdk1.8 tomcat8为例

1、选择相应版本下载 官网地址&#xff1a;http://www.oracle.com/technetwork/java/javase/downloads/index.html 2、下载后一直点击下一步就好了ps&#xff1a;路径不喜欢安装在C盘的可以选择其他盘符&#xff0c;但是一定要自己找的到安装路径。 3、选择路径安装jdk1.8 4、…

Java基础(十六):String的常用API

Java基础系列文章 Java基础(一)&#xff1a;语言概述 Java基础(二)&#xff1a;原码、反码、补码及进制之间的运算 Java基础(三)&#xff1a;数据类型与进制 Java基础(四)&#xff1a;逻辑运算符和位运算符 Java基础(五)&#xff1a;流程控制语句 Java基础(六)&#xff1…

Serilog介绍

SerilogSerilogSerilog是.net 下的新兴的日志框架&#xff0c;本文这里简单的介绍一下它的用法。 首先安装Nuget包&#xff1a; Install-Package SerilogInstall-Package Serilog.Sinks.Console 其中包Serilog是Log核心库&#xff0c;Serilog.Sinks.Console是Log的控制台输出…

云安全问题及其解决方案

随着云计算技术的快速发展&#xff0c;云计算已经成为了企业和个人的首选。云计算在提高了企业的效率和降低了成本的同时&#xff0c;也带来了一系列的安全问题。本篇博客将深入讨论云安全问题&#xff0c;并提出相应的解决方案&#xff0c;以帮助企业和个人更好地保护自己的云…