WebSocket 实现指南
目录
1. 依赖安装
1.1 安装必要的包
# 安装 gorilla/websocket
go get github.com/gorilla/websocket
# 安装 gin 框架
go get github.com/gin-gonic/gin
1.2 更新 go.mod
require (
github.com/gin-gonic/gin v1.9.1
github.com/gorilla/websocket v1.5.3
)
1.3 配置文件
1.3.1Config.go
package config
import (
"os"
"gopkg.in/yaml.v3"
)
type Config struct {
Server ServerConfig `yaml:"server"`
MySQL MySQLConfig `yaml:"mysql"`
Redis RedisConfig `yaml:"redis"`
JWT JWTConfig `yaml:"jwt"`
WebSocket WebSocketConfig `yaml:"websocket"`
}
type ServerConfig struct {
Port int `yaml:"port"`
Mode string `yaml:"mode"`
}
type MySQLConfig struct {
Host string `yaml:"host"`
Port int `yaml:"port"`
User string `yaml:"user"`
Password string `yaml:"password"`
DBName string `yaml:"dbname"`
}
type RedisConfig struct {
Host string `yaml:"host"`
Port int `yaml:"port"`
Password string `yaml:"password"`
DB int `yaml:"db"`
}
type JWTConfig struct {
Secret string `yaml:"secret"`
Expire int `yaml:"expire"` // token过期时间(小时)
}
type WebSocketConfig struct {
ReadBufferSize int `yaml:"read_buffer_size"`
WriteBufferSize int `yaml:"write_buffer_size"`
MaxMessageSize int64 `yaml:"max_message_size"`
WriteWait int `yaml:"write_wait"`
PongWait int `yaml:"pong_wait"`
PingPeriod int `yaml:"ping_period"`
MaxConnections int `yaml:"max_connections"`
}
var GlobalConfig Config
func Init() error {
// 确保日志目录存在
os.MkdirAll("logs", 0755)
file, err := os.ReadFile("config/config.yaml")
if err != nil {
return err
}
return yaml.Unmarshal(file, &GlobalConfig)
}
1.3.2 config.yaml
server:
port: 8080
mode: debug # debug or release
websocket:
read_buffer_size: 1024
write_buffer_size: 1024
max_message_size: 512
write_wait: 10 # seconds
pong_wait: 60 # seconds
ping_period: 54 # seconds
max_connections: 5000
mysql:
host: localhost
port: 3306
user: root
password: "123456"
dbname: my_kingdom
redis:
host: localhost
port: 6379
password: "123456"
db: 0
jwt:
secret: "XueZhimin"
expire: 24 # hours
2. 代码实现
2.1 WebSocket管理器 (manager.go)
package websocket
import (
"encoding/json"
"fmt"
"mykingdom/config"
"net/http"
"sync"
"github.com/gin-gonic/gin"
"github.com/gorilla/websocket"
)
// Manager WebSocket管理器
type Manager struct {
clients sync.Map // 所有客户端连接
broadcast chan []byte // 广播消息通道
config *config.WebSocketConfig
messages chan Message // 新增:消息处理通道
}
// Message 定义消息结构
type Message struct {
Type string `json:"type"`
Data interface{} `json:"data"`
ClientID string `json:"-"` // 发送者的连接ID
}
// 配置websocket upgrader
var upgrader = websocket.Upgrader{
ReadBufferSize: 1024,
WriteBufferSize: 1024,
CheckOrigin: func(r *http.Request) bool {
return true // 允许所有跨域请求
},
}
// NewManager 创建WebSocket管理器
func NewManager(config *config.WebSocketConfig) *Manager {
return &Manager{
broadcast: make(chan []byte),
messages: make(chan Message, 256), // 新增:初始化消息通道
config: config,
}
}
// HandleWebSocket 处理WebSocket连接
func (m *Manager) HandleWebSocket() gin.HandlerFunc {
return func(c *gin.Context) {
// 检查连接数限制
count := 0
m.clients.Range(func(_, _ interface{}) bool {
count++
return true
})
if count >= m.config.MaxConnections {
c.JSON(http.StatusServiceUnavailable, gin.H{
"message": "达到最大连接数限制",
})
return
}
conn, done := Upgrade(c)
if done {
return
}
// 创建新的客户端连接
client := &Client{
conn: conn,
manager: m,
send: make(chan []byte, 256),
}
// 存储客户端连接
m.clients.Store(client.conn.RemoteAddr().String(), client)
// 启动读写协程
go client.readPump()
go client.writePump()
}
}
// Upgrade 升级HTTP连接为WebSocket连接
func Upgrade(c *gin.Context) (*websocket.Conn, bool) {
conn, err := upgrader.Upgrade(c.Writer, c.Request, nil)
if err != nil {
return nil, true
}
return conn, false
}
// Broadcast 广播消息给所有客户端
func (m *Manager) Broadcast(message []byte) {
m.clients.Range(func(_, value interface{}) bool {
if client, ok := value.(*Client); ok {
select {
case client.send <- message:
default:
m.clients.Delete(client.conn.RemoteAddr().String())
close(client.send)
}
}
return true
})
}
// SendToClient 发送消息给指定客户端
func (m *Manager) SendToClient(clientAddr string, message []byte) bool {
if value, ok := m.clients.Load(clientAddr); ok {
if client, ok := value.(*Client); ok {
client.send <- message
return true
}
}
return false
}
// SendMessage 发送任意类型的消息
func (m *Manager) SendMessage(messageType string, data interface{}) error {
message := Message{
Type: messageType,
Data: data,
}
// 将消息转换为JSON
jsonData, err := json.Marshal(message)
if err != nil {
return fmt.Errorf("marshal message failed: %v", err)
}
// 广播消息
m.Broadcast(jsonData)
return nil
}
// SendMessageToClient 发送消息给指定客户端
func (m *Manager) SendMessageToClient(clientAddr string, messageType string, data interface{}) error {
message := Message{
Type: messageType,
Data: data,
}
// 将消息转换为JSON
jsonData, err := json.Marshal(message)
if err != nil {
return fmt.Errorf("marshal message failed: %v", err)
}
// 发送给指定客户端
if !m.SendToClient(clientAddr, jsonData) {
return fmt.Errorf("client not found: %s", clientAddr)
}
return nil
}
// GetMessages 获取消息通道,用于读取消息
func (m *Manager) GetMessages() <-chan Message {
return m.messages
}
// ReadMessage 读取消息
func ReadMessage(conn *websocket.Conn) ([]byte, error) {
_, message, err := conn.ReadMessage()
if err != nil {
return nil, fmt.Errorf("read message failed: %v", err)
}
return message, nil
}
// WriteMessage 发送消息
func WriteMessage(conn *websocket.Conn, message []byte) error {
err := conn.WriteMessage(websocket.TextMessage, message)
if err != nil {
return fmt.Errorf("write message failed: %v", err)
}
return nil
}
// WriteJSON 发送JSON消息
func WriteJSON(conn *websocket.Conn, v interface{}) error {
err := conn.WriteJSON(v)
if err != nil {
return fmt.Errorf("write json failed: %v", err)
}
return nil
}
2.2 客户端实现 (client.go)
package websocket
import (
"encoding/json"
"log"
"time"
"github.com/gorilla/websocket"
)
const (
writeWait = 10 * time.Second
pongWait = 60 * time.Second
pingPeriod = (pongWait * 9) / 10
)
// Client WebSocket客户端
type Client struct {
conn *websocket.Conn
manager *Manager
send chan []byte
}
// readPump 从WebSocket连接读取消息
func (c *Client) readPump() {
defer func() {
c.manager.clients.Delete(c.conn.RemoteAddr().String())
c.conn.Close()
}()
c.conn.SetReadDeadline(time.Now().Add(pongWait))
c.conn.SetPongHandler(func(string) error {
c.conn.SetReadDeadline(time.Now().Add(pongWait))
return nil
})
for {
_, message, err := c.conn.ReadMessage()
if err != nil {
if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) {
log.Printf("error: %v", err)
}
break
}
// 尝试解析消息为Message结构
var msg Message
if err := json.Unmarshal(message, &msg); err != nil {
log.Printf("unmarshal message failed: %v", err)
continue
}
// 设置发送者ID
msg.ClientID = c.conn.RemoteAddr().String()
// 将消息发送到消息通道
c.manager.messages <- msg
// 广播消息给所有客户端
c.manager.Broadcast(message)
}
}
// writePump 向WebSocket连接写入消息
func (c *Client) writePump() {
ticker := time.NewTicker(pingPeriod)
defer func() {
ticker.Stop()
c.conn.Close()
}()
for {
select {
case message, ok := <-c.send:
c.conn.SetWriteDeadline(time.Now().Add(writeWait))
if !ok {
c.conn.WriteMessage(websocket.CloseMessage, []byte{})
return
}
w, err := c.conn.NextWriter(websocket.TextMessage)
if err != nil {
return
}
w.Write(message)
if err := w.Close(); err != nil {
return
}
case <-ticker.C:
c.conn.SetWriteDeadline(time.Now().Add(writeWait))
if err := c.conn.WriteMessage(websocket.PingMessage, nil); err != nil {
return
}
}
}
}
3. 配置说明
3.1 WebSocket配置 (config.yaml)
websocket:
read_buffer_size: 1024 # 读缓冲区大小
write_buffer_size: 1024 # 写缓冲区大小
max_connections: 5000 # 最大连接数
3.2 配置结构体 (config.go)
type WebSocketConfig struct {
ReadBufferSize int `yaml:"read_buffer_size"`
WriteBufferSize int `yaml:"write_buffer_size"`
MaxConnections int `yaml:"max_connections"`
}
4. 使用示例
4.1 服务端示例
func main() {
r := gin.Default()
// 创建WebSocket管理器
wsManager := websocket.NewManager(&config.GlobalConfig.WebSocket)
// WebSocket连接
r.GET("/ws", wsManager.HandleWebSocket())
// 广播消息
r.POST("/broadcast", func(c *gin.Context) {
message := c.PostForm("message")
wsManager.Broadcast([]byte(message))
c.JSON(200, gin.H{"message": "broadcast sent"})
})
r.Run(":8080")
}
4.2 前端示例
// 连接WebSocket
const ws = new WebSocket('ws://localhost:8080/ws')
// 连接成功
ws.onopen = () => {
console.log('连接成功')
}
// 接收消息
ws.onmessage = (event) => {
console.log('收到消息:', event.data)
}
// 发送消息
ws.send('Hello, World!')
4.3 Vue组件示例
<template>
<div>
<div>连接状态: {{ isConnected ? '已连接' : '未连接' }}</div>
<input v-model="message" @keyup.enter="sendMessage">
<button @click="sendMessage">发送</button>
</div>
</template>
<script setup>
import { ref, onMounted, onUnmounted } from 'vue'
const ws = ref(null)
const isConnected = ref(false)
const message = ref('')
const connect = () => {
ws.value = new WebSocket('ws://localhost:8080/ws')
ws.value.onopen = () => isConnected.value = true
ws.value.onclose = () => isConnected.value = false
ws.value.onmessage = (event) => {
console.log('收到消息:', event.data)
}
}
const sendMessage = () => {
if (isConnected.value && message.value) {
ws.value.send(message.value)
message.value = ''
}
}
onMounted(() => connect())
onUnmounted(() => ws.value?.close())
</script>
4.4 消息收发示例
4.4.1 消息结构
// Message 定义消息结构
type Message struct {
Type string `json:"type"` // 消息类型
Data interface{} `json:"data"` // 消息数据
ClientID string `json:"-"` // 发送者的连接ID
}
4.4.2 发送消息
// 1. 广播消息给所有客户端
err := wsManager.SendMessage("chat", map[string]interface{}{
"user": "系统",
"content": "欢迎新用户加入",
})
// 2. 发送消息给指定客户端
err := wsManager.SendMessageToClient(clientAddr, "private", map[string]interface{}{
"content": "这是一条私信",
"time": time.Now(),
})
// 3. 发送游戏动作
err := wsManager.SendMessage("game_action", map[string]interface{}{
"action": "move",
"position": map[string]int{
"x": 100,
"y": 200,
},
})
4.4.3 接收和处理消息
// 启动消息处理协程
go func() {
// 获取消息通道
msgChan := wsManager.GetMessages()
// 循环处理消息
for msg := range msgChan {
// 根据消息类型处理
switch msg.Type {
case "chat":
handleChatMessage(msg)
case "game_action":
handleGameAction(msg)
case "private":
handlePrivateMessage(msg)
default:
log.Printf("未知消息类型: %s", msg.Type)
}
}
}()
// 处理聊天消息
func handleChatMessage(msg websocket.Message) {
data, ok := msg.Data.(map[string]interface{})
if !ok {
return
}
log.Printf("来自 %s 的聊天消息: %v", msg.ClientID, data["content"])
}
// 处理游戏动作
func handleGameAction(msg websocket.Message) {
data, ok := msg.Data.(map[string]interface{})
if !ok {
return
}
log.Printf("来自 %s 的游戏动作: %v", msg.ClientID, data["action"])
}
4.4.4 前端发送消息示例
// 1. 发送聊天消息
ws.send(JSON.stringify({
type: 'chat',
data: {
content: '大家好!'
}
}))
// 2. 发送游戏动作
ws.send(JSON.stringify({
type: 'game_action',
data: {
action: 'move',
position: {
x: 100,
y: 200
}
}
}))
4.4.5 前端接收消息示例
ws.onmessage = (event) => {
try {
const message = JSON.parse(event.data)
// 根据消息类型处理
switch (message.type) {
case 'chat':
handleChat(message.data)
break
case 'game_action':
handleGameAction(message.data)
break
case 'private':
handlePrivateMessage(message.data)
break
default:
console.log('未知消息类型:', message.type)
}
} catch (error) {
console.error('解析消息失败:', error)
}
}
// 处理聊天消息
function handleChat(data) {
console.log(`${data.user}: ${data.content}`)
}
// 处理游戏动作
function handleGameAction(data) {
console.log('游戏动作:', data.action)
updateGameState(data.position)
}
// 处理私信
function handlePrivateMessage(data) {
console.log('收到私信:', data.content)
}
4.5 错误处理示例
// 发送消息时的错误处理
if err := wsManager.SendMessage("chat", data); err != nil {
log.Printf("发送消息失败: %v", err)
}
// 发送私信时的错误处理
if err := wsManager.SendMessageToClient(clientAddr, "private", data); err != nil {
if strings.Contains(err.Error(), "client not found") {
log.Printf("用户已离线: %s", clientAddr)
} else {
log.Printf("发送私信失败: %v", err)
}
}
4.6 消息类型建议
const (
// 系统消息
MessageTypeSystem = "system" // 系统通知
MessageTypeError = "error" // 错误消息
// 聊天消息
MessageTypeChat = "chat" // 公共聊天
MessageTypePrivate = "private" // 私聊消息
// 游戏消息
MessageTypeGameAction = "game_action" // 游戏动作
MessageTypeGameState = "game_state" // 游戏状态
MessageTypeMatch = "match" // 匹配相关
)
5. 框架整合
5.1 与Gin框架整合
// 中间件:验证WebSocket连接
func AuthWebSocket() gin.HandlerFunc {
return func(c *gin.Context) {
// 验证token
token := c.Query("token")
if !validateToken(token) {
c.AbortWithStatus(http.StatusUnauthorized)
return
}
c.Next()
}
}
// 使用中间件
r.GET("/ws", AuthWebSocket(), wsManager.HandleWebSocket())
5.2 与Nginx整合
# nginx.conf
http {
map $http_upgrade $connection_upgrade {
default upgrade;
'' close;
}
upstream websocket {
server 127.0.0.1:8080;
}
server {
listen 80;
server_name example.com;
# WebSocket代理
location /ws {
proxy_pass http://websocket;
proxy_http_version 1.1;
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection $connection_upgrade;
proxy_set_header Host $host;
}
# 其他HTTP请求
location / {
proxy_pass http://127.0.0.1:8080;
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
}
}
}
6. 部署说明
6.1 服务器要求
- 支持WebSocket的现代浏览器
- Go 1.16+
- Nginx 1.16+(如果使用)
6.2 部署步骤
- 编译Go程序
go build -o server cmd/main.go
- 配置Nginx(如果使用)
# 复制nginx配置
sudo cp nginx.conf /etc/nginx/conf.d/websocket.conf
# 重启Nginx
sudo systemctl restart nginx
- 运行服务
./server
6.3 注意事项
-
连接管理
- 定期清理断开的连接
- 实现重连机制
- 控制连接数量
-
安全性
- 添加连接认证
- 限制消息大小
- 添加消息验证
-
性能优化
- 使用消息队列
- 控制广播频率
- 添加负载均衡
-
监控
- 记录连接数
- 监控消息流量
- 错误日志记录