Spring Cloud Stream 3.x+kafka 3.8整合

news2024/10/10 5:53:59

Spring Cloud Stream 3.x+kafka 3.8整合,文末有完整项目链接

  • 前言
  • 一、如何看官方文档(有深入了解需求的人)
  • 二、kafka的安装
    • tar包安装
    • docker安装
  • 三、代码中集成
    • 创建一个测试topic:test
    • producer代码
    • producer配置(配置的格式,上篇文章我有详细解释,大家可以回看)
    • Consumer代码
    • Consumer 配置
    • Consumer 2的代码和配置
  • 四、测试
  • 五、结语

前言

上一篇文章,我们用Spring Cloud Stream整合了RocketMQ:SpringCloud Alibaba五大组件之——RocketMQ,趁着此机会,继续学习了解一下Spring Cloud Stream,本文就以kafka为例。本文项目用到的所有Maven依赖和版本,都是和前面几篇文章一样。

由于整合kafka 不需要用到Cloud Alibaba一系列的技术,所以下载到源码运行不起来的,请删除mysql,nacos,dubbo,redis等一系列相关的依赖和代码。本文写下的时候,kafka最新版本为3.8版本,所以就以3.8版本举例说明。

官方中文文档:https://kafka1x.apachecn.org/documentation.html
官网文档:https://kafka.apache.org/documentation/
中文文档的版本比较老,建议大家对照着英文文档3.8版本的,相互结合起来看。

一、如何看官方文档(有深入了解需求的人)

1.基础操作:建议大家看operation一栏,后面我会简单贴出基本安装使用流程
在这里插入图片描述
2.配置建议看中文版本
在这里插入图片描述

二、kafka的安装

tar包安装

  1. 下载链接:kafka_2.13-3.8.0.tgz

  2. 选择一个合适的位置解压

    tar -zxvf kafka_2.12-3.8.0.tgz
    
  3. 启动自带的zookeeper(后台启动)

    nohup bin/zookeeper-server-start.sh config/zookeeper.properties &
    
  4. 修改kafka server的配置文件,便于外网能够访问
    找到bin\config目录下的server.properties文件
    修改以下两行listeners照着我这样写,advertised.listeners修改为你服务器的ip,端口默认9092

    listeners=PLAINTEXT://0.0.0.0:9092
    advertised.listeners=PLAINTEXT://172.16.72.133:9092
    
  5. 启动kafka server(后台启动)

    nohup bin/kafka-server-start.sh config/server.properties &
    
  6. 稍微扩展一下,集群的搭建,比如我们要扩展为三个集群代理:
    首先,为每个代理创建一个配置文件 (在Windows上使用copy 命令来代替):

    cp config/server.properties config/server-1.properties
    cp config/server.properties config/server-2.properties
    

    编辑这些新文件并设置如下属性:

    config/server-1.properties:
        broker.id=1
        listeners=PLAINTEXT://:9093
        log.dir=/tmp/kafka-logs-1
     
    config/server-2.properties:
        broker.id=2
        listeners=PLAINTEXT://:9094
        log.dir=/tmp/kafka-logs-2
    

    broker.id属性是集群中每个节点的名称,这一名称是唯一且永久的,必须重写端口和日志目录
    然后启动就好了:低一个启动的为leader,如果杀死leader,会重新推荐一个leader出来

    bin/kafka-server-start.sh config/server-1.properties &
    bin/kafka-server-start.sh config/server-2.properties &
    

    但是这样扩展的唯一不好的一点就是,会没有以前的数据,新的topic不影响,具体操作大家可以看文档。

docker安装

  1. 拉取镜像

    docker pull apache/kafka:3.8.0
    
  2. 启动

    docker run -p -d 9092:9092 apache/kafka:3.8.0
    

三、代码中集成

创建一个测试topic:test

bin/kafka-topics.sh --create --topic test --bootstrap-server localhost:9092

