Netty实战(十六)

news2024/11/23 8:04:11

UDP广播事件(二)编写广播者和监视器

  • 一、编写广播者
  • 二、编写监视器
  • 三、运行 LogEventBroadcaster 和 LogEventMonitor

一、编写广播者

Netty 提供了大量的类来支持 UDP 应用程序的编写。下面我们列出一些要用到的类型:

名 称描 述
interface AddressedEnvelope<M, A extends SocketAddress>extends ReferenceCounted定义一个消息,其包装了另一个消息并带有发送者和接收者地址。其中 M 是消息类型;A 是地址类型
class DefaultAddressedEnvelope<M, A extends SocketAddress>implements AddressedEnvelope<M,A>提供了 interface AddressedEnvelope的默认实现
class DatagramPacketextends DefaultAddressedEnvelope<ByteBuf, InetSocketAddress>扩展了 DefaultAddressedEnvelope 以使用 ByteBuf 作为消息数据容器
implements ByteBufHolder扩展了 DefaultAddressedEnvelope 以使用 ByteBuf 作为消息数据容器
interface DatagramChannelextends Channel扩展了 Netty 的 Channel 抽象以支持 UDP 的多播组管理
class NioDatagramChannnelextends AbstractNioMessageChannelimplements DatagramChannel定义了一个能够发送和接收 AddressedEnvelope 消息的 Channel 类型

Netty 的 DatagramPacket 是一个简单的消息容器,DatagramChannel 实现用它来和远程节点通信。类似于在我们先前的类比中的明信片,它包含了接收者(和可选的发送者)的地址以及消息的有效负载本身。

要将 LogEvent 消息转换为 DatagramPacket,我们将需要一个编码器。但是没有必要从头开始编写我们自己的。我们将扩展 Netty 的 MessageToMessageEncoder,这个我们之前使用过。

下面我们看一下将要广播的信息:我们将展示正在广播的 3 个日志条目,每一个都将通过一个专门的 DatagramPacket进行广播。
在这里插入图片描述

这是该 LogEventBroadcaster 的 ChannelPipeline 的一个高级别视图,展示了 LogEvent 消息是如何流经它的。
在这里插入图片描述

我们会将所有的将要被传输的数据都被封装在了 LogEvent 消息中。LogEventBroadcaster 将把这些写入到 Channel 中,并通过 ChannelPipeline 发送它们,在那里它们将会被转换(编码)为 DatagramPacket 消息。最后,他们都将通过 UDP 被广播,并由远程节点(监视器)所捕获。

下面我们自定义一个MessageToMessageEncoder来执行上面所说的转换:

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.socket.DatagramPacket;
import io.netty.handler.codec.MessageToMessageEncoder;
import io.netty.util.CharsetUtil;

import java.net.InetSocketAddress;
import java.util.List;

/**
 * Author: lhd
 * Data: 2023/6/13
 * Annotate:
 */
public class LogEventEncoder extends MessageToMessageEncoder<LogEvent> {
    private final InetSocketAddress remoteAddress;

    //LogEventEncoder 创建了即将被发送到指定的InetSocketAddress 的 DatagramPacket 消息
    public LogEventEncoder(InetSocketAddress remoteAddress) {
        this.remoteAddress = remoteAddress;
    }
    @Override
    protected void encode(ChannelHandlerContext channelHandlerContext, LogEvent logEvent, List<Object> out) throws Exception {
        byte[] file = logEvent.getLogfile().getBytes(CharsetUtil.UTF_8);
        byte[] msg = logEvent.getMsg().getBytes(CharsetUtil.UTF_8);
        ByteBuf buf = channelHandlerContext.alloc()
                .buffer(file.length + msg.length + 1);
        //将文件名写入到 ByteBuf 中
        buf.writeBytes(file);
        //添加一个SEPARATOR
        buf.writeByte(LogEvent.SEPARATOR);
        //将日志消息写入ByteBuf 中
        buf.writeBytes(msg);
        //将一个拥有数据和目的地地址的新 DatagramPacket 添加到出站的消息列表中
        out.add(new DatagramPacket(buf, remoteAddress));
    }
}

