Message Processing With Spring Integration高级应用:自定义消息通道与端点

news2024/12/19 21:41:31

一、Spring Integration 简介

Spring Integration 是 Spring 框架的扩展,支持企业集成模式(EIP),提供轻量级的消息处理功能,帮助开发者构建可维护、可测试的企业集成解决方案。

核心目标:
  1. 提供简单的模型来实现复杂的企业集成。
  2. 支持与外部系统的集成。
  3. 提供模块化、松耦合的消息处理架构。

二、Spring Integration 核心组件

1. 消息(Message)
  • 定义:消息是 Spring Integration 的核心,包含 payload(负载)和 header(头部)。
  • 创建消息:通过 MessageBuilder 创建消息。

代码示例

import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;

Message<String> message = MessageBuilder
    .withPayload("Message Payload")
    .setHeader("Message_Header1", "Header1_Value")
    .setHeader("Message_Header2", "Header2_Value")
    .build();

2. 消息通道(Message Channel)
  • 定义:消息通道是消息传递的管道,连接消息的生产者和消费者。
  • 类型
    • 点对点(Point-to-Point):每条消息最多被一个消费者接收。
    • 发布/订阅(Publish/Subscribe):每条消息可以被多个订阅者接收。
  • 常见实现
    • DirectChannel:默认点对点通道。
    • NullChannel:虚拟通道,用于测试和调试。
    • 其他:PublishSubscribeChannelQueueChannelPriorityChannel 等。

3. 消息端点(Message Endpoint)

消息端点是应用程序代码与消息基础设施之间的桥梁,主要类型包括:

  • Transformer:转换消息内容或结构。
  • Filter:过滤不符合条件的消息。
  • Router:根据条件将消息路由到不同的通道。
  • Splitter:将消息拆分为多个子消息。
  • Aggregator:将多个消息聚合为一个消息。
  • Service Activator:连接服务实例到消息系统。
  • Channel Adapter:连接消息通道与外部系统。

三、货物处理系统示例

1. 需求

实现一个货物处理系统,功能包括:

  1. 接收货物消息。
  2. 拆分货物列表为单个货物消息。
  3. 基于重量过滤货物。
  4. 根据运输类型(国内/国际)路由货物。
  5. 转换货物消息。
  6. 最终处理并记录货物信息。

2. 项目环境
  • JDK:1.8
  • Spring:4.1.2
  • Spring Integration:4.1.0
  • Maven:3.2.2
  • 操作系统:Ubuntu 14.04

3. 完整代码实现
Step 1:添加依赖

pom.xml 中添加 Spring 和 Spring Integration 的依赖:

<properties>
    <spring.version>4.1.2.RELEASE</spring.version>
    <spring.integration.version>4.1.0.RELEASE</spring.integration.version>
</properties>

<dependencies>
    <!-- Spring 核心依赖 -->
    <dependency>
        <groupId>org.springframework</groupId>
        <artifactId>spring-context</artifactId>
        <version>${spring.version}</version>
    </dependency>
    <!-- Spring Integration 核心依赖 -->
    <dependency>
        <groupId>org.springframework.integration</groupId>
        <artifactId>spring-integration-core</artifactId>
        <version>${spring.integration.version}</version>
    </dependency>
</dependencies>

Step 2:配置类

创建 AppConfiguration 类,配置消息通道和启用 Spring Integration:

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.IntegrationComponentScan;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.config.EnableIntegration;
import org.springframework.messaging.MessageChannel;

@Configuration
@ComponentScan("com.onlinetechvision.integration")
@EnableIntegration
@IntegrationComponentScan("com.onlinetechvision.integration")
public class AppConfiguration {

    @Bean
    public MessageChannel cargoGWDefaultRequestChannel() {
        return new DirectChannel();
    }

    @Bean
    public MessageChannel cargoSplitterOutputChannel() {
        return new DirectChannel();
    }

    @Bean
    public MessageChannel cargoFilterOutputChannel() {
        return new DirectChannel();
    }

    @Bean
    public MessageChannel cargoTransformerOutputChannel() {
        return new DirectChannel();
    }
}