所有的topic的操作,都可以用kafka-topics.sh来操作,具体的可以看文档。老版本的启动是加–zookeeper的,会报错not found,新版本要用–bootstrap-server。

producer代码

@RestController
@RequestMapping("/mqtest")
public class KafkaTestController {
    private static final Logger logger = LoggerFactory.getLogger(KafkaTestController.class);
    @Autowired
    private StreamBridge streamBridge;

    @RequestMapping("/test1")
    public void testOne() {
        Message<SimpleMsg> msg = new GenericMessage<>(new SimpleMsg("我是 broadcastMessage", new Date().toString()));
        streamBridge.send("broadcastMessage-out-0", msg);
    }
}

自定义消息体SimpleMsg,此类不需要序列化

@AllArgsConstructor
@Data
@NoArgsConstructor
public class SimpleMsg{
    private String msg;
    private String time;
}

producer配置(配置的格式,上篇文章我有详细解释,大家可以回看)

key:serializer 重中之重,发送对象消息的时候,解决转换错误,SpringCloudStream默认的是ByteArraySerializer,但是kafkamore默认的是String

spring:
  cloud:
    stream:
      kafka:
        binder:
          ##kafka的server地址
          brokers: 172.16.72.133:9092
          ##如果topic不存在则创建
          auto-create-topics: true
          auto-add-partitions: true #自动分区
          min-partition-count: 1 #最小分区
          ##这个序列化很关键,如果不加这个配置,则发送对象消息时候,会报转换错误
          configuration:
            key:
              serializer: org.apache.kafka.common.serialization.ByteArraySerializer
      bindings:
        broadcastMessage-out-0:
          destination: test
          content-type: application/json

Consumer代码

 @Bean
 public Consumer<Message<SimpleMsg>> broadcastMessage() {
     return msg -> {
         log.info(Thread.currentThread().getName() + " Consumer1 Receive New Messages: " + msg.getPayload().getMsg() + msg.getPayload().getTime());
     };
 }

Consumer 配置

项目中有更详细的配置,这里为了测试用的简化版

spring:
  cloud:
    stream:
      function:
        definition: broadcastMessage
      kafka:
        binder:
          brokers: 172.16.72.133:9092
          auto-create-topics: true
          configuration:
            key:
              serializer: org.apache.kafka.common.serialization.ByteArraySerializer
      bindings:
        broadcastMessage-in-0:
          destination: test
          group: test-topic-account
          content-type: application/json

Consumer 2的代码和配置

项目中还有一个friend模块,当做第二个消费者,代码和配置和Consumer 1完全一样,唯一不同的就是可以设置group不同,这里就不贴代码了。

四、测试

生产者发送两个消息
在这里插入图片描述
两个消费者实例,分组一样,则轮询消费,分组不同,则单独消费
account模块消费者:
在这里插入图片描述
friend模块消费者:
在这里插入图片描述

五、结语

到这篇文章,这一个系列基本就算结束了,后面可能会补充一下内容,或者去写点其他的东西。或者说,去研究下springboot的集成而不用Spring Cloud Stream,后面再说吧。

本文完整项目代码GitHub地址,请切换到kafka分支
https:https://github.com/wangqing-github/DubboAndNacos.git
ssh:git@github.com:wangqing-github/DubboAndNacos.git

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2201232.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

PHP中的HTTP请求:简化你的网络通信

在当今的网络应用开发中&#xff0c;PHP作为一种流行的服务器端脚本语言&#xff0c;经常需要与外部服务进行通信。这通常涉及到发送HTTP请求来获取或提交数据。幸运的是&#xff0c;PHP提供了多种方式来简化HTTP请求的过程&#xff0c;使得网络通信变得轻而易举。 PHP中的HTTP…

stm32h743 threadx + filex(SD卡读写) + CubeMX + CubeIDE

今天整了一下正点原子阿波罗h743的filex,按部就班的使用CubeMX去搭建环境,再用CubeIDE去编写程序,里面也有几个小坑,问题不大。 Step1. 创建CubeMX 首先设置RCC晶振,SYS为tim6 然后勾选sdmmc1 选择4bits,分频系数为4。这个分频系数选4对应一般的sd卡都可以用了,如果好…