在 LogEventEncoder 被实现之后,我们就该准备引导该服务器,其包括设置各种各样的 ChannelOption,以及在 ChannelPipeline 中安装所需要的 ChannelHandler,这些都要通过主类 LogEventBroadcaster 完成。


import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioDatagramChannel;

import java.io.File;
import java.io.RandomAccessFile;
import java.net.InetSocketAddress;

/**
 * Author: lhd
 * Data: 2023/6/13
 * Annotate: 广播者组件
 */
public class LogEventBroadcaster {
    private final EventLoopGroup group;
    private final Bootstrap bootstrap;
    private final File file;
    public LogEventBroadcaster(InetSocketAddress address, File file) {
        group = new NioEventLoopGroup();
        bootstrap = new Bootstrap();
        //引导该 NioDatagramChannel(无连接的)
        bootstrap.group(group).channel(NioDatagramChannel.class)
                //设置 SO_BROADCAST 套接字选项
                .option(ChannelOption.SO_BROADCAST, true)
                .handler(new LogEventEncoder(address));
        this.file = file;
    }
    public void run() throws Exception {
        //绑定 Channel
        Channel ch = bootstrap.bind(0).sync().channel();
        long pointer = 0;
        //启动主处理循环
        for (;;) {
            long len = file.length();
            if (len < pointer) {
               // 文件已重置
                //如果有必要,将文件指针设置到该文件的最后一个字节
                pointer = len;
            } else if (len > pointer) {
                // 已添加内容
                RandomAccessFile raf = new RandomAccessFile(file, "r");
                //设置当前的文件指针,以确保没有任何的旧日志被发送
                raf.seek(pointer);
                String line;
                while ((line = raf.readLine()) != null) {
                    //对于每个日志条目,写入一个LogEvent到Channel 中
                    ch.writeAndFlush(new LogEvent(null, -1,
                            file.getAbsolutePath(), line));
                }
                //存储其在文件中的当前位置
                pointer = raf.getFilePointer();
                raf.close();
            }
            try {
                //休眠 1 秒,如果被中断,则退出循环;否则重新处理它
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                Thread.interrupted();
                break;
            }
        }
    }
    public void stop() {
        group.shutdownGracefully();
    }
    public static void main(String[] args) throws Exception {
        if (args.length != 2) {
            throw new IllegalArgumentException();
        }
        //创建并启动一个新的LogEventBroadcaster的实例
        LogEventBroadcaster broadcaster = new LogEventBroadcaster(
                new InetSocketAddress("255.255.255.255",
                        Integer.parseInt(args[0])), new File(args[1]));
        try {
            broadcaster.run();
        }
        finally {
            broadcaster.stop();
        }
    }
}

这样就完成了该应用程序的广播者组件。

二、编写监视器

我们定义一个 LogEventMonitor。

这个程序将:
(1)接收由 LogEventBroadcaster 广播的 UDP DatagramPacket;
(2)将它们解码为 LogEvent 消息;
(3)将 LogEvent 消息写出到 System.out。

和之前一样,该逻辑由一组自定义的 ChannelHandler 实现,我们扩展 MessageToMessageDecoder。

下面是一个LogEventMonitor 的 ChannelPipeline,展示了LogEvent 是如何流经它的
在这里插入图片描述

综上所述,我们需要2个解码器来处理LogEvent ,它们分别是LogEventDecoder 、LogEventHandler。

LogEventDecoder :

1、ChannelPipeline 中的第一个解码器LogEventDecoder 负责将传入的DatagramPacket解码为LogEvent 消息(一个用于转换入站数据的任何 Netty 应用程序的典型设置)

下面我们开始写LogEventDecoder的实现:


import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.socket.DatagramPacket;
import io.netty.handler.codec.MessageToMessageDecoder;
import io.netty.util.CharsetUtil;

import java.util.List;

/**
 * Author: lhd
 * Data: 2023/6/13
 * Annotate:LogEventDecoder
 */
public class LogEventDecoder extends MessageToMessageDecoder<DatagramPacket> {

