使用AWS的API Gateway实现websocket

news2024/11/25 2:32:50

问题

最近业务上面需要使用到WebSocket长连接来解决某些业务场景。

一图胜千言

AWS API Gateway WebSocket
注意:这里承担WebSocket服务器的是AWS API Gateway;后面的EC2业务服务,其实都是REST接口服务。
这里主要关注API Gateway和REST业务服务怎么实现API Gateway要求的WebSocket协议。VPCLink,ELB和Auto Scaling不是我们的重点。这里假设这些都已经完成了。

初始化网关

选择【构建】,如下图:
websocket构建
设置网关名称,类似如下图:
设置网关名称
全部选择3种路由,如下图:
网关路由选择
集成方式暂时选择mock方式:
网关mock集成
阶段,随便填个名称:
网关阶段
预览一下,即将创建的空websocket网关:
websocket网关预览

网关事件

初始化完成部署后,进入网关会出现如下图:
初始化网关3种路由

路由选择表达式

$request.body.action:这个表示客户端与网关通过ws方式连接上后,可以向网关发送json数据来触发网关来进行路由调用。这里的action就是被选中路由资源的关键字段而已。类似发送这样:

{
	"action": "getList"
}

接下来介绍默认自带的三种路由:
$connect:表示客户端通过ws方式与网关进行了连接;
$disconnect:表示ws连接中断;
$default:如果客户端发送过来的消息不是json方式,默认就这这个路由。
一般来说,前两个都需要集成后台业务程序进行相关处理;最后一个默认根据业务情况来定,我这里不需要业务程序处理。
接下来介绍前2种路由的集成配置。

$connect路由事件

connect路由配置
上面是该事件的主体配置。接下来我们先看集成请求是怎么配置的?

集成请求

connect集成请求配置
这里选择vpclink方式对后台业务程序进行集成。这里值得关注的是请求集成模板参数如下:
模板选择表达式:\$default
模板密钥名称:$default
生成模板内容:

{
  "connectionId": "$context.connectionId",
  "payload": $input.body
}

这样后台业务程序,就能够读取到connectionId和payload,这两个字段。业务程序类似如下:

   @PostMapping("/connect")
    public R connect(@RequestBody WebsocketConn websocketConn) {
        // 记录connectionId
        String connectionId = websocketConn.getConnectionId();
        log.info(String.format("WebSocket ID: %s 上线", connectionId));
        redisRepository.setExpire(String.format("%s:%s", TokenConstant.WEBSOCKET_API_ID_PREFIX, connectionId),
                connectionId, TokenConstant.SESSTION_API_TOKEN_EXPIRE);
        return R.status(true);
    }

这段Java代码主要就是实现connect接口,将aws api gateway的ws连接ID保存到redis里面,然后,返回200给api gateway。

集成响应

集成响应配置
这里集成响应配置,主要就是添加响应$default

$disconnect路由事件

disconnect事件主要配置
上图就是disconnect路由事件的主要配置,只要配置一个集成请求就完事了。

集成请求

集成请求
集成请求事件如上图。
这里选择vpclink方式对后台业务程序进行集成。这里值得关注的是请求集成模板参数如下:
模板选择表达式:\$default
模板密钥名称:$default
生成模板内容:

{
  "connectionId": "$context.connectionId"
}

这样后台业务程序,就能够读取到connectionId,这两个字段。业务程序类似如下:

   @PostMapping("/disconnect")
    public R disconnect(@RequestBody WebsocketConn websocketConn) {
        // 移除connectionId
        String connectionId = websocketConn.getConnectionId();
        log.info(String.format("WebSocket ID: %s 下线", connectionId));
        redisRepository.del(String.format("%s:%s", TokenConstant.WEBSOCKET_API_ID_PREFIX, connectionId));
        return R.status(true);
    }

这段Java代码主要就是实现disconnect接口,将aws api gateway的ws连接时保存的连接ID从redis里面移除,然后,返回200给api gateway。

$default路由事件

default路由配置
以上是default路由配置事件。

集成请求