Step 3:消息网关

定义 CargoGateway 接口,作为消息系统的入口:

import org.springframework.integration.annotation.Gateway;
import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.messaging.Message;

import java.util.List;

@MessagingGateway
public interface CargoGateway {

    @Gateway(requestChannel = "cargoGWDefaultRequestChannel")
    void processCargoRequest(Message<List<Cargo>> message);
}

Step 4:消息拆分器

实现 CargoSplitter,将货物列表拆分为单个货物消息:

import org.springframework.integration.annotation.MessageEndpoint;
import org.springframework.integration.annotation.Splitter;
import org.springframework.messaging.Message;

import java.util.List;

@MessageEndpoint
public class CargoSplitter {

    @Splitter(inputChannel = "cargoGWDefaultRequestChannel", outputChannel = "cargoSplitterOutputChannel")
    public List<Cargo> splitCargoList(Message<List<Cargo>> message) {
        return message.getPayload();
    }
}

Step 5:消息过滤器

实现 CargoFilter,过滤重量超过限制的货物:

import org.springframework.integration.annotation.Filter;
import org.springframework.integration.annotation.MessageEndpoint;

@MessageEndpoint
public class CargoFilter {

    private static final double CARGO_WEIGHT_LIMIT = 1000.0;

    @Filter(inputChannel = "cargoSplitterOutputChannel", outputChannel = "cargoFilterOutputChannel", discardChannel = "cargoFilterDiscardChannel")
    public boolean filterCargo(Cargo cargo) {
        return cargo.getWeight() <= CARGO_WEIGHT_LIMIT;
    }
}

Step 6:服务激活器

实现 CargoServiceActivator,处理最终的货物消息:

import org.springframework.integration.annotation.MessageEndpoint;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.messaging.handler.annotation.Header;

@MessageEndpoint
public class CargoServiceActivator {

    @ServiceActivator(inputChannel = "cargoTransformerOutputChannel")
    public void processCargo(Cargo cargo, @Header("CARGO_BATCH_ID") long batchId) {
        System.out.println("Processed Cargo: " + cargo + " in Batch: " + batchId);
    }
}

Step 7:运行主程序

创建 Application 类,初始化 Spring 容器并发送货物请求:

import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.messaging.support.MessageBuilder;

import java.util.Arrays;
import java.util.List;

public class Application {
    public static void main(String[] args) {
        ApplicationContext context = new AnnotationConfigApplicationContext(AppConfiguration.class);
        CargoGateway gateway = context.getBean(CargoGateway.class);

        List<Cargo> cargos = Arrays.asList(
            new Cargo(1, "Receiver1", "Address1", 500, "Domestic"),
            new Cargo(2, "Receiver2", "Address2", 1500, "International")
        );

        gateway.processCargoRequest(MessageBuilder.withPayload(cargos).build());
    }
}

四、运行过程

  1. 启动 Application 类。
  2. 系统会根据配置:
    • 拆分货物列表。
    • 过滤重量超过限制的货物。
    • 路由货物到不同的通道。
    • 最终处理并记录货物信息。
  3. 控制台输出处理结果。

五、适用场景

Spring Integration 非常适合以下场景:

  1. 企业系统集成:如 ERP、CRM、供应链系统之间的数据交换。
  2. 消息驱动架构:如基于事件的微服务通信。
  3. 复杂消息处理:如批量处理、过滤、路由、转换等。
  4. 与外部系统交互:如文件系统、消息队列(RabbitMQ、Kafka)、数据库等。

通过 Spring Integration,可以轻松实现复杂的企业集成需求,同时保持代码的可维护性和扩展性。
参考链接:https://dzone.com/articles/message-processing-spring

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2262373.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Linux文件属性 --- 硬链接、所有者、所属组

三、硬链接数 1.目录 使用“ll”命令查看&#xff0c;在文件权限的后面有一列数字&#xff0c;这是文件的硬链接数。 对于目录&#xff0c;硬链接的数量是它具有的直接子目录的数量加上其父目录和自身。 下图的“qwe”目录就是“abc”目录的直接子目录。 2.文件 对于文件可…

