Spring Cloud Stream与Kafka(一)

news2025/1/22 15:02:24

Spring Cloud Stream与Kafka(一)

​ 在实际开发过程中,消息中间件用于解决应用解耦,异步消息,流量削峰等问题,实现高可用、高性能、可伸缩和最终一致性架构。不同的消息中间件实现方式不同,内部结构是不一样的。比如常见的RabbitMQ和Kafka,RabbitMQ有exchange,kafka有topic、partition,这些中间件的差异性导致我们在实际项目开发过程中造成了一定的干扰。如果采用了其中的一种,后面的业务需求,我想往另一种消息队列迁移,有一堆东西需要重做。Spring Cloud Stream是一种解耦的方式。

文章目录

    • Spring Cloud Stream与Kafka(一)
      • 简单介绍
      • Kafka实例
        • 生产者
        • 消费者

简单介绍

  1. Spring Cloud Stream是由一个中间件中立的核心组成,应用通过Spring Cloud Stream插入的input(相当于消费者)和output(相当于生产者)通道与外界交流。通道通过指定中间件的Binder与外部代理连接,业务开发者不需要关注具体的消息中间件,只需要关注Binder对应程序提供的抽象概念来使用中间件实现业务就可以了。Spring Cloud Stream许多抽象和原语,简化了消息驱动微服务应用程序的开发。

SCSt-with-binder

  • 最底层是消息服务,中间层是绑定层,绑定层和底层的消息服务进行绑定,顶层是消息生产者和消费者,顶层可以向绑定层生产消费、获取消息。
  1. Binder绑定器作为中间层,实现了应用程序与消息中间件细节之间的隔离。通过向应用程序暴露统一的Channel通道,使得应用程序不需要考虑具体的中间件实现。当需要升级或更换中间件产品时,我们要做的就是更换对应的Binder绑定器而不需要修改任何应用逻辑。

  2. 在Spring Cloud Stream中的消息通信方式遵循发布订阅模式,当一条消息被投递到消息中间件后,它会通过共享的主题进行广播,消费者在订阅的主题收到消息后触发自身的业务逻辑处理。这里的主题是抽象概念,代表发布共享消息给消费者的地方。在不同的消息中间中,主题可能对应着不同的概念。

  3. Destination Binders是负责提供与外部消息系统集成的组件。Destination Bindings是外部消息系统和最终用户提供的应用程序代码(生产者/消费者)之间的桥梁。Message是生产者和消费者用于与目标绑定器通信的规范化数据结构。

Kafka实例

  1. 分别创建生产者kafka-producer和消费者kafka-consumer,引入依赖。
<dependencies>
    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-starter-stream-kafka</artifactId>
    </dependency>
</dependencies>
生产者
  1. 添加配置文件application.yml
spring:
  cloud:
    stream:
      kafka:
        binder: # 绑定器
          brokers: 192.168.182.171:9092   # broker的IP和端口
  application:
    name: kafka-provider

server:
  port: 8301
  1. 添加启动类
package org.lxx.stream.kafka.producer;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class KafkaProducerApplication {

    public static void main(String[] args) {
        SpringApplication.run(KafkaProducerApplication.class, args);
    }

}
  1. 添加配置文件
package org.lxx.stream.kafka.producer.config;

import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.SubscribableChannel;
import org.springframework.stereotype.Component;

@Component
public interface MyProcessor {

    String MESSAGE_OUTPUT = "log_output";

    //在Kafka中创建主题log_output
    @Output(MESSAGE_OUTPUT)
    SubscribableChannel logOutput();
}
  1. 创建实体类
package org.lxx.stream.kafka.producer.entity;

import lombok.Data;

import java.util.Date;

@Data
public class LogInfo {
    private String clientVersion;
    private String userId;
    private String clientIP;
    private Date time;
}
  1. 创建控制器
package org.lxx.stream.kafka.producer.controller;

import lombok.extern.slf4j.Slf4j;
import org.lxx.stream.kafka.producer.config.MyProcessor;
import org.lxx.stream.kafka.producer.entity.LogInfo;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

