netty编程之使用ChannelOutboundHandler对write出去的消息做不同处理

news2025/1/15 22:45:03

写在前面

源码 。
在进行网络编程的时候,不可避免的需要对write出去的消息做一些处理,比如脱敏,增加统一数据等。而netty提供了ChannelOutboundHandler来允许我们拦截消息从而可以对消息进行处理。对应的接口是io.netty.channel.ChannelHandler,抽象了通道处理器相关逻辑。
本文就一起来看下这部分内容。

1:定义server

server main:

package com.dahuyou.netty.outboundHandlerAndInboundHandler.server;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;

public class NettyServer {

    public static void main(String[] args) {
        new NettyServer().bing(7397);
    }

    private void bing(int port) {
        //配置服务端NIO线程组
        EventLoopGroup parentGroup = new NioEventLoopGroup(); //NioEventLoopGroup extends MultithreadEventLoopGroup Math.max(1, SystemPropertyUtil.getInt("io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2));
        EventLoopGroup childGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(parentGroup, childGroup)
                    .channel(NioServerSocketChannel.class)    //非阻塞模式
                    .option(ChannelOption.SO_BACKLOG, 128)
                    .childHandler(new MyChannelInitializer());
            ChannelFuture f = b.bind(port).sync();
            f.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            childGroup.shutdownGracefully();
            parentGroup.shutdownGracefully();
        }

    }

}

MyChannelInitializer:

package com.dahuyou.netty.outboundHandlerAndInboundHandler.server;

import io.netty.channel.ChannelInitializer;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.LineBasedFrameDecoder;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;

import java.nio.charset.Charset;

public class MyChannelInitializer extends ChannelInitializer<SocketChannel> {

    @Override
    protected void initChannel(SocketChannel channel) {
        // 基于换行符号
//        channel.pipeline().addLast(new LineBasedFrameDecoder(1024));
        // 解码转String,注意调整自己的编码格式GBK、UTF-8
        channel.pipeline().addLast(new StringDecoder(Charset.forName("GBK")));
        // 解码转String,注意调整自己的编码格式GBK、UTF-8
        channel.pipeline().addLast(new StringEncoder(Charset.forName("GBK")));
        // 在管道中添加我们自己的接收数据实现方法
        channel.pipeline().addLast(new MyServerHandler());
    }

}

MyServerHandler:

package com.dahuyou.netty.outboundHandlerAndInboundHandler.server;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.socket.SocketChannel;

import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Random;

public class MyServerHandler extends ChannelInboundHandlerAdapter {

    /**
     * 当客户端主动链接服务端的链接后,这个通道就是活跃的了。也就是客户端与服务端建立了通信通道并且可以传输数据
     */
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        SocketChannel channel = (SocketChannel) ctx.channel();
        System.out.println("链接报告开始");
        System.out.println("链接报告信息:有一客户端链接到本服务端");
        System.out.println("链接报告IP:" + channel.localAddress().getHostString());
        System.out.println("链接报告Port:" + channel.localAddress().getPort());
        System.out.println("链接报告完毕");
        //通知客户端链接建立成功
        String str = "通知客户端链接建立成功" + " " + new Date() + " " + channel.localAddress().getHostString() + "\r\n";
        //ctx.writeAndFlush(str);
    }

    /**
     * 当客户端主动断开服务端的链接后,这个通道就是不活跃的。也就是说客户端与服务端的关闭了通信通道并且不可以传输数据
     */
    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("客户端断开链接" + ctx.channel().localAddress().toString());
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        //接收msg消息{与上一章节相比,此处已经不需要自己进行解码}
        System.out.println(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()) + " 服务端接收到消息:" + msg);
        //通知客户端链消息发送成功
        String str = "随机数:" + Math.random() * 10 + "\r\n";
        //ctx.writeAndFlush(str);
    }

    /**
     * 抓住异常,当发生异常的时候,可以做一些相应的处理,比如打印日志、关闭链接
     */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.close();
        System.out.println("异常信息:\r\n" + cause.getMessage());
    }

}

