Java基础之《netty(13)—任务队列taskQueue》

news2025/1/17 1:07:38

一、任务队列

1、用户程序自定义的普通任务

2、用户自定义定时任务

3、非当前Reactor线程调用Channel的各种方法
例如在推送系统的业务线程里面,根据用户的标识,找到对应的Channel引用,然后调用Write类方法向该用户推送消息,就会进入到这种场景。最终的Write会提交到任务队列中后被异步消费。

二、使用场景

1、比如在服务器端channelRead中有一个非常耗费时间的业务,我们要异步执行,把它提交到channel对应的NioEventLoopGroup的taskQueue中。

2、每个NioEventLoop是一个单线程线程池,提交任务相当于还是它自己来做,只不过是它会根据你设定的ioradio参数来分配io事件和普通任务的时间。

三、方案1:用户程序自定义的普通任务

1、改写服务器端Handler,为NettyChannelHandler2.java

package netty.simple;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;

/**
 * 说明
 * 1. 我们自定义一个Handler,需要继承netty规定好的某个HandlerAdapter(规范)
 * 2. 这时我们自定义一个Handler,才能称之为Handler
 * 
 */
public class NettyChannelHandler2 extends ChannelInboundHandlerAdapter {

	//读取数据的事件(这里我们可以读取客户端发送的消息)
	/*
	 * 1. ChannelHandlerContext ctx:上下文对象,含有管道pipeline,通道channel,地址
	 * 2. Object msg:就是客户端发送的数据,默认是Object
	 * 3. 通道读写数据,管道处理数据
	 */
	@Override
	public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
		
		//方案1:用户程序自定义的普通任务
		//会提交到当前channel关联的NioEventLoop里面的taskQueue执行
		//任务一
		ctx.channel().eventLoop().execute(new Runnable() {
			@Override
			public void run() {
				try {
					Thread.sleep(10 * 1000);
					ctx.channel().writeAndFlush(Unpooled.copiedBuffer("hello,客户端~", CharsetUtil.UTF_8));
				} catch (InterruptedException e) {
					// TODO Auto-generated catch block
					e.printStackTrace();
				}
				
			}
		});
		
		//任务二
		ctx.channel().eventLoop().execute(new Runnable() {

			@Override
			public void run() {
				System.out.println("任务二...");
				ctx.channel().writeAndFlush(Unpooled.copiedBuffer("hello,客户端3~", CharsetUtil.UTF_8));
				
			}
			
		});
		
		ctx.channel().writeAndFlush(Unpooled.copiedBuffer("hello,客户端2~", CharsetUtil.UTF_8));
		
		//客户端
		//会收到:hello,客户端2~
		//再收到:hello,客户端~
		//再收到:hello,客户端3~
		
	}
	
	//数据读取完毕
	//这个方法会在channelRead读完后触发
	@Override
	public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
		//把数据写到缓冲区,并且刷新缓冲区,是write + flush
		//一般来讲,我们对这个发送的数据进行编码
		//ctx.channel().writeAndFlush(Unpooled.copiedBuffer("hello,客户端~", CharsetUtil.UTF_8));
		
	}
	
	//处理异常,一般是需要关闭通道
	@Override
	public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
		ctx.channel().close();
	}
}

2、客户端执行结果

服务器回复的消息:hello,客户端2~
服务器的地址:/127.0.0.1:6668
服务器回复的消息:hello,客户端~
服务器的地址:/127.0.0.1:6668
服务器回复的消息:hello,客户端3~
服务器的地址:/127.0.0.1:6668

服务器端nioeventloop还是一个线程执行,taskQueue里是按照添加的顺序依次执行。

四、方案2:用户自定义定时任务

1、该任务是提交到scheduleTaskQueue中。

2、改写服务器端Handler,为NettyChannelHandler3.java

package netty.simple;

import java.util.concurrent.TimeUnit;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;

/**
 * 说明
 * 1. 我们自定义一个Handler,需要继承netty规定好的某个HandlerAdapter(规范)
 * 2. 这时我们自定义一个Handler,才能称之为Handler
 * 
 */