import javax.annotation.Resource;
import java.sql.Date;
import java.time.Instant;

@Slf4j
@RestController
@EnableBinding(value = {MyProcessor.class})
public class MessageController {

    @Resource
    private MyProcessor myProcessor;

    @GetMapping("sendLogMessage")
    public void sendLogMessage(String message) {
        Message<String> strMessage = 
            MessageBuilder.withPayload(message).build();
        myProcessor.logOutput().send(strMessage);
    }

    @GetMapping("sendObjLogMessage")
    public void sendObjLogMessage() {
        LogInfo logInfo = new LogInfo();
        logInfo.setClientIP("192.168.1.111");
        logInfo.setClientVersion("1.0");
        logInfo.setUserId("198663383837434");
        logInfo.setTime(Date.from(Instant.now()));
        Message<LogInfo> strMessage = 
            MessageBuilder.withPayload(logInfo).build();
        myProcessor.logOutput().send(strMessage);
    }
}
消费者
  1. 添加配置文件application.yml
spring:
  cloud:
    stream:
      kafka:
        binder: # 绑定器
          brokers: 192.168.182.171:9092   # broker的IP和端口
  application:
    name: kafka-consumer

server:
  port: 8302
  1. 添加启动类
package org.lxx.stream.kafka.consumer;

import org.lxx.stream.kafka.consumer.config.MyProcessor;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;

@EnableBinding(value = MyProcessor.class)
@SpringBootApplication
public class KafkaConsumerApplication {

    public static void main(String[] args) {
        SpringApplication.run(KafkaConsumerApplication.class, args);
    }

}
  1. 添加配置类
package org.lxx.stream.kafka.consumer.config;

import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.SubscribableChannel;
import org.springframework.stereotype.Component;

@Component
public interface MyProcessor {

    String MESSAGE_INPUT = "log_input";
    String MESSAGE_OUTPUT = "log_output";
    String LOG_FORMAT_INPUT = "log_format_input";
    String LOG_FORMAT_OUTPUT = "log_format_output";

    @Input(MESSAGE_INPUT)
    SubscribableChannel logInput();

    @Output(MESSAGE_OUTPUT)
    SubscribableChannel logOutput();

    @Input(LOG_FORMAT_INPUT)
    SubscribableChannel logFormatInput();

    @Output(LOG_FORMAT_OUTPUT)
    SubscribableChannel logFormatOutput();

}
  1. 添加监听器
package org.lxx.stream.kafka.consumer.service;

import lombok.extern.slf4j.Slf4j;
import org.lxx.stream.kafka.consumer.config.MyProcessor;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.messaging.handler.annotation.SendTo;
import org.springframework.stereotype.Component;

@Slf4j
@Component
public class MessageListener {

    @StreamListener(MyProcessor.MESSAGE_INPUT)
    @SendTo(MyProcessor.LOG_FORMAT_OUTPUT)
    public String processLogMessage(String message) {
        //通过MyProcessor.MESSAGE_INPUT接收消息
        //然后通过SendTo把处理后的消息发送到MyProcessor.LOG_FORMAT_OUTPUT
        log.info("GET Message:" + message);
        return message;
    }

    @StreamListener(MyProcessor.LOG_FORMAT_INPUT)
    public void processFormatLogMessage(String message) {
        //接收来自MyProcessor.LOG_FORMAT_INPUT 的消息
        //也就是SendTo发送的加工后的消息
        log.info("接收到格式化后的消息:" + message);
    }

}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2076977.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

遥感反演保姆级教程:SPSS筛选因子之后如何采用python建模和反演整个研究区?(以反演生物量为例)

SPSS筛选因子之后如何采用python建模和反演整个研究区?&#xff08;以反演生物量为例&#xff09; 引言 在遥感数据分析中&#xff0c;因子筛选和建模是关键步骤。筛选出与目标变量&#xff08;如生物量&#xff09;显著相关的因子&#xff0c;不仅可以提高模型的预测精度&a…

