基于Go语言实现一个网络聊天室(连接Redis版)

news2025/4/6 9:19:50

基于Go语言和Redis的实时聊天室项目详解

项目概述

在这个项目中,我们实现了一个基于Go语言和Redis的实时聊天室系统。该系统允许用户通过客户端连接到服务器,进行实时聊天,并且支持以下功能:

  • 用户网名注册和验证
  • 消息广播和接收
  • 心跳检测和自动重连
  • 用户活跃度统计和排名
  • 消息存储到Redis

技术栈

  • Go语言:用于实现客户端和服务器逻辑
  • Redis:用于存储用户活跃度和聊天记录
  • TCP协议:用于客户端和服务器之间的通信

项目结构

项目由三个主要文件组成:

  1. client.go:客户端逻辑
  2. server.go:服务器逻辑
  3. utils.go:工具函数(消息发送和读取)

详细实现

客户端实现(client.go)

1. 连接到服务器

客户端通过TCP协议连接到服务器:

conn, err := net.Dial("tcp", "localhost:8080")
if err != nil {
	log.Fatal("连接服务器时出错:", err)
}
defer conn.Close()
2. 输入网名

用户输入网名并发送到服务器:

fmt.Print("请输入你的网名: ")
nameInput, err := reader.ReadString('\n')
name = strings.TrimSpace(nameInput)
err = utils.SendMessage(conn, []byte(name))
3. 消息发送和接收

用户输入消息后,通过utils.SendMessage发送到服务器:

message, err := reader.ReadString('\n')
err = utils.SendMessage(conn, []byte(message))

服务器发送的消息通过handleServerMessages函数接收并打印:

func handleServerMessages(conn *net.Conn) {
	reader := bufio.NewReader(*conn)
	for {
		message, err := utils.ReadMessage(reader)
		if err != nil {
			// 处理错误和重连逻辑
		}
		fmt.Println(string(message))
	}
}
4. 心跳检测和自动重连

客户端会接收服务器的心跳检测消息(PING),并发送PONG响应:

if string(message) == "PING" {
	log.Printf("【心跳检测】接收到服务端 PING,发送 PONG 响应")
	utils.SendMessage(*conn, []byte("PONG"))
}

如果连接断开,客户端会尝试重新连接:

func reconnect(oldConn net.Conn) (net.Conn, bool) {
	for i := 0; i < 3; i++ {
		newConn, err := net.Dial("tcp", "localhost:8080")
		if err == nil {
			oldConn.Close()
			return newConn, true
		}
		time.Sleep(time.Duration(2<<uint(i)) * time.Second)
	}
	return nil, false
}

服务器实现(server.go)

1. 初始化Redis

服务器使用Redis存储用户活跃度和聊天记录:

redisClient := redis.NewClient(&redis.Options{
	Addr:     "localhost:6379",
	Password: "",
	DB:       0,
})
2. 客户端管理

服务器维护一个ChatRoom结构体,管理所有在线客户端:

type ChatRoom struct {
	Clients map[*Client]bool
	Join    chan *Client
	Leave   chan *Client
	Message chan []byte
	Redis   *redis.Client
	mu      sync.RWMutex
}
3. 消息广播

服务器接收客户端的消息,并广播到所有在线客户端:

func (cr *ChatRoom) Run() {
	for {
		select {
		case message := <-cr.Message:
			cr.mu.RLock()
			for client := range cr.Clients {
				client.Send <- message
			}
			cr.mu.RUnlock()
			batchStoreToRedis(ctx, cr.Redis, message)
		}
	}
}
4. 心跳检测

服务器定期向客户端发送PING消息,并等待PONG响应:

ticker := time.NewTicker(10 * time.Second)
heartbeatTimeout := time.NewTimer(15 * time.Second)
for {
	select {
	case <-ticker.C:
		utils.SendMessage(client.Conn, []byte("PING"))
		heartbeatTimeout.Reset(15 * time.Second)
	case <-heartbeatTimeout.C:
		cr.Leave <- client
	}
}
5. Redis存储

服务器将消息批量存储到Redis,并更新用户活跃度:

func batchStoreToRedis(ctx context.Context, redisClient *redis.Client, message []byte) {
	queue := redisClient.LRange(ctx, "chat_messages_queue", 0, -1).Val()
	queue = append(queue, string(message))

	if len(queue) >= 10 {
		pipe := redisClient.Pipeline()
		for _, msg := range queue {
			pipe.RPush(ctx, "chat_messages", msg)
			pipe.ZIncrBy(ctx, "user_activity", 1, extractUsername(msg))
		}
		pipe.Exec(ctx)
		redisClient.Del(ctx, "chat_messages_queue")
	} else {
		redisClient.RPush(ctx, "chat_messages_queue", string(message))
	}
}

工具函数(utils.go)

工具函数提供了消息的发送和读取功能:

func SendMessage(conn net.Conn, message []byte) error {
	length := uint32(len(message))
	err := binary.Write(conn, binary.BigEndian, length)
	if err != nil {
		return err
	}
	_, err = conn.Write(message)
	return err
}

