Spring Cloud Stream实现数据流处理

news2025/1/20 4:42:54

1.什么是Spring Cloud Stream?

我看很多回答都是“为了屏蔽消息队列的差异,使我们在使用消息队列的时候能够用统一的一套API,无需关心具体的消息队列实现”。 这样理解是有些不全面的,Spring Cloud Stream的核心是Stream,准确来讲Spring Cloud Stream提供了一整套数据流走向(流向)的API, 它的最终目的是使我们不关心数据的流入和写出,而只关心对数据的业务处理 我们举一个例子:你们公司有一套系统,这套系统由多个模块组成,你负责其中一个模块。数据会从第一个模块流入,处理完后再交给下一个模块。对于你负责的这个模块来说,它的功能就是接收上一个模块处理完成的数据,自己再加工加工,扔给下一个模块。

module

我们很容易总结出每个模块的流程:

  1. 从上一个模块拉取数据
  2. 处理数据
  3. 将处理完成的数据发给下一个模块

其中流程1和3代表两个模块间的数据交互,这种数据交互往往会采用一些中间件(middleware)。比如模块1和模块2间数据可能使用的是kafka,模块1向kafka中push数据,模块2向kafka中poll数据。而模块2和模块3可能使用的是rabbitMQ。很明显,它们的功能都是一样的:提供数据的流向,让数据可以流入自己同时又可以从自己流出发给别人。但由于中间件的不同,需要使用不同的API。 为了消除这种数据流入(输入)和数据流出(输出)实现上的差异性,因此便出现了Spring Cloud Stream。

2.环境准备

采用docker-compose搭建kafaka环境

version: '3'

networks:
  kafka:
    ipam:
      driver: default
      config:
        - subnet: "172.22.6.0/24"

services:
  zookepper:
    image: registry.cn-hangzhou.aliyuncs.com/zhengqing/zookeeper:latest
    container_name: zookeeper-server
    restart: unless-stopped
    volumes:
      - "/etc/localtime:/etc/localtime"
    environment:
      ALLOW_ANONYMOUS_LOGIN: yes
    ports:
      - "2181:2181"
    networks:
      kafka:
        ipv4_address: 172.22.6.11

  kafka:
    image: registry.cn-hangzhou.aliyuncs.com/zhengqing/kafka:3.4.1
    container_name: kafka
    restart: unless-stopped
    volumes:
      - "/etc/localtime:/etc/localtime"
    environment:
      ALLOW_PLAINTEXT_LISTENER: yes
      KAFKA_CFG_ZOOKEEPER_CONNECT: zookepper:2181
      KAFKA_CFG_ADVERTISED_LISTENERS: PLAINTEXT://10.11.68.77:9092
    ports:
      - "9092:9092"
    depends_on:
      - zookepper
    networks:
      kafka:
        ipv4_address: 172.22.6.12

  kafka-map:
    image: registry.cn-hangzhou.aliyuncs.com/zhengqing/kafka-map
    container_name: kafka-map
    restart: unless-stopped
    volumes:
      - "./kafka/kafka-map/data:/usr/local/kafka-map/data"
    environment:
      DEFAULT_USERNAME: admin
      DEFAULT_PASSWORD: 123456
    ports:
      - "9080:8080"
    depends_on:                         
      - kafka
    networks:
      kafka:
        ipv4_address: 172.22.6.13

run

docker-compose -f docker-compose-kafka.yml -p kafka up -d

kafka-map

https://github.com/dushixiang/kafka-map

  • 访问:http://127.0.0.1:9080
  • 账号密码:admin/123456

3.代码工程

stream

 实验目标

  1. 生成UUID并将其发送到Kafka主题batch-in
  2. batch-in主题接收UUID的批量消息,移除其中的数字,并将结果发送到batch-out主题。
  3. 监听batch-out主题并打印接收到的消息。

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <artifactId>springcloud-demo</artifactId>
        <groupId>com.et</groupId>
        <version>1.0-SNAPSHOT</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>

    <artifactId>spring-cloud-stream-kafaka</artifactId>

    <properties>
        <maven.compiler.source>17</maven.compiler.source>
        <maven.compiler.target>17</maven.compiler.target>
    </properties>
    <dependencies>
        <!-- Spring Boot Starter Web -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <!-- Spring Boot Starter Test -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-kafka</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
        </dependency>

    </dependencies>

