业务需求
公司IM服务主要基于netty实现websocket,为保证在线用户channel通道畅通故一直使用单机运行。现由于公司业务增加需要增加IM集群,由于channel通道不能缓存,故急需一套可以完整兼容之前功能的方案。
技术选型
1、采用spring websocket方案,发送进行MQ广播,各个IM服务节点收到广播进行业务处理。
该种方式需要将原始功能重做,而且广播消息会增加系统额外开销,实现难度不大,开发成本较高。
2、gateway自定义负载均衡,当接收到ws消息直接根据用户ID进行路由。
该方式可以完美兼容原始功能,原始功能采用netty进行开发websocket,实现难度简单,开发成本低。
结论:为完美兼容老系统功能,且节省开发成本,本次迭代采用第二种方案。
开发原理
1、websocket 注册、心跳、发消息都会调用ws路径,且路径带有发送人ID、接收人ID,如果是注册消息只有发送人,对聊消息会存在接收人,心跳连接也会有发送人。故可以通过对 人员ID % 服务数量 将满足规则的ws请求路由到指定的服务上。
2、spring gateway 网关大家都应该不陌生,我们查看源码发现底层采用负载均衡过滤器实现的负载。
源码如下:
其中有一个受保护的方法用来选择路由到哪个服务提供者。
源码如下:
查看该方法我们可以看见内部调用了ribbon,ribbon根据配置的策略进行服务选择。
源码如下:
所以,我们可以覆写该选择服务的方法,如果是ws 请求直接进行自定义选择服务,其他请求放行执行默认逻辑。
小试牛刀
我们只是简单模拟ws 请求根据请求人员ID进行负载均衡,当然并没有考虑服务宕机如何切换路由等问题。如有需要建议使用hash环来解决这一问题,感兴趣的同学可以考虑使用hash环保存服务提供者信息。
1、提供自定义负载均衡过滤器类,该类继承默认的负责均衡过滤器
/**
* 自定义负载均衡
* @author senfel
* @version 1.0
* @date 2023/1/9 15:39
*/
public class ILoadBalancerClientFilter extends LoadBalancerClientFilter {
/**
* 发送人ID
*/
public static final String SENDER_ID = "senderId";
/***
* 收件人ID
**/
public static final String RECEIVER_ID = "receiverId";
/**
* 发现客户端
*/
private DiscoveryClient discoveryClient;
public ILoadBalancerClientFilter(LoadBalancerClient loadBalancer,DiscoveryClient discoveryClient) {
super(loadBalancer);
this.discoveryClient = discoveryClient;
}
/**
* 自定义服务实体选择
* @param exchange
* @author senfel
* @date 2023/1/9 15:44
* @return org.springframework.cloud.client.ServiceInstance
*/
protected ServiceInstance choose(ServerWebExchange exchange) {
//获取请求url
URI requestUrl = (URI) exchange.getAttribute(GATEWAY_REQUEST_URL_ATTR);
//获取host
String serverId = requestUrl.getHost();
//请求path
String path = requestUrl.getPath();
//如果请求包含
if(path.contains("/ws")){
List<ServiceInstance> instances = discoveryClient.getInstances(serverId);
if(!CollectionUtils.isEmpty(instances)){
//获取请求参数
MultiValueMap<String, String> queryParams = exchange.getRequest().getQueryParams();
Long sendId = null;
Long receiverId = null;
if(Objects.nonNull(queryParams.get(SENDER_ID))){
sendId = Long.valueOf(queryParams.get(SENDER_ID).get(0));
}
if(Objects.nonNull(queryParams.get(RECEIVER_ID))){
receiverId = Long.valueOf(queryParams.get(RECEIVER_ID).get(0));
}
int index = 0;
if(null != sendId){
index = (int) (sendId % instances.size());
}
if(null != receiverId){
index = (int) (receiverId % instances.size());
}
return instances.get(index);
}
}
return loadBalancer.choose(((URI) exchange.getAttribute(GATEWAY_REQUEST_URL_ATTR)).getHost());
}
}
2、将自定义的负载均衡过滤器注入容器
/**
* 负载均衡过滤器载入
* @author senfel
* @version 1.0
* @date 2023/1/9 16:03
*/
@Configuration
public class LoadLalancerConfig {
@Bean
public LoadBalancerClientFilter loadBalancerClientFilter(LoadBalancerClient loadBalancer, DiscoveryClient discoveryClient){
return new ILoadBalancerClientFilter(loadBalancer,discoveryClient);
}
}
3、测试不同的用户进入系统注册websocket,查看路由信息
查看IM服务提供信息:
im-10-10-17-126-7009 为0号服务
im-10-10-22-174-7009 为1号服务
测试预计:
530686546882331052 % 2 = 0 应路由至0号服务
530686546882331635 % 2 = 1 应路由至 1号服务
测试结果:
用户 530686546882331052 路由到0号服务:
用户 530686546882331635 路由到1号服务