springCloud之Stream

news2024/11/15 21:52:00
1、简介

Spring Cloud Stream是一个用来为微服务应用构建 消息驱动 能力的框架。通过使用 Spring Cloud Strea m ,可以有效简化开发人员对消息中间件的使用复杂度,降低代码与消息中间件间的耦合度,屏蔽消息中间件 之 间的差异性,让开发人员可以有更多的精力关注于核心业务逻辑的处理。

主要有以下几个组件:

1)、目的地绑定器(Destination Binders):负责提供与外部消息系统集成的组件。

2)、固定器(Bindings):介于外部消息系统与应用程序间的桥梁 ,这个应用程序提供了生产者和消费者的消息 (由 Destination Binders 创建)。

3)、输入管道(Input Bindings):消费者通过Input Bindings 连接 Binder ,而 Binder 与 MQ 连接,即消费者通过 Input Bindings 从 MQ 读取数据。

4)、输出管道(Output Bindings):生产者通过Output Bindings 连接 Binder ,而 Binder 与 MQ 连接,即生产者通过 Output Bindings 向 MQ 写入数据。

5)、消息(Message):生产者和消费者使用的规范数据结构,用于与 Binders 通信(从而通过外部消息系统与其他应用程序通信)。

2、具体应用示例1(MQ使用kafka)

引入依赖

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-stream-kafka</artifactId>
</dependency>
2.1、生产者

配置文件

server:
  port: 8090
spring:
  cloud:
    stream:
      kafka:
        binder:
          brokers: 192.168.30.88:9092,192.168.30.89:9092
      
      bindings:
        producer-out-0:
          destination: topic1
          content-type: application/json

代码实现

@Autowired
private StreamBridge streamBridge;

@GetMapping("/test/send")
public String sendMsg(@RequestParam("msg") String msg){
    Map<String , Object> map = new HashMap<>();
    map.put("tag", "tags");
    MessageHeaders headers = new MessageHeaders(map);
    // 封装消息
    Message<String> message = MessageBuilder.createMessage(msg, headers);
    //发送消息
    streamBridge.send("producer-out-0", message);
    return msg;
}
2.2、消费者

配置文件

server:
  port: 8091
spring:
  cloud:
    stream:
      kafka:
        binder:
          brokers: 192.168.30.88:9092,192.168.30.89:9092
      function:
        definition: consumer # 这个名称要和下面bindings的consumer-in-0第一个单词一样

      bindings:
        consumer-in-0:
          destination: topic1
          content-type: application/json

代码实现

// 向容器中添加Consumer<Message<String>>类型的bean即可
@Bean
public Consumer<Message<String>> consumer(){
    return msg -> {
        System.out.println("接收到消息:" + msg.getPayload());
    };
}
3、具体应用示例2(MQ使用Rocketmq)

引入依赖

<dependency>
    <groupId>com.alibaba.cloud</groupId>
    <artifactId>spring-cloud-starter-stream-rocketmq</artifactId>
</dependency>

代码实现

@Autowired
private StreamBridge streamBridge;

@GetMapping("/test/send")
public String sendMsg(@RequestParam("msg") String msg){
    Map<String , Object> map = new HashMap<>();
    map.put(MessageConst.PROPERTY_TAGS, "tags");
    MessageHeaders headers = new MessageHeaders(map);
    // 封装消息
    Message<String> message = MessageBuilder.createMessage(msg, headers);
    //发送消息
    streamBridge.send("producer-out-0", message);
    return JSON.toJSONString(message);
}
3.2、消费者

配置文件:

server:
  port: 8091
spring:
  cloud:
    stream:
      rocketmq:
        binder:
          name-server: 192.168.30.88:9876
      function:
        definition: consumer # 这个名称要和下面bindings的consumer-in-0第一个单词一样

      bindings:
        consumer-in-0:
          destination: topic1
          content-type: application/json

代码实现:

// 向容器中添加Consumer<Message<String>>类型的bean即可
@Bean
public Consumer<Message<String>> consumer(){
    return msg -> {
        System.out.println("接收到消息:" + msg.getPayload());
    };
}

注:

1、在spring-cloud-stream 3.1.0之前的版本,还有采用定义Source、Sink等方式编写消息生产者和消费者,在3.1.0以后的版本中弃用@StreamListener的方式,而采用函数式编程的方式接入,使用StreamBrige来进行发送。