func ReadMessage(reader *bufio.Reader) ([]byte, error) {
	var length uint32
	err := binary.Read(reader, binary.BigEndian, &length)
	if err != nil {
		return nil, err
	}
	message := make([]byte, length)
	_, err = io.ReadFull(reader, message)
	return message, err
}

重点问题分析

问题 1:心跳检测机制与消息处理分支的逻辑冲突导致服务端无法接收客户端消息

1.1 原始代码结构的问题

在未修复的代码中,服务端的 HandleClient 函数使用了如下逻辑:

for {
    select {
    case <-ticker.C:  // 心跳检测分支(每10秒触发一次)
        sendPing()

    default:          // 默认分支(非阻塞)
        message := readMessage()
        processMessage(message)
    }
}
1.2 关键问题分析
  • default 分支的局限性
    default 分支仅在 所有其他 case 未就绪时触发
    ticker.C 每隔 10 秒触发一次心跳检测时,select 会优先执行 case <-ticker.C,而 default 分支仅在心跳未触发时才会执行。
    这会导致:
    1. 消息读取延迟:客户端发送的消息可能堆积在 bufio.Reader 的缓冲区中,但服务端因 default 分支未及时执行而无法读取。
    2. 竞争条件:如果客户端在心跳触发时发送消息,服务端会优先处理心跳,消息可能被跳过。
1.3 具体场景模拟

假设客户端连续发送两条消息 消息A消息B,时间线如下:

时间点 0ms: 服务端开始循环
时间点 5ms: 客户端发送消息A
时间点 10ms: ticker.C 触发心跳检测(发送PING)
时间点 15ms: 客户端发送消息B
  • 服务端行为
    1. 10ms 时,case <-ticker.C 触发,发送 PING。
    2. default 分支在 10ms 后才有机会执行,但此时 bufio.Reader 中可能已有 消息A消息B
    3. 由于 default 分支是非阻塞的,服务端可能只读取到部分消息,甚至因心跳频繁触发而完全跳过消息处理。
1.4 Go 的 select 调度机制
  • 随机选择原则
    当多个 case 同时就绪时,select 会随机选择一个执行。
    但若某个 case(如 ticker.C)周期性触发,它会频繁占用执行机会,导致其他分支(如消息读取)被“饿死”。

  • default 分支的陷阱
    default 分支的设计初衷是避免阻塞,但它不适合需要持续监听的操作。
    在您的场景中,消息读取需要主动检查缓冲区,而 default 分支无法保证这一点。

1.5 修复方案的核心逻辑

修改后的代码通过以下方式解决问题:

for {
    select {
    case <-ticker.C:           // 心跳检测
        sendPing()

    case <-msgTicker.C:        // 专用消息读取分支(每50ms触发一次)
        message := readMessage()
        processMessage(message)
    }
}
1.6 为何有效?
  1. 独立的定时器 (msgTicker)
    添加了一个专用的 msgTicker,每隔 50ms 触发一次消息读取。

    • 即使心跳检测占用 select 的执行机会,消息读取仍有独立的触发窗口。
    • 避免了心跳和消息处理的竞争。
  2. 消除 default 分支的不可靠性
    用显式的 case <-msgTicker.C 替代 default,确保消息读取按固定频率执行。

1.7 总结:冲突的本质
  • 心跳机制干扰:周期性的心跳检测(ticker.C)占用了 select 的执行机会,导致 default 分支无法及时处理消息。
  • 修复思路:为消息读取分配独立的触发通道(msgTicker),与心跳检测解耦,确保两者互不阻塞。
1.8 类比解释

想象一个餐厅服务员(服务端)需要同时做两件事:

  1. 定时检查厨房温度(心跳检测):每10分钟一次。
  2. 接待顾客点餐(消息处理):需要随时响应。
  • 原始方案:服务员大部分时间站在厨房门口检查温度,只有偶尔看一眼大堂(default 分支),导致顾客长时间无人接待。
  • 修复方案:服务员每隔5分钟主动巡视大堂一次(msgTicker),同时定期检查厨房,两者互不干扰。

问题 2:网名重复导致身份冲突

2.1 前因后果
  • 问题本质:多个用户使用相同网名加入聊天室,导致消息归属混乱、活跃度统计错误。
  • 根本原因:服务端未验证网名唯一性,直接接受客户端提交的名称,未检查是否已被占用。
  • 具体表现
    • 用户A和用户B使用相同网名“小明”加入后,服务端无法区分两者消息。
    • Redis中user_activity的活跃度分数会被错误累加到同一用户名下。
2.2 具体场景模拟

假设用户A和用户B同时尝试使用网名“小明”连接:

时间点 0ms: 用户A发送网名“小明” → 服务端接受并加入。
时间点 50ms: 用户B发送网名“小明” → 服务端未检查唯一性,直接加入。
时间点 100ms: 用户A发送消息“你好”,用户B发送消息“大家好”。
  • 服务端行为
    1. 用户A和用户B的消息均被标记为“小明: 消息内容”。
    2. Redis中用户“小明”的活跃度分数错误累加为2(实际应为两个独立用户)。
2.3 Go 语言机制分析
  • 并发写入冲突:多个协程同时操作Redis集合时,若未加锁可能导致数据竞争。
  • 集合操作的原子性:Redis的SADDSISMEMBER命令是原子操作,但Go代码中需确保逻辑顺序正确。
2.4 解决方案
  1. Redis 集合管理在线用户
    • 用户加入时,检查网名是否存在于集合online_users
      exists, err := cr.Redis.SIsMember(ctx, "online_users", clientName).Result()
      if exists {
          utils.SendMessage(conn, []byte("ERROR: 网名已被占用"))
          return
      }
      
    • 网名合法后添加到集合:
      cr.Redis.SAdd(ctx, "online_users", clientName)
      
  2. 退出时清理网名
    • 使用defer确保客户端断开时从集合中移除:
      defer cr.Redis.SRem(ctx, "online_users", clientName)
      
2.5 总结类比
  • 问题类比:多人共用同一身份证号登记入住酒店,前台无法区分客人。
  • 修复思路:前台要求每位客人提供唯一身份证号,登记前查重,退房时注销。

问题 3:空消息和空网名导致无效数据

3.1 前因后果
  • 问题本质:用户输入空白内容作为网名或消息,导致系统处理无效数据。
  • 根本原因:客户端和服务端未对输入内容做非空验证。
  • 具体表现
    • 空网名:用户直接回车加入,服务端记录空名称,广播消息时无法标识来源。
    • 空消息:用户发送空格或回车,占用网络带宽和存储资源。
3.2 具体场景模拟

假设用户执行以下操作:

时间点 0ms: 用户输入空网名(直接回车) → 服务端允许加入。
时间点 100ms: 用户发送空消息(空格 + 回车) → 服务端广播“:  ”。
  • 服务端行为
    1. 网名为空的客户端加入,广播“系统广播: 加入了聊天室”。
    2. 空消息被广播到所有客户端,消息内容无意义。
3.3 Go 语言机制分析
  • 字符串处理strings.TrimSpace可过滤首尾空白字符,但需主动调用。
  • 输入流阻塞:客户端的ReadString可能读取到换行符,需显式检查内容是否为空。
3.4 解决方案
  1. 客户端验证
    • 循环读取网名直到非空:
      for {
          fmt.Print("请输入网名: ")
          name = strings.TrimSpace(reader.ReadString())
          if name != "" {
              break
          }
          fmt.Println("网名不能为空!")
      }
      
    • 发送消息前检查内容:
      message = strings.TrimSpace(input)
      if message == "" {
          fmt.Println("消息不能为空!")
          continue
      }
      
  2. 服务端二次过滤
    • 网名非空检查:
      if clientName == "" {
          utils.SendMessage(conn, []byte("ERROR: 网名不能为空"))
          return
      }
      
    • 消息内容过滤:
      msgContent := strings.TrimSpace(string(message))
      if msgContent == "" {
          log.Printf("客户端 %s 发送了空消息,已忽略", client.Name)
          continue
      }
      
3.5 总结类比
  • 问题类比:用户向邮箱发送空白信件,邮局仍派送,浪费资源。
  • 修复思路:邮局拒绝投递无内容信件,并要求寄件人填写有效地址。

问题 4:心跳超时导致僵尸连接

4.1 前因后果
  • 问题本质:客户端异常退出后,服务端未检测到离线状态,维持无效连接。
  • 根本原因:服务端仅依赖客户端显式退出信号,缺乏被动检测机制。
  • 具体表现
    • 客户端断网后,服务端持续等待消息,占用内存和连接资源。
    • Redis中online_users集合保留无效网名,影响新用户注册。
4.2 具体场景模拟

假设客户端因网络故障断开:

时间点 0ms: 服务端发送PING。
时间点 10ms: 客户端未响应(已断开)。
时间点 25ms: 服务端仍认为客户端在线,未清理资源。
  • 服务端行为
    1. 客户端连接残留,占用Clients集合和TCP端口。
    2. 新用户无法使用相同网名注册。
4.3 Go 语言机制分析
  • 计时器管理time.Timer需手动重置,避免误触发。
  • 协程泄漏风险:未关闭的协程可能持续占用内存。
4.4 解决方案
  1. 心跳检测与超时机制
    • 服务端每10秒发送PING:
      ticker := time.NewTicker(10 * time.Second)
      defer ticker.Stop()
      
    • 客户端响应PONG后重置超时计时器:
      case <-msgTicker.C:
          if message == "PONG" {
              heartbeatTimeout.Reset(15 * time.Second)
          }
      
  2. 超时强制断开
    • 若15秒未收到PONG,判定客户端离线:
      heartbeatTimeout := time.NewTimer(15 * time.Second)
      defer heartbeatTimeout.Stop()
      
      select {
      case <-heartbeatTimeout.C:
          cr.Leave <- client
          return
      }
      