Centos7 部署ZLMediakit

1、拉取代码 #国内用户推荐从同步镜像网站gitee下载 git clone --depth 1 https://gitee.com/xia-chu/ZLMediaKit cd ZLMediaKit #千万不要忘记执行这句命令 git submodule update --init 2、安装编译器 sudo yum -y install gcc 3、安装cmake sudo yum -y install cmake 4…

无管理员权限 LCU auth-token、port 获取(全网首发 go)

一&#xff1a; 提要&#xff1a; 参考项目&#xff1a; https://github.com/Zzaphkiel/Seraphine 想做一个 lol 查战绩的软件&#xff0c;并且满足自己的需求&#xff08;把混子和大爹都表示出来&#xff09;&#xff0c;做的第一步就是获取 lcu token &#xff0c;网上清一色…

《云原生安全攻防》-- K8s安全框架:认证、鉴权与准入控制

从本节课程开始&#xff0c;我们将来介绍K8s安全框架&#xff0c;这是保障K8s集群安全比较关键的安全机制。接下来&#xff0c;让我们一起来探索K8s安全框架的运行机制。 在这个课程中&#xff0c;我们将学习以下内容&#xff1a; K8s安全框架&#xff1a;由认证、鉴权和准入控…

spring\strust\springboot\isp前后端那些事儿

后端 一. 插入\更新一条数据&#xff08;老&#xff09; Map<String, Object> parameterMap MybatisUtil.initParameterSave("Send_ProjectFrozenLog", sendProjectFrozenLog); commonMapper.insert(parameterMap);parameterMap MybatisUtil.initParameter…

UE5安装Fab插件

今天才知道原来Fab也有类似Quixel Bridge的插件&#xff0c;于是立马就安装上了&#xff0c;这里分享一下安装方法 在Epic客户端 - 库 - Fab Library 搜索 Fab 即可安装Fab插件 然后重启引擎&#xff0c;在插件面板勾选即可 然后在窗口这就有了 引擎左下角也会多出一个Fab图标…

对于使用exe4j打包,出现“NoClassDefFoundError: BOOT-INF/classes”的解决方案

jar使用exe4j打包exe&#xff0c;出现NoClassDefFoundError: BOOT-INF/classes 注意选取的jar包是使用build&#xff0c;而不是maven中的install 本文介绍解决这个方法的方案 点击Project Structure 按照如图所示选择 选择main class&#xff0c;选择你要打的main 如果遇到/M…

文件上传之文件内容检测

一.基本概念 介绍&#xff1a;文件内容检测就是检测上传的文件里的内容。 文件幻数检测 通常情况下&#xff0c;通过判断前10个字节&#xff0c;基本就能判断出一个文件的真实类型。 文件加载检测 一般是调用API或函数对文件进行加载测试。常见的是图像渲染测试&#xff0c;再…

WebSpoon9.0(KETTLE的WEB版本)编译 + tomcatdocker部署 + 远程调试教程

前言 Kettle简介 Kettle是一款国外开源的ETL工具&#xff0c;纯Java编写&#xff0c;可以在Window、Linux、Unix上运行&#xff0c;绿色无需安装&#xff0c;数据抽取高效稳定 WebSpoon是Kettle的Web版本&#xff0c;由Kettle社区维护&#xff0c;不受Pentaho支持&#xff0c;…

搭建Tomcat(三)---重写service方法

目录 引入 一、在Java中创建一个新的空项目&#xff08;初步搭建&#xff09; 问题&#xff1a; 要求在tomcat软件包下的MyTomcat类中编写main文件&#xff0c;实现在MyTomcat中扫描myweb软件包中的所有Java文件&#xff0c;并返回“WebServlet(url"myFirst")”中…

CAN配置---波特率中断引脚等---autochips-AC7811-ARM-M3内核

