1. 登入
- 在连接建立好之后,客户端发送登入,将登入消息封装为LoginRequestMessage这个类的对象,
ctx.writeAndFlush(loginRequestMessage);)
使用ctx发送,注意入站处理器调用写相关方法,会触发出站处理器(从最后向前找)。
/**
* 在连接建立好之后 触发active事件
* @param ctx
* @throws Exception
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
//负责接收用户在控制台的输入,负责向服务器发送各种消息
//改线程可以和netty的线程不相关联(不使用event loop group中的线程),独立接收输入
new Thread(() -> {
System.out.println("请输入用户名:");
String username = scanner.nextLine();
System.out.println("请输入密码");
String password = scanner.nextLine();
//应该校验用户名密码是否为空,这里省略
LoginRequestMessage loginRequestMessage = new LoginRequestMessage(username, password);
System.out.println(loginRequestMessage);
//发送消息
ctx.writeAndFlush(loginRequestMessage);//入站处理器调用写相关方法,会触发出站处理器(从最后向前找)
System.out.println("等待后续操作...");
},"system in");
}
- 用户发送完毕自己的登入用户名和密码后,客户端应该进行校验。但是校验代码比较长,直接使用
ch.pipeline().addLast(new xxxHandler());
的方式代码看起来比较冗余。因此将验证用户名和密码的代码直接封装到一个Handler类中:LoginRequestMessageHandler
,在外部new出该实例,加入到pipeline中即可。该handler读取client传过来的消息(入栈处理器,解码),再写出登入成功、失败的消息(出栈处理器,编码)。
注意:
SimpleChannelInboundHandler<LoginRequestMessage>
该handler只处理LoginRequestMessage,意思就是client发送再多种消息,当前这个handler只处理LoginRequestMessage这一种请求登入的消息,进行用户名密码验证。这样好处就是,不需要将所有类型的消息接收到,在判断是不是请求登入的消息,再进行处理。
@ChannelHandler.Sharable
public class LoginRequestMessageHandler extends SimpleChannelInboundHandler<LoginRequestMessage> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, LoginRequestMessage msg) throws Exception {
String username = msg.getUsername();
String password = msg.getPassword();
boolean login = UserServiceFactory.getUserService().login(username, password);//后期可以优化到数据库中查询,这里简单在内存中查
LoginResponseMessage message;
if (login) {
SessionFactory.getSession().bind(ctx.channel(), username);
message = new LoginResponseMessage(true, "登录成功");
} else {
message = new LoginResponseMessage(false, "用户名或密码不正确");
}
ctx.writeAndFlush(message);
}
}
之后客户端new出这个类的实例,加入到pipeline中 socketChannel.pipeline().addLast(LOGIN_REQMSG_HANDLER);
- server发送登入验证之后,服务器要接收这个消息,加一个入站处理器,实现接收server的响应消息。
- 注意,这里不能再在channelActive下面处理接收消息了,因为channelActive是连接建立完成之后才会触发的。应该加一个channelRead事件
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
//对比学习,LoginResponseMessage发送的类对应的是SimpleChannelInboundHandler<LoginRequestMessage>
// 其只对应LoginReqMeg一种消息进行处理,现在这个handler是对所有的msg都进行处理,所以要判断类型。
if (msg instanceof LoginResponseMessage){
LoginResponseMessage loginResponseMessage = (LoginResponseMessage) msg;
log.info("{}",loginResponseMessage);
if (loginResponseMessage.isSuccess()){
//多线程的原子性操作!
SUCCESS_LOGIN.set(true);
}
// 唤醒 system in 线程
WAIT_FOR_LOGIN.countDown();
}
}
- 重点 ,实现线程之间的通信, 主要使用:
CountDownLatch WAIT_FOR_LOGIN = new CountDownLatch(1)
配合WAIT_FOR_LOGIN.countDown();
就可以唤醒线程;WAIT_FOR_LOGIN.await();
进入等待(await就是等待计数器变为0)。
- CountDownLatch是一个同步工具类,它通过一个计数器来实现的,初始值为线程的数量。每当一个线程完成了自己的任务,计数器的值就相应得减1。当计数器到达0时,表示所有的线程都已执行完毕,然后在等待的线程就可以恢复执行任务。
应用场景:
- 某个线程需要在其他n个线程执行完毕后再向下执行(本项目2个线程例子)
- 多个线程并行执行同一个任务,提高响应速度(其他多线程在任务执行完毕后,await唤醒主线程,可以接下去做别的任务)
本项目中:
在client输入登入的用户名,密码后,线程要停下来await(释放了cpu),等待server验证登入结果。
如果server那边验证通过了,client线程放行,选择聊天室场景。
如果server验证失败,client这边线程也要继续,但是可以关闭管道。
上面就涉及两个线程之间的通信,一个netty的nio线程,一个自定义线程,需要一个线程等待。
2. client端进行 聊天场景选择
之前完成了登入,现在完成登入之后,要进行聊天场景的配置,分别有:单聊、小组聊天、创建群聊、查看小组成员、退出群聊、加入群聊等。
// 如果登录失败
if (!LOGIN.get()) {
ctx.channel().close();
return;
}
while (true) {
System.out.println("==================================");
System.out.println("send [username] [content]");
System.out.println("gsend [group name] [content]");
System.out.println("gcreate [group name] [m1,m2,m3...]");
System.out.println("gmembers [group name]");
System.out.println("gjoin [group name]");
System.out.println("gquit [group name]");
System.out.println("quit");
System.out.println("==================================");
String command = null;
try {
command = scanner.nextLine();
} catch (Exception e) {
break;
}
if(EXIT.get()){
return;
}
- 即:要求用户按照上面的格式进行输入,因为scanner.nextLine()拿到的是一个字符串,就需要把这个字符串进行解析,以空格进行分割,第一个关键字代表聊天的性质(通过这个选择发送消息的封装格式),第二个关键字是名称,第三个关键字依次类推。
下面站在client角度进行场景分析:
2.1 场景一:ChatRequestMessage 请求聊天信息
最关键的消息是从 当前用户 发送着,接收消息者,消息内容
从字符串中解析出的字段一,进行配置,write写出消息,触发出站操作,对该消息进行自定义协议封装。
2.2 场景二 :GroupChatRequestMessage 请求给某个群聊发送消息
主要看构造方法
2.3 场景三: GroupCreateRequestMessage 请求创建群聊
- 对应输入System.out.println(“gcreate [group name] [m1,m2,m3…]”);
- 解析输入,第三个字段是用户,创建群里用户应该去重,可以放在set集合中
2.4 场景四: GroupMembersRequestMessage获取群里的成员
2.5 场景五、六:GroupJoinRequestMessage、GroupQuitRequestMessage 加入、退出群聊
- 该模块client整体代码
// 在连接建立后触发 active 事件
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
// 负责接收用户在控制台的输入,负责向服务器发送各种消息
new Thread(() -> {
System.out.println("请输入用户名:");
String username = scanner.nextLine();
if(EXIT.get()){
return;
}
System.out.println("请输入密码:");
String password = scanner.nextLine();
if(EXIT.get()){
return;
}
// 构造消息对象
LoginRequestMessage message = new LoginRequestMessage(username, password);
System.out.println(message);
// 发送消息
ctx.writeAndFlush(message);
System.out.println("等待后续操作...");
try {
WAIT_FOR_LOGIN.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
// 如果登录失败
if (!LOGIN.get()) {
ctx.channel().close();
return;
}
while (true) {
System.out.println("==================================");
System.out.println("send [username] [content]");
System.out.println("gsend [group name] [content]");
System.out.println("gcreate [group name] [m1,m2,m3...]");
System.out.println("gmembers [group name]");
System.out.println("gjoin [group name]");
System.out.println("gquit [group name]");
System.out.println("quit");
System.out.println("==================================");
String command = null;
try {
command = scanner.nextLine();
} catch (Exception e) {
break;
}
if(EXIT.get()){
return;
}
String[] s = command.split(" ");
switch (s[0]){
case "send":
ctx.writeAndFlush(new ChatRequestMessage(username, s[1], s[2]));
break;
case "gsend":
ctx.writeAndFlush(new GroupChatRequestMessage(username, s[1], s[2]));
break;
case "gcreate":
Set<String> set = new HashSet<>(Arrays.asList(s[2].split(",")));
set.add(username); // 加入自己
ctx.writeAndFlush(new GroupCreateRequestMessage(s[1], set));
break;
case "gmembers":
ctx.writeAndFlush(new GroupMembersRequestMessage(s[1]));
break;
case "gjoin":
ctx.writeAndFlush(new GroupJoinRequestMessage(username, s[1]));
break;
case "gquit":
ctx.writeAndFlush(new GroupQuitRequestMessage(username, s[1]));
break;
case "quit":
ctx.channel().close();
return;
}
}
}, "system in").start();
}
3. 服务器端对client各个聊天场景请求进行回应
3.1 封装思想
- 为了简洁,将每个入站处理器封装到类中,都继承:
extends SimpleChannelInboundHandler<XXXXRequestMessage>
,这个每个client请求消息都能直接对应到目标的入站处理器,无需在channelRead方法之内做判断后分配。
- 技巧:
可以在写好匿名内部类之后使用idea进行重构,先构建成为独立的内部类
在将内部类提出到独立的文件中
3.2 通信逻辑
- 现在例如用户A要给用户B发消息:clientA->server->clientB
- 在每个入站处理器内,需要先找到client发送目的地对应的channel,如果在,说明通信的对方clientB在线,那server就能够成功发送消息给B,之间的桥梁就是:server和clientB之间的channel,只要clientB和server连接着,那么这个channel就在,就可以成功把消息发过去
- 那如何判断channel到底还在不在?------>之前在每个用户登入的时候,就已经将用户名对应的channel绑定,并把channel放到session中,后期需要这个channel就可以用session直接根据用户名获取即可。如果发现session中没有目标的channel,那说明通信对方clientB还没和服务器连起来,发送失败!
- 注意,上面说的session是自己定义的,主要由模板方法设计思想,Session是自己写的接口,不是HttpServlet在Web开发中的那个session。自定义的这个session底层使用ConcurrentHashMap来实现。
上图是绑定用户名和channel到session中,下图是获取对应的channel进行消息发送
3.3 具体场景之:向群聊发送消息 GroupChatRequestMessage
- client向某个群聊发送消息后,server这边收到需要向这个群聊的每一个成员都发送消息,即需要拿到每个成员与server之间对应的channel,使用channel 发送消息。
- 如何通过群聊的名称获取群聊每个组员的channel?—相关方法已经封装到了模板方法中
- 先通过群聊名称拿到群聊中的组员名字
@Override
public Set<String> getMembers(String name) {
return groupMap.getOrDefault(name, Group.EMPTY_GROUP).getMembers();
}
- 再通过每个组员名字的set集合拿到channel集合,使用了stream流进行操作
@Override
public List<Channel> getMembersChannel(String name) {
return getMembers(name).stream()
.map(member -> SessionFactory.getSession().getChannel(member))
.filter(Objects::nonNull) //处理还在线的
.collect(Collectors.toList());
}