4.5 总结类比
  • 问题类比:电话通话中对方突然静默,但未挂断,导致线路一直被占用。
  • 修复思路:运营商设定“无响应超时”,若一段时间无声音,自动挂断释放线路。

总结

通过上述措施,项目实现了:

  1. 身份唯一性:Redis集合保障网名全局唯一。
  2. 数据有效性:双重验证过滤空输入。
  3. 连接健康性:心跳超时机制自动清理僵尸连接。

完整代码

client.go(客户端)

package main

import (
	"bufio"
	"fmt"
	"log"
	"net"
	"os"
	"strings"
	"time"

	"Learn/kaohe/redis2/utils"
)

// handleServerMessages 负责处理从服务器接收到的消息
func handleServerMessages(conn *net.Conn) {
	reader := bufio.NewReader(*conn) // 创建一个读取器,用于从连接中读取消息
	for {
		message, err := utils.ReadMessage(reader) // 从服务器读取消息
		if err != nil {
			log.Println("接收服务器消息时出错:", err)
			// 如果连接断开,尝试重新连接
			newConn, ok := reconnect(*conn)
			if !ok {
				return // 如果重新连接失败,退出处理
			}
			*conn = newConn                 // 更新连接
			reader = bufio.NewReader(*conn) // 重新创建读取器
			continue
		}
		log.Printf("接收到消息: %s", string(message))
		fmt.Println(string(message)) // 打印消息到控制台

		// 如果收到的是 PING 消息,发送 PONG 响应
		if string(message) == "PING" {
			log.Printf("【心跳检测】接收到服务端 PING,发送 PONG 响应")
			err := utils.SendMessage(*conn, []byte("PONG"))
			if err != nil {
				log.Println("发送心跳响应时出错:", err)
			}
		}
	}
}

// reconnect 尝试重新连接到服务器
func reconnect(oldConn net.Conn) (net.Conn, bool) {
	for i := 0; i < 3; i++ { // 最多尝试 3 次
		log.Printf("尝试第 %d 次重新连接服务器...", i+1)
		newConn, err := net.Dial("tcp", "localhost:8080") // 尝试连接服务器
		if err == nil {
			log.Println("重新连接服务器成功")
			oldConn.Close()      // 关闭旧连接
			return newConn, true // 返回新连接
		}
		time.Sleep(time.Duration(2<<uint(i)) * time.Second) // 指数退避等待
	}
	log.Println("多次尝试重新连接服务器失败,退出程序")
	return nil, false // 重新连接失败
}

func main() {
	conn, err := net.Dial("tcp", "localhost:8080") // 连接到服务器
	if err != nil {
		log.Fatal("连接服务器时出错:", err)
	}
	defer conn.Close() // 程序结束时关闭连接

	go handleServerMessages(&conn) // 启动一个 goroutine 处理服务器消息

	reader := bufio.NewReader(os.Stdin) // 创建一个读取器,用于读取用户输入

	// 循环读取网名直到输入有效
	var name string
	for {
		fmt.Print("请输入你的网名: ")
		nameInput, err := reader.ReadString('\n') // 读取用户输入的网名
		if err != nil {
			log.Fatal("读取网名时出错:", err)
		}
		name = strings.TrimSpace(nameInput) // 去除多余空格
		if name == "" {
			fmt.Println("网名不能为空,请重新输入!")
			continue
		}
		break
	}

	err = utils.SendMessage(conn, []byte(name)) // 向服务器发送网名
	if err != nil {
		log.Fatal("发送网名时出错:", err)
	}

	fmt.Println("你已成功加入聊天室,可以开始聊天了!")

	for {
		message, err := reader.ReadString('\n') // 读取用户输入的消息
		if err != nil {
			log.Println("读取用户输入时出错:", err)
			break
		}
		message = strings.TrimSpace(message) // 去除多余空格

		if message == "" {
			fmt.Println("消息不能为空,请重新输入!")
			continue
		}

		if message == "/quit" { // 如果输入是 "/quit",退出聊天室
			fmt.Println("你已退出聊天室")
			utils.SendMessage(conn, []byte("/quit"))
			break
		}

		err = utils.SendMessage(conn, []byte(message)) // 向服务器发送消息
		log.Printf("DEBUG: 客户端尝试发送消息: %s, 错误: %v", message, err)
		if err != nil {
			log.Println("发送消息到服务器时出错:", err)
			// 如果发送失败,尝试重新连接
			newConn, ok := reconnect(conn)
			if !ok {
				return
			}
			conn = newConn
			err = utils.SendMessage(conn, []byte(name)) // 重新发送网名
			if err != nil {
				log.Fatal("重新连接后发送网名时出错:", err)
			}
		}
	}
}

server.go(服务端)

package main

import (
	"bufio"
	"context"
	"fmt"
	"io"
	"log"
	"net"
	"strings"
	"sync"
	"time"

	"Learn/kaohe/redis2/utils"
	"github.com/redis/go-redis/v9"
)

