WebSocket是什么
WebSocket 是基于 TCP 的一种新的应用层网络协议。它实现了浏览器与服务器全双工通信,即允许服务器主动发送信息给客户端。因此,在 WebSocket 中,浏览器和服务器只需要完成一次握手,两者之间就直接可以创建持久性的连接,并进行双向数据传输,客户端和服务器之间的数据交换变得更加简单。
WebSocket的真正美妙之处在于它们总共使用了 1 个 TCP 连接,并且所有通信都是通过这个单一的长寿命 TCP 连接完成的。这大大减少了使用 WebSockets 构建实时应用程序所需的网络开销,因为不需要对 HTTP 端点进行持续轮询。
关键词
:应用层协议、基于 TCP、全双工通信、一次握手、持久连接、双向数据传输
WebSocket与http之间的区别
相同点:
都是一样基于TCP的,都是可靠性传输协议。都是应用层协议。
联系:
WebSocket在建立握手时,数据是通过HTTP传输的。但是建立之后,在真正传输时候是不需要HTTP协议的。
下面一张图说明了 HTTP 与 WebSocket的主要区别:
- WebSocket 是双向通信协议,模拟Socket协议,可以双向发送或接受信息,而HTTP是单向的
- WebSocket 是需要浏览器和服务器握手进行建立连接的,而http是浏览器发起向服务器的连接。
注意:虽然HTTP/2也具备服务器推送功能,但HTTP/2 只能推送静态资源,无法推送指定信息。
WebSocket使用场景
如何建立连接
在 WebSocket 开始通信之前,通信双方需要先进行握手,WebSocket 复用了 HTTP 的握手通道
,即客户端通过 HTTP 请求与 WebSocket 服务端协商升级协议。协议升级完成后,后续的数据交换则遵照 WebSocket 的协议。
利用HTTP完成握手有什么好处
- 让 WebSocket 和 HTTP 基础设备兼容(运行在 80 端口 或 443 端口)
- 可以复用 HTTP 的 Upgrade 机制,完成升级协议的协商过程。
WebSocket连接的过程
- 客户端发起http请求,经过3次握手后,建立起TCP连接;http请求里存放WebSocket支持的版本号等信息,如:Upgrade、Connection、WebSocket-Version等
- 服务器收到客户端的握手请求后,同样采用HTTP协议回馈数据
- 客户端收到连接成功的消息后,开始借助于TCP传输信道进行全双工通信。
如何维持连接
如果我们使用 WebSocket 进行通信,建立连接之后怎么判断连接正常没有断开或者服务是否可用
可以通过建立心跳机制
,所谓心跳机制,就是定时发送一个数据包,让对方知道自己在线且正常工作,确保通信有效。如果对方无法响应,便可以弃用旧连接,发起新的连接了。
需要重连的场景可能包括:网络问题或者机器故障导致连接断开、连接没断但不可用了或者连接对端的服务不可用了等等。
实时通信的基本步骤
-
引入
gorilla/websocket
库:首先需要安装这个库,可以通过go get
命令安装:go get github.com/gorilla/websocket
-
设置WebSocket升级处理:使用
gorilla/websocket
的Upgrader
结构来升级HTTP连接为WebSocket连接。可以设置读取和写入缓冲区大小,以及检查请求来源的函数。 -
创建WebSocket处理函数:编写一个处理函数,该函数使用
Upgrader
来升级连接,并处理WebSocket消息的接收和发送。 -
注册WebSocket路由:在Gin的路由中注册一个路径,当客户端请求这个路径时,调用上面创建的WebSocket处理函数。
-
编写业务逻辑:在WebSocket处理函数中,编写业务逻辑,例如接收消息、发送消息、连接管理等。
-
测试WebSocket服务:启动服务后,可以使用WebSocket客户端工具(如浏览器的开发者工具或专用的WebSocket测试工具)来测试WebSocket服务是否能够正常通信。
package main
import (
"github.com/gin-gonic/gin"
"github.com/gorilla/websocket"
"net/http"
)
// 定义Upgrader用于升级HTTP连接为WebSocket连接
var upgrader = websocket.Upgrader{
ReadBufferSize: 1024,
WriteBufferSize: 1024,
CheckOrigin: func(r *http.Request) bool {
// 允许所有CORS请求
return true
},
}
func main() {
r := gin.Default()
// 定义WebSocket路由
r.GET("/ws", func(c *gin.Context) {
serveWebSocket(c.Writer, c.Request)
})
// 启动Gin服务器
r.Run(":8080")
}
// serveWebSocket处理WebSocket连接
func serveWebSocket(w http.ResponseWriter, r *http.Request) {
conn, err := upgrader.Upgrade(w, r, nil)
if err != nil {
return // 错误处理
}
defer conn.Close()
// 处理WebSocket消息
for {
_, message, err := conn.ReadMessage()
if err != nil {
return // 错误处理
}
// 将接收到的消息发送回客户端
if err := conn.WriteMessage(websocket.TextMessage, message); err != nil {
return // 错误处理
}
}
}
在这个示例中,我们创建了一个WebSocket服务端,它监听/ws
路径的WebSocket请求。当客户端连接时,服务器将使用serveWebSocket
函数来处理连接。服务器接收消息并直接将相同消息发送回客户端,这是一个基本的WebSocket回声服务器的实现。
hello world 示例
package main
import (
"github.com/gin-gonic/gin"
"github.com/gorilla/websocket"
"log"
"net/http"
"time"
)
var upgrader = websocket.Upgrader{
// 这个是校验请求来源
// 在这里我们不做校验,直接return true
CheckOrigin: func(r *http.Request) bool {
return true
},
}
func main() {
engine := gin.Default()
engine.GET("/helloWebSocket", func(context *gin.Context) {
// 将普通的http GET请求升级为websocket请求
client, _ := upgrader.Upgrade(context.Writer, context.Request, nil)
for {
// 每隔两秒给前端推送一句消息“hello, WebSocket”
err := client.WriteMessage(websocket.TextMessage, []byte("hello, WebSocket"))
if err != nil {
log.Println(err)
}
time.Sleep(time.Second * 2)
}
})
err := engine.Run(":8090")
if err != nil {
log.Fatalln(err)
}
}
可以用websocket在线测试工具代码:http://coolaf.com/tool/chattest。
注意:请求url前面的http://记得换成ws://
实现实时消息推送 示例
package module
import (
"github.com/gin-gonic/gin"
"github.com/gorilla/websocket"
"log"
"net/http"
"sync"
"time"
)
var (
// 消息通道
news = make(map[string]chan interface{})
// websocket客户端链接池
client = make(map[string]*websocket.Conn)
// 互斥锁,防止程序对统一资源同时进行读写
mux sync.Mutex
)
// api:/getPushNews接口处理函数
func GetPushNews(context *gin.Context) {
id := context.Query("userId")
log.Println(id + "websocket链接")
// 升级为websocket长链接
WsHandler(context.Writer, context.Request, id)
}
// api:/deleteClient接口处理函数
func DeleteClient(context *gin.Context) {
id := context.Param("id")
// 关闭websocket链接
conn, exist := getClient(id)
if exist {
conn.Close()
deleteClient(id)
} else {
context.JSON(http.StatusOK, gin.H{
"mesg": "未找到该客户端",
})
}
// 关闭其消息通道
_, exist =getNewsChannel(id)
if exist {
deleteNewsChannel(id)
}
}
// websocket Upgrader
var wsupgrader = websocket.Upgrader{
ReadBufferSize: 1024,
WriteBufferSize: 1024,
HandshakeTimeout: 5 * time.Second,
// 取消ws跨域校验
CheckOrigin: func(r *http.Request) bool {
return true
},
}
// WsHandler 处理ws请求
func WsHandler(w http.ResponseWriter, r *http.Request, id string) {
var conn *websocket.Conn
var err error
var exist bool
// 创建一个定时器用于服务端心跳
pingTicker := time.NewTicker(time.Second * 10)
conn, err = wsupgrader.Upgrade(w, r, nil)
if err != nil {
log.Println(err)
return
}
// 把与客户端的链接添加到客户端链接池中
addClient(id, conn)
// 获取该客户端的消息通道
m, exist := getNewsChannel(id)
if !exist {
m = make(chan interface{})
addNewsChannel(id, m)
}
// 设置客户端关闭ws链接回调函数
conn.SetCloseHandler(func(code int, text string) error {
deleteClient(id)
log.Println(code)
return nil
})
for {
select {
case content, _ := <- m:
// 从消息通道接收消息,然后推送给前端
err = conn.WriteJSON(content)
if err != nil {
log.Println(err)
conn.Close()
deleteClient(id)
return
}
case <- pingTicker.C:
// 服务端心跳:每20秒ping一次客户端,查看其是否在线
conn.SetWriteDeadline(time.Now().Add(time.Second * 20))
err = conn.WriteMessage(websocket.PingMessage, []byte{})
if err != nil {
log.Println("send ping err:", err)
conn.Close()
deleteClient(id)
return
}
}
}
}
// 将客户端添加到客户端链接池
func addClient(id string, conn *websocket.Conn) {
mux.Lock()
client[id] = conn
mux.Unlock()
}
// 获取指定客户端链接
func getClient(id string) (conn *websocket.Conn, exist bool) {
mux.Lock()
conn, exist = client[id]
mux.Unlock()
return
}
// 删除客户端链接
func deleteClient(id string) {
mux.Lock()
delete(client, id)
log.Println(id + "websocket退出")
mux.Unlock()
}
// 添加用户消息通道
func addNewsChannel(id string, m chan interface{}) {
mux.Lock()
news[id] = m
mux.Unlock()
}
// 获取指定用户消息通道
func getNewsChannel(id string) (m chan interface{}, exist bool) {
mux.Lock()
m, exist = news[id]
mux.Unlock()
return
}
// 删除指定消息通道
func deleteNewsChannel(id string) {
mux.Lock()
if m, ok := news[id]; ok {
close(m)
delete(news, id)
}
mux.Unlock()
}
补充说明
当你要给某个用户推送消息时,你只需要使用getNewsChannel()方法获取该用户的消息通道,然后把消息送入通道就可以了。
若用户离线,你可以把消息直接存到用户所有消息中,或者设置一个消息队列,把消息放到用户未读消息队列中,下次用户上线时再一次性推送给用户。
服务端心跳:服务端每隔20秒回ping一下用户,查看其是否还在线,若ping不到,则服务端自动关闭websocket链接。