单片机死机后在不破坏现场的情况下连接调试器进入调试模式

经常遇到程序在现场卡死了&#xff0c;但是这个时候没有连接调试器&#xff0c;不好找死机原因。 下面说下在程序卡死的时候&#xff0c;在不破坏死机现场的情况下&#xff0c;连接上调试器进行程序调试的方法。 把调试器连接电脑&#xff0c;打开keil做下面的配置&#xff0c…

Llama系列上新多模态!3.2版本开源超闭源,还和Arm联手搞了手机优化版,Meta首款多模态Llama 3.2开源!1B羊驼宝宝,跑在手机上了

Llama系列上新多模态&#xff01;3.2版本开源超闭源&#xff0c;还和Arm联手搞了手机优化版&#xff0c;Meta首款多模态Llama 3.2开源&#xff01;1B羊驼宝宝&#xff0c;跑在手机上了&#xff01; 在多模态领域&#xff0c;开源模型也超闭源了&#xff01; 就在刚刚结束的Met…

Web自动化Demo-PHP+Selenium

1.新建工程 打开PhpStorm新建工程如下&#xff1a; 打开终端输入如下命令安装selenium&#xff1a; composer require php-webdriver/webdriver 2.编写代码 <?php require vendor/autoload.php;use Facebook\WebDriver\Remote\RemoteWebDriver; use Facebook\WebDriver…

分治算法(8)_归并排序_翻转对

个人主页&#xff1a;C忠实粉丝 欢迎 点赞&#x1f44d; 收藏✨ 留言✉ 加关注&#x1f493;本文由 C忠实粉丝 原创 分治算法(8)_归并排序_翻转对 收录于专栏【经典算法练习】 本专栏旨在分享学习算法的一点学习笔记&#xff0c;欢迎大家在评论区交流讨论&#x1f48c; 目录 温…

云原生日志ELK( logstash安装部署)

logstash 介绍 LogStash由JRuby语言编写&#xff0c;基于消息&#xff08;message-based&#xff09;的简单架构&#xff0c;并运行在Java虚拟机 &#xff08;JVM&#xff09;上。不同于分离的代理端&#xff08;agent&#xff09;或主机端&#xff08;server&#xff09;&…

【python】:pycharm2024.2.2使用

参考链接&#xff1a; 文件连接&#xff1a; 方法1&#xff1a;临时有效&#xff0c;后期官方更新提示激活无效 输入激活码&#xff1a; X9MQ8M5LBM-eyJsaWNlbnNlSWQiOiJYOU1ROE01TEJNIiwibGljZW5zZWVOYW1lIjoiZ3VyZ2xlcyB0dW1ibGVzIiwiYXNzaWduZWVOYW1lIjoiIiwiYXNzaWduZWVF…

Chainlit集成Dashscope实现语音交互网页对话AI应用

前言 本篇文章讲解和实战&#xff0c;如何使用Chainlit集成Dashscope实现语音交互网页对话AI应用。实现方案是对接阿里云提供的语音识别SenseVoice大模型接口和语音合成CosyVoice大模型接口使用。针对SenseVoice大模型和CosyVoice大模型&#xff0c;阿里巴巴在github提供的有开…

视频流媒体融合与视频监控汇聚管理系统集成方案

流媒体视频融合与汇聚管理系统可以实现对各类模块化服务进行统一管理和配置等操作&#xff0c;可实现对应用服务的整合、管理及共享&#xff0c;以标准接口的方式&#xff0c;业务平台及其他第三方业务平台可以方便地调用各类数据&#xff0c;具有开放性和可扩展性。在流媒体视…

opencascade鼠标拖拽框选功能