// Client 表示一个客户端连接
type Client struct {
	Name string      // 客户端的网名
	Conn net.Conn    // 客户端的连接
	Send chan []byte // 用于向客户端发送消息的通道
}

// ChatRoom 表示聊天室,管理所有客户端连接
type ChatRoom struct {
	Clients map[*Client]bool // 当前在线的客户端
	Join    chan *Client     // 新客户端加入的通道
	Leave   chan *Client     // 客户端离开的通道
	Message chan []byte      // 广播消息的通道
	Redis   *redis.Client    // Redis 客户端
	mu      sync.RWMutex     // 用于并发控制的互斥锁
}

// NewChatRoom 创建一个新的聊天室
func NewChatRoom(redisClient *redis.Client) *ChatRoom {
	return &ChatRoom{
		Clients: make(map[*Client]bool), // 初始化客户端列表
		Join:    make(chan *Client),     // 初始化加入通道
		Leave:   make(chan *Client),     // 初始化离开通道
		Message: make(chan []byte, 100), // 初始化消息通道,缓冲大小为 100
		Redis:   redisClient,            // 初始化 Redis 客户端
	}
}

// Run 聊天室的主循环,处理客户端的加入、离开和消息广播
func (cr *ChatRoom) Run() {
	for {
		select {
		case client := <-cr.Join: // 处理新客户端加入
			cr.mu.Lock()
			cr.Clients[client] = true // 将客户端添加到列表
			cr.mu.Unlock()
			message := fmt.Sprintf("系统广播: %s 加入了聊天室", client.Name)
			cr.Message <- []byte(message) // 广播加入消息

		case client := <-cr.Leave: // 处理客户端离开
			cr.mu.Lock()
			if _, ok := cr.Clients[client]; ok {
				delete(cr.Clients, client) // 从列表中删除客户端
				close(client.Send)         // 关闭客户端的发送通道
			}
			cr.mu.Unlock()
			if client.Name != "" {
				message := fmt.Sprintf("系统广播: %s 离开了聊天室", client.Name)
				cr.Message <- []byte(message) // 广播离开消息
			}

		case message := <-cr.Message: // 处理广播消息
			cr.mu.RLock()
			log.Printf("准备广播消息: %s", string(message))
			for client := range cr.Clients { // 向所有客户端发送消息
				select {
				case client.Send <- message: // 发送消息
					log.Printf("已发送消息到客户端 %s", client.Name)
				default:
					log.Printf("客户端 %s 的发送通道已满", client.Name)
				}
			}
			cr.mu.RUnlock()
			// 将消息存储到 Redis
			ctx := context.Background()
			batchStoreToRedis(ctx, cr.Redis, message)
		}
	}
}

// batchStoreToRedis 将消息批量存储到 Redis
func batchStoreToRedis(ctx context.Context, redisClient *redis.Client, message []byte) {
	const batchSize = 10 // 批量存储的大小
	// 从 Redis 获取当前队列中的消息
	queue := redisClient.LRange(ctx, "chat_messages_queue", 0, -1).Val()
	queue = append(queue, string(message)) // 将新消息添加到队列

	if len(queue) >= batchSize { // 如果队列达到批量大小
		pipe := redisClient.Pipeline() // 创建 Redis 管道
		for _, msg := range queue {
			// 将消息存储到 chat_messages 列表
			pipe.RPush(ctx, "chat_messages", msg)
			// 更新用户活跃度
			pipe.ZIncrBy(ctx, "user_activity", 1, extractUsername(msg))
		}
		_, _ = pipe.Exec(ctx)                       // 执行管道
		redisClient.Del(ctx, "chat_messages_queue") // 删除队列
	} else {
		// 如果队列未达到批量大小,将消息存储到队列
		redisClient.RPush(ctx, "chat_messages_queue", string(message))
	}
}

// extractUsername 从消息中提取用户名
func extractUsername(message string) string {
	parts := strings.SplitN(message, ": ", 2)
	if len(parts) > 0 {
		return parts[0]
	}
	return ""
}