public class NettyChannelHandler3 extends ChannelInboundHandlerAdapter {

	//读取数据的事件(这里我们可以读取客户端发送的消息)
	/*
	 * 1. ChannelHandlerContext ctx:上下文对象,含有管道pipeline,通道channel,地址
	 * 2. Object msg:就是客户端发送的数据,默认是Object
	 * 3. 通道读写数据,管道处理数据
	 */
	@Override
	public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
		
		//方案2:用户自定义定时任务
		ctx.channel().eventLoop().schedule(new Runnable() {
			@Override
			public void run() {
				try {
					Thread.sleep(10 * 1000);
					ctx.channel().writeAndFlush(Unpooled.copiedBuffer("hello,客户端~", CharsetUtil.UTF_8));
				} catch (InterruptedException e) {
					// TODO Auto-generated catch block
					e.printStackTrace();
				}
				
			}
		}, 5, TimeUnit.SECONDS); //延迟5秒,然后执行
		
		ctx.channel().writeAndFlush(Unpooled.copiedBuffer("hello,客户端2~", CharsetUtil.UTF_8));
		
	}
	
	//数据读取完毕
	//这个方法会在channelRead读完后触发
	@Override
	public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
		//把数据写到缓冲区,并且刷新缓冲区,是write + flush
		//一般来讲,我们对这个发送的数据进行编码
		//ctx.channel().writeAndFlush(Unpooled.copiedBuffer("hello,客户端~", CharsetUtil.UTF_8));
		
	}
	
	//处理异常,一般是需要关闭通道
	@Override
	public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
		ctx.channel().close();
	}
}

3、客户端执行结果

服务器回复的消息:hello,客户端2~
服务器的地址:/127.0.0.1:6668
服务器回复的消息:hello,客户端~
服务器的地址:/127.0.0.1:6668

五、方案3:服务器端要推送多个管道

1、服务器端要推送到管道A、管道B、管道C,所以要找到它对应的channel。

2、如何获取其他的channel
(1)在服务器端初始化new ChannelInitializer<SocketChannel>()对象时,把SocketChannel放入一个HashMap中,然后从HashMap里取。
(2)在推送消息时,可以将业务加入到各个channel对应的NioEventLoop的taskQueue或者scheduleTaskQueue中。

3、改写服务端,为NettyServer2.java

package netty.simple;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;

/**
 * 可以传递一个集合保存SocketChannel的引用
 * @author user
 *
 */
public class NettyServer2 {
	public static void main(String[] args) throws Exception {
		
		//创建BossGroup和WorkerGroup
		//说明
		//1. 创建两个线程组bossGroup和workerGroup
		//2. bossGroup它只是处理连接请求,真正的与客户端业务处理会交给workerGroup去完成
		//3. 两个都是无限循环
		EventLoopGroup bossGroup = new NioEventLoopGroup(1);
		EventLoopGroup workerGroup = new NioEventLoopGroup(8);
		
		try {
			//创建服务器端的启动对象,配置启动参数
			ServerBootstrap bootstrap = new ServerBootstrap();
			
			//集合保存所有SocketChannel引用
			Map<Integer, SocketChannel> map = new ConcurrentHashMap<>();
			
			//使用链式编程来进行设置
			bootstrap.group(bossGroup, workerGroup) //设置两个线程组
				.channel(NioServerSocketChannel.class) //使用NioServerSocketChannel作为服务器的通道实现
				.childHandler(new ChannelInitializer<SocketChannel>() { //创建一个通道初始化对象
					//给pipeline设置处理器
					@Override
					protected void initChannel(SocketChannel ch) throws Exception {
						
						//将channel引用放入map
						map.put(ch.hashCode(), ch);
						
						ChannelPipeline pipeline = ch.pipeline();
						pipeline.addLast(new NettyChannelHandler4(map)); //向管道的最后增加一个处理器
						
					};
				}); //给我们的workerGroup的EventLoop对应的管道设置处理器
			
			//bossGroup参数
			bootstrap.option(ChannelOption.SO_BACKLOG, 1024); //设置线程队列等待连接的个数
			
			//workerGroup参数
			bootstrap.childOption(ChannelOption.SO_KEEPALIVE, true); //设置保持活动连接状态
			
			System.out.println("...服务器 is ready...");
			
			//绑定一个端口并且同步,生成了一个ChannelFuture对象
			//启动服务器并绑定端口
			ChannelFuture cf = bootstrap.bind(6668).sync();
			
			//对关闭通道进行监听
			cf.channel().closeFuture().sync();
		} catch (Exception e) {
			e.printStackTrace();
		} finally {
			System.out.println("Shutdown Netty Server...");
			//优雅的关闭
			workerGroup.shutdownGracefully();
			bossGroup.shutdownGracefully();
			System.out.println("Shutdown Netty Server Success!");
		}
		
	}
}