集成请求配置
这里选择mock方式,不需要与后台业务程序集成。这里值得关注的是请求集成模板参数如下:
模板选择表达式:\$default
模板密钥名称:$default
生成模板内容:

{
  "statusCode": 200
}

部署网关

部署网关
将API Gateway部署到阶段里面去。

查看阶段

阶段
从阶段可以看到两种URL,第一种就是通过ws协议进行连接到URL,第二种就是操作这个API Gateway的接口。

推送消息(Java SDK)

Maven

<dependency>
    <groupId>com.amazonaws</groupId>
    <artifactId>aws-java-sdk-core</artifactId>
    <version>1.12.348</version>
</dependency>

<!-- AWS API Gateway -->
<dependency>
    <groupId>com.amazonaws</groupId>
    <artifactId>aws-java-sdk-apigatewaymanagementapi</artifactId>
    <version>1.12.348</version>
</dependency>

ApiGatewayConfig.java(Spring配置类)

package org.xxxx.config;

import com.amazonaws.auth.DefaultAWSCredentialsProviderChain;
import com.amazonaws.client.builder.AwsClientBuilder;
import com.amazonaws.regions.Region;
import com.amazonaws.regions.Regions;
import com.amazonaws.services.apigatewaymanagementapi.AmazonApiGatewayManagementApiAsync;
import com.amazonaws.services.apigatewaymanagementapi.AmazonApiGatewayManagementApiAsyncClientBuilder;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class ApiGatewayConfig {
    @Value("${api.gateway.endpoint}")
    private String endpoint;

    @Value("${region.name:cn-north-1}")
    private String regionName;
    @Bean
    public AmazonApiGatewayManagementApiAsync amazonApiGatewayManagementApi() {
        Region region = Regions.getCurrentRegion();
        if (region != null){
            regionName = region.getName();
        }
        AwsClientBuilder.EndpointConfiguration endpointConfiguration = new AwsClientBuilder.EndpointConfiguration(endpoint, regionName);
        return AmazonApiGatewayManagementApiAsyncClientBuilder.standard()
                .withEndpointConfiguration(endpointConfiguration)
                .withCredentials(DefaultAWSCredentialsProviderChain.getInstance())
                .build();
    }
}

注意:这里的EC2运行角色需要设置api gateway调用权限类似:AmazonAPIGatewayInvokeFullAccess,也可以根据情况进行自定义权限设置。

推送消息

    @Override
    public void pushAllFromMessage(String message) {
        Set<String> keys = redisRepository.getListKey(TokenConstant.WEBSOCKET_API_ID_PREFIX);
        if (!CollectionUtils.isEmpty(keys)){
            for (String key : keys) {
                String connectionId = (String) redisRepository.get(key);
                PostToConnectionRequest postToConnectionRequest = new PostToConnectionRequest();
                postToConnectionRequest.withConnectionId(connectionId);
                ByteBuffer byteBuffer = ByteBuffer.wrap(message.getBytes());
                byteBuffer = byteBuffer.duplicate();
                postToConnectionRequest.withData(byteBuffer);
                amazonApiGatewayManagementApi.postToConnectionAsync(postToConnectionRequest);
            }
        }
    }

这里主要使用postToConnectionAsync方法进行消息推送,注意这里的ByteBuffer的使用,是需要调用duplicate方法的。

测试

安装wscat客户端:

npm install -g wscat

使用wscat连接上去,即可接收消息推送了。

wscat -c wss://aabbccddee.execute-api.cn-north-1.amazonaws.com.cn/test

总结

到这里就完成了AWS API Gateway中使用websocket的功能。注意,我们的业务程序不要集成任何websocket库,只用正常的restful方式实现aws要求的那个几个路由事件接口即可。再集成api gateway相关的sdk就可以对websocket连接进行管理了。

参考:

  • AWS API Gateway - WebSocket API + EC2 (HTTP & VPC Link & Auth & API Keys & Lambda Authorizer)
  • awsapigw-ws-springboot
  • AmazonApiGatewayManagementApi
  • 使用 IAM 授权
  • 使用 wscat 连接到 WebSocket API 并向其发送消息

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/43085.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Mysql事务机制

