问题
最近业务上面需要使用到WebSocket长连接来解决某些业务场景。
一图胜千言
注意:这里承担WebSocket服务器的是AWS API Gateway;后面的EC2业务服务,其实都是REST接口服务。
这里主要关注API Gateway和REST业务服务怎么实现API Gateway要求的WebSocket协议。VPCLink,ELB和Auto Scaling不是我们的重点。这里假设这些都已经完成了。
初始化网关
选择【构建】,如下图:
设置网关名称,类似如下图:
全部选择3种路由,如下图:
集成方式暂时选择mock方式:
阶段,随便填个名称:
预览一下,即将创建的空websocket网关:
网关事件
初始化完成部署后,进入网关会出现如下图:
路由选择表达式
$request.body.action:这个表示客户端与网关通过ws方式连接上后,可以向网关发送json数据来触发网关来进行路由调用。这里的action就是被选中路由资源的关键字段而已。类似发送这样:
{
"action": "getList"
}
接下来介绍默认自带的三种路由:
$connect
:表示客户端通过ws方式与网关进行了连接;
$disconnect
:表示ws连接中断;
$default
:如果客户端发送过来的消息不是json方式,默认就这这个路由。
一般来说,前两个都需要集成后台业务程序进行相关处理;最后一个默认根据业务情况来定,我这里不需要业务程序处理。
接下来介绍前2种路由的集成配置。
$connect路由事件
上面是该事件的主体配置。接下来我们先看集成请求是怎么配置的?
集成请求
这里选择vpclink方式对后台业务程序进行集成。这里值得关注的是请求集成模板参数如下:
模板选择表达式:\$default
模板密钥名称:$default
生成模板内容:
{
"connectionId": "$context.connectionId",
"payload": $input.body
}
这样后台业务程序,就能够读取到connectionId和payload,这两个字段。业务程序类似如下:
@PostMapping("/connect")
public R connect(@RequestBody WebsocketConn websocketConn) {
// 记录connectionId
String connectionId = websocketConn.getConnectionId();
log.info(String.format("WebSocket ID: %s 上线", connectionId));
redisRepository.setExpire(String.format("%s:%s", TokenConstant.WEBSOCKET_API_ID_PREFIX, connectionId),
connectionId, TokenConstant.SESSTION_API_TOKEN_EXPIRE);
return R.status(true);
}
这段Java代码主要就是实现connect接口,将aws api gateway的ws连接ID保存到redis里面,然后,返回200给api gateway。
集成响应
这里集成响应配置,主要就是添加响应$default
。
$disconnect路由事件
上图就是disconnect路由事件的主要配置,只要配置一个集成请求就完事了。
集成请求
集成请求事件如上图。
这里选择vpclink方式对后台业务程序进行集成。这里值得关注的是请求集成模板参数如下:
模板选择表达式:\$default
模板密钥名称:$default
生成模板内容:
{
"connectionId": "$context.connectionId"
}
这样后台业务程序,就能够读取到connectionId,这两个字段。业务程序类似如下:
@PostMapping("/disconnect")
public R disconnect(@RequestBody WebsocketConn websocketConn) {
// 移除connectionId
String connectionId = websocketConn.getConnectionId();
log.info(String.format("WebSocket ID: %s 下线", connectionId));
redisRepository.del(String.format("%s:%s", TokenConstant.WEBSOCKET_API_ID_PREFIX, connectionId));
return R.status(true);
}
这段Java代码主要就是实现disconnect接口,将aws api gateway的ws连接时保存的连接ID从redis里面移除,然后,返回200给api gateway。
$default路由事件
以上是default路由配置事件。
集成请求
这里选择mock方式,不需要与后台业务程序集成。这里值得关注的是请求集成模板参数如下:
模板选择表达式:\$default
模板密钥名称:$default
生成模板内容:
{
"statusCode": 200
}
部署网关
将API Gateway部署到阶段里面去。
查看阶段
从阶段可以看到两种URL,第一种就是通过ws协议进行连接到URL,第二种就是操作这个API Gateway的接口。
推送消息(Java SDK)
Maven
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-core</artifactId>
<version>1.12.348</version>
</dependency>
<!-- AWS API Gateway -->
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-apigatewaymanagementapi</artifactId>
<version>1.12.348</version>
</dependency>
ApiGatewayConfig.java(Spring配置类)
package org.xxxx.config;
import com.amazonaws.auth.DefaultAWSCredentialsProviderChain;
import com.amazonaws.client.builder.AwsClientBuilder;
import com.amazonaws.regions.Region;
import com.amazonaws.regions.Regions;
import com.amazonaws.services.apigatewaymanagementapi.AmazonApiGatewayManagementApiAsync;
import com.amazonaws.services.apigatewaymanagementapi.AmazonApiGatewayManagementApiAsyncClientBuilder;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class ApiGatewayConfig {
@Value("${api.gateway.endpoint}")
private String endpoint;
@Value("${region.name:cn-north-1}")
private String regionName;
@Bean
public AmazonApiGatewayManagementApiAsync amazonApiGatewayManagementApi() {
Region region = Regions.getCurrentRegion();
if (region != null){
regionName = region.getName();
}
AwsClientBuilder.EndpointConfiguration endpointConfiguration = new AwsClientBuilder.EndpointConfiguration(endpoint, regionName);
return AmazonApiGatewayManagementApiAsyncClientBuilder.standard()
.withEndpointConfiguration(endpointConfiguration)
.withCredentials(DefaultAWSCredentialsProviderChain.getInstance())
.build();
}
}
注意:这里的EC2运行角色需要设置api gateway调用权限类似:AmazonAPIGatewayInvokeFullAccess,也可以根据情况进行自定义权限设置。
推送消息
@Override
public void pushAllFromMessage(String message) {
Set<String> keys = redisRepository.getListKey(TokenConstant.WEBSOCKET_API_ID_PREFIX);
if (!CollectionUtils.isEmpty(keys)){
for (String key : keys) {
String connectionId = (String) redisRepository.get(key);
PostToConnectionRequest postToConnectionRequest = new PostToConnectionRequest();
postToConnectionRequest.withConnectionId(connectionId);
ByteBuffer byteBuffer = ByteBuffer.wrap(message.getBytes());
byteBuffer = byteBuffer.duplicate();
postToConnectionRequest.withData(byteBuffer);
amazonApiGatewayManagementApi.postToConnectionAsync(postToConnectionRequest);
}
}
}
这里主要使用postToConnectionAsync方法进行消息推送,注意这里的ByteBuffer的使用,是需要调用duplicate方法的。
测试
安装wscat客户端:
npm install -g wscat
使用wscat连接上去,即可接收消息推送了。
wscat -c wss://aabbccddee.execute-api.cn-north-1.amazonaws.com.cn/test
总结
到这里就完成了AWS API Gateway中使用websocket的功能。注意,我们的业务程序不要集成任何websocket库,只用正常的restful方式实现aws要求的那个几个路由事件接口即可。再集成api gateway相关的sdk就可以对websocket连接进行管理了。
参考:
- AWS API Gateway - WebSocket API + EC2 (HTTP & VPC Link & Auth & API Keys & Lambda Authorizer)
- awsapigw-ws-springboot
- AmazonApiGatewayManagementApi
- 使用 IAM 授权
- 使用 wscat 连接到 WebSocket API 并向其发送消息