2:定义client

client main:

package com.dahuyou.netty.outboundHandlerAndInboundHandler.client;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;

public class NettyClient {

    public static void main(String[] args) {
        new NettyClient().connect("127.0.0.1", 7397);
    }

    private void connect(String inetHost, int inetPort) {
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            Bootstrap b = new Bootstrap();
            b.group(workerGroup);
            b.channel(NioSocketChannel.class);
            b.option(ChannelOption.AUTO_READ, true);
            b.handler(new MyChannelInitializer());
            ChannelFuture f = b.connect(inetHost, inetPort).sync();
            f.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            workerGroup.shutdownGracefully();
        }
    }

}

MyChannelInitializer:

package com.dahuyou.netty.outboundHandlerAndInboundHandler.client;

import io.netty.channel.ChannelInitializer;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.LineBasedFrameDecoder;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import java.nio.charset.Charset;

public class MyChannelInitializer extends ChannelInitializer<SocketChannel> {

    @Override
    protected void initChannel(SocketChannel channel) throws Exception {
        // 基于换行符号
        channel.pipeline().addLast(new LineBasedFrameDecoder(1024));
        // 解码转String,注意调整自己的编码格式GBK、UTF-8
        channel.pipeline().addLast(new StringDecoder(Charset.forName("GBK")));
        // 解码转String,注意调整自己的编码格式GBK、UTF-8
        channel.pipeline().addLast(new StringEncoder(Charset.forName("GBK")));
        // 在管道中添加我们自己的接收数据实现方法
        channel.pipeline().addLast(new MyTuoMinOutMsgHandler());//消息出站处理器,脱敏处理
        channel.pipeline().addLast(new MyCommonDataOutMsgHandler());//消息出站处理器,增加统一消息
        channel.pipeline().addLast(new MyInMsgHandler()); //消息入站处理器
    }

}

这里我们定义了两个ChannelOutboundHandler(本文重点!!!),分别是负责脱敏的MyTuoMinOutMsgHandler:

package com.dahuyou.netty.outboundHandlerAndInboundHandler.client;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelOutboundHandlerAdapter;
import io.netty.channel.ChannelPromise;

public class MyTuoMinOutMsgHandler extends ChannelOutboundHandlerAdapter {

    /*@Override
    public void read(ChannelHandlerContext ctx) throws Exception {
        ctx.writeAndFlush("ChannelOutboundHandlerAdapter.read 发来一条消息\r\n");
        super.read(ctx);
    }*/

    @Override // 调用ctx的writeAndFlush方法时会被该方法拦截执行
    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
        System.out.println("MyOutMsgHandler.write 111111111--脱敏处理");
        msg += "--tuomin";
        super.write(ctx, msg, promise);
    }

}

以及负责增加统一数据的MyCommonDataOutMsgHandler,

package com.dahuyou.netty.outboundHandlerAndInboundHandler.client;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelOutboundHandlerAdapter;
import io.netty.channel.ChannelPromise;

public class MyCommonDataOutMsgHandler extends ChannelOutboundHandlerAdapter {

    /*@Override
    public void read(ChannelHandlerContext ctx) throws Exception {
        ctx.writeAndFlush("ChannelOutboundHandlerAdapter.read 发来一条消息\r\n");
        super.read(ctx);
    }*/

    @Override // 调用ctx的writeAndFlush方法时会被该方法拦截执行
    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
        System.out.println("MyOutMsgHandler.write 22222222 --- 增加统一数据");
        msg += "-common data";
        super.write(ctx, msg, promise);
    }

}

先启动server,后启动client,client和server输出:
在这里插入图片描述
在这里插入图片描述

写在后面

参考文章列表

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2075210.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Python:win10下一种不用编译,直接下载二进制依赖的方法

python依赖的安装&#xff0c;在win环境下&#xff0c; 有些包还是比较麻烦&#xff0c; 经常编译失败&#xff0c; 我曾发帖讨论过多次&#xff0c;有帖为证&#xff01;点此进入&#xff01; https://blog.csdn.net/weixin_62598385/article/details/135945383 win下的Pyth…