2、注意binding的名称命名规则

例如:上面的代码中定义的consumer。

# 输入:    <方法名> + -in- + <index>
# 输出:    <方法名> + -out- + <index>

总结:本文介绍Stream统一消息中间件的模型,给出基于kafka和Rocketmq两种消息中间件模型下的使用案例,以及给出废弃使用老版本的Source、Sink模式解释。帮助大家快速上手Stream的使用。

       本人是一个从小白自学计算机技术,对运维、后端、各种中间件技术、大数据等有一定的学习心得,想获取自学总结资料(pdf版本)或者希望共同学习,关注微信公众号:上了年纪的小男孩。后台回复相应技术名称/技术点即可获得。(本人学习宗旨:学会了就要免费分享)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1353052.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

深入理解 BEM:前端开发中的命名约定革命

&#x1f90d; 前端开发工程师&#xff08;主业&#xff09;、技术博主&#xff08;副业&#xff09;、已过CET6 &#x1f368; 阿珊和她的猫_CSDN个人主页 &#x1f560; 牛客高级专题作者、在牛客打造高质量专栏《前端面试必备》 &#x1f35a; 蓝桥云课签约作者、已在蓝桥云…

定时器PWM控制RGB彩灯案例

1.脉冲宽度调制PWM PWM&#xff08;Pulse Width Modulation&#xff09;简称脉宽调制&#xff0c;是利用微处理器的数字输出来对模拟电路进行控制的一种非常有效的技术&#xff0c;广泛应用在测量、通信、工控等方面。   PWM的一个优点是从处理器到​​ ​被控系统​​​信号…

【React】class组件生命周期函数的梳理和总结(第一篇)

1. 前言 本篇梳理和总结一下React的生命周期函数&#xff0c;方便使用class组件的同学查阅&#xff0c;先上生命周期图谱。 2. 生命周期函数 生命周期函数说明constructor(props) 功能&#xff1a;如果不需要初始化state或不进行方法绑定&#xff0c;class组件可以不用实现构造…

Cesium加载大规模三维数据渲染性能优化方案

根据实际项目经验和近期的论文&#xff0c;总结一下Cesium加载大规模三维数据性能优化方法。个人认为在实际的GIS数字孪生项目中,其可行的优化手段主要有三种&#xff1a; &#xff08;1&#xff09;通过专业的转换工具CesiumLab等对原始的三维模型进行轻量化处理&#xff0c;包…

【Linux Shell】2. Shell 变量

文章目录 【 1. 变量命名规则 】【 2. 变量的使用 】【 3. 只读变量 】【 4. 删除变量 】【 5. 变量类型 】【 6. Shell 字符串 】6.1 字符串的分类6.2 字符串操作 【 7. Shell 数组 】7.1 定义数组7.2 读取数组7.3 获取数组的长度 【 8. Shell 注释 】8.1 单行注释8.2 多行注释…

华为交换机入门(六):VLAN的配置

VLAN&#xff08;Virtual Local Area Network&#xff09;即虚拟局域网&#xff0c;是将一个物理的LAN在逻辑上划分成多个广播域的通信技术。VLAN内的主机间可以直接通信&#xff0c;而VLAN间不能直接互通&#xff0c;从而将广播报文限制在一个VLAN内。 VLAN 主要用来解决如何…

[Vulnhub靶机] DriftingBlues: 2

[Vulnhub靶机] DriftingBlues: 2靶机渗透思路及方法&#xff08;个人分享&#xff09; 靶机下载地址&#xff1a; https://download.vulnhub.com/driftingblues/driftingblues2.ova 靶机地址&#xff1a;192.168.67.21 攻击机地址&#xff1a;192.168.67.3 一、信息收集 1.…

std::setlocale详解

头文件 #include <clocale>作用 std::setlocale是C标准库中的一个函数&#xff0c;用于设置当前程序的本地化&#xff08;locale&#xff09;环境。 setlocale 函数安装指定的系统本地环境或其一部分&#xff0c;作为新的 C 本地环境。 修改保持效果&#xff0c;并影…

3dmax全景图用什么渲染 全景图云渲染使用教程

在给客户展示设计概念时&#xff0c;应用3ds Max创建的全景图是一个高效直观的方法。这种方式不仅可以全方位地呈现整个空间&#xff0c;让客户沉浸式地感受设计师的创意&#xff0c;而且在展现大型空间设计&#xff0c;如展览馆或者会议室等&#xff0c;效果尤其显著&#xff…

