目录
一、简单实现Netty发送消息的案例
二、websocket连接注册用户
三、实现单聊
四、群聊功能
五、案例代码
一、简单实现Netty发送消息的案例
案例一的依赖有:若没springboot项目有自动对应版本,其他版本可以使用maven仓库的最新版本。
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
</dependency>
新建Netty服务的启动器:配置如下
package com.dragonwu.server;
import com.dragonwu.server.handler.WebSocketHandler;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import io.netty.handler.stream.ChunkedWriteHandler;
/**
* @author DragonWu
* @since 2023-01-05 14:02
**/
public class IMServer {
public static void start() {
//创建线程池
EventLoopGroup boss = new NioEventLoopGroup();
//创建工作线程
EventLoopGroup worker = new NioEventLoopGroup();
//绑定端口
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(boss, worker) //将线程放入线程池
.channel(NioServerSocketChannel.class) //选择NIO的channel类型
.childHandler(new ChannelInitializer<SocketChannel>() { //初始化handler
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline pipeline = socketChannel.pipeline();
//添加http编码解码器
pipeline.addLast(new HttpServerCodec())
//支持大数据流
.addLast(new ChunkedWriteHandler())
//对http消息做聚合操作,会产生FullHttpRequest、FullHttpResponse
.addLast(new HttpObjectAggregator(1024 * 64))
//websocket支持
.addLast(new WebSocketServerProtocolHandler("/")) //websocket的根路径
.addLast(new WebSocketHandler());
}
});
//绑定Netty的启动端口
ChannelFuture future = bootstrap.bind(8888);
}
}
创建软件启动类:最简单的一个Netty服务已书写完毕,下面来进行测试
package com.dragonwu;
import com.dragonwu.server.IMServer;
/**
* @author DragonWu
* @since 2023-01-05 14:01
**/
public class NettyIMApplication {
public static void main(String[] args) {
IMServer.start();
}
}
websocket的连接测试:连接服务器
websocket在线测试
发送消息后,可以看到服务器接收到消息了,这是最简单的实现:
二、websocket连接注册用户
在实现聊天前,先来实现前后端的websocket连接注册步骤:
IMServer修改为:
package com.dragonwu.server;
import com.dragonwu.server.handler.WebSocketHandler;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import io.netty.handler.stream.ChunkedWriteHandler;
import io.netty.util.concurrent.GlobalEventExecutor;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/**
* @author DragonWu
* @since 2023-01-05 14:02
**/
public class IMServer {
public static final Map<String, Channel> USERS = new ConcurrentHashMap<>(1024);
public static final ChannelGroup GROUP=new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
public static void start() {
//创建线程池
EventLoopGroup boss = new NioEventLoopGroup();
//创建工作线程
EventLoopGroup worker = new NioEventLoopGroup();
//绑定端口
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(boss, worker) //将线程放入线程池
.channel(NioServerSocketChannel.class) //选择NIO的channel类型
.childHandler(new ChannelInitializer<SocketChannel>() { //初始化handler
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline pipeline = socketChannel.pipeline();
//添加http编码解码器
pipeline.addLast(new HttpServerCodec())
//支持大数据流
.addLast(new ChunkedWriteHandler())
//对http消息做聚合操作,会产生FullHttpRequest、FullHttpResponse
.addLast(new HttpObjectAggregator(1024 * 64))
//websocket支持
.addLast(new WebSocketServerProtocolHandler("/")) //websocket的根路径
.addLast(new WebSocketHandler());
}
});
//绑定Netty的启动端口
ChannelFuture future = bootstrap.bind(8888);
}
}
创建聊天命令枚举类:用于判断发送的数据类型
package com.dragonwu.server.domain.enums;
import lombok.AllArgsConstructor;
import lombok.Getter;
/**
* @author DragonWu
* @since 2023-01-05 14:38
* 聊天的命令枚举
**/
@Getter
@AllArgsConstructor
public enum CommandType {
CONNECTION(1001),
CHAT(1002),
JOIN_GROUP(1003),
ERROR(-1);
private final Integer code;
public static CommandType match(Integer code) {
for (CommandType value : CommandType.values()) {
if (value.getCode().equals(code)) {
return value;
}
}
return ERROR;
}
}
创建命令消息接收对象:接收到的json数据将会被转换为该类型,方便后续处理
package com.dragonwu.server.domain.pojo;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
/**
* @author DragonWu
* @since 2023-01-05 14:42
**/
@Data
@AllArgsConstructor
@NoArgsConstructor
public class Command {
//连接信息编码
private Integer code;
//用户昵称
private String nickname;
}
为了方便前后的数据接收,这里再创建一个案例的Result的vo对象:
package com.dragonwu.server.domain;
import com.alibaba.fastjson2.JSON;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.time.LocalDateTime;
/**
* @author DragonWu
* @since 2023-01-05 15:01
**/
@Data
@AllArgsConstructor
@NoArgsConstructor
public class Result {
private String name;
private LocalDateTime time;
private String message;
public static TextWebSocketFrame fail(String message) {
return new TextWebSocketFrame(JSON.toJSONString(new Result("系统消息", LocalDateTime.now(), message)));
}
public static TextWebSocketFrame success(String message) {
return new TextWebSocketFrame(JSON.toJSONString(new Result("系统消息", LocalDateTime.now(), message)));
}
public static TextWebSocketFrame success(String name, String message) {
return new TextWebSocketFrame(JSON.toJSONString(new Result(name, LocalDateTime.now(), message)));
}
}
创建连接处理器:这是WebsocketHandler主处理器的一个调用,处理连接时的操作
package com.dragonwu.server.handler;
import com.alibaba.fastjson2.JSON;
import com.dragonwu.server.IMServer;
import com.dragonwu.server.domain.Result;
import com.dragonwu.server.domain.pojo.Command;
import io.netty.channel.ChannelHandlerContext;
/**
* @author DragonWu
* @since 2023-01-05 14:58
* 连接请求的处理器
**/
public class ConnectHandler {
public static void execute(ChannelHandlerContext channelHandlerContext, Command command) {
//判断用户是否已上线
if (IMServer.USERS.containsKey(command.getNickname())) {
channelHandlerContext.channel().writeAndFlush(Result.fail("该用户已上线,请换个昵称再试~"));
//断开连接
channelHandlerContext.channel().disconnect();
return;
}
IMServer.USERS.put(command.getNickname(), channelHandlerContext.channel());
channelHandlerContext.channel().writeAndFlush(Result.success("与服务端建立连接成功"));
//返回群聊的人
channelHandlerContext.channel().writeAndFlush(Result.success(JSON.toJSONString(IMServer.USERS.keySet())));
}
}
接下来试WebsocketHanler的连接处理:调用其他处理器进行不太类型的操作
package com.dragonwu.server.handler;
import com.alibaba.fastjson2.JSON;
import com.dragonwu.server.domain.Result;
import com.dragonwu.server.domain.enums.CommandType;
import com.dragonwu.server.domain.pojo.Command;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
/**
* @author DragonWu
* @since 2023-01-05 14:21
**/
public class WebSocketHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {//通过TextWebSocketFrame作为消息承载体
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, TextWebSocketFrame textWebSocketFrame) throws Exception {
try {
//将json文本解析为指令对象
Command command = JSON.parseObject(textWebSocketFrame.text(), Command.class);
switch (CommandType.match(command.getCode())) {
case CONNECTION:
ConnectHandler.execute(channelHandlerContext, command);
break;
default:
channelHandlerContext.channel().writeAndFlush(Result.fail("不支持CODE"));
}
} catch (Exception e) {
channelHandlerContext.channel().writeAndFlush(Result.fail("错误消息:" + e.getMessage()));
}
}
}
后端到这里,重启后端服务器。
前端如下:
<!doctype html>
<html lang="en">
<head>
<meta charset="UTF-8">
<meta name="viewport"
content="width=device-width, user-scalable=no, initial-scale=1.0, maximum-scale=1.0, minimum-scale=1.0">
<meta http-equiv="X-UA-Compatible" content="ie=edge">
<title>websocket</title>
</head>
<body>
</body>
<script>
//创建一个websocket对象
let ws = new WebSocket("ws://localhost:8888")
//连接成功时触发
ws.onopen = function (event) {
console.log("连接成功");
ws.send("{\"nickname\":\"" + nickname + "\",\"code\":\"1001\"}");
}
//连接失败时触发
ws.onerror = function (event) {
console.log("连接失败");
}
//接收消息时触发
ws.onmessage = function (event) {
console.log(event)
}
//连接关闭时触发
ws.onclose = function (event) {
console.log("连接关闭")
}
</script>
</html>
打开前端页面,打开控制台看见:
此时连接已成功!
三、实现单聊
在连接成功的基础上实现,
实现思路,本次实现通过ConcurrentHashMap来存储聊天用户,线上环境时可以使用redis等来进一步拓展。
创建消息枚举类:用于明确消息的类型
package com.dragonwu.server.domain.enums;
import lombok.AllArgsConstructor;
import lombok.Getter;
/**
* @author DragonWu
* @since 2023-01-06 11:50
**/
@Getter
@AllArgsConstructor
public enum MessageType {
//私聊
PRIVATE(1),
//群聊
GROUP(2),
//不支持的类型
ERROR(-1);
private final Integer type;
public static MessageType match(Integer type) {
for (MessageType value : MessageType.values()) {
if (value.getType().equals(type)) {
return value;
}
}
return ERROR;
}
}
创建消息实体类:接收到的消息为json字符串,转换为该实体类方便后续操作
package com.dragonwu.server.domain.pojo;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
/**
* @author DragonWu
* @since 2023-01-06 11:48
**/
@Data
@AllArgsConstructor
@NoArgsConstructor
public class ChatMessage extends Command {
//消息类型
private Integer type;
//目标接收对象
private String target;
//消息内容
private String content;
}
将之前的WebsocketHandler主要处理器再添加一个分支,用于处理私聊
package com.dragonwu.server.handler;
import com.alibaba.fastjson2.JSON;
import com.dragonwu.server.domain.Result;
import com.dragonwu.server.domain.enums.CommandType;
import com.dragonwu.server.domain.pojo.Command;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
/**
* @author DragonWu
* @since 2023-01-05 14:21
**/
public class WebSocketHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {//通过TextWebSocketFrame作为消息承载体
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, TextWebSocketFrame textWebSocketFrame) throws Exception {
try {
//将json文本解析为指令对象
Command command = JSON.parseObject(textWebSocketFrame.text(), Command.class);
switch (CommandType.match(command.getCode())) {
case CONNECTION:
ConnectHandler.execute(channelHandlerContext, command);
break;
case CHAT:
ChatHandler.execute(channelHandlerContext, textWebSocketFrame);
break;
default:
channelHandlerContext.channel().writeAndFlush(Result.fail("不支持CODE"));
}
} catch (Exception e) {
channelHandlerContext.channel().writeAndFlush(Result.fail("错误消息:" + e.getMessage()));
}
}
}
前端修改为如下:前端通过ws.send()发送消息给Netty服务器,再由服务器转发给对应客户端。
<!doctype html>
<html lang="en">
<head>
<meta charset="UTF-8">
<meta name="viewport"
content="width=device-width, user-scalable=no, initial-scale=1.0, maximum-scale=1.0, minimum-scale=1.0">
<meta http-equiv="X-UA-Compatible" content="ie=edge">
<title>websocket</title>
</head>
<body>
<div>
<div>
<input id="send-msg-box" type="text">
<input id="who" type="text" placeholder="who">
<button id="send-button" onclick="sendMsg()">发送</button>
</div>
</div>
</body>
<script>
function getSearchString(key, Url) {
var str = Url;
str = str.substring(1, str.length); // 获取URL中?之后的字符(去掉第一位的问号)
// 以&分隔字符串,获得类似name=xiaoli这样的元素数组
var arr = str.split("&");
var obj = new Object();
// 将每一个数组元素以=分隔并赋给obj对象
for (var i = 0; i < arr.length; i++) {
var tmp_arr = arr[i].split("=");
obj[decodeURIComponent(tmp_arr[0])] = decodeURIComponent(tmp_arr[1]);
}
return obj[key];
}
//创建一个websocket对象
let ws = new WebSocket("ws://localhost:8888")
let search = window.location.search
let nickname = getSearchString("user", search)
//连接成功时触发
ws.onopen = function (event) {
console.log("连接成功");
ws.send("{\"nickname\":\"" + nickname + "\",\"code\":\"1001\"}");
}
//连接失败时触发
ws.onerror = function (event) {
console.log("连接失败");
}
//接收消息时触发
ws.onmessage = function (event) {
console.log(event)
}
//连接关闭时触发
ws.onclose = function (event) {
console.log("连接关闭")
}
function sendMsg() {
let msg = document.getElementById("send-msg-box").value;
let target = document.getElementById("who").value;
document.getElementById("send-msg-box").value = "";
document.getElementById("who").value = "";
ws.send('{"nickname":"' + nickname + '","code":"1002","type":"1","target":"' + target + '","content":"' + msg + '"}')
}
</script>
</html>
打开一个前端页面:记得写上user是谁,实际业务中user应该为登录的用户,
本次模拟两个用户间的聊天:
先上线John:
再上线张三:
可以看到服务器转发的在线用户列表,该消息是这个代码段转发的:
下面是聊天测试:John发送消息
张三控制台接收到消息:
如果张三指定的人是不存在的:John发送
服务器会转发出错误提示:
这部的实现代码是这里:
私聊功能基本结构就是这样实现,可结合业务就行拓展与修改。
四、群聊功能
群聊功能在私聊的基础上进行实现:
添加一个JoinGroup功能,让对应的用户模拟加入群聊:
package com.dragonwu.server.handler;
import com.dragonwu.server.IMServer;
import com.dragonwu.server.domain.Result;
import io.netty.channel.ChannelHandlerContext;
/**
* @author DragonWu
* @since 2023-01-06 13:24
**/
public class JoinGroupHandler {
public static void execute(ChannelHandlerContext channelHandlerContext){
//将Channel添加到ChannelGroup
IMServer.GROUP.add(channelHandlerContext.channel());
channelHandlerContext.channel().writeAndFlush(Result.success("加入系统默认群聊成功~"));
}
}
主处理器添加分支:
package com.dragonwu.server.handler;
import com.alibaba.fastjson2.JSON;
import com.dragonwu.server.domain.Result;
import com.dragonwu.server.domain.enums.CommandType;
import com.dragonwu.server.domain.pojo.Command;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
/**
* @author DragonWu
* @since 2023-01-05 14:21
**/
public class WebSocketHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {//通过TextWebSocketFrame作为消息承载体
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, TextWebSocketFrame textWebSocketFrame) throws Exception {
try {
//将json文本解析为指令对象
Command command = JSON.parseObject(textWebSocketFrame.text(), Command.class);
switch (CommandType.match(command.getCode())) {
case CONNECTION:
ConnectHandler.execute(channelHandlerContext, command);
break;
case CHAT:
ChatHandler.execute(channelHandlerContext, textWebSocketFrame);
break;
case JOIN_GROUP:
JoinGroupHandler.execute(channelHandlerContext);
break;
default:
channelHandlerContext.channel().writeAndFlush(Result.fail("不支持CODE"));
}
} catch (Exception e) {
channelHandlerContext.channel().writeAndFlush(Result.fail("错误消息:" + e.getMessage()));
}
}
}
聊天处理器添加分支:
package com.dragonwu.server.handler;
import com.alibaba.fastjson2.JSON;
import com.dragonwu.server.IMServer;
import com.dragonwu.server.domain.Result;
import com.dragonwu.server.domain.enums.MessageType;
import com.dragonwu.server.domain.pojo.ChatMessage;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.util.internal.StringUtil;
/**
* @author DragonWu
* @since 2023-01-06 11:52
**/
public class ChatHandler {
public static void execute(ChannelHandlerContext channelHandlerContext, TextWebSocketFrame textWebSocketFrame) {
try {
ChatMessage chat = JSON.parseObject(textWebSocketFrame.text(), ChatMessage.class);
switch (MessageType.match(chat.getType())) {
case PRIVATE:
if (StringUtil.isNullOrEmpty(chat.getTarget())) {
channelHandlerContext.channel().writeAndFlush(Result.fail("消息发送失败,消息发送前请指定接收对象"));
return;
}
Channel channel = IMServer.USERS.get(chat.getTarget());
if (null == channel || !channel.isActive()) {
channelHandlerContext.channel().writeAndFlush(Result.fail("消息发送失败,对方" + chat.getTarget() + "不在线"));
} else {
channel.writeAndFlush(Result.success("私聊消息(" + chat.getNickname() + ")", chat.getContent()));
}
break;
case GROUP:
IMServer.GROUP.writeAndFlush(Result.success("群聊消息(" + chat.getNickname() + ")", chat.getContent()));
break;
default:
channelHandlerContext.channel().writeAndFlush(Result.fail("不支持消息类型"));
}
} catch (Exception e) {
}
}
}
前端修改如下:
<!doctype html>
<html lang="en">
<head>
<meta charset="UTF-8">
<meta name="viewport"
content="width=device-width, user-scalable=no, initial-scale=1.0, maximum-scale=1.0, minimum-scale=1.0">
<meta http-equiv="X-UA-Compatible" content="ie=edge">
<title>websocket</title>
</head>
<body>
<div>
<div>
<input id="send-msg-box" type="text">
<input id="who" type="text" placeholder="who">
<button id="send-button" onclick="sendMsg()">发送</button>
</div>
</div>
</body>
<script>
function getSearchString(key, Url) {
var str = Url;
str = str.substring(1, str.length); // 获取URL中?之后的字符(去掉第一位的问号)
// 以&分隔字符串,获得类似name=xiaoli这样的元素数组
var arr = str.split("&");
var obj = new Object();
// 将每一个数组元素以=分隔并赋给obj对象
for (var i = 0; i < arr.length; i++) {
var tmp_arr = arr[i].split("=");
obj[decodeURIComponent(tmp_arr[0])] = decodeURIComponent(tmp_arr[1]);
}
return obj[key];
}
//创建一个websocket对象
let ws = new WebSocket("ws://localhost:8888")
let search = window.location.search
let nickname = getSearchString("user", search)
//连接成功时触发
ws.onopen = function (event) {
console.log("连接成功");
ws.send("{\"nickname\":\"" + nickname + "\",\"code\":\"1001\"}");
joinGroup();
}
//连接失败时触发
ws.onerror = function (event) {
console.log("连接失败");
}
//接收消息时触发
ws.onmessage = function (event) {
console.log(event)
}
//连接关闭时触发
ws.onclose = function (event) {
console.log("连接关闭")
}
function sendMsg() {
let msg = document.getElementById("send-msg-box").value;
let target = document.getElementById("who").value;
document.getElementById("send-msg-box").value = "";
document.getElementById("who").value = "";
if(target==""){
ws.send('{"nickname":"' + nickname + '","code":"1002","type":"2","content":"' + msg + '"}')
}else{
ws.send('{"nickname":"' + nickname + '","code":"1002","type":"1","target":"' + target + '","content":"' + msg + '"}')
}
}
function joinGroup() {
ws.send('{"nickname":"' + nickname + '","code":"1003"}')
}
</script>
</html>
当who为空时,直接群发:
John发送群消息后可以看到:
群里的三个人都收到了消息!
五、案例代码
Websocket案例代码: Websocket案例代码 - Gitee.com