4、改写服务端Handler,为NettyChannelHandler4.java

package netty.simple;

import java.util.Map;

import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.socket.SocketChannel;
import io.netty.util.CharsetUtil;

/**
 * 说明
 * 1. 我们自定义一个Handler,需要继承netty规定好的某个HandlerAdapter(规范)
 * 2. 这时我们自定义一个Handler,才能称之为Handler
 * 
 */
public class NettyChannelHandler4 extends ChannelInboundHandlerAdapter {
	
	private Map<Integer, SocketChannel> map;
	
	public NettyChannelHandler4(Map<Integer, SocketChannel> map) {
		this.map = map;
	}

	//读取数据的事件(这里我们可以读取客户端发送的消息)
	/*
	 * 1. ChannelHandlerContext ctx:上下文对象,含有管道pipeline,通道channel,地址
	 * 2. Object msg:就是客户端发送的数据,默认是Object
	 * 3. 通道读写数据,管道处理数据
	 */
	@Override
	public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
		
		System.err.println("channel的数量:" + map.size());
		
		//方案3:服务器端要推送到管道A、管道B、管道C。。。
		map.forEach((key, value) -> {  
            value.writeAndFlush(Unpooled.copiedBuffer("server向"+key+"发送消息", CharsetUtil.UTF_8));
        });
	}
	
	//数据读取完毕
	//这个方法会在channelRead读完后触发
	@Override
	public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
		//把数据写到缓冲区,并且刷新缓冲区,是write + flush
		//一般来讲,我们对这个发送的数据进行编码
		//ctx.channel().writeAndFlush(Unpooled.copiedBuffer("hello,客户端~", CharsetUtil.UTF_8));
		
	}
	
	//处理异常,一般是需要关闭通道
	@Override
	public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
		//客户端主动断开连接时会进到这里
		
		//移除集合
		map.remove(ctx.channel().hashCode());
		ctx.channel().close();
	}
}

六、netty模型方案再说明

1、netty抽象出两组线程池,BossGroup专门负责接收客户端连接,WorkerGroup专门负责网络读写操作。

2、NioEventLoop表示一个不断循环执行处理任务的线程,每个NioEventLoop都有一个selector,用于监听绑定在其上的socket网络通道。

3、NioEventLoop内部采用串行化设计,从消息的读取->解码->处理->编码->发送,始终由IO线程NioEventLoop负责。

4、NioEventLoopGroup下包含多个NioEventLoop
(1)每个NioEventLoop中包含有一个Selector,一个taskQueue。
(2)每个NioEventLoop的Selector上可以注册监听多个NioChannel。
(3)每个NioChannel只会绑定在唯一的NioEventLoop上。
(4)每个NioChannel都绑定有一个自己的ChannelPipeline。
 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/93718.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

基于java+springmvc+mybatis+vue+mysql的养老院管理系统

项目介绍 管理员后台页面&#xff1a; 功能&#xff1a;主页、个人中心、护工管理、家属管理、楼房资料管理、房间资料管理、床位管理、老人入住管理、老人档案管理、身体状态管理、用药情况管理、转房登记管理、外出登记管理、药品信息管理、药品入库管理、药品出库管理、物品…

【C语言】整型的存储方式(大小端,原码,反码,补码)