编程世界的平衡术:日常编码与自我提升的和谐共生

前言 在快速迭代的编程世界中&#xff0c;程序员的角色日益复杂且充满挑战&#xff0c;他们不仅是代码的编织者&#xff0c;更是技术进步的推动者。面对日常编码工作的繁重与个人成长的迫切需求&#xff0c;寻找两者之间的平衡点成为了每位程序员必须深思的问题。以下是我的详细…

C++初学者指南-5.标准库(第二部分)–特殊容器

C初学者指南-5.标准库(第二部分)–特殊容器 pair<A , B> 包含两个相同或不同类型的值 tuple<A , B> C11 包含许多相同或不同类型的值 optional C17 包含一个类型为 T 的值或没有值 variant<A,B,C,…> C17 包含一个类型为A、B或C的值…… any C17 包含任…

《花100块做个摸鱼小网站! 》第四篇—前端应用搭建和完成第一个热搜组件

⭐️基础链接导航⭐️ 服务器 → ☁️ 阿里云活动地址 看样例 → &#x1f41f; 摸鱼小网站地址 学代码 → &#x1f4bb; 源码库地址 一、前言 在本系列文章的早期章节中&#xff0c;我们已经成功地购买了服务器并配置了MySQL、Redis等核心中间件。紧接着&#xff0c;我们不仅…

用AI来学习英语口语(白嫖,所以稍微麻烦些)

写在前面 本文看下如何使用AI来免费学习英语口语。 1&#xff1a;正文 首先&#xff0c;我们点击这里来到一个对话窗口&#xff0c;你可以在这个窗口中使用英语来询问你的问题&#xff0c;比如what can i do when i am not happy&#xff1a; 接着复制机器人回答内容&#…

datawhale AI夏令营第五期 深度学习入门 Task1 了解机器学习

机器学习基础 定义 学习一般是只有人才具备的能力&#xff0c;机器学习就是通过某种方式让机器具备人才有的学习能力&#xff0c;这里的某种方式是机器具备找一个函数的能力 比如说证件照背景颜色更换&#xff0c;那么机器需要找到图片中的背景在哪里&#xff0c;再替换成目…

pandas操作Excel文件

pandas操作Excel文件 一、前言二、指定读取的工作表与header设置2.1指定工作表2.2header设置 三、读取Excel数据3.1iloc读取数据3.2read_excel读取数据3.3loc读取数据 四、DataFrame数据筛选4.1根据列标签对整列进行筛选4.2使用iloc对区域进行筛选4.3自定义筛选 五、DataFrame类…

2024年8月26日(线上考试系统,虚拟化技术部署,使用link)

[rootdocker ~]# systemctl start docker [rootdocker ~]# docker pull mysql 一、线上考试系统 虚拟化技术部署 1、部署前端服务器 project_exam_system.sql数据库文件 dist网络资源 1.将资源上传到服务器 C:\Users\89765>scp -r "D:\青岛实训\project_exam_system\d…

C语言典型例题52

《C程序设计教程&#xff08;第四版&#xff09;——谭浩强》 题目&#xff1a; 例题4.4 相传古代印度国王舍罕要褒奖他聪明能干的宰相达依尔&#xff08;国际象棋发明者&#xff09;&#xff0c;问他需要什么&#xff0c;达依尔回答说&#xff1a;“国王只要在国际象棋的棋盘的…

Shader 中的渲染顺序

1、深度测试和深度写入 有了深度测试和深度写入发挥作用让我们不需要关心不透明物体的渲染顺序比如一个物体A 挡住了 物体B&#xff0c;即使底层逻辑中 先渲染A&#xff0c;后渲染B&#xff0c;我们也不用担心 B的颜色会把A覆盖&#xff0c;因为在进行深度测试时&#xff0c;远…

电池管理系统SOX算法资料优化目录2024.8.26

这篇文章主要写一下这一次更新的几个地方&#xff0c;有对原来的代码及模型进行优化的部分&#xff0c;也有新增加的代码和模型&#xff0c;我就把几个比较典型的给列了出来。但是还有好多的更新没有在下面展示出来&#xff0c;因为一个个展示出来太复杂了。如果你对更新的内容…

