是简单demo测试
前端
<html>
<head>
<title>Simple client</title>
<script type="text/javascript">
var ws;
function init() {
// Connect to Web Socket
ws = new WebSocket("ws://localhost:8866/ws");
// Set event handlers.
ws.onopen = function() {
output("onopen");
};
ws.onmessage = function(e) {
// e.data contains received string.
output("onmessage: " + e.data);
};
ws.onclose = function() {
output("onclose");
};
ws.onerror = function(e) {
output("onerror");
console.log(e)
};
}
function onSubmit() {
var input = document.getElementById("input");
// You can send message to the Web Socket using ws.send.
ws.send(input.value);
output("send: " + input.value);
input.value = "";
input.focus();
}
function onCloseClick() {
ws.close();
}
function output(str) {
var log = document.getElementById("log");
var escaped = str.replace(/&/, "&").replace(/</, "<").
replace(/>/, ">").replace(/"/, """); // "
log.innerHTML = escaped + "<br>" + log.innerHTML;
}
</script>
</head>
<body onload="init();">
<form onsubmit="onSubmit(); return false;">
<input type="text" id="input">
<input type="submit" value="Send">
<button onclick="onCloseClick(); return false;">close</button>
</form>
<div id="log"></div>
</body>
</html>
后端
主服务
package main
import (
"bytes"
"github.com/gorilla/websocket"
"net/http"
"time"
"websocket/impl"
)
func main() {
http.HandleFunc("/ws", wsHandle)
http.ListenAndServe("0.0.0.0:8866", nil)
}
//定义转换器
var (
upgrader = websocket.Upgrader{
//允许跨域
CheckOrigin: func(r *http.Request) bool {
return true
},
}
)
func wsHandle(w http.ResponseWriter, r *http.Request) {
var (
wsConn *websocket.Conn
err error
data []byte
conn *impl.Connection
)
if wsConn, err = upgrader.Upgrade(w, r, nil); err != nil {
return
}
if conn, err = impl.InitConnetion(wsConn); err != nil {
goto ERR
}
go func() {
var (
err error
)
for {
if err = conn.WriteMessage([]byte("heartbeat")); err != nil {
return
}
time.Sleep(5 * time.Second)
}
}()
for {
if data, err = conn.ReadMessage(); err != nil {
goto ERR
}
if err = conn.WriteMessage(data); err != nil {
goto ERR
}
}
ERR:
//todo关闭连接操作
conn.Close()
}
func BytesCombine1(pBytes ...[]byte) []byte {
length := len(pBytes)
s := make([][]byte, length)
for index := 0; index < length; index++ {
s[index] = pBytes[index]
}
sep := []byte("")
return bytes.Join(s, sep)
}
接口封装
package impl
import (
"errors"
"github.com/gorilla/websocket"
"sync"
)
type Connection struct {
wsConn *websocket.Conn
inChannel chan []byte
outChannel chan []byte
closeChannel chan byte
isClose bool
mutex sync.Mutex
}
func InitConnetion(wsConn *websocket.Conn) (conn *Connection, err error) {
conn = &Connection{
wsConn: wsConn,
inChannel: make(chan []byte, 1000),
outChannel: make(chan []byte, 1000),
closeChannel: make(chan byte, 1),
}
//读协程
go conn.readLoop()
go conn.writeLoop()
return
}
func (conn *Connection) ReadMessage() (data []byte, err error) {
select {
case data = <-conn.inChannel:
case <-conn.closeChannel:
err = errors.New("connection 已关闭")
}
return
}
func (conn *Connection) WriteMessage(data []byte) (err error) {
select {
case conn.outChannel <- data:
case <-conn.closeChannel:
err = errors.New("connection 已关闭")
}
return
}
func (conn *Connection) Close() {
conn.wsConn.Close()
if !conn.isClose {
close(conn.closeChannel)
conn.isClose = true
}
conn.mutex.Unlock()
}
//内部实现
func (conn *Connection) readLoop() {
var (
data []byte
err error
)
for {
select {
case conn.inChannel <- data:
case <-conn.closeChannel:
goto ERR
}
if _, data, err = conn.wsConn.ReadMessage(); err != nil {
goto ERR
}
conn.inChannel <- data
}
ERR:
conn.Close()
}
func (conn *Connection) writeLoop() {
var (
data []byte
err error
)
for {
data = <-conn.outChannel
if conn.wsConn.WriteMessage(websocket.TextMessage, data); err != nil {
goto ERR
}
}
ERR:
conn.Close()
}
效果
优化
内核瓶颈
cup 最理想的处理大概是每秒100万次,已经到了极限
- 减少网络小包的发送,小包大概几百字节,把同一秒中的推送的条数合并成一条,合并后
每秒推送的次数
等于连接数
,
锁瓶颈
- 打包json