目录 一、基本类型 二、原码&#xff0c;反码&#xff0c;补码 2.1 原&#xff0c;反&#xff0c;补的计算方式 2.1.1 正数的原&#xff0c;反&#xff0c;补 2.1.2 负数的原&#xff0c;反&#xff0c;补 2.2 为什么要用补码存放 2.3 大小端是什么&#xff1f; 2.3.1 …

明道云联合契约锁共建人事场景电子签约解决方案

背景介绍 在每个组织的人事管理工作中&#xff0c;从招聘、入职、在职、调岗到离职&#xff0c;整个过程中存在大量的合同、证明、函件、通知等文件需要签字盖章。HR每天都要在“核对文件、敲章、通知员工签合同、催进度、给外地员工寄合同、关注合同到期时间等”繁琐的签署工…

使用vite和Element Plus,实现部署后不修改代码/打包,新增主题/皮肤包

Web前端界面切换主题/皮肤&#xff0c;是一个常见的需求。如果希望在打包部署后实现皮肤的修改甚至增加皮肤&#xff0c;不需要修改源码或者重新打包&#xff0c;类似于我们常见的皮肤包扩展&#xff0c;又该如何实现呢&#xff1f; 我使用类似上一期多语言包功能中介绍的方法来…

基于Xlinx的时序分析与约束(3)----基础概念(下)

1、4种基本的时序路径 下图是一张典型的FPGA与上游器件、下游器件通信的示意图&#xff1a; 其可以划分为4条基本的数据路径&#xff0c;这4条路径也是需要进行时序约束的最基本路径。 &#xff08;1&#xff09;寄存器到寄存器 路径2&#xff0c;FPGA内部的寄存器到另一个寄存…

[附源码]Node.js计算机毕业设计高校医疗健康服务系统的设计与实现Express

项目运行 环境配置&#xff1a; Node.js最新版 Vscode Mysql5.7 HBuilderXNavicat11Vue。 项目技术&#xff1a; Express框架 Node.js Vue 等等组成&#xff0c;B/S模式 Vscode管理前后端分离等等。 环境需要 1.运行环境&#xff1a;最好是Nodejs最新版&#xff0c;我…

【C++初阶】类和对象(下)再谈构造函数、static成员、C++11的成员初始化新玩法、友元类、内部类

文章目录再谈构造函数static成员C11的成员初始化新玩法友元类内部类再谈构造函数 1.构造函数体赋值 在创建对象时&#xff0c;编译器通过调用构造函数&#xff0c;给对象中各个成员变量一个合适的初始值。 虽然上述构造函数调用之后&#xff0c;对象中已经有了一个初始值&am…

客户管理系统如何提升体验

数字化时代&#xff0c;客户与企业交互的触点爆炸式增长&#xff0c;客户体验正从单一触点走向端到端旅程。众多的产品、海量的数据&#xff0c;导致客户对体验的要求越来越多......CRM客户管理系统是企业提升客户体验的有效工具&#xff0c;它不仅可以帮助您进一步了解客户&am…

App自动化之dom结构和元素定位方式(包含滑动列表定位)

先来看几个名词和解释&#xff1a; dom: Document Object Model 文档对象模型dom应用: 最早应用于html和js的交互。界面的结构化描述&#xff0c; 常见的格式为html、xml。核心元素为节点和属性xpath: xml路径语言&#xff0c;用于xml 中的节点定位&#xff0c;XPath 可在 xml…

javaSE - 三个常用的接口(Comparable,Comparator,Cloneable)

1、Comparable 英 [ˈkɒmpərəbl] 美 [ˈkɑːmpərəbl] 可比较的;可比的;可比性;可比;可比较   2、Comparator 美 [kəmˈpɜrətər] n. 比较器&#xff0c;比色器&#xff0c;比较电路&#xff0c;比长仪&#xff0c;场强计 3、 Cloneable 可复制的 一、Comparable …

MySQL性能优化浅析

