此方法不是唯一,只是自己对于Springboot中关于Websocket处理思路比较清晰的一种,在此记录下来。总共不过就四个文件而已。
一、创建Springboot项目,添加不可或缺的pom
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
由于需要跟http请求对接,所以一定要有web的依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
二、配置文件WebsocketConfiguration
package com.chris.modules.config;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.config.annotation.EnableWebSocket;
import org.springframework.web.socket.server.standard.ServerEndpointExporter;
/**
* @author Chris Chan
* Create On 2022/11/23 13:22
* Use for:
* Explain:
*/
@EnableWebSocket
@Configuration
public class WebsocketConfiguration {
@Bean
public ServerEndpointExporter serverEndpointExporter() {
return new ServerEndpointExporter();
}
}
这是最简单的内容了,不过就是创建了一个bean而已。
三、抽象定义文件
package com.chris.modules.ws;
import javax.websocket.Session;
import java.util.concurrent.CopyOnWriteArraySet;
/**
* @author Chris Chan
* Create On 2022/11/23 14:37
* Use for:
* Explain:
*/
public interface Websocket {
/**
* @author Chris Chan
* Create On 2022/11/23 13:09
* Use for: 规范连接回调
* Explain:
*/
interface LinkPoint {
//连接时回调
void onOpen(Session session, String auth);
//收到消息时回调
void onMessage(String message);
//连接关闭时回调
void onClose();
//发生错误时回调
void onError(Session session, Throwable throwable);
}
/**
* @author Chris Chan
* Create On 2022/11/23 13:15
* Use for: 客户端
* Explain:
*/
interface Client {
//获取会话
Session getSession();
//获取标记
String getTag();
//发送文本
void sendText(String text);
//发送对象
void send(Object object);
}
/**
* @author Chris Chan
* Create On 2022/11/23 13:12
* Use for: 管理行为
* Explain:
*/
interface Manager<T extends Client> {
//向指定客户端发送文本
void sendText(String text, T... clients);
//向所有客户端发送文本
void sendTextYoAll(String text);
//向指定客户端发送对象
void send(Object object, T... clients);
//向所有客户端发送对象
void sendToAll(Object object);
//向其他客户端发送文本
void sendTextToOther(String text, T... clients);
//向其他客户端发送对象
void sendToOther(Object object, T... clients);
//添加客户端
void addClients(T... clients);
//获取所有客户端
CopyOnWriteArraySet<T> all();
//移除客户端
void removeClients(T... clients);
//根据标记获取客户端
T getClientByTag(String tag);
//根据标记获取多个客户端
T[] getClientsByTags(String... tags);
}
}
这里本来是三个接口,后来我把他们放到一个文件里面了。其中LinkPoint规范了四个回调方法。Client声明了客户端需要用到的几个行为,Manager为管理类定义了一系列行为。
四、实现客户端
package com.chris.modules.impl;
import com.chris.modules.ws.Websocket.Client;
import com.chris.modules.ws.Websocket.LinkPoint;
import org.springframework.stereotype.Component;
import javax.websocket.*;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
/**
* @author Chris Chan
* Create On 2022/11/23 13:26
* Use for: 实现一个客户端
* Explain:
*/
@Component
@ServerEndpoint("/chat/{auth}")
public class DemoClient implements LinkPoint, Client {
private static DemoManager demoManager = DemoManager.getInstance();
private String tag;
private Session session;
@OnOpen
@Override
public void onOpen(Session session, @PathParam("auth") String auth) {
this.session = session;
this.tag = auth;
demoManager.addClients(this);
String msg = tag + " 上线了" + "目前在线 " + demoManager.all().size() + " 人";
System.out.println(msg);
demoManager.sendTextToOther(msg, this);
}
@OnMessage
@Override
public void onMessage(String message) {
String msg = tag + ": " + message;
System.out.println(msg);
demoManager.sendTextToOther(msg, this);
}
@OnClose
@Override
public void onClose() {
demoManager.removeClients(this);
String msg = tag + " 下线了,目前在线 " + demoManager.all().size() + " 人";
System.out.println(msg);
demoManager.sendTextToOther(msg);
}
@OnError
@Override
public void onError(Session session, Throwable throwable) {
System.out.println("出错");
try {
session.close();
demoManager.removeClients(this);
String msg = tag + " 离线了,目前在线 " + demoManager.all().size() + " 人";
System.out.println(msg);
demoManager.sendTextToOther(msg);
} catch (IOException e) {
e.printStackTrace();
}
}
@Override
public Session getSession() {
return this.session;
}
@Override
public String getTag() {
return this.tag;
}
@Override
public void sendText(String text) {
try {
session.getBasicRemote().sendText(text);
} catch (IOException e) {
e.printStackTrace();
}
}
@Override
public void send(Object object) {
try {
session.getBasicRemote().sendObject(object);
} catch (IOException | EncodeException e) {
e.printStackTrace();
}
}
}
其中的auth原本的设计是token,使用来识别客户端身份的,可以增加校验逻辑。目前整体逻辑是按照聊天的业务来实现的。
五、实现管理类
package com.chris.modules.impl;
import com.chris.modules.ws.Websocket.Manager;
import java.util.Arrays;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.stream.Collectors;
/**
* @author Chris Chan
* Create On 2022/11/23 13:32
* Use for:
* Explain:
*/
public class DemoManager implements Manager<DemoClient> {
private static CopyOnWriteArraySet<DemoClient> linkSet = new CopyOnWriteArraySet<>();
//单例
private static DemoManager instance = new DemoManager();
private DemoManager() {
}
public static DemoManager getInstance() {
return instance;
}
@Override
public void sendText(String text, DemoClient... clients) {
for (DemoClient client : clients) {
client.sendText(text);
}
}
@Override
public void sendTextYoAll(String text) {
for (DemoClient demoClient : linkSet) {
demoClient.sendText(text);
}
}
@Override
public void send(Object object, DemoClient... clients) {
for (DemoClient client : clients) {
client.send(object);
}
}
@Override
public void sendToAll(Object object) {
for (DemoClient demoClient : linkSet) {
demoClient.send(object);
}
}
@Override
public void sendTextToOther(String text, DemoClient... clients) {
Set<String> tagSet = Arrays.stream(clients).map(DemoClient::getTag).collect(Collectors.toSet());
for (DemoClient demoClient : linkSet) {
if (tagSet.contains(demoClient.getTag())) {
continue;
}
demoClient.sendText(text);
}
}
@Override
public void sendToOther(Object object, DemoClient... clients) {
Set<String> tagSet = Arrays.stream(clients).map(DemoClient::getTag).collect(Collectors.toSet());
for (DemoClient demoClient : linkSet) {
if (tagSet.contains(demoClient.getTag())) {
continue;
}
demoClient.send(object);
}
}
@Override
public void addClients(DemoClient... clients) {
linkSet.addAll(Arrays.asList(clients));
}
@Override
public CopyOnWriteArraySet<DemoClient> all() {
return linkSet;
}
@Override
public void removeClients(DemoClient... clients) {
for (DemoClient client : clients) {
linkSet.remove(client);
}
}
@Override
public DemoClient getClientByTag(String tag) {
for (DemoClient demoClient : linkSet) {
if (demoClient.getTag().equals(tag)) {
return demoClient;
}
}
return null;
}
@Override
public DemoClient[] getClientsByTags(String... tags) {
if (null == tags || tags.length == 0) {
return null;
}
Set<String> tagSet = Arrays.stream(tags).collect(Collectors.toSet());
List<DemoClient> clientList = linkSet.stream().filter(c -> tagSet.contains(c.getTag())).collect(Collectors.toList());
DemoClient[] clients = new DemoClient[clientList.size()];
clientList.toArray(clients);
return clients;
}
}
本来管理类可以使用一个静态类就够了,但是为了用一个接口来规范行为,所以设计成了单例模式。其中对客户端的管理使用了并发包下面的set集合。
到现在,一个拥有实时聊天的逻辑就具备了。运行起来,用一个网络客户端就可以模拟聊天了。
一般在设计使用长连接时,主要负责下行数据传输,上行需求一般还是倚靠http接口来解决。我们写两个接口。
package com.chris.modules.web;
import com.chris.modules.impl.DemoClient;
import com.chris.modules.impl.DemoManager;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
/**
* @author Chris Chan
* Create On 2022/11/23 14:21
* Use for:
* Explain:
*/
@RestController
@RequestMapping("api/ws")
public class WsController {
private static DemoManager demoManager = DemoManager.getInstance();
/**
* 向指定客户端发送消息
*
* @param auth
* @param tags
* @param msg
* @return
*/
@GetMapping("send")
public String send(String auth, String tags, String msg) {
//模拟auth发送者,这里可根据http请求携带的token来分析
DemoClient authCli = demoManager.getClientByTag(auth);
//获取目标客户端
DemoClient[] clients = demoManager.getClientsByTags(tags.split(","));
//发送消息
demoManager.sendText(authCli.getTag() + ":" + msg, clients);
return "send success.";
}
/**
* 向指定客户端发送消息
*
* @param auth
* @param tags
* @param msg
* @return
*/
@GetMapping("sendToOther")
public String sendToOther(String auth, String tags, String msg) {
DemoClient authCli = demoManager.getClientByTag(auth);
DemoClient[] clients = demoManager.getClientsByTags(tags.split(","));
demoManager.sendTextToOther(authCli.getTag() + ":" + msg, clients);
return "send success.";
}
}
尝试调用:
http://localhost:8088/api/ws/send?auth=chris&tags=chris,bill&msg=hello,chris and bill
正式使用的时候,还需要为下行数据封装一个数据协议,以适应不同的也无需求。
补充:
上述确定上行用http,websocket尽在下行的时候使用。所以有Springboot的服务调用Websocket客户端,没有反向的调用。
加入我们的业务允许上行,比如实时聊天,需要将数据写入数据库,这是就需要调用Springboot的服务。但是直接在ServerEndpoint中装载服务是不行的。我们需要做一点修改。
首先创建一个service,模拟一个简单的逻辑。
package com.chris.modules.service;
import org.springframework.stereotype.Service;
/**
* @author Chris Chan
* Create On 2022/11/23 15:20
* Use for:
* Explain:
*/
@Service
public class DemoService {
public void show(String msg){
System.out.println(msg+" --Springboot服务调用");
}
}
修改DemoClient,把服务作为一个静态成员加入。
创建一个监听器,在spring上下文装载完成之后,把服务的bean传给client。
package com.chris.modules.config;
import com.chris.modules.impl.DemoClient;
import com.chris.modules.service.DemoService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationListener;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.event.ContextRefreshedEvent;
/**
* @author Chris Chan
* Create On 2022/11/23 15:22
* Use for:
* Explain:
*/
@Configuration
public class AppStartedListener implements ApplicationListener<ContextRefreshedEvent> {
@Autowired
DemoService demoService;
@Override
public void onApplicationEvent(ContextRefreshedEvent event) {
DemoClient.setDemoService(demoService);
}
}
这样就可以在有上行数据之后,调用Springboot的服务了。