// HandleClient 处理单个客户端的连接
func (cr *ChatRoom) HandleClient(conn net.Conn) {
	var clientName string
	defer func() {
		conn.Close()                                                 // 关闭连接
		cr.Leave <- &Client{Name: clientName, Conn: conn, Send: nil} // 通知聊天室客户端离开
		log.Printf("客户端连接已关闭: %s", conn.RemoteAddr())
	}()

	reader := bufio.NewReader(conn)             // 创建读取器
	nameBytes, err := utils.ReadMessage(reader) // 读取客户端的网名
	if err != nil {
		log.Println("读取网名时出错:", err)
		return
	}
	clientName = strings.TrimSpace(string(nameBytes))

	// 检查网名是否为空
	if clientName == "" {
		log.Println("客户端尝试使用空网名连接")
		utils.SendMessage(conn, []byte("ERROR: 网名不能为空"))
		return
	}

	// 检查网名是否已存在
	ctx := context.Background()
	exists, err := cr.Redis.SIsMember(ctx, "online_users", clientName).Result()
	if err != nil {
		log.Printf("检查网名 %s 时 Redis 出错: %v", clientName, err)
		utils.SendMessage(conn, []byte("ERROR: 服务器内部错误"))
		return
	}
	if exists {
		log.Printf("网名 %s 已存在,拒绝连接", clientName)
		utils.SendMessage(conn, []byte("ERROR: 网名已被占用,请更换其他名称"))
		return
	}

	// 添加网名到在线用户集合
	if _, err := cr.Redis.SAdd(ctx, "online_users", clientName).Result(); err != nil {
		log.Printf("存储网名 %s 到 Redis 失败: %v", clientName, err)
		utils.SendMessage(conn, []byte("ERROR: 服务器内部错误"))
		return
	}
	defer cr.Redis.SRem(ctx, "online_users", clientName)

	client := &Client{
		Name: clientName,
		Conn: conn,
		Send: make(chan []byte, 10), // 初始化发送通道
	}
	cr.Join <- client // 通知聊天室客户端加入

	// 启动一个 goroutine 处理发送消息
	go func() {
		for message := range client.Send {
			err := utils.SendMessage(client.Conn, message)
			if err != nil {
				log.Println("发送消息给客户端时出错:", err)
				cr.Leave <- client // 通知聊天室客户端离开
				break
			}
		}
	}()

	// 定时发送心跳检测
	ticker := time.NewTicker(10 * time.Second)
	// 定时读取消息
	msgTicker := time.NewTicker(50 * time.Millisecond)
	// 心跳超时检测
	heartbeatTimeout := time.NewTimer(15 * time.Second)
	defer func() {
		ticker.Stop()
		msgTicker.Stop()
		heartbeatTimeout.Stop()
	}()

	for {
		select {
		case <-ticker.C: // 心跳检测
			log.Printf("【心跳检测】向客户端 %s 发送 PING", client.Name)
			if err := utils.SendMessage(client.Conn, []byte("PING")); err != nil {
				log.Printf("【心跳异常】客户端 %s 心跳发送失败: %v", client.Name, err)
				cr.Leave <- client
				return
			}
			heartbeatTimeout.Reset(15 * time.Second)

		case <-heartbeatTimeout.C:
			log.Printf("【心跳超时】客户端 %s 未响应 PONG,强制断开", client.Name)
			cr.Leave <- client
			return

		case <-msgTicker.C: // 读取消息
			message, err := utils.ReadMessage(reader)
			if err != nil {
				if err == io.EOF {
					log.Printf("客户端 %s 主动断开连接", client.Name)
				} else {
					log.Printf("读取消息时出错: %v", err)
				}
				cr.Leave <- client
				return
			}

			if string(message) == "PONG" { // 处理心跳响应
				log.Printf("【心跳响应】客户端 %s 返回 PONG", client.Name)
				heartbeatTimeout.Reset(15 * time.Second)
				continue
			}

			msgContent := strings.TrimSpace(string(message))
			if msgContent == "" {
				log.Printf("客户端 %s 发送了空消息,已忽略", client.Name)
				continue
			}

			log.Printf("接收到来自 %s 的消息: %s", client.Name, msgContent)
			// 构造完整消息并广播
			fullMessage := fmt.Sprintf("%s: %s", client.Name, msgContent)
			cr.Message <- []byte(fullMessage)
		}
	}
}

// printActivityRankings 打印用户活跃度排名
func printActivityRankings(redisClient *redis.Client) {
	ctx := context.Background()
	topUsers, err := redisClient.ZRevRangeWithScores(ctx, "user_activity", 0, 9).Result()
	if err != nil {
		log.Println("获取活跃度排名时出错:", err)
		return
	}
	fmt.Println("活跃度排名:")
	for i, user := range topUsers {
		fmt.Printf("%d. %s: %.0f\n", i+1, user.Member.(string), user.Score)
	}
}

// PrintConnectedClients 打印当前在线客户端
func (cr *ChatRoom) PrintConnectedClients() {
	cr.mu.RLock()
	defer cr.mu.RUnlock()
	log.Println("【活跃客户端】当前在线客户端列表:")
	for client := range cr.Clients {
		log.Printf(" - %s (IP: %s)", client.Name, client.Conn.RemoteAddr())
	}
}

func main() {
	// 初始化 Redis 客户端
	redisClient := redis.NewClient(&redis.Options{
		Addr:     "localhost:6379",
		Password: "",
		DB:       0,
	})
	ctx := context.Background()
	_, err := redisClient.Ping(ctx).Result()
	if err != nil {
		log.Fatal("连接 Redis 时出错:", err)
	}

	// 监听端口
	listener, err := net.Listen("tcp", ":8080")
	if err != nil {
		log.Fatal("监听端口时出错:", err)
	}
	defer listener.Close()

	fmt.Println("聊天室服务器已启动,等待客户端连接...")

	// 创建聊天室
	chatRoom := NewChatRoom(redisClient)
	go chatRoom.Run() // 启动聊天室

	// 定期打印活跃度排名和在线客户端
	go func() {
		ticker := time.NewTicker(30 * time.Second)
		defer ticker.Stop()
		for range ticker.C {
			printActivityRankings(redisClient)
			chatRoom.PrintConnectedClients()
		}
	}()

	// 接受客户端连接
	for {
		conn, err := listener.Accept()
		if err != nil {
			log.Println("接受客户端连接时出错:", err)
			continue
		}
		go chatRoom.HandleClient(conn) // 处理客户端连接
	}
}