目录 一&#xff1a;定义 二&#xff1a;事务的特质 三&#xff1a;检测ACID特性 1. 准备工作. 2. 测试原子性和持久性 case1&#xff1a; 模拟原子性的全部失败 case2&#xff1a;模拟原子性的全部成功 case3&#xff1a;检查 持久性。 3. 测试一致性 case1&#xff…

[附源码]计算机毕业设计springboot餐馆点餐管理系统

项目运行 环境配置&#xff1a; Jdk1.8 Tomcat7.0 Mysql HBuilderX&#xff08;Webstorm也行&#xff09; Eclispe&#xff08;IntelliJ IDEA,Eclispe,MyEclispe,Sts都支持&#xff09;。 项目技术&#xff1a; SSM mybatis Maven Vue 等等组成&#xff0c;B/S模式 M…

马尔可夫链

目录 1.相关概念 2.马尔可夫链的状态概率分布推演及稳态分布 3.马尔可夫链的应用 4.稳态分布性质 1.相关概念 小明同学每日选择早餐的概率转化如下图所示&#xff1a; 并且当日的选择只受前一日的结果以及对应的转移概率影响&#xff0c;与之前的选择无关。———> …

Spring Cloud Alibaba 容器化部署最佳实践 | 本地部署版本 | Seata服务端组件安装

Seata 是一款开源的分布式事务解决方案&#xff0c;致力于提供高性能和简单易用的分布式事务服务。Seata 将为用户提供了 AT、TCC、SAGA 和 XA 事务模式&#xff0c;为用户打造一站式的分布式解决方案。 下载地址 https://github.com/seata/seata/releases Seata安装-修改配…

PyQt5 信号(Signal)与槽(Slot)

PyQt5 信号与槽信号与槽介绍内置信号与槽的使用自定义信号与槽的使用自定义信号和内置槽函数自定义信号和自定义槽函数自定义有参信号使用自定义信号参数装饰器信号与槽信号与槽的断开和连接多线程中信号与槽的使用信号与槽介绍 信号(Signal)与槽(Slot)是Qt中的核心机制&#…

基于Spring Boot的个人博客系统(源码+数据库)

目录 一、系统功能框架图 二、开发技术 三、开发环境 四、页面展示 1.登录页面 2.首页 3.文章详情页面 4.文章评论页面 ​5.后台页面 6.后台文件编辑页面 ​7.后台文章管理列表页面 五、文件组织结构 六、数据库设计 1. 文章详情表t_article 2.文章评论表t_comm…

Elasticsearch:构建地图以按国家或地区比较指标

标如果你不熟悉 Elastic 地图&#xff0c;本教程是一个不错的起点。 它会指导你完成处理位置数据的常见步骤。在完成本教程后&#xff0c;你将学会: 创建具有多个图层和数据源的地图使用符号、颜色和标签来设置数据值的样式在仪表板中嵌入地图在仪表板中跨仪表盘搜索 完成本教…

【Scala专栏】数据类型、变量常量、类和对象

本文内容主要分为3节&#xff0c;依次讲解&#xff1a;Scala的数据类型有哪些&#xff1f; 变量常量如何使用&#xff1f; 类和对象如何理解&#xff1f; 受限于博主的大脑容量&#xff0c;大概是无法做到事无巨细的&#xff0c;不过其实也没必要那么"细"&#xff0c…

Java核心技术卷Ⅰ-第三章Java的基本程序设计结构

重点 1.数据类型 2.运算符 3.字符串 4.控制流程 5.数组 1.数据类型 整型&#xff1a;Java程序必须保证在所有机器上都能得到相同的运行结果&#xff0c;所以各种数据类型的取值范围是固定的&#xff1b;在C/C中&#xff0c;int和long类型的大小与目标平台相关 类型存储需求取值…

基于粒子群算法的线性规划问题求解matlab程序

基于粒子群算法的线性规划问题求解matlab程序 1 基本粒子群算法流程 粒子群算法基于“种群”和“进化”的概念&#xff0c;通过个体间的协作与竞争&#xff0c;实现复杂空间最优解的搜索&#xff0c;其流程如下&#xff1a; &#xff08;1&#xff09;初始化粒子群&#xff…