使用Python进行用户参与度分析

用户参与度分析是一种数据驱动的方法&#xff0c;用于评估和了解用户对产品&#xff0c;服务或平台的参与&#xff0c;互动和满意度。它涉及分析各种指标和行为模式&#xff0c;以深入了解用户行为和偏好。它帮助企业做出明智的决策&#xff0c;以增强用户体验&#xff0c;优化…

广州怎么找工作哪里工作机会多

广州找工作上 吉鹿力招聘网 打开 吉鹿力招聘网 “注册账号”&#xff0c;然后输入个人基本信息&#xff0c;进行注册&#xff08;可使用手机号注册&#xff0c;也可以使用邮箱注册&#xff09;。 填写求职意向&#xff0c;基本信息点击“下一步”。 填写工作经历点击“下一步”…

算法巡练day04Leetcode24交换节点19删除倒数节点142环形链表

今天学习的文章和视频链接 https://www.bilibili.com/video/BV1YT411g7br/?vd_source8272bd48fee17396a4a1746c256ab0ae https://www.bilibili.com/video/BV1if4y1d7ob/?vd_source8272bd48fee17396a4a1746c256ab0ae 24两两交换链表中的节点 给你一个链表&#xff0c;两两…

yolov5 损失函数

yolov5有三个损失函数分别是回归损失、置信度损失、分类损失 回归损失用的是CIOU loss 置信度和分类损失用的是BCE loss &#xff08;1&#xff09;对比L2损失&#xff0c;Iou和GIou具有尺度不变性&#xff0c;不会说输入的框子大loss就越大。 &#xff08;2&#xff09;对比…

Vue2 - 数据响应式原理

目录 1&#xff0c;总览2&#xff0c;Observer3&#xff0c;Dep4&#xff0c;Watcher5&#xff0c;Schedule 1&#xff0c;总览 vue2官网参考 简单介绍下上图流程&#xff1a;以 Data 为中心来说&#xff0c; Vue 会将传递给 Vue 实例的 data 选项&#xff08;普通 js 对象&a…

SSM养老院综合服务系统----计算机毕业设计

项目介绍 该项目为后台管理项目&#xff0c;分为管理员与护工两种角色&#xff1b; 管理员角色包含以下功能&#xff1a; 管理员登录,个人资料密码管理,用户管理,床位类型管理,床位管理,护工管理,老人管理,咨询登记管理,预约登记管理,老人健康信息管理,费用管理等功能。 护…

【LeetCode每日一题】2487. 从链表中移除节点(调用栈+递归+翻转链表)

2024-1-3 文章目录 [2487. 从链表中移除节点](https://leetcode.cn/problems/remove-nodes-from-linked-list/)方法一&#xff1a;调用栈方法二&#xff1a;递归方法三&#xff1a;翻转链表 2487. 从链表中移除节点 方法一&#xff1a;调用栈 1.将所有节点按顺序压入栈中 2.从…

浅谈接口自动化测试

昨晚在某个测试交流群&#xff0c;听了一个测试老司机分享接口自动化测试的内容&#xff0c;对接口自动化有了更深的一些认识&#xff0c;也为接下来公司的接口自动化实施&#xff0c;提供了更多的思路。 这篇博客&#xff0c;就说说功能测试到接口自动化的进阶&#xff0c;以及…

商品推荐系统+可视化+2种协同过滤推荐算法 Django框架 大数据毕业设计(附源码+论文)✅

毕业设计&#xff1a;2023-2024年计算机专业毕业设计选题汇总&#xff08;建议收藏&#xff09; 毕业设计&#xff1a;2023-2024年最新最全计算机专业毕设选题推荐汇总 &#x1f345;感兴趣的可以先收藏起来&#xff0c;点赞、关注不迷路&#xff0c;大家在毕设选题&#xff…

【论文笔记】An Extractive-and-Abstractive Framework for Source Code Summarization

An Extractive-and-Abstractive Framework for Source Code Summarization 1. Introduction2. Model2.1 Overview2.2 Training of EACS2.2.1 Part i : Training of Extractor2.2.2 Part ii : Training of Abstracter 3. Evaluation 1. Introduction 代码摘要可以细分为抽取式代…