utils.go(工具函数,发送和接收消息)

package utils

import (
	"bufio"
	"encoding/binary"
	"fmt"
	"io"
	"log"
	"net"
)

const maxMessageLength = 1 << 20 // 1MB,最大消息长度限制

// SendMessage 向连接发送消息
func SendMessage(conn net.Conn, message []byte) error {
	length := uint32(len(message)) // 消息长度
	if length > maxMessageLength {
		log.Println("消息长度超出限制:", length)
		return fmt.Errorf("message too long")
	}

	// 写入消息长度
	err := binary.Write(conn, binary.BigEndian, length)
	if err != nil {
		log.Printf("写入消息长度时出错: %v", err)
		return err
	}

	// 写入消息内容
	_, err = conn.Write(message)
	if err != nil {
		log.Printf("写入消息内容时出错: %v", err)
	}
	return err
}

// ReadMessage 从连接读取消息
func ReadMessage(reader *bufio.Reader) ([]byte, error) {
	var length uint32 // 消息长度
	err := binary.Read(reader, binary.BigEndian, &length)
	if err != nil {
		log.Printf("读取消息长度时出错: %v", err)
		return nil, err
	}

	if length > maxMessageLength {
		log.Println("消息长度超出限制:", length)
		return nil, fmt.Errorf("message too long")
	}

	// 读取消息内容
	message := make([]byte, length)
	_, err = io.ReadFull(reader, message)
	if err != nil {
		log.Printf("读取消息内容时出错: %v", err)
		return nil, err
	}
	return message, nil
}

如果对项目有任何问题或建议,欢迎在评论区留言!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2329122.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

springboot2.7.x整合nacos+seata

1、nacos及seata下载地址 Nacos Server 下载 | Nacos 官网 Seata Java Download | Apache Seata 这里的seata版本用2.1.0版本。 2、启动nacos D:\本地-seata-nacos\nacos-server\bin>startup.cmd -m standalone 3、修改seata的conf下的application的内容 这里的数据库…

为 IDEA 设置管理员权限

IDEA 安装目录 兼容性选择管理员身份运行程序 之后 IDEA 中的操作&#xff08;包括终端中的操作&#xff09;都是管理员权限的了

单片机学习笔记8.定时器

IAP15W4K58S4定时/计数器结构工作原理 定时器工作方式控制寄存器TMOD 不能进行位寻址&#xff0c;只能对整个寄存器进行赋值 寄存器地址D7D6D5D4D3D2D1D0复位值TMOD89HGATEC/(T低电平有效)M1M0GATEC/(T低电平有效)M1M000000000B D0-D3为T0控制&#xff0c;D4-D7为T1控制 GAT…

vue3实现markdown预览和编辑

Markdown作为一种轻量级标记语言&#xff0c;已经成为开发者编写文档的首选工具之一。在Vue3项目中集成Markdown编辑和预览功能可以极大地提升内容管理体验。本文将介绍如何使用Vditor这一强大的开源Markdown编辑器在Vue3项目中实现这一功能。 一、Vditor简介 Vditor是一款浏…

高并发秒杀系统接入层如何设计

博主介绍&#xff1a;✌全网粉丝5W&#xff0c;全栈开发工程师&#xff0c;从事多年软件开发&#xff0c;在大厂呆过。持有软件中级、六级等证书。可提供微服务项目搭建与毕业项目实战&#xff0c;博主也曾写过优秀论文&#xff0c;查重率极低&#xff0c;在这方面有丰富的经验…

C++异常处理 throw try catch

C 异常处理概述 C 异常处理机制提供了一种在程序运行时捕获错误或异常情况的方式。异常处理的目的是使得程序在遇到错误时能够优雅地终止或恢复&#xff0c;并防止程序出现崩溃。C 使用 try, throw, 和 catch 关键字来实现异常处理。 异常处理的基本结构&#xff1a; throw: …

纯css实现环形进度条

需要在中实现一个定制化的环形进度条&#xff0c;最终效果如图&#xff1a; 使用代码 <divclass"circular-progress":style"{--progress: nextProgress,--color: endSliderColor,--size: isFull ? 60rpx : 90rpx,}"><div class"inner-conte…

0基础 | 硬件 | 电源系统 一

降压电路LDO 几乎所有LDO都是基于此拓扑结构 图 拓扑结构 LDO属于线性电源&#xff0c;通过控制开关管的导通程度实现稳压&#xff0c;输出纹波小&#xff0c;无开关噪声 线性电源&#xff0c;IoutIin&#xff0c;发热功率P电压差△U*电流I&#xff0c;转换效率Vo/Vi LDO不适…

