系列文章目录
…TODO
spring integration开篇:说明
…TODO
spring integration使用:消息路由
spring integration使用:消息路由
- 系列文章目录
- 前言
- 消息路由的概念
- 二、路由的分类
- 基于内容的路由器
- spring integration中的实现
- RecipientListRouter
- ...TODO
- 动态路由器
- 三、基于内容的路由器使用示例
- 目标
- 1.引入库
- 2.码代码
- 2.1.消息源
- 2.2.定义渠道
- 2.3.定义集成流
- 2.4.定义用于处理分流过来消息(前缀为a的消息)集成流
- 总结
前言
本系列文章主要是通过一些实际项目场景举例,展开讲解spring integration对enterprise integration patterns的实现。个人能力所限,文中有不妥当或者错误的点还希望大家担待和指正。
关于文章中使用的一些环境依赖和代码风格、约定,请看系列文章的开篇说明。
消息路由的概念
如何分离各个处理步骤,以便可以根据一组条件将消息传递到不同的筛选器?
插入一个特殊的过滤器,即消息路由器,该过滤器使用来自一个消息通道的消息,并根据一组条件将其重新发布到另一个消息通道通道。
消息路由器与管道和过滤器的最基本概念不同,因为它连接到多个输出通道。由于管道和过滤器体系结构,消息路由器周围的组件完全不知道消息路由器的存在。消息路由器的一个关键属性是它不修改消息内容。它只关心消息的目的地。
二、路由的分类
基于内容的路由器
spring integration中的实现
RecipientListRouter
通过自定义规则将收到的每条消息发送到静态定义的消息通道列表。
…TODO
动态路由器
三、基于内容的路由器使用示例
目标
通过对消息内容做判断将消息分流到不同的渠道中进行后续处理。
1.引入库
gradle
implementation 'org.springframework.boot:spring-boot-starter-integration'
implementation 'org.springframework.integration:spring-integration-http'
implementation 'org.springframework.integration:spring-integration-file'
2.码代码
2.1.消息源
public String getFeed() {
RestTemplate restTemplate = new RestTemplate();
String forObject = restTemplate.getForObject("https://spring.io/blog.atom", String.class);
// String forObject = restTemplate.getForObject("https://tuna.moe/feed.xml", String.class);
// System.out.println(forObject);
return forObject;
}
2.2.定义渠道
@Bean
public MessageChannel prefixa(){
return new DirectChannel();
}
2.3.定义集成流
@Bean
public IntegrationFlow httpOutboundFlow() {
return IntegrationFlows.fromSupplier(this::getFeed, c -> c.poller(Pollers.fixedRate(10000)))
.channel(MessageChannels.direct())
.transform(Transformers.objectToString("UTF-8"))
.split(s -> s.applySequence(false).delimiters(" "))
.<String>filter((p) -> p.length() < 10 && p.matches("\\b[\\w]{3,}\\b"))
.channel(MessageChannels.direct())
.routeToRecipients(r->r
.applySequence(true)
.ignoreSendFailures(true)
.defaultOutputChannel("nullChannel")
.recipient("prefixa", "payload.startsWith('a')")
)
.get();
}
2.4.定义用于处理分流过来消息(前缀为a的消息)集成流
@Bean
public IntegrationFlow printAFlow(){
return IntegrationFlows.from("prefixa")
.handle(p->{
System.out.println("^^^^^^^^^^^^^^^" + p.getPayload());
})
.get();
}
总结
…TODO。