如何使用ssm实现基于 SSM 框架的宠物用品电子商务平台设计与实现+vue

TOC ssm258基于 SSM 框架的宠物用品电子商务平台设计与实现vue 绪论 1.1 研究背景 当前社会各行业领域竞争压力非常大&#xff0c;随着当前时代的信息化&#xff0c;科学化发展&#xff0c;让社会各行业领域都争相使用新的信息技术&#xff0c;对行业内的各种相关数据进行科…

黑神话悟空妖怪平生录

黑神话悟空是一部特别好玩的单机游戏。上一个我这么喜欢的国产单机还是古剑奇谭三。 虽然黑神话的战斗系统和地图系统尚不完善&#xff0c;但是这里面的游记是真的做得很认真。 203个妖怪&#xff0c;203首小诗&#xff0c;203个妖生故事&#xff0c;带你去看妖怪的喜怒哀乐。…

freemarker模版注入

Freemarker模版注入漏洞 模版注入漏洞根因&#xff08;SSTI&#xff0c;服务器端模版注入&#xff09;freemarker介绍Freemarker模版注入漏洞关键点漏洞复现环境引入依赖poc 修复方案完整代码&#xff08;包含修复&#xff09;参考 模版注入漏洞根因&#xff08;SSTI&#xff0…

乾坤大挪移--将一个混乱的excel分类整理的辅助VBA代码

excel 乾坤大挪移 你不需要将工作表手动分类&#xff1b; 只需要在”已整理“的标题行增加标题列&#xff0c; listbox会自动获取”已整理“sheet中的标题列&#xff0c;并列出来 你只需要选中同一列中的单元格&#xff0c;点击想移动到的列表的类别&#xff0c;双击或者点…

【云故事探索】NO.7:「越用越上瘾」,中华财险 60% 研发人员用通义灵码提效

云布道师 中华联合财产保险股份有限公司运用“云大模型”技术革新业务&#xff0c;通过阿里云的通义灵码大幅提升编码效率&#xff0c;近60%的研发人员采用&#xff0c;采纳的生成代码占比约20%&#xff0c;显著提升了团队创新能力与代码质量&#xff0c;并积极探索大模型在更多…

基于SpringBoot的智慧党建系统+uniapp移动端+LW示例参考

1.项目介绍 技术栈环境&#xff1a;SpringBootthymeleafuniappIDEA NavicatMySQL 功能介绍&#xff1a; 后端功能&#xff1a;首页管理&#xff08;轮播图、通知公告、新闻管理&#xff09;、用户管理&#xff08;用户信息、用户反馈、用户订单、用户动态&#xff09;、其他…

一文搞定MybatisPlus

Mybatis简介 MyBatis-Plus&#xff08;简称 MP&#xff09;是一个 MyBatis 的增强工具&#xff0c;在 MyBatis 的基础上只做增强不做改变&#xff0c;为简化开发、提高效率而生。 &#xff08;来自官网&#xff09; 体验Mybatisplus 1.创建SpringBoot工程&#xff0c;导入m…

HarmonyOS--认证服务-操作步骤

HarmonyOS–认证服务 文章目录 一、注册华为账号开通认证服务二、添加项目&#xff1a;*包名要与项目的包名保持一致三、获取需要的文件四、创建项目&#xff1a;*包名要与项目的包名保持一致五、添加json文件六、加入请求权限七、加入依赖八、修改构建配置文件&#xff1a;bui…

【UDS诊断】——0x34、0x36、0x37服务

&#x1f64b;‍♂️【UDS诊断服务介绍合集】系列&#x1f481;‍♂️点击跳转 文章目录 一、服务概述1.0x34服务——请求下载数据1.1.0x34格式 2.0x36服务——数据传输2.1.0x36格式 3.0x37服务——退出上传下载3.1.0x37格式 一、服务概述 Client端使用Routine Control服务来…