</project>

处理流

/*
 * Copyright 2023 the original author or authors.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      https://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package com.et;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;

import java.util.List;
import java.util.UUID;
import java.util.function.Function;
import java.util.function.Supplier;

/**
 * @author Steven Gantz
 */
@SpringBootApplication
public class CloudStreamsFunctionBatch {

   public static void main(String[] args) {
      SpringApplication.run(CloudStreamsFunctionBatch.class, args);
   }

   @Bean
   public Supplier<UUID> stringSupplier() {
      return () -> {
         var uuid = UUID.randomUUID();
         System.out.println(uuid + " -> batch-in");
         return uuid;
      };
   }

   @Bean
   public Function<List<UUID>, List<Message<String>>> digitRemovingConsumer() {
      return idBatch -> {
         System.out.println("Removed digits from batch of " + idBatch.size());
         return idBatch.stream()
            .map(UUID::toString)
            // Remove all digits from the UUID
            .map(uuid -> uuid.replaceAll("\\d",""))
            .map(noDigitString -> MessageBuilder.withPayload(noDigitString).build())
            .toList();
      };
   }

   @KafkaListener(id = "batch-out", topics = "batch-out")
   public void listen(String in) {
      System.out.println("batch-out -> " + in);
   }

}
  • 定义一个名为stringSupplier的Bean,它实现了Supplier<UUID>接口。这个方法生成一个随机的UUID,并打印到控制台,表示这个UUID将被发送到batch-in主题。
  • 定义一个名为digitRemovingConsumer的Bean,它实现了Function<List<UUID>, List<Message<String>>>接口。这个方法接受一个UUID的列表,打印出处理的UUID数量,然后将每个UUID转换为字符串,移除其中的所有数字,最后将结果封装为消息并返回。
  • 使用@KafkaListener注解定义一个Kafka监听器,监听batch-out主题。当接收到消息时,调用listen方法并打印接收到的消息内容。

配置文件

spring:
  cloud:
    function:
      definition: stringSupplier;digitRemovingConsumer
    stream:
      bindings:
        stringSupplier-out-0:
          destination: batch-in
        digitRemovingConsumer-in-0:
          destination: batch-in
          group: batch-in
          consumer:
            batch-mode: true
        digitRemovingConsumer-out-0:
          destination: batch-out
      kafka:
        binder:
          brokers: localhost:9092
        bindings:
          digitRemovingConsumer-in-0:
            consumer:
              configuration:
                # Forces consumer to wait 5 seconds before polling for messages
                fetch.max.wait.ms: 5000
                fetch.min.bytes: 1000000000
                max.poll.records: 10000000

参数解释

spring:
  cloud:
    function:
      definition: stringSupplier;digitRemovingConsumer
  • spring.cloud.function.definition:定义了两个函数,stringSupplierdigitRemovingConsumer。这两个函数将在应用程序中被使用。
    stream:
      bindings:
        stringSupplier-out-0:
          destination: batch-in
  • stream.bindings.stringSupplier-out-0.destination:将stringSupplier函数的输出绑定到Kafka主题batch-in
        digitRemovingConsumer-in-0:
          destination: batch-in
          group: batch-in
          consumer:
            batch-mode: true
  • stream.bindings.digitRemovingConsumer-in-0.destination:将digitRemovingConsumer函数的输入绑定到Kafka主题batch-in
  • group: batch-in:指定消费者组为batch-in,这意味着多个实例可以共享这个组来处理消息。
  • consumer.batch-mode: true:启用批处理模式,允许消费者一次处理多条消息。
        digitRemovingConsumer-out-0:
          destination: batch-out
  • stream.bindings.digitRemovingConsumer-out-0.destination:将digitRemovingConsumer函数的输出绑定到Kafka主题batch-out

以上只是一些关键代码,所有代码请参见下面代码仓库

代码仓库

  • https://github.com/Harries/springcloud-demo(Spring Cloud Stream)

4.测试

启动弄Spring Boot应用,可以看到控制台输出日志如下:

291ea6cc-1e5e-4dfb-92b6-5d5ea43d4277 -> batch-in
c746ba4e-835e-4f66-91c5-7a5cf8b01068 -> batch-in
a661145b-2dd9-4927-8806-919ad258ade5 -> batch-in
db150918-0f0b-49f6-b7bb-77b0f580de4c -> batch-in
b0d4917b-6777-4d96-a6d0-bb96715b5b20 -> batch-in
Removed digits from batch of 5
batch-out -> eacc-ee-dfb-b-dead
batch-out -> cbae-e-f-c-acfb
batch-out -> ab-dd---adade
batch-out -> db-fb-f-bbb-bfdec
batch-out -> bdb--d-ad-bbbb

5.引用

  • https://github.com/spring-cloud/spring-cloud-stream-samples
  • Spring Cloud Stream 3.X - coderZoe的博客
  • Spring Cloud Stream实现数据流处理 | Harries Blog™

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2243246.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

无人机飞手入门指南

无人机飞手入门指南旨在为初学者提供一份全面的学习路径和实践建议&#xff0c;帮助新手快速掌握无人机飞行技能并了解相关法规知识。以下是一份详细的入门指南&#xff1a; 一、了解无人机基础知识 1. 无人机构造&#xff1a;了解无人机的组成部分&#xff0c;如机身、螺旋桨…

使用Mac下载MySQL修改密码

Mac下载MySQL MySQL官网链接MySQL​​​​​​ 当进入到官网后下滑到community社区&#xff0c;进行下载 然后选择community sever下载 这里就是要下载的界面&#xff0c;如果需要下载之前版本的话可以点击archives&#xff0c; 可能会因为这是外网原因&#xff0c;有时候下…

两大新兴开发语言大比拼:Move PK Rust

了解 Move 和 Rust 的差异有助于开发者根据项目的具体需求选择最合适的语言。选择不恰当的语言可能会导致项目后期出现技术债务。不同语言有其独特的优势。了解 Move 和 Rust 的差异可以帮助开发者拓展技术视野&#xff0c;发现不同语言在不同领域的应用潜力。 咱们直奔主题&a…

three.js 对 模型使用 视频进行贴图修改材质

three.js 对 模型使用 视频进行贴图修改材质 https://threehub.cn/#/codeMirror?navigationThreeJS&classifyapplication&idvideoModel import * as THREE from three import { OrbitControls } from three/examples/jsm/controls/OrbitControls.js import { GLTFLoad…

【论文分享】利用多源大数据衡量街道步行环境的老年友好性:以中国上海为例

本次给大家带来一篇SCI论文的全文翻译&#xff01;该论文考虑了绿化程度、可步行性、安全性、形象性、封闭性和复杂性这六个指标&#xff0c;提出了一种基于多源地理空间大数据的新型定量评价模型&#xff0c;用于从老年人和专家的角度评估街道步行环境的老年友好程度&#xff…

计算机网络安全 —— 对称加密算法 DES (一)

一、对称加密算法概念# ​ 我们通过计算机网络传输数据时&#xff0c;如果无法防止他人窃听&#xff0c; 可以利用密码学技术将发送的数据变换成对任何不知道如何做逆变换的人都不可理解的形式&#xff0c; 从而保证了数据的机密性。这种变换被称为加密&#xff08; encryptio…

6.C操作符详解,深入探索操作符与字符串处理

C操作符详解&#xff0c;深入探索操作符与字符串处理 C语言往期系列文章目录 往期回顾&#xff1a; C语言是什么&#xff1f;编程界的‘常青树’&#xff0c;它的辉煌你不可不知VS 2022 社区版C语言的安装教程&#xff0c;不要再卡在下载0B/s啦C语言入门&#xff1a;解锁基础…

微信小程序 最新获取用户头像以及用户名

一.在小程序改版为了安全起见 使用用户填写来获取头像以及用户名 二.代码实现 <view class"login_box"><!-- 头像 --><view class"avator_box"><button wx:if"{{ !userInfo.avatarUrl }}" class"avatorbtn" op…

Uni-APP+Vue3+鸿蒙 开发菜鸟流程

参考文档 文档中心 运行和发行 | uni-app官网 AppGallery Connect DCloud开发者中心 环境要求 Vue3jdk 17 Java Downloads | Oracle 中国 【鸿蒙开发工具内置jdk17&#xff0c;本地不使用17会报jdk版本不一致问题】 开发工具 HBuilderDevEco Studio【目前只下载这一个就…

【Android、IOS、Flutter、鸿蒙、ReactNative 】屏幕适配