    @Override
    protected void decode(ChannelHandlerContext ctx, DatagramPacket datagramPacket, List<Object> out) throws Exception {
        //获取对 DatagramPacket 中的数据(ByteBuf)的引用
        ByteBuf data = datagramPacket.content();
        //获取该 SEPARATOR 的索引
        int idx = data.indexOf(0, data.readableBytes(), LogEvent.SEPARATOR);
        //提取文件名
        String filename = data.slice(0, idx).toString(CharsetUtil.UTF_8);
        //提取日志消息
        String logMsg = data.slice(idx + 1, data.readableBytes()).toString(CharsetUtil.UTF_8);
        //构建一个新的 LogEvent 对象,并且将它添加到(已经解码的消息的)列表中
        LogEvent event = new LogEvent(datagramPacket.sender(),
                System.currentTimeMillis(), filename, logMsg);
        out.add(event);
    }
}

LogEventHandler:

2、第二个 ChannelHandler 的工作是对第一个 ChannelHandler 所创建的 LogEvent 消息执行一些处理。在这个场景下,它只是简单地将它们写出到 System.out。在真实世界的应用程序中,可能需要聚合来源于不同日志文件的事件,或者将它们发布到数据库中。

下面我们开始写LogEventHandler的实现:

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;

/**
 * Author: lhd
 * Data: 2023/6/13
 * Annotate: LogEventHandler
 */
public class LogEventHandler extends SimpleChannelInboundHandler<LogEvent> { //扩展 SimpleChannelInboundHandler 以处理 LogEvent 消息

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        //当异常发生时,打印栈跟踪信息,并关闭对应的 Channe
        cause.printStackTrace();
        ctx.close();
    }
    @Override
    public void channelRead0(ChannelHandlerContext ctx, LogEvent event) throws Exception {
        //创建 StringBuilder,并且构建输出的字符串
        StringBuilder builder = new StringBuilder();
        builder.append(event.getReceivedTimestamp());
        builder.append(" [");
        builder.append(event.getSource().toString());
        builder.append("] [");
        builder.append(event.getLogfile());
        builder.append("] : ");
        builder.append(event.getMsg());
        //打印 LogEvent 的数据
        System.out.println(builder.toString());
    }
}

LogEventHandler 将以一种简单易读的格式打印 LogEvent 消息,包括以下的各项:

  • 发送方的 InetSocketAddress,其由 IP 地址和端口组成;
  • 生成 LogEvent 消息的日志文件的绝对路径名;
  • 实际上的日志消息,其代表日志文件中的一行。

按照我们之前讲过的内容,下面我们要做的是将LogEventDecoder 和LogEventHandler 安装到ChannelPipeline。


import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioDatagramChannel;

import java.net.InetSocketAddress;

/**
 * Author: lhd
 * Data: 2023/6/13
 * Annotate: 监视器
 */
public class LogEventMonitor {
    private final EventLoopGroup group;
    private final Bootstrap bootstrap;
    public LogEventMonitor(InetSocketAddress address) {
        group = new NioEventLoopGroup();
        bootstrap = new Bootstrap();
        bootstrap.group(group)
                //引导该 NioDatagramChannel
                .channel(NioDatagramChannel.class)
                //设置套接字选项 SO_BROADCAST
                .option(ChannelOption.SO_BROADCAST, true)
                .handler(new ChannelInitializer<Channel>() {
                    @Override
                    protected void initChannel(Channel channel) throws Exception {
                        ChannelPipeline pipeline = channel.pipeline();
                        //将 LogEventDecoder 和 LogEventHandler 添加到 ChannelPipeline 中
                        pipeline.addLast(new LogEventDecoder());
                        pipeline.addLast(new LogEventHandler());
                    }
                } )
                .localAddress(address);
    }
    
    //绑定 Channel。注意,DatagramChannel 是无连接的
    public Channel bind() {
        return bootstrap.bind().syncUninterruptibly().channel();
    }
    public void stop() {
        group.shutdownGracefully();
    }
    public static void main(String[] args) throws Exception {
        if (args.length != 1) {
            throw new IllegalArgumentException("Usage: LogEventMonitor <port>");
        }
        //构造一个新的LogEventMonitor
        LogEventMonitor monitor = new LogEventMonitor(
                new InetSocketAddress(Integer.parseInt(args[0])));
        try {
            Channel channel = monitor.bind();
            System.out.println("LogEventMonitor running");
            channel.closeFuture().sync();
        } finally {
            monitor.stop();
        }
    }
}

三、运行 LogEventBroadcaster 和 LogEventMonitor

编写完广播者和监视器后,我们在idea中分别启动它们,当我们看到控制台打印:”LogEventMonitor running“时,说明已经启动成功!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/643228.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

价格·歧视

一级价格歧视与耳机价格歧视 价格歧视指的是厂商对于两个完全一样的产品收取不同的价格&#xff0c;无论是对同一消费者还是对不同消费者。 一级价格歧视&#xff08;完全价格歧视&#xff09; 对于每一个消费者都收取不同的价格&#xff0c;而且价格定在消费者最多愿意支付的…

基于Java新生报到系统设计与实现(源码+lw+部署文档+讲解等)

博主介绍&#xff1a; ✌全网粉丝30W,csdn特邀作者、博客专家、CSDN新星计划导师、java领域优质创作者,博客之星、掘金/华为云/阿里云/InfoQ等平台优质作者、专注于Java技术领域和毕业项目实战 ✌ &#x1f345; 文末获取源码联系 &#x1f345; &#x1f447;&#x1f3fb; 精…

ZooKeeper【客户端命令行】

客户端连接ZooKeeper服务器 启动ZooKeeper集群 ./zkServer.sh start 启动客户端 ./zkCli.sh 我们发现启动客户端时它会默认连接本地的服务器&#xff0c;这是因为zookeeper客户端启动时默认连接的是本地模式。 指定连接集群中的服务器 ./zkServer.sh start -server hadoo…

C语言之结构体讲解

目录 结构体类型的声明 结构体初始化 结构体成员访问 结构体传参 对于上期指针初阶&#xff08;2&#xff09;我们后期还会讲数组指针是什么&#xff1f;大家可以先思考一下&#xff0c;后期我们会讲 1.结构体的声明 结构是一些值的集合&#xff0c;这些值被称为成员变量&am…

swagger解析

1.引用swagger包&#xff1a; !-- swagger --> <dependency><groupId>io.springfox</groupId><artifactId>springfox-swagger2</artifactId><version>2.7.0</version> </dependency> <dependency><groupId>io…

【Android -- JNI 和 NDK】Java 和 C/C++ 之间传递参数和返回值

本文主要介绍 JNI 的数据传递上&#xff0c;即 Java 如何传递对象给 C; 而 C 又如何将数据封装成 Java 所需的对象。 1. 基本数据类型 传递 java 的基本类型是非常简单而直接的&#xff0c;一个 jxxx 之类的类型已经定义在本地系统中了&#xff0c;比如&#xff1a;jint, jby…

RHCE shell 作业一

1. 设置邮箱 [rootserver ~]# yum install s-nail -y [rootserver ~]# vim /etc/s-nail.rc 编写脚本 [rootserver ~]# vim homework1.sh 设置定时任务 [rootserver ~]# vim /etc/crontab 2. [rootserver ~]# vim homework2.sh 测试&#xff1a; 3. [rootserve…

Maven配置多个镜像源 SpringBoot配置多个镜像源

Maven配置多个镜像源 SpringBoot配置多个镜像源 当在 Maven 中配置多个镜像源&#xff0c;并在 Spring Boot 项目中配置多个镜像源时&#xff0c;可以按照以下步骤进行操作&#xff1a; 1. Maven 中配置多个镜像源 在 Maven 的 settings.xml 文件中&#xff0c;按照以下示例配…

背单词学英语20年经验总结

推荐一款最近开发的单词小程序&#xff1a; 悟道单词&#xff0c;融入了卡片记忆&#xff0c;图像记忆等&#xff0c;融入了我20年背单词经验。 从这个小程序里我悟出了两大记忆原则 两个记忆原则&#xff1a; 1. 图像是文字之本法则 想一想我们原来是怎么记住中文字词的&#…

【文生图系列】基础篇-马尔可夫链

文章目录 马尔可夫链转移矩阵例子 应用实例参考 在阅读论文Denoising Diffusion Probabilistic Models时&#xff0c;发现论文里面介绍扩散概率模型&#xff08;diffusion probabilistic model&#xff09;是一个参数化的马尔科夫链&#xff08;parameterized Markov chain&…

List数组高效率去重

List数组高效率去重 一、环境准备–生成包含重复元素的list数组 /*** 生成包含重复元素的list数组* return*/ private static List<String> getList(){List<String> list new ArrayList<>();for (int i 1; i < 10000; i) {list.add(String.valueOf(i)…

Kendo UI for jQuery---03.组件___网格---02.开始

网格入门 本指南演示了如何启动和运行 Kendo UI for jQuery Grid。 完成本指南后&#xff0c;您将能够实现以下最终结果&#xff1a; 1. 创建一个空的 div 元素 首先&#xff0c;在页面上创建一个空元素&#xff0c;该元素将用作 Grid 组件的主容器。 <div id"my-…

SpringMVC组件原理剖析

文章目录 SpringMVC组件原理剖析一、 前端控制器初始化1.1 初始化SpringMVC容器1.2 注册了 SpringMVC的 九大组件1.3 处理器映射器初始化细节 二、前端控制器执行主流程2.1 定位doDispatcher方法2.2 验证HandlerExecutionChain2.3 HandlerAdapter执行目标方法 SpringMVC组件原理…

64位和32位相比优势是什么(一)

前置知识&#xff1a;程序是如何执行的&#xff1f; 一道常规的面试题&#xff1a;相比 32 位&#xff0c;64 位的优势是什么&#xff1f; 面试官考察这种类型的问题&#xff0c;主要是想看求职者是否有扎实的计算机基础&#xff0c;同时想知道求职者在工作中是否充满好奇&am…

路径规划算法:基于未来搜索优化的路径规划算法- 附代码

路径规划算法&#xff1a;基于未来搜索优化的路径规划算法- 附代码 文章目录 路径规划算法&#xff1a;基于未来搜索优化的路径规划算法- 附代码1.算法原理1.1 环境设定1.2 约束条件1.3 适应度函数 2.算法结果3.MATLAB代码4.参考文献 摘要&#xff1a;本文主要介绍利用智能优化…

【硬件专题】案例:怎么通过元件丝印信息反查芯片

今天同事和昨天因为工作上的原因,问我一个问题,就是怎么通过丝印(Marking code)知道用的是什么芯片。以下列举几个方法: 凭经验 比如昨天给出的一张图片,看图片是比较模糊的。但是根据之前的使用,看芯片LOGO很明显是ST的,然后看上面的型号是STM**F1*VCT*,那么…

Go语言环境安装和程序结构

Go语言环境安装和程序结构 1、Go环境安装 Go安装包下载地址为&#xff1a; https://golang.org/dl/ https://golang.google.cn/dl/ 1.1 Windows下的安装 Windows 下可以使用.msi 后缀的安装包来安装&#xff0c;我这里下载的安装包是 go1.18.4.windows-amd64.msi&#xf…

C\C++ Thread-

文章作者&#xff1a;里海 来源网站&#xff1a;https://blog.csdn.net/WangPaiFeiXingYuan 简介 说明 时间 c语言的时间处理&#xff1a;time.h 获取从1970年1月1日到当前经过的秒数: long t0 time(NULL); 让程序暂停3秒&#xff1a; sleep(3); 当前时间的3秒后&#x…

Pycharm 通过 SVN 直接管理控制代码,原来这么方便又高级!

做自动化测试的小伙伴都知道&#xff0c;代码不会只放到本地管理&#xff0c;需要托管到远端进行管理&#xff01; 一方面&#xff0c;发布在不同的电脑上进行同步开发&#xff0c;不需要用U盘拷来拷去&#xff1b;另外一方面&#xff0c;可以轻松找回代码&#xff0c;避免本地…

【UE】玻璃材质

效果 步骤 1. 新建一个材质&#xff0c;这里命名为“M_GLASS” 双击打开“M_GLASS”&#xff0c;左下角混合模式设置为半透明 光照模式设置为表面前向着色 将基础颜色提升为参数 同样还需提升为参数的有“高光度”、“粗糙度”、“不透明度”、“折射” 设置高光度的默认值和最…