1.首先在OccView中添加用于显示矩形框的类 //! rubber rectangle for the mouse selection.Handle(AIS_RubberBand) mRectBand; 2.设置框选的属性 mRectBand new AIS_RubberBand(); //设置属性 mRectBand->SetLineType(Aspect_TOL_SOLID); //设置变宽线型为实线 mRe…

scanMiR:使用R语言预测 miRNA 结合位点

生信碱移 scanMiR 结合预测 scanMiR&#xff0c;一款R语言工具包&#xff0c;能够高效地扫描任何自定义序列上的典型和非典型miRNA结合位点&#xff0c;估计解离常数并预测聚合的转录物抑制。 最近&#xff0c;几项高通量研究试图阐明miRNA–mRNA靶向的生化决定因素。在一项发…

Unity 克隆Timeline并保留引用

Timeline的资源是.playable文件&#xff0c;简单的复制不会保留引用关系。 下面的脚本可以复制引用关系。 using System.Collections.Generic; using System.Linq; using System.Reflection; using UnityEngine; using UnityEngine.Playables; using UnityEngine.Timeline; u…

Node.js入门——fs、path模块、URL端口号、模块化导入导出、包、npm软件包管理器

Node.js入门 1.介绍 定义&#xff1a;跨平台的JS运行环境&#xff0c;使开发者可以搭建服务器端的JS应用程序作用&#xff1a;使用Node.Js编写服务器端代码Node.js是基于Chrome V8引擎进行封装&#xff0c;Node中没有BOM和DOM 2.fs模块-读写文件 定义&#xff1a;封装了与…

(03)python-opencv图像处理——图像的几何变换

前言 1、变换 2、缩放 3、平移变换 4、旋转 5、仿射变换 6、翻转 参考文献 前言 在本教程中&#xff1a; 你将会学到将不同的几何变换应用于图像&#xff0c;如平移、旋转、仿射变换等。你会学到如下函数&#xff1a;cv.getPerspectiveTransform 图像的几何变换是图像…

Ping32引领数据防泄漏新潮流:智能、高效、安全

在当今数字化迅猛发展的时代&#xff0c;企业面临着日益严峻的数据安全挑战。数据泄漏事件频发&#xff0c;不仅损害企业声誉&#xff0c;还可能导致巨额的经济损失。为此&#xff0c;Ping32以其创新的数据防泄漏解决方案&#xff0c;正在引领行业新潮流。其技术特点可概括为“…

017 平台属性[属性分组、规格参数、销售属性]

文章目录 获取指定分类的属性列表AttrController.javaAttrServiceImpl.java 获取属性分组所关联的所有属性AttrGroupControllerAttrServiceImpl.java 移除AttrGroupController.javaAttrServiceImpl.javaAttrAttrgroupRelationDao.javaAttrAttrgroupRelationDao.xml 获取属性分组…

动态规划算法专题(六):回文串问题

目录 1、回文子串&#xff08;"引子题"&#xff09; 1.1 算法原理 1.2 算法代码 2、最长回文子串 2.1 算法原理 2.2 算法代码 3、分割回文串 IV&#xff08;hard&#xff09; 3.1 算法原理 3.2 算法代码 4、分割字符串 II&#xff08;hard&#xff09; 4…

甲虫身体图像分割系统源码&数据集分享

甲虫身体图像分割系统源码&#xff06;数据集分享 [yolov8-seg-EfficientRepBiPAN&#xff06;yolov8-seg-C2f-FocusedLinearAttention等50全套改进创新点发刊_一键训练教程_Web前端展示] 1.研究背景与意义 项目参考ILSVRC ImageNet Large Scale Visual Recognition Challen…

C语言之扫雷小游戏(完整代码版)

说起扫雷游戏&#xff0c;这应该是很多人童年的回忆吧&#xff0c;中小学电脑课最常玩的必有扫雷游戏&#xff0c;那么大家知道它是如何开发出来的吗&#xff0c;扫雷游戏背后的原理是什么呢&#xff1f;今天就让我们一探究竟&#xff01; 扫雷游戏介绍 如下图&#xff0c;简…