Android Java 屏幕适配 参考 今日头条适配依赖配置 添加设计屏幕尺寸 设置字体大小 通过切换不同屏幕尺寸查看字体大小 设置文本宽高 通过切换不同屏幕尺寸查看文本宽高 Android Compose 屏幕适配 <

从视频帧生成点云数据、使用PointNet++模型提取特征,并将特征保存下来的完整实现。

文件地址 https://github.com/yanx27/Pointnet_Pointnet2_pytorch?spm5176.28103460.0.0.21a95d27ollfze Pointnet_Pointnet2_pytorch\log\classification\pointnet2_ssg_wo_normals文件夹改名为Pointnet_Pointnet2_pytorch\log\classification\pointnet2_cls_ssg "E:…

Websocket如何分块处理数据量超大的消息体

若我们服务端一次性最大处理的字节数是1M,而客户端发来了2M的数据&#xff0c;此时服务端的数据就要被切割成两次传输解码。Http协议中有分块传输&#xff0c;而在Websocket也可以分块处理超大的消息体。在jsr356标准中使用javax.websocket.MessageHandler.Partial可以分块处理…

论文复现_How Machine Learning Is Solving the Binary Function Similarity Problem

1. 内容概述 前言&#xff1a;此代码库支持 USENIX Security 22 论文 《How Machine Learning Is Solving the Binary Function Similarity Problem》&#xff0c;作者包括 Andrea Marcelli 等人&#xff0c;提供了相关代码、数据集和技术细节。 关键内容&#xff1a;技术报告…

【视觉SLAM】2-三维空间刚体运动的数学表示

读书笔记&#xff1a;学习空间变换的三种数学表达形式。 文章目录 1. 旋转矩阵1.1 向量运算1.2 坐标系空间变换1.3 变换矩阵与齐次坐标 2. 旋转向量和欧拉角2.1 旋转向量2.2 欧拉角 3. 四元数 1. 旋转矩阵 1.1 向量运算 对于三维空间中的两个向量 a , b ∈ R 3 a,b \in \R^3 …

【WPF】Prism学习(六)

Prism Dependency Injection 1.依赖注入&#xff08;Dependency Injection&#xff09; 1.1. Prism与依赖注入的关系&#xff1a; Prism框架一直围绕依赖注入构建&#xff0c;这有助于构建可维护和可测试的应用程序&#xff0c;并减少或消除对静态和循环引用的依赖。 1.2. P…

多账号登录管理器(淘宝、京东、拼多多等)

目录 下载安装与运行 解决什么问题 功能说明 目前支持的平台 功能演示 登录后能保持多久 下载安装与运行 下载、安装与运行 语雀 解决什么问题 多个账号的快捷登录与切换 功能说明 支持多个电商平台支持多个账号的登录保持支持快捷切换支持导入导出支持批量删除支持…

UniAPP快速入门教程(一)

一、下载HBuilder 首先需要下载HBuilder开发工具&#xff0c;下载地址:https://www.dcloud.io/hbuilderx.htmlhttps://www.dcloud.io/hbuilder.html 选择Windows正式版.zip文件下载。下载解压后直接运行解压目录里的HBuilderX.exe就可以启动HBuilder。 UniApp的插件市场网址…

PyAEDT:Ansys Electronics Desktop API 简介

在本文中&#xff0c;我将向您介绍 PyAEDT&#xff0c;这是一个 Python 库&#xff0c;旨在增强您对 Ansys Electronics Desktop 或 AEDT 的体验。PyAEDT 通过直接与 AEDT API 交互来简化脚本编写&#xff0c;从而允许在 Ansys 的电磁、热和机械求解器套件之间无缝集成。通过利…

SpringBoot源码解析(四):解析应用参数args

SpringBoot源码系列文章 SpringBoot源码解析(一)&#xff1a;SpringApplication构造方法 SpringBoot源码解析(二)&#xff1a;引导上下文DefaultBootstrapContext SpringBoot源码解析(三)&#xff1a;启动开始阶段 SpringBoot源码解析(四)&#xff1a;解析应用参数args 目录…

【Linux】指令 + 重定向操作

Linux基本指令 一.Linux基本指令1.mv&#xff08;重要&#xff09;2.cat3.more和less&#xff08;重要&#xff09;4.head和tail5.date6.cal7.find&#xff08;重要&#xff09; 二.Linux相关知识点1. Linux系统中&#xff1a;一切皆文件2. 重定向操作1. 输出重定向2. 追加重定…