1、配置工具 虽然不怎么好用&#xff0c;但比没有强多了。具体看图&#xff1a; 时钟选着 NVIC配置 GPIO配置 2、生成的具体配置信息 NXP的配置工具里面&#xff0c;具体的波特率可以直接显示&#xff0c;这个工具没有&#xff0c;怎么办&#xff1f; 它放到了生成的代码里面…

matlab Patten的使用(重要)(Matlab处理字符串一)

原文连接&#xff1a;https://www.mathworks.com/help/releases/R2022b/matlab/ref/pattern.html?browserF1help 能使用的搜索函数&#xff1a; contains确定字符串中是否有模式matches确定模式是否与字符串匹配count计算字符串中模式的出现次数endsWith确定字符串是否以模式…

Docker创建一个mongodb实例,并用springboot连接 mongodb进行读写文件

一、通过Docker 进行运行一个 mongodb实例 1、拉取镜像 docker pull mongo:5.0.5 2、创建 mongodb容器实例 docker run -d --name mongodb2 \-e MONGO_INITDB_ROOT_USERNAMEsalaryMongo \-e MONGO_INITDB_ROOT_PASSWORD123456 \-p 27017:27017 \mongo:5.0.5 3、进入容器&am…

#渗透测试#漏洞挖掘#红蓝攻防#护网#sql注入介绍02-基于错误消息的SQL注入(Error-Based SQL Injection)

免责声明 本教程仅为合法的教学目的而准备&#xff0c;严禁用于任何形式的违法犯罪活动及其他商业行为&#xff0c;在使用本教程前&#xff0c;您应确保该行为符合当地的法律法规&#xff0c;继续阅读即表示您需自行承担所有操作的后果&#xff0c;如有异议&#xff0c;请立即停…

OpenCVE:一款自动收集NVD、MITRE等多源知名漏洞库的开源工具,累计收录CVE 27万+

漏洞库在企业中扮演着至关重要的角色&#xff0c;不仅提升了企业的安全防护能力&#xff0c;还支持了安全决策、合规性要求的满足以及智能化管理的发展。前期博文《业界十大知名权威安全漏洞库介绍》介绍了主流漏洞库&#xff0c;今天给大家介绍一款集成了多款漏洞库的开源漏洞…

Spring Boot 3.X:Unable to connect to Redis错误记录

一.背景 最近在搭建一个新项目&#xff0c;本着有新用新的原则&#xff0c;项目选择到了jdk17SpringBoot3.4。但是在测试Redis连接的时候却遇到了以下问题&#xff1a; redis连不上了。于是我先去检查了配置文件的连接信息&#xff0c;发现没问题&#xff1b;再去检查配置类&am…

MinT: 第一个能够生成顺序事件并控制其时间戳的文本转视频模型。

MinT 是第一个能够生成顺序事件并控制其时间戳的文本转视频模型。使用 MinT 生成时间控制的多事件视频。给定一系列事件文本提示及其所需的开始和结束时间戳&#xff0c;MinT 可以合成具有一致主题和背景的平滑连接事件。此外&#xff0c;它可以灵活地控制每个事件的时间跨度。…

C语言实验 结构体2

时间:2024.12.18 6-5 评委打分-t-CalcuScore 代码 // 定义结构体 struct Score {int id;char name[10];int value[17];double finalScore;int rank; };// 计算最终成绩 void CalcuScore(struct Score grade[], int n) {for (int i = 0; i < n; i++) {int max = grade[i].…

第6章 第一组重构

最常用到的重构就是用提炼函数&#xff08;106&#xff09;将代码提炼到函数中&#xff0c;或者用提炼变量&#xff08;119&#xff09;来提炼变量。既然重构的作用就是应对变化&#xff0c;你应该不会感到惊讶&#xff0c;我也经常使用这两个重构的反向重构——内联函数&#…

基于python对网页进行爬虫简单教程

python对网页进行爬虫 基于BeautifulSoup的爬虫—源码 """ 基于BeautifulSoup的爬虫### 一、BeautifulSoup简介1. Beautiful Soup提供一些简单的、python式的函数用来处理导航、搜索、修改分析树等功能。它是一个工具箱&#xff0c;通过解析文档为用户提供需要…