基于vue.js和node.js的酒坊销售网站的设计与实现---附源码98047

目 录 摘要 1 绪论 1.1研究背景与意义 1.3研究内容 1.4论文结构与章节安排 2 酒坊销售网站分析 2.1 可行性分析 2.2系统流程分析 2.2.1 数据增加流程 2.2.2 数据修改流程 2.2.3 数据删除流程 2.3 系统功能分析 2.3.1 功能性分析 2.3.2 非功能性分析 2.4 系统用例…

实战分享:利用两大在线平台实现自动化数据采集的技巧

本文将深入探讨如何运用两大主流在线平台&#xff0c;通过实战案例分享&#xff0c;揭示自动化数据采集的高效技巧。无需编程基础&#xff0c;也能快速掌握跨平台数据抓取秘籍&#xff0c;助力企业和个人提升市场竞争力与决策效率。 正文 在大数据时代背景下&#xff0c;信息…

ESP8266通过WiFiManager实现Web配网

背景 一个项目中使用到了一款压力传感器,需要通过单片机实现数据的采集并发送到远程的服务器上,单片机采用的时ESP8266,通过WiFiManager实现局域网配置,以及远端服务器IP地址和服务端口的配置。发布此文章记录一下使用WiFiManager实现配网的方法。 程序流程图 示例代码 …

如何下载GB2312字体,免费

因为写文章需要用到&#xff0c;然后wps里面这个是收费的&#xff0c;所以我就去找了免费的&#xff0c;现在分享给大家。 因为我看网上很多都是给一个网址&#xff0c;有些网址已经坏了&#xff0c;所以我这里给一下我的链接 链接&#xff1a;https://pan.baidu.com/s/1wiyF…

如何用Java SpringBoot+Vue构建高效的产品订单管理系统

✍✍计算机编程指导师 ⭐⭐个人介绍&#xff1a;自己非常喜欢研究技术问题&#xff01;专业做Java、Python、微信小程序、安卓、大数据、爬虫、Golang、大屏等实战项目。 ⛽⛽实战项目&#xff1a;有源码或者技术上的问题欢迎在评论区一起讨论交流&#xff01; ⚡⚡ Java实战 |…

python 爬虫,东方网 上海新闻, 简单数据分析

起因: 本来想去市区玩玩&#xff0c;结果搜到一些相关的新闻&#xff0c;所以就想爬取新闻网站… 1. 爬虫部分 import os import csv import time import requests""" # home: https://sh.eastday.com/ # 1. 标题, url&#xff0c; 来源&#xff0c;时间 &qu…

SQL进阶技巧:近距离有效的缺失值填充问题【last_value实现版】

目录 0 场景描述 1 数据准备 2 问题分析 3 小结 0 场景描述 场景:现在有一张商品入库表,包括商品id、商品成本和入库日期3个字段,由于某些原因,导致部分商品的成本缺失(为0或者没有值都是缺失),这样不利于我们计算成本。所以现在要把缺失的商品进价补充完整,补充的…

Redis远程字典服务器(12)—— 使用C++操作Redis

目录 一&#xff0c;环境配置 1.1 介绍 1.2 安装hiredis 1.3 安装redis-plus-plus 1.4 连接服务器 二&#xff0c;使用通用命令 2.0 StringView&#xff0c;和OptionalString类型 2.1 set&#xff0c;get&#xff0c; 2.2 exists&#xff0c;del 2.3 keys 2.4 expi…

【秋招笔试】8.25拼多多秋招-三语言题解

🍭 大家好这里是 春秋招笔试突围,一起备战大厂笔试 💻 ACM金牌团队🏅️ | 多次AK大厂笔试 | 编程一对一辅导 ✨ 本系列打算持续跟新 春秋招笔试题 👏 感谢大家的订阅➕ 和 喜欢💗 和 手里的小花花🌸 ✨ 笔试合集传送们 -> 🧷春秋招笔试合集 🍒 本专栏已收…