获取KUKA机器人诊断文件KRCdiag的方法

有时候在进行售后问题时需要获取KUKA机器人的诊断文件KRCdiag&#xff0c;通过以下方法可以获取KUKA机器人的诊断文件KRCdiag&#xff1a; 1、将U盘插到控制柜内的任意一个USB接口&#xff1b; 2、依次点【主菜单】—【文件】—【存档】—【USB&#xff08;控制柜&#xff09…

一周学会Pandas2 Python数据处理与分析-NumPy数据类型

锋哥原创的Pandas2 Python数据处理与分析 视频教程&#xff1a; 2025版 Pandas2 Python数据处理与分析 视频教程(无废话版) 玩命更新中~_哔哩哔哩_bilibili NumPy 提供了丰富的数据类型&#xff08;dtypes&#xff09;&#xff0c;主要用于高效数值计算。以下是 NumPy 的主要…

Redis核心机制-缓存、分布式锁

目录 缓存 缓存更新策略 定期生成 实时生成 缓存问题 缓存预热&#xff08;Cache preheating&#xff09; 缓存穿透&#xff08;Cache penetration&#xff09; 缓存雪崩&#xff08;Cache avalanche&#xff09; 缓存击穿&#xff08;Cache breakdown&#xff09; 分…

如何在Ubuntu上安装Dify

如何在Ubuntu上安装Dify 如何在Ubuntu上安装docker 使用apt安装 # Add Dockers official GPG key: sudo apt-get update sudo apt-get install ca-certificates curl sudo install -m 0755 -d /etc/apt/keyrings sudo curl -fsSL https://download.docker.com/linux/ubuntu/gpg…

Python FastApi(13):APIRouter

如果你正在开发一个应用程序或 Web API&#xff0c;很少会将所有的内容都放在一个文件中。FastAPI 提供了一个方便的工具&#xff0c;可以在保持所有灵活性的同时构建你的应用程序。假设你的文件结构如下&#xff1a; . ├── app # 「app」是一个 Python 包…

【算法竞赛】状态压缩型背包问题经典应用(蓝桥杯2019A4分糖果)

在蓝桥杯中遇到的这道题&#xff0c;看上去比较普通&#xff0c;但其实蕴含了很巧妙的“状态压缩 背包”的思想&#xff0c;本文将从零到一&#xff0c;详细解析这个问题。 目录 一、题目 二、思路分析&#xff1a;状态压缩 最小覆盖 1. 本质&#xff1a;最小集合覆盖问题…

常微分方程 1

slow down and take your time 定积分应用回顾常微分方程的概述一阶微分方程可分离变量齐次方程三阶线性微分方程 一阶线性微分方程不定积分的被积分函数出现了绝对值梳理微分方程的基本概念题型 1 分离变量题型 2 齐次方程5.4 题型 3 一阶线性微分方程知识点5.55.6 尾声 定积分…

Web前端页面搭建

1.在D盘中创建www文件 cmd进入窗口命令windowsR 切换盘符d: 进入创建的文件夹 在文件夹里安装tp框架 在PS中打开tp文件 创建网站&#xff0c;根目录到public 在浏览器中打开网页 修改文件目录名称 在public目录中的。htaccess中填写下面代码 <IfModule mod_rewrite.c >…

开源 LLM 应用开发平台 Dify 全栈部署指南(Docker Compose 方案)

开源 LLM 应用开发平台 Dify 全栈部署指南&#xff08;Docker Compose 方案&#xff09; 一、部署环境要求与前置检查 1.1 硬件最低配置 组件要求CPU双核及以上内存4GB 及以上磁盘空间20GB 可用空间 1.2 系统兼容性验证 ✅ 官方支持系统&#xff1a; Ubuntu 20.04/22.04 L…

BN 层的作用, 为什么有这个作用?

BN 层&#xff08;Batch Normalization&#xff09;——这是深度神经网络中非常重要的一环&#xff0c;它大大改善了网络的训练速度、稳定性和收敛效果。 &#x1f9e0; 一句话理解 BN 层的作用&#xff1a; Batch Normalization&#xff08;批归一化&#xff09;通过标准化每一…

金仓数据库KCM认证考试介绍【2025年4月更新】

KCM&#xff08;金仓认证大师&#xff09;认证是金仓KES数据库的顶级认证&#xff0c;学员需通过前置KCA、KCP认证才能考KCM认证。 KCM培训考试一般1-2个月一次&#xff0c;KCM报名费原价为1.8万&#xff0c;当前优惠价格是1万&#xff08;趋势是&#xff1a;费用越来越高&…

如何通过句块训练法(Chunks)提升英语口语

真正说一口流利英语的人&#xff0c;并不是会造句的人&#xff0c;而是擅长“调取句块”的人。下面我们从原理、方法、场景、资源几个维度展开&#xff0c;告诉你怎么用“句块训练法&#xff08;Chunks&#xff09;”快速提升英语口语&#xff1a; 一、什么是“句块”&#xff…