写在前面
本文看下使用netty如何实现广播消息,主要依赖于类io.netty.channel.group.ChannelGroup
。
1:正戏
首先定义server:
package com.dahuyou.netty.broadcastmsg;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
public class NettyServer {
public static void main(String[] args) {
new NettyServer().bing(7397);
}
private void bing(int port) {
//配置服务端NIO线程组
EventLoopGroup parentGroup = new NioEventLoopGroup(); //NioEventLoopGroup extends MultithreadEventLoopGroup Math.max(1, SystemPropertyUtil.getInt("io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2));
EventLoopGroup childGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(parentGroup, childGroup)
.channel(NioServerSocketChannel.class) //非阻塞模式
.option(ChannelOption.SO_BACKLOG, 128)
.childHandler(new MyChannelInitializer());
ChannelFuture f = b.bind(port).sync();
f.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
childGroup.shutdownGracefully();
parentGroup.shutdownGracefully();
}
}
}
MyChannelInitializer通道初始化类:
package com.dahuyou.netty.broadcastmsg;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import java.nio.charset.Charset;
public class MyChannelInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel channel) {
/*System.out.println("远端连接初始化,IP:" + channel.remoteAddress().getHostString() + ", port:" + channel.remoteAddress().getPort());*/
/* 解码器 */
// 基于换行符号,基于换行符来分割字符串???
// channel.pipeline().addLast(new LineBasedFrameDecoder(1024));
// 基于指定字符串【换行符,这样功能等同于LineBasedFrameDecoder】
// e.pipeline().addLast(new DelimiterBasedFrameDecoder(1024, false, Delimiters.lineDelimiter()));
// 基于最大长度
// e.pipeline().addLast(new FixedLengthFrameDecoder(4));
// 解码转String,注意调整自己的编码格式GBK、UTF-8 byte->string
channel.pipeline().addLast(new StringDecoder(Charset.forName("GBK")));
// 增加内置编码器,这样向对端发送消息就不要转换为二进制数据了
channel.pipeline().addLast(new StringEncoder(Charset.forName("GBK")));
//在管道中添加我们自己的接收数据实现方法
channel.pipeline().addLast(new MyServerHandler());
}
}
在通道初始化类中定义了接收消息的字符串解码器和发送消息的字符串编码器,以及自定义了接收消息的处理器MyServerHandler:
package com.dahuyou.netty.broadcastmsg;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.socket.SocketChannel;
import java.text.SimpleDateFormat;
import java.util.Date;
public class MyServerHandler extends ChannelInboundHandlerAdapter {
/**
* 通道激活
* @param ctx
* @throws Exception
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
SocketChannel channel = (SocketChannel) ctx.channel();
// 添加通道到通道组中
ChannelHandler.channelGroup.add(ctx.channel());
System.out.println("远端连接初始化,IP:" + channel.remoteAddress().getHostString() + ", port:" + channel.remoteAddress().getPort());
// 通知客户端链接建立成功
String str = "notify channel build success: " + " " + new Date() + " " + channel.localAddress().getHostString() + "\r\n";
/*ByteBuf buf = Unpooled.buffer(str.getBytes().length);
buf.writeBytes(str.getBytes("GBK"));
ctx.writeAndFlush(buf);*/
ctx.writeAndFlush(str);
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
/*//接收msg消息
ByteBuf buf = (ByteBuf) msg;
byte[] msgByte = new byte[buf.readableBytes()];
buf.readBytes(msgByte);
System.out.print(new Date() + "接收到消息:");
System.out.println(new String(msgByte, Charset.forName("GBK")));*/
// 因为在初始化是已经设置了字符串的解码器,所以就不需要上述的手动读取二进制字节码的操作了,这就是框架的好处咯!
System.out.println(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()) + " 接收到消息:" + msg);
// 通知客户端链消息发送成功
/*String str = "server received your msg: " + new Date() + " " + msg + "\r\n";
ByteBuf buf = Unpooled.buffer(str.getBytes().length);
buf.writeBytes(str.getBytes("GBK"));
ctx.writeAndFlush(buf);*/
String str = "server received your msg: " + new Date() + " " + msg + "\r\n";
// ctx.writeAndFlush(str);
ChannelHandler.channelGroup.writeAndFlush(str);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
System.out.println("异常信息:\r\n" + cause.getMessage());
}
/**
* 当客户端主动断开服务端的链接后,这个通道就是不活跃的。也就是说客户端与服务端的关闭了通信通道并且不可以传输数据
*/
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
// 从通道组中移除通道
ChannelHandler.channelGroup.remove(ctx.channel());
System.out.println("客户端断开链接" + ctx.channel().localAddress().toString());
}
}
主要代码是ChannelHandler.channelGroup,用来维护连接中的通道,并通过代码ChannelHandler.channelGroup.writeAndFlush(str);
来实现将消息发送到通过方法ChannelHandler.channelGroup.add(ctx.channel());
添加的所有通道。ChannelHandler代码如下:
package com.dahuyou.netty.broadcastmsg;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.util.concurrent.GlobalEventExecutor;
public class ChannelHandler {
//用于存放用户Channel信息,也可以建立map结构模拟不同的消息群
// 这里的一个通道可以映射为群聊的一个群,如果是有多个群的话,创建多个通道组就可以了,之后使用比如map结构就可以维护
public static ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
}
接着我们就可以启动2个client来测试,首先启动第一个:
启动第二个:
使用其中一个说一句话,server的回复另一个client也可以收到: