Spring Boot集成udp通讯
- 加入依赖
- 编辑配置文件
- 配置相关属性
- 具体业务类
- 客户端
- 调试
加入依赖
<!--加入UDP通信所需依赖-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-integration</artifactId>
<exclusions>
<exclusion>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-logging</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-ip</artifactId>
</dependency>
编辑配置文件
application.yml
# 和嵌入式udp通信的发送端口和监听端口地址
udp:
listeningPort: 9911
sendingPort: 9911
配置相关属性
@Component
@ConfigurationProperties(prefix = "udp")
public class UdpConfig {
/**
* 与嵌入式通信的udp监听端口
*/
private static Integer listeningPort;
/**
* 与嵌入式通信的udp发送端口
*/
private static Integer sendingPort;
@Value("${udp.listeningPort}")
public void setListeningPort(Integer listeningPort) {
this.listeningPort = listeningPort;
}
@Value("${udp.sendingPort}")
public void setSendingPort(Integer sendingPort) {
this.sendingPort = sendingPort;
}
public static Integer getListeningPort() {
return listeningPort;
}
public static Integer getSendingPort() {
return sendingPort;
}
}
具体业务类
@Configuration
public class UdpServer {
private static final Logger logger = LoggerFactory.getLogger(UdpServer.class);
/**
* UDP消息接收服务
*/
@Bean
public IntegrationFlow integrationFlow() {
logger.info("UDP服务启动成功,端口号为: {}", UdpConfig.getListeningPort());
return IntegrationFlows.from(Udp.inboundAdapter(UdpConfig.getListeningPort())).channel("udpChannel").get();
}
/**
* 转换器
*/
@Transformer(inputChannel = "udpChannel", outputChannel = "udpFilter")
public String transformer(@Payload byte[] payload, @Headers Map<String, Object> headers) {
String message = new String(payload);
// todo 进行数据转换
message = message.toUpperCase();
return message;
}
/**
* 过滤器
*/
@Filter(inputChannel = "udpFilter", outputChannel = "udpRouter")
public boolean filter(String message, @Headers Map<String, Object> headers) {
// 获取来源Id
String id = headers.get("id").toString();
// 获取来源IP,可以进行IP过滤
String ip = headers.get("ip_address").toString();
// 获取来源Port
String port = headers.get("ip_port").toString();
// todo 信息数据过滤
// if (true) {
// // 没有-的数据会被过滤
// return false;
// }
return true;
}
/**
* 路由分发处理器:可以进行分发消息被那个处理器进行处理
*/
@Router(inputChannel = "udpRouter")
public String router(String message, @Headers Map<String, Object> headers) {
// 获取来源Id
String id = headers.get("id").toString();
// 获取来源IP,可以进行IP过滤
String ip = headers.get("ip_address").toString();
// 获取来源Port
String port = headers.get("ip_port").toString();
// todo 筛选,走那个处理器
if (true) {
return "udpHandle2";
}
return "udpHandle1";
}
/**
* 最终处理器1
*/
@ServiceActivator(inputChannel = "udpHandle1")
public void udpMessageHandle(String message) throws Exception {
// todo 可以进行异步处理
logger.info("message:" + message);
}
/**
* 最终处理器2
*/
@ServiceActivator(inputChannel = "udpHandle2")
public void udpMessageHandle2(String message) throws Exception {
logger.info("UDP2:" + message);
//接受消息处理业务
}
public void sendMsg(String message) {
byte[] bytes = message.getBytes();
UnicastSendingMessageHandler handler = new UnicastSendingMessageHandler("localhost", UdpConfig.getSendingPort());
logger.info("发送UDP信息: {" + bytes + "}");
handler.handleMessage(MessageBuilder.withPayload(bytes).build());
logger.info("发送成功");
}
}
客户端
@RestController
@RequestMapping("/udp")
public class UdpClient {
private final static Logger logger = LoggerFactory.getLogger(UdpClient.class);
@PostMapping("/send")
public BaseResult send(@RequestParam("message") String message) {
byte[] bytes = message.getBytes();
UnicastSendingMessageHandler handler = new UnicastSendingMessageHandler("localhost", UdpConfig.getSendingPort());
logger.info("发送UDP信息: {" + bytes + "}");
handler.handleMessage(MessageBuilder.withPayload(bytes).build());
logger.info("发送成功");
return new BaseResult<>();
}
}
调试
接口请求客户端地址{ip:host}/udp/send
debug查看 成功接收到消息