实战简介:
基于tcp协议实现功能
服务器端:
接受用户消息和循环转发
对功能命令进行处理(meun查询功能词,changeName改名,online查询在线人数,quit退出)
客户端:
接受服务器发送的信息并处理
接受用户的输入并发往服务器
结构和示例
用户登录示例:
功能行命令测试
发送消息广播测试
用户退出示例
服务器端
基本流程
-
连接服务器:
- 使用
net.Dial("tcp", "127.0.0.1:8080")
建立到服务器的TCP连接。 - 如果连接失败,打印错误信息并退出程序。
- 使用
-
获取用户昵称:
- 调用
getUserInput
函数提示用户输入昵称。 - 昵称被编码后通过
conn.Write
发送至服务器。
- 调用
-
启动退出监控:
- 创建一个退出信号通道
exit
。 - 使用
defer
关键字确保在函数返回时关闭连接和退出通道。
- 创建一个退出信号通道
-
并发读取消息:
- 启动一个 goroutine 调用
readMsg
函数,用于并发读取服务器发送的消息。
- 启动一个 goroutine 调用
-
并发处理用户输入:
- 启动另一个 goroutine 来持续获取用户输入:
- 如果输入以 “quit” 开头,编码退出消息并发送至服务器,然后通过退出通道发送退出信号。
- 对于其他输入,编码后发送至服务器。
- 启动另一个 goroutine 来持续获取用户输入:
-
监听退出信号:
- 主函数中的
select
语句监听退出通道exit
:- 一旦接收到退出信号,打印退出成功信息并退出主函数。
- 主函数中的
-
读取消息循环:
readMsg
函数中,使用bufio.NewReader
和module.Decode
读取并解码服务器消息:- 如果是 “changeName” 类型的消息,更新本地
name
变量并打印新昵称。 - 对于其他消息,打印带有时间戳的消息内容。
- 如果是 “changeName” 类型的消息,更新本地
-
错误处理:
- 在连接、编码、写入、读取消息过程中,出现错误时打印错误信息:
- 对于
conn.Write
和module.Encode
失败,打印错误并退出对应的 goroutine。 - 对于
module.Decode
失败或读取到 EOF,退出读取消息循环。
- 对于
- 在连接、编码、写入、读取消息过程中,出现错误时打印错误信息:
-
资源清理:
- 使用
defer
关键字确保在函数退出时正确关闭网络连接和通道,释放资源。
- 使用
-
用户交互:
getUserInput
函数根据不同的提示信息,从标准输入读取用户输入,并处理输入结束的换行符。
代码
package main
import (
"bufio"
"chatRoom/module" // 引入模块,包含Encode和Decode函数
"fmt"
"io"
"log"
"net"
"strconv"
"strings"
"sync"
"time"
)
// client 定义了客户端的结构,包含用户通道、网络连接、用户名和地址
type client struct {
userChannel chan string
conn net.Conn
name string
addr string
}
// count 记录当前连接的用户数量
var count int
// mu 互斥锁,用于同步对在线用户列表的访问
var mu sync.Mutex
// onlineList 存储当前在线的所有客户端信息,使用客户端地址作为键
var onlineList map[string]*client
// message 通道,用于广播消息给所有在线的用户
var message chan string
// init 函数初始化在线用户列表和消息通道
func init() {
onlineList = make(map[string]*client)
message = make(chan string, 1024) // 创建一个容量为1024的缓冲通道
}
func main() {
fmt.Println("端口监听中...")
listener, err := net.Listen("tcp", "127.0.0.1:8080") // 监听本地8080端口
if err != nil {
log.Fatal(err) // 如果监听失败,记录错误并退出
}
defer listener.Close()
time.Sleep(time.Second) // 等待一秒,确保端口监听就绪
fmt.Println("端口监听成功")
go manger() // 启动消息管理的goroutine
fmt.Println("manger函数启动成功")
for {
conn, err := listener.Accept() // 接受新的客户端连接
if err != nil {
continue // 如果有错误发生,忽略并继续监听
}
go handleConnection(conn) // 为每个新连接创建一个处理goroutine
}
}
// handleConnection 处理每个客户端连接的函数
func handleConnection(conn net.Conn) {
defer conn.Close() // 确保连接在函数结束时关闭
count++ // 增加连接用户计数
fmt.Println("有新用户连接服务,当前连接数:", count)
addUser(conn) // 添加用户到在线列表
go writeMsgToClient(conn) // 启动写消息goroutine
quit := make(chan bool) // 创建退出信号通道
go readClient(conn, quit) // 启动读消息goroutine
// 监听退出信号
select {
case <-quit:
connName := onlineList[conn.RemoteAddr().String()].name
mu.Lock()
onlineList[conn.RemoteAddr().String()].userChannel <- "quit"
close(onlineList[conn.RemoteAddr().String()].userChannel)
delete(onlineList, conn.RemoteAddr().String()) // 从在线列表中删除用户
mu.Unlock()
message <- "[" + connName + "]" + "下线了" // 广播用户下线消息
count-- // 减少连接用户计数
fmt.Println("有用户下线了,当前连接数:", count)
if count == 0 {
fmt.Println("等待用户连接中...")
}
return
}
}
// (c *client) changeName 允许客户端更改其用户名
func (c *client) changeName(newUserName string) bool {
mu.Lock()
defer mu.Unlock()
c.name = newUserName // 更新用户名
message <- fmt.Sprintf("[%s] 更新昵称为: %s", c.name, newUserName)
return true
}
// manger 负责将消息广播给所有在线的用户
func manger() {
for msg := range message {
mu.Lock()
for _, v := range onlineList {
v.userChannel <- msg // 将消息发送到每个用户的通道
}
mu.Unlock()
}
}
// writeMsgToClient 负责将消息写入客户端连接
func writeMsgToClient(conn net.Conn) {
fmt.Println("函数writeMsgToClient函数开始")
for msg := range onlineList[conn.RemoteAddr().String()].userChannel {
encodedMsg, err := module.Encode(msg + "\n") // 编码消息
if err != nil {
fmt.Println("发送消息失败")
continue
}
_, err = conn.Write(encodedMsg) // 发送编码后的消息
if err != nil {
fmt.Println("发送消息失败")
continue
}
}
fmt.Println("函数writeMsgToClient函数结束")
}
// addUser 将新连接的客户端添加到在线列表中
func addUser(conn net.Conn) client {
newUser := client{
make(chan string),
conn,
"", // 用户名初始为空
conn.RemoteAddr().String(),
}
onlineList[conn.RemoteAddr().String()] = &newUser // 添加到在线列表
mu.Lock()
message <- "[ " + newUser.addr + " ]" + "上线了" // 广播上线消息
mu.Unlock()
return newUser
}
// readClient 从客户端读取消息并处理
func readClient(conn net.Conn, quit chan bool) {
fmt.Println("开始使用readClient函数")
defer fmt.Println("readClient函数结束")
reader := bufio.NewReader(conn)
for {
msg, err := module.Decode(reader) // 解码消息
if err == io.EOF {
return // 如果客户端关闭连接,退出函数
}
if err != nil {
fmt.Println("decode msg failed, err:", err)
return
}
if len(msg) == 0 {
continue
}
fmt.Println("收到client发来的数据:", msg)
// 根据消息类型进行不同的处理
switch {
case strings.HasPrefix(msg, "changeName"):
// 处理用户更改昵称的请求
king := true
oldName := onlineList[conn.RemoteAddr().String()].name
newName := strings.TrimPrefix(msg, "changeName")
if newName == "" {
newName = conn.RemoteAddr().String()
}
for _, v := range onlineList {
if v.name == newName {
mu.Lock()
message <- "[ " + oldName + " ]" + "名字:" + newName + "已存在,请更换一个名字尝试"
mu.Unlock()
king = false
break
}
}
if king {
mu.Lock()
message <- fmt.Sprintf("[ %s ] 更新昵称为: %s", oldName, newName)
mu.Unlock()
if onlineList[conn.RemoteAddr().String()].changeName(newName) {
mu.Lock()
onlineList[conn.RemoteAddr().String()].userChannel <- "changeName" + newName
mu.Unlock()
} else {
mu.Lock()
onlineList[conn.RemoteAddr().String()].userChannel <- "改名失败"
mu.Unlock()
}
}
case strings.HasPrefix(msg, "quit"):
// 处理用户退出的请求
fmt.Println("[ " + onlineList[conn.RemoteAddr().String()].name + " ]" + "下线了")
mu.Lock()
quit <- true
mu.Unlock()
return
case strings.HasPrefix(msg, "menu"):
// 发送菜单信息给用户
mu.Lock()
onlineList[conn.RemoteAddr().String()].userChannel <- "改名:changeName ;查询在线人数:online ;退出:quit"
mu.Unlock()
case strings.HasPrefix(msg, "online"):
// 响应查询在线人数的请求
fmt.Println("在线人数:", count)
mu.Lock()
message <- "当前在线人数:" + strconv.Itoa(count)
mu.Unlock()
default:
// 广播普通消息给所有用户
mu.Lock()
message <- fmt.Sprintf("[ %s ]: %s", onlineList[conn.RemoteAddr().String()].name, msg)
mu.Unlock()
}
}
}
客户端
基本流程
-
建立连接:
- 客户端尝试连接到本地的TCP服务器(
127.0.0.1:8080
)。 - 如果连接失败,打印错误信息并退出程序。
- 客户端尝试连接到本地的TCP服务器(
-
用户昵称输入:
- 通过
getUserInput
函数提示用户输入昵称。 - 用户输入的昵称通过
module.Encode
编码后发送给服务器。
- 通过
-
并发操作:
- 创建一个退出信号通道
exit
。 - 启动一个 goroutine 用于并发读取服务器消息 (
readMsg
)。 - 启动另一个 goroutine 用于并发处理用户输入并发送消息。
- 创建一个退出信号通道
-
用户输入处理:
- 在用户输入的 goroutine 中,持续获取用户输入:
- 如果输入以 “quit” 开头,编码退出消息并发送,然后通过退出通道发送退出信号。
- 对于其他输入,编码后发送给服务器。
- 在用户输入的 goroutine 中,持续获取用户输入:
-
消息读取:
- 在消息读取的 goroutine 中,使用
bufio.NewReader
读取服务器消息,并使用module.Decode
解码:- 如果消息是更改昵称的指令,更新本地昵称并打印。
- 其他消息直接打印显示。
- 在消息读取的 goroutine 中,使用
-
监听退出信号:
- 主函数中的
select
语句监听退出通道exit
。 - 接收到退出信号时,打印退出成功信息并退出主函数。
- 主函数中的
-
错误处理:
- 在编码消息、发送消息、读取消息过程中,如果发生错误,打印错误信息并退出相关操作。
-
资源清理:
- 使用
defer
关键字确保在退出时关闭网络连接和退出通道,释放资源。
- 使用
-
用户交互:
getUserInput
函数根据不同的提示信息,从标准输入读取用户输入,并处理输入结束的换行符。
-
时间戳显示:
- 在打印接收到的消息时,添加当前时间戳,以便用户知道消息的接收时间。
代码
package main
import (
"bufio"
"chatRoom/module" // 这个模块包含消息编码和解码的函数
"fmt"
"io"
"net"
"os"
"strings"
"time"
)
// 全局变量,存储用户的昵称
var name string
func main() {
// 尝试连接到服务器
conn, err := net.Dial("tcp", "127.0.0.1:8080")
if err != nil {
fmt.Println("服务器连接失败 err =", err)
return
}
defer conn.Close() // 确保连接最终会被关闭
fmt.Println("服务器连接成功")
// 获取用户输入的昵称,并将其编码后发送给服务器
name = getUserInput("请输入你的昵称:")
data, err := module.Encode("changeName" + name)
if err != nil {
fmt.Println("encode msg failed, err:", err)
return
}
_, err = conn.Write(data)
if err != nil {
fmt.Println("发送数据失败1 err =", err)
}
// 创建退出信号通道
var exit = make(chan bool)
defer close(exit)
// 并发读取服务器消息
go readMsg(conn, exit)
// 并发处理用户输入消息
go func() {
for {
msg := getUserInput("") // 获取用户输入的消息
if strings.HasPrefix(msg, "quit") {
// 如果用户输入"quit",编码并发送退出消息
data, err := module.Encode(msg)
if err != nil {
fmt.Println("encode msg failed, err:", err)
return
}
_, err = conn.Write(data)
if err != nil {
fmt.Println("发送数据失败1 err =", err)
}
fmt.Println("正在退出...")
exit <- true // 发送退出信号
return
}
// 编码用户消息并发送
data, err := module.Encode(msg)
if err != nil {
fmt.Println("encode msg failed, err:", err)
return
}
_, err = conn.Write(data)
if err != nil {
fmt.Println("发送数据失败2 err =", err)
}
}
}()
// 监听退出信号
for {
select {
case <-exit:
fmt.Println("退出成功")
return
}
}
}
// getUserInput 函数用于获取用户输入
func getUserInput(prompt string) string {
time.Sleep(time.Millisecond * 100) // 简单延迟,防止输出和输入混在一起
if prompt == "请输入你的昵称:" {
fmt.Print(prompt) // 打印提示信息
} else {
fmt.Println("[", name, "](菜单menu) :")
}
reader := bufio.NewReader(os.Stdin) // 创建标准输入的读取器
input, err := reader.ReadString('\n') // 读取用户输入
if err != nil {
fmt.Println("用户输入获取失败:err =", err)
return "客户端信息读取错误"
}
fmt.Println("信息发送成功") // 打印信息发送成功的提示
return strings.TrimSpace(input) // 清除字符串两端空白并返回
}
// readMsg 函数用于读取服务器发送的消息
func readMsg(conn net.Conn, exit chan bool) {
defer conn.Close() // 确保连接最终会被关闭
reader := bufio.NewReader(conn) // 创建连接的读取器
for {
msg, err := module.Decode(reader) // 解码服务器消息
if err == io.EOF {
fmt.Println("服务器连接已断开 ")
return
}
if err != nil {
fmt.Println("服务器断开连接 2 err =", err)
return
}
if msg == "" {
continue // 忽略空消息
}
// 如果消息以"changeName"开头,更新昵称
if strings.HasPrefix(msg, "changeName") {
msg1 := strings.TrimPrefix(msg, "changeName")
name = strings.TrimRight(msg1, "\n")
fmt.Println("当前的名字为:", name)
continue
}
// 打印接收到的消息
fmt.Print("【", time.Now().Format("15:04:05"), "】", msg)
}
}
消息封包和解包的函数
作用:防止tcp粘包的情况影响消息的读取
为什么会出现粘包
主要原因就是tcp数据传递模式是流模式,在保持长连接的时候可以进行多次的收和发。
“粘包"可发生在发送端也可发生在接收端:
- 由Nagle算法造成的发送端的粘包:Nagle算法是一种改善网络传输效率的算法。简单来说就是当我们提交一段数据给TCP发送时,TCP并不立刻发送此段数据,而是等待一小段时间看看在等待期间是否还有要发送的数据,若有则会一次把这两段数据发送出去。
- 接收端接收不及时造成的接收端粘包:TCP会把接收到的数据存在自己的缓冲区中,然后通知应用层取数据。当应用层由于某些原因不能及时的把TCP的数据取出来,就会造成TCP缓冲区中存放了几段数据。
解决办法
出现"粘包"的关键在于接收方不确定将要传输的数据包的大小,因此我们可以对数据包进行封包和拆包的操作。
封包:封包就是给一段数据加上包头,这样一来数据包就分为包头和包体两部分内容了(过滤非法包时封包会加入"包尾"内容)。包头部分的长度是固定的,并且它存储了包体的长度,根据包头长度固定以及包头中含有包体长度的变量就能正确的拆分出一个完整的数据包。
我们可以自己定义一个协议,比如数据包的前4个字节为包头,里面存储的是发送的数据的长度。
代码
package module
import (
"bufio"
"bytes"
"encoding/binary"
)
func Encode(message string) ([]byte, error) {
// 读取消息的长度,转换成int32类型(占4个字节)
var length = int32(len(message))
var pkg = new(bytes.Buffer)
// 写入消息头
err := binary.Write(pkg, binary.LittleEndian, length)
if err != nil {
return nil, err
}
// 写入消息实体
err = binary.Write(pkg, binary.LittleEndian, []byte(message))
if err != nil {
return nil, err
}
return pkg.Bytes(), nil
}
// Decode 解码消息
func Decode(reader *bufio.Reader) (string, error) {
// 读取消息的长度
lengthByte, _ := reader.Peek(4) // 读取前4个字节的数据
lengthBuff := bytes.NewBuffer(lengthByte)
var length int32
err := binary.Read(lengthBuff, binary.LittleEndian, &length)
if err != nil {
return "", err
}
// Buffered返回缓冲中现有的可读取的字节数。
if int32(reader.Buffered()) < length+4 {
return "", err
}
// 读取真正的消息数据
pack := make([]byte, int(4+length))
_, err = reader.Read(pack)
if err != nil {
return "", err
}
return string(pack[4:]), nil
}