文章目录
- 〇、代码逻辑
- 一、搭建Server
- 1.引入依赖
- 2.搭建一个简单的Server
- 二、搭建WebSocket建立连接
- 1.修改Server,增加一些支持
- 2.自定义一个WebSocketHandler
- 三、功能实现——用户注册上线
- 1.先定义一个工具类Result,用于封装服务端返回消息
- 2.封装客户端指令
- 3.完善WebSocketHandler
- 4.给Server添加一个存放用户映射关系的Map
- 5.定义ConnectionHandler,用于处理用户上线功能的逻辑
- 6.最后,运行项目来进行测试
- 四、功能实现——私聊
- 1.扩展Command类,增加封装后的消息协议
- 2.修改WebSocketHandler,增加对聊天功能的处理ChatHandler
- 3.定义ChatHandler用于处理聊天任务
- 4.运行项目进行测试
- 五、功能实现——群聊
- 1.给Server类添加一个channel组,实现系统默认群聊组
- 2.给CommandType增加一个加入群聊指令的枚举类型
- 3.在WebSocketHandler中增加加入群组功能的处理
- 4.定义JoinHandler,用于处理加入群聊的业务逻辑
- 5.在ChatHandler中增加发送群聊消息的代码
- 6.运行项目进行测试
项目源码: github
websocket在线测试工具(在没有前端的情况下也可以与Server连接并进行通信): http://websocket-test.com/
在前两篇博客中我介绍了Java IO通信模型和Netty核心概念,在这篇博客中我会展示如何使用Netty开发一个仿WeChat通讯工具——SmartChat。
我们都知道,Netty是一个异步基于事件驱动的高性能网络通信框架,下面我们使用它来一步步搭建SmartChat。
〇、代码逻辑
一、搭建Server
首先,使用熟悉的IDE工具创建一个Java项目,我命名为了SmartChat。
1.引入依赖
<!-- netty -->
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.85.Final</version>
</dependency>
<!-- lombok工具 -->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.24</version>
<scope>provided</scope>
</dependency>
<!-- json -->
<dependency>
<groupId>com.alibaba.fastjson2</groupId>
<artifactId>fastjson2</artifactId>
<version>2.0.18</version>
</dependency>
2.搭建一个简单的Server
package tracy;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
public class Server {
public static void start(){
//主线程池
EventLoopGroup bossPool=new NioEventLoopGroup();
//副线程池
EventLoopGroup workPool=new NioEventLoopGroup();
//用于监听端口
ServerBootstrap bootstrap=new ServerBootstrap();
bootstrap.group(bossPool,workPool)//放入两个线程池
.channel(NioServerSocketChannel.class)//指定channel
.childHandler(new ChannelInitializer<SocketChannel>() {//初始化channel
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
}
});
//监听端口
ChannelFuture future=bootstrap.bind(8080);
}
}
- 在项目启动类中调用Server类的start():
package tracy;
/**
* 启动类
*/
public class Main {
public static void main(String[] args) {
System.out.println("SmartChat!!!");
System.out.println("Hello and welcome!");
Server.start();
}
}
- 运行启动类:
二、搭建WebSocket建立连接
websocket用于在服务端和客户端之间建立连接。
1.修改Server,增加一些支持
package tracy;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import io.netty.handler.stream.ChunkedWriteHandler;
public class Server {
public static void start () throws InterruptedException{
//主线程池
EventLoopGroup bossPool=new NioEventLoopGroup();
//副线程池
EventLoopGroup workPool=new NioEventLoopGroup();
//用于监听端口
ServerBootstrap bootstrap=new ServerBootstrap();
bootstrap.group(bossPool,workPool)//放入两个线程池
.channel(NioServerSocketChannel.class)//指定channel
.childHandler(new ChannelInitializer<SocketChannel>() {//初始化channel
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
//获取pipeline,pipeline的工作是基于责任链模式
ChannelPipeline pipeline=socketChannel.pipeline();
//添加一些handler
//http编码解码器
pipeline.addLast(new HttpServerCodec())
//对大数据量的支持
.addLast(new ChunkedWriteHandler())
//对http消息进行聚合
.addLast(new HttpObjectAggregator(1024*24))
//对websocket进行支持
.addLast(new WebSocketServerProtocolHandler("/"))
//websocket具体怎么处理,需要自定义
.addLast(new WebSocketHandler());
}
});
//监听端口
bootstrap.bind(8080);
}
}
2.自定义一个WebSocketHandler
package tracy;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
/**
* TextWebSocketFrame表示消息体为文本类型
*/
public class WebSocketHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {
//先做一个最简单的处理,把消息内容直接打印出来
System.out.println("客户端消息:"+msg.text());
}
}
- 启动项目后去websocket在线测试网站http://websocket-test.com/进行测试:
- 成功:
三、功能实现——用户注册上线
功能概述: 用户A要想与用户B进行私聊,首先需要保存用户A和B和Server的连接标识,即需要保存映射关系。
- 优化目录结构:
首先,我们先来优化一下目录结构,使其更清晰。
command用来存放客户端的指令,handler用来存放我们的一些处理逻辑,util存放工具类。
1.先定义一个工具类Result,用于封装服务端返回消息
package tracy.util;
import com.alibaba.fastjson2.JSON;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import lombok.AllArgsConstructor;
import lombok.Data;
import java.time.LocalDateTime;
@Data
@AllArgsConstructor
public class Result {
private String name;//消息类型
private LocalDateTime time;
private String message;//消息内容
public static TextWebSocketFrame fail(String message){
return new TextWebSocketFrame(JSON.toJSONString(new Result("系统消息",LocalDateTime.now(),message)));
}
public static TextWebSocketFrame success(String message){
return new TextWebSocketFrame(JSON.toJSONString(new Result("系统消息",LocalDateTime.now(),message)));
}
public static TextWebSocketFrame success(String name,String message){
return new TextWebSocketFrame(JSON.toJSONString(new Result(name,LocalDateTime.now(),message)));
}
}
2.封装客户端指令
- 定义一个Command类,用于描述客户端的指令:
package tracy.command;
import lombok.Data;
@Data
public class Command {
//用户昵称
private String nickname;
//指令
private Integer code;
}
- 定义一个CommandType枚举类型,用于罗列客户端的指令类型:
package tracy.command;
import lombok.AllArgsConstructor;
import lombok.Getter;
@Getter
@AllArgsConstructor
public enum CommandType{
//指令:建立连接
CONNECTION(1),
//指令:错误指令
ERROR(0),
;
private final Integer code;
public static CommandType match(Integer code){
//遍历枚举类型的所有值,看输入的code是否能与某一个匹配上
for (CommandType value:CommandType.values()){
if(value.getCode().equals(code))return value;
}
//匹配不上,说明输入的指令不是合法的枚举类
return ERROR;
}
}
3.完善WebSocketHandler
package tracy.handler;
import com.alibaba.fastjson2.JSON;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import tracy.command.*;
import tracy.util.Result;
/**
* TextWebSocketFrame表示消息体为文本类型
*/
public class WebSocketHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {
//将消息解析成一个Command能兼容的对象
try{
//将json形式的文本解析成Command类
Command command= JSON.parseObject(msg.text(),Command.class);
//每一种指令都定义一个对应的Handler来进行处理
switch(CommandType.match(command.getCode())){
case CONNECTION: ConnectionHandler.execute(command,ctx);break;
default: ctx.channel().writeAndFlush(Result.fail("不支持的CODE"));
}
}catch (Exception e){
e.printStackTrace();
ctx.channel().writeAndFlush(Result.fail("不支持的CODE"));
}
}
}
4.给Server添加一个存放用户映射关系的Map
//用于保存映射关系
public static final Map<String,Channel> USERS=new ConcurrentHashMap<>();
5.定义ConnectionHandler,用于处理用户上线功能的逻辑
package tracy.handler;
import com.alibaba.fastjson2.JSON;
import io.netty.channel.ChannelHandlerContext;
import tracy.Server;
import tracy.command.Command;
import tracy.util.Result;
public class ConnectionHandler {
public static void execute(Command command, ChannelHandlerContext ctx){
//容错处理:避免相同昵称的用户重复上线
if(Server.USERS.containsKey(command.getNickname())){
ctx.channel().writeAndFlush(Result.fail("该用户已上线,请更换昵称后重试!"));
ctx.channel().disconnect();
return;
}
//将用户加入到服务端的映射队列中
Server.USERS.put(command.getNickname(),ctx.channel());
//返回一条表示用户上线成功的消息
ctx.channel().writeAndFlush(Result.success("与服务端连接建立成功!"));
//再以json字符串的形式返回当前在线的用户列表
ctx.channel().writeAndFlush(Result.success(JSON.toJSONString(Server.USERS.keySet())));
}
}
6.最后,运行项目来进行测试
http://websocket-test.com/
wang001上线成功
{
"nickname": "tracy001",
"code": 1
}
新开一个测试窗口,tracy002上线成功
{
"nickname": "tracy002",
"code": 1
}
新开一个测试窗口,tracy001上线失败,原因是昵称重复
{
"nickname": "tracy001",
"code": 1
}
新开一个测试窗口,tracy003上线失败,原因是指令=2不合法
{
"nickname": "tracy003",
"code": 2
}
四、功能实现——私聊
1.扩展Command类,增加封装后的消息协议
- Command:
package tracy.command;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
@Data
@AllArgsConstructor
@NoArgsConstructor
public class Command {
//用户昵称
private String nickname;
//指令
private Integer code;
//消息
private Message message;
}
- CommandType也需要增加相应的枚举值:
//指令:聊天
CHAT(2),
- Message:
package tracy.command;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
@Data
@AllArgsConstructor
@NoArgsConstructor
public class Message{
//消息类型
private Integer type;
//接收对象
private String target;
//内容
private String content;
public Message(Integer type,String target){
this.type=type;
this.target=target;
}
public Message(Integer type){
this.type=type;
}
}
- MessageType:
package tracy.command;
import lombok.AllArgsConstructor;
import lombok.Getter;
@Getter
@AllArgsConstructor
public enum MessageType {
//私聊
PRIVATE(1),
//群聊
GROUP(2),
//错误
ERROR(0);
private Integer type;
public static MessageType match(Integer type){
//遍历枚举类型的所有值,看输入的type否能与某一个匹配上
for (MessageType value:MessageType.values()){
if(value.getType().equals(type))return value;
}
//匹配不上,说明输入的type不是合法的枚举类
return ERROR;
}
}
2.修改WebSocketHandler,增加对聊天功能的处理ChatHandler
package tracy.handler;
import com.alibaba.fastjson2.JSON;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import tracy.command.*;
import tracy.util.Result;
/**
* TextWebSocketFrame表示消息体为文本类型
*/
public class WebSocketHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {
try{
//将json形式的文本解析成Command类
Command command= JSON.parseObject(msg.text(),Command.class);
//每一种指令都定义一个对应的Handler来进行处理
switch(CommandType.match(command.getCode())){
case CONNECTION: ConnectionHandler.execute(command,ctx);break;
case CHAT: ChatHandler.execute(command,ctx);break;
default: ctx.channel().writeAndFlush(Result.fail("不支持的CODE"));
}
}catch (Exception e){
e.printStackTrace();
ctx.channel().writeAndFlush(Result.fail("发送消息格式错误,请确认后再试"));
}
}
}
3.定义ChatHandler用于处理聊天任务
package tracy.handler;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import tracy.Server;
import tracy.command.Command;
import tracy.command.Message;
import tracy.command.MessageType;
import tracy.util.Result;
public class ChatHandler {
//用户聊天逻辑
public static void execute(Command command, ChannelHandlerContext ctx){
try{
Message message=command.getMessage();
//按不同聊天类型进行处理
switch(MessageType.match(message.getType())){
//私聊
case PRIVATE: {
//信息接收对象为空
String target=message.getTarget();
Channel channel=Server.USERS.get(target);
if(target==null||target.isEmpty()){
ctx.channel().writeAndFlush(Result.fail("接收者信息为空,请确认后再试"));
return;
}
//信息接收对象不存在
else if(channel==null){
ctx.channel().writeAndFlush(Result.fail("接收者"+target+"不存在,请确认后再试"));
return;
}
//信息接收对象下线了
else if(!channel.isActive()){
ctx.channel().writeAndFlush(Result.fail("接收者"+target+"已下线,请确认后再试"));
return;
}
else{
channel.writeAndFlush(Result.success("私聊消息("+command.getNickname()+")",message.getContent()));
}
};break;
//群聊,先空着,下一章实现
case GROUP: ;break;
default: ctx.channel().writeAndFlush(Result.fail("不支持的TYPE"));
}
}catch (Exception e){
e.printStackTrace();
ctx.channel().writeAndFlush(Result.fail("发送消息格式错误,请确认后再试"));
}
}
}
4.运行项目进行测试
http://websocket-test.com/
wang001、wang002、wang003上线成功
wang001向wang002发送消息,wang002接收到了
{
"nickname": "wang001",
"code": 2,
"message": {
"type": 1,
"target": "wang002",
"content": "你好,我是1号"
}
}
wang001向并不存在的wang004发送消息,失败
wang003下线,然后wang001向下线的wang003发送消息,失败
五、功能实现——群聊
功能概述:系统提供一个群里组,但是用户需要有加入群聊这个操作,才能进行后续的收发群聊消息。
1.给Server类添加一个channel组,实现系统默认群聊组
//添加一个channel组,用于实现群聊一对多通信
public static final ChannelGroup CHANNEL_GROUP=new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
2.给CommandType增加一个加入群聊指令的枚举类型
//指令:加入群聊
JOIN(3),
3.在WebSocketHandler中增加加入群组功能的处理
switch(CommandType.match(command.getCode())){
case CONNECTION: ConnectionHandler.execute(command,ctx);break;
case CHAT: ChatHandler.execute(command,ctx);break;
case JOIN: JoinHandler.execute(ctx);break;
default: ctx.channel().writeAndFlush(Result.fail("不支持的CODE"));
}
4.定义JoinHandler,用于处理加入群聊的业务逻辑
package tracy.handler;
import io.netty.channel.ChannelHandlerContext;
import tracy.Server;
import tracy.util.Result;
public class JoinHandler {
public static void execute(ChannelHandlerContext ctx){
Server.CHANNEL_GROUP.add(ctx.channel());
ctx.channel().writeAndFlush(Result.success("加入系统默认群聊成功"));
}
}
5.在ChatHandler中增加发送群聊消息的代码
switch(MessageType.match(message.getType())){
//私聊
case PRIVATE: {
……
};break;
//群聊
case GROUP: {
Server.CHANNEL_GROUP.writeAndFlush(Result.success("群聊消息("+command.getNickname()+")",message.getContent()));
};break;
default: ctx.channel().writeAndFlush(Result.fail("不支持的TYPE"));
}
6.运行项目进行测试
http://websocket-test.com/
自行完成这一工作。
- 加入群聊json:
{
"nickname": "wang001",
"code": 3
}
- 发送群聊消息json:
{
"nickname": "wang001",
"code": 2,
"message": {
"type": 2,
"content": "你好,我是1号"
}
}
成功!
到这里,我们就完成了mini版微信聊天工具SmartChat的开发工作了,在此基础上,可以增加前端的开发,以及更多功能的实现,实际上开发步骤都是类似的,只是针对不同的功能具体的业务逻辑不同罢了,感兴趣的同学可以尝试着扩展一下SmartChat的功能。
源码请看文章的最顶部。感谢阅读。