Apache Maven

Apache Maven简介安装Eclipse中安装内置的Maven插件Maven官网下载&#xff0c;直接安装在电脑上Maven安装目录结构bin目录boot目录conf目录lib目录Maven生命周期与命令Maven生命周期clean&#xff1a;清理cleanup&#xff1a;清理所有default&#xff1a;默认site&#xff1a;站…

做数据集增强时,训练一半出现IndexError: tuple index out of range这种错误,不知道怎么改,有神仙赐教一下嘛?

在用YOLOv5做图像训练时&#xff0c;首先做了数据集的增强&#xff0c;但是增强中出现了如下的错误 首先出现这样的警告 (A:/stdy py37-g/agu_img.py:153: DeprecationWarning: An exception was ignored while fetching the attribute __array_interface__ from an object of …

maven部署方案之分离业务包

一、思想&#xff1a; 通过将业务包和公共包分离&#xff0c;集中管理所有包&#xff0c;打包时只构建业务包减少项目包的大小和传输时间。 为了观测稳定性&#xff0c;暂通过环境区分&#xff0c;较为频繁的联调环境采用该方式&#xff0c;测试、预发、正式暂保持一体化打包…

golang实现andflow流程引擎

1、andflow引擎 andflow_js可以实现在Html端设计流程&#xff0c;并将设计结果保存为json模型&#xff0c;andflow可以用于设计业务流程、数据处理流程、工作流、控制流等一切可流程化的过程。 由于golang具备高效、跨平台、并且还能够直接编译成可执行文件&#xff0c;这些优…

selenium 找不到元素:Unable to find element on closed window

浏览器&#xff1a;IE 报错信息&#xff1a; Unable to find element on closed window Unable to get browser 过程&#xff1a;登录》跳转页面&#xff08;同窗口&#xff09;》点击备份按钮 已知代码没有改过&#xff0c;而且部署到多个机子上&#xff0c;很多机子没有问…

C++11(一)

&#x1f9f8;&#x1f9f8;&#x1f9f8;各位大佬大家好&#xff0c;我是猪皮兄弟&#x1f9f8;&#x1f9f8;&#x1f9f8; 文章目录一、列表初始化initializer_list二、声明1.auto2.decltype3.nullptr三、C11 STL中的变化1.array2.forward_list3.STL其他变化四、C关键字新功…

NOIP 装箱问题

题目&#xff1a;[NOIP2001]装箱问题 &#xff0c;哈哈&#xff0c;我们今天来看一道很古老的题嘛&#xff0c;这是选自NOIP上的一道题&#xff0c;好了&#xff0c;我们一起来看看题意吧&#xff1a; 考虑到直接复制题目&#xff0c;或者截屏的方式不是很方便阅读&#xff0…

常见的网络协议

目录 一、TCP/IP协议簇 二、网络设备与五层模型对应关系&#xff1a; 三、常用网络协议总结&#xff08;TCP/IP协议簇&#xff09; 四、应用层服务协议 五、传输层协议组 TCP_UDP 六、网络层协议 IP_ICMP_ARP 七、物理层协议 MAC子层协议 一、TCP/IP协议簇 OSI七层模型…

视频怎么转换为音频文件?快来掌握这几种方式

大家平时在下载网课资源进行学习的时候&#xff0c;看久了眼睛也会开始疲劳&#xff0c;而且有些视频的画面看起来很枯燥。其实我们可以使用一些软件把视频中的音频分离出来&#xff0c;直接收听音频也可以学到知识&#xff0c;而且我们还可以处理其他的事情&#xff0c;是不是…

《计算机体系结构量化研究方法第六版》1.5 集成电路中的功耗和能耗趋势

1.5.1 功耗和能耗&#xff1a;系统视角 Q1&#xff1a;处理器需要的最大功耗是多少&#xff1f; 如果处理器的预期功耗大于电源系统能够提供的功耗&#xff08;试图汲取的电流大于电源系统可以提供的电流&#xff09;&#xff0c;通常会导致电压下降而让器件无法工作。在峰值…