1. 硬件 1.1 CPU IO密集型&#xff0c;提升CPU核心数 计算密集型&#xff0c;提升CPU频率 1.2 磁盘 机械硬盘在随机访问时&#xff0c;由于受磁针移动速度的限制&#xff0c;性能会大幅降低。使用固态硬盘可以大幅提升随机访问的能力。按需选择。 1.3 其他 带宽、内存频…

Superset 安装配置

文章目录Superset 安装配置一、Superset 概述1. Superset简介2. 功能概述3. 支持的数据库二、Superset 环境部署步骤三、创建虚拟机&#xff0c;安装CentOS1.下载CentOS2.创建虚拟机3.编辑虚拟机设置4.安装centos7.9mini版本5.启动centos&#xff0c;并进行登录四、CentOS配置1…

小米(Android)刷NetHunter安装指南

一、安装NetHunter 前提&#xff1a;确保手机已经root&#xff0c;已装上magisk。如果没有root&#xff0c;可用尝试magisk root 后执行此文 1、下载Nethunter&#xff1a;Get Kali | Kali Linux 然后push 到sdcard 里&#xff0c; 2、打开magisk&#xff0c;选择刚刚下好的…

Windows下安装libtorch与Clion配置

Windows 安装和使用libtorch 1.下载libtorch libtorch的下载链接&#xff0c;如下图所示&#xff0c;libtorch有release和debug版本可以选择。为了方便调试&#xff0c;下debug版。电脑上没CUDA&#xff0c;下次有需要再更新吧。 2.libtorch使用 在Visual Studio的使用可以参…

云原生周刊 | 让 ChatGPT 以电子邮件的方式来解释 KubeSphere

过去的一周是 ChatGPT 的狂欢&#xff0c;我猜每一位云原生玩家都很好奇他是如何看待 Kubernetes 的。咱们不防换个方式来提问&#xff0c;让它使用电子邮件的方式来向别人推荐 KubeSphere 和 OpenFunction。 开源项目推荐 Tailscale Ingress Controller 这是针对 Tailscale …

【DevOps实战系列】第七章:详解Docker私服Harbor篇

个人亲自录制全套DevOps系列实战教程 &#xff1a;手把手教你玩转DevOps全栈技术 Harbor私服搭建 讲完Nexus3再来看下harbor&#xff0c;其实大同小异&#xff0c;只不过harbor的管理要比Nexus3更专业、功能更完善&#xff0c;大家按需选择即可&#xff0c;Nexus的优势是他能和…

web网站工程项目前期需求分析与规划怎么写?

在当下&#xff0c;判断一份网站工程项目文档是否优秀&#xff0c;项目目录是最直接的体现&#xff0c;同时&#xff0c;工程说明、需求分析和项目规划各版块的内容都缺一不可。工欲善其事必先利其器&#xff0c;前期准备得越充分&#xff0c;后期就会越顺利。 本期&#xff0c…

Centos7安装图形化界面并使用Windows远程桌面连接(包含离线部署)

一、在centos7 中部署远程桌面所使用的程序 1、关闭防火墙和selinux(xrdp是通过3389端口远程桌面连接 ) [rootlocalhost ~]# systemctl stop firewalld #临时关闭防火墙 [rootlocalhost ~]# systemctl disable firewalld.service #永久关闭防火墙 [rootlocalhost ~]# setenf…

SpringMVC:SpringMVC之JSON数据传输参数(5)

JSON数据传输参数1 JSON数据传输参数2 JSON普通数组3 JSON对象数据4 JSON对象数组5 小结1 JSON数据传输参数 现在比较流行的开发方式为异步调用。前后台以异步方式进行交换&#xff0c;传输的数据使用的是JSON,所以前端如果发送的是JSON数据&#xff0c;后端该如何接收? 对于…

面试官:单体架构怎么向分布式微服务架构演变的?(8000字干货)

随着网站规模越来越大&#xff0c;单体应用往往很难再满足要求&#xff0c;就需要向分布式&#xff0c;微服务架构演变。 那么这个演变过程是怎么样的呢&#xff1f;都涉及到哪些组件&#xff0c;会遇到哪些问题&#xff0c;以及相应的解决方案都是什么&#xff0c;本篇文章就…