【测试】JMeter从入门到进阶

本文参考 Jmeter自动化测试工具从入门到进阶6小时搞定&#xff0c;适合手工测试同学学习_哔哩哔哩_bilibili JMeter介绍 JMeter 是 Apache 组织使用 Java 开发的一款测试工具&#xff1a; 1、可以用于对服务器、网络或对象模拟巨大的负载 2、通过创建带有断言的脚本来验证程序…

C3-80螺栓介绍及其特性

C3-80 螺栓作为马氏体不锈钢高强度紧固件的一员&#xff0c;在工程应用中扮演着重要角色。它不仅具有较高的强度&#xff0c;还拥有良好的耐腐蚀性能&#xff0c;适用于多种恶劣环境下的工业应用。 C3-80螺栓概述 C3-80螺栓是一种马氏体不锈钢材质的高强度紧固件&#xff0c;其…

电商数据怎么分析?电商数据接口助力电商运营中每日必看5个底层数据

数据分析充电站——深入探索中小企业数字化转型&#xff0c;专注提供各行业数据分析干货、分析技巧、工具推荐以及各类超实用分析模板&#xff0c;为钻研于数据分析的朋友们加油充电。 电商运营店铺涉及大量数据&#xff0c;包括用户行为、交易记录、库存信息等&#xff0c;如何…

【C++八股题整理】虚函数

C八股题整理 - 虚函数 虚函数虚函数的定义&#xff1f;C11引入的override和final关键字的作用&#xff1f;虚函数的实现原理&#xff1f;虚函数表&#xff08;vbtl&#xff09;和虚函数表指针&#xff08;vptr&#xff09;虚函数表、虚函数表指针的生成时期及存储位置&#xff…

JS常用事件示例

<!DOCTYPE html> <html lang"en"> <head> <meta charset"UTF-8"> <meta name"viewport" content"widthdevice-width, initial-scale1.0"> <title>JS函数中的事件</title> <…

钡铼技术BL196MQTT远程IO模块工业物联网应用

随着工业物联网&#xff08;IIoT&#xff09;的迅猛发展&#xff0c;工业设备之间的互联互通已成为推动产业升级的关键因素之一。在这个背景下&#xff0c;钡铼技术推出了一款名为BL196MQTT的远程IO模块&#xff0c;该模块专为工业自动化环境中的数据采集与控制而设计&#xff…

搭建深度神经网络(DNN)

利用 numpy 工具&#xff0c;手动搭建一个 DNN 深度神经网络。 定义网络结构 初始化模型参数 循环计算&#xff1a;前向传播/计算当前损失/反向传播/权值更新 1、初始化模型参数 对于一个包含L层的隐藏层深度神经网络&#xff0c;我们在初始化其模型参数的时候需要更灵活一点…

触想强固型工业显示器加速海上油气勘探开发

石油作为现代工业发展的主要能源&#xff0c;已成为国际间政治、经济博弈的重要工具。 一、行业发展背景 过去百年间&#xff0c;人类对陆地油气资源的勘探开发逐渐趋于饱和&#xff0c;而面对持续增长的全球能源需求&#xff0c;海洋勘探已成为当今油气能源角逐的主要“战场”…

Linux文件IO缓存

一、缓冲区大小对 I/O 系统调用性能的影响 总之&#xff0c;如果与文件发生大量的数据传输&#xff0c;通过采用大块空间缓冲数据&#xff0c;以及执行更少的 系统调用&#xff0c;可以极大地提高 I / O 性能 二、stdio 库的缓冲 当操作磁盘文件时&#xff0c;缓冲大块数据以…

合宙LuatOS产品规格书——Air700EAQ

Luat Air700EAQ是合宙的LTE Cat.1bis通信模块&#xff0c;采用移芯EC716E平台&#xff0c;支持LTE 3GPP Rel.13技术。 该模块专为满足小型化、低成本需求而设计&#xff0c;具备超小封装和极致成本优势。 Air700EAQ支持移动双模&#xff0c;内置丰富的网络协议&#xff0c;集…