目录
- 线程池
- 线程池介绍
- 线程池的参数
- Java线程池ExecutorTester
- 服务器socket编程
- 普通socket编程
- 线程池并行处理客户请求
- Java NIO异步处理客户请求
线程池
线程池介绍
在创建一个线程时存在一定的开销,创建线程的开销相比于一般的算法来说较大。首先需要建立一个调用栈,接着操作系统还需要建立很多数据结构来维护线程的状态等等。为了解决每次都需要临时创建线程带来开销的问题,引入了线程池。
线程池会预先建立好线程,等待任务派发。一个线程池内可能会有几十上百个闲置的线程,当有任务来临时,需要选中一个线程执行任务。执行完毕后释放该线程资源,重新回到闲置状态,等待下一个任务。
在线程池中,通常有一个Blocking Queue用来派发任务。队列通常为空,当有新的任务进队列时,闲置线程开始抢占该任务。当没有闲置线程时,新任务到达Blocking Queue便会开始排队,等待有线程空出来。
线程池的参数
corePoolSize:线程池中初始线程的数量,可能处于等待状态
maimumPoolSize:线程池中最大允许线程数量
keepAliveTime:超出corePoolSize部分线程如果等待这些时间,就会被回收
Java线程池ExecutorTester
可以使用ThreadPoolExecutor创建线程池:
参数包含了corePoolSize,maimumPoolSize,keepAliveTime,时间单元TimeUnit以及一个可执行的workQueue任务队列。
另一种写法是用executors.newFixedThreadPool()创建线程池。这样的写法参数较少,只需要指定一个线程数即可。
ExecutorService executor = Executors.newFixedThreadPool(3);
如此创建的线程池,拥有的线程数量固定为3,如果任务数大于3,那么多余的任务则只有排队等待。
接下来我们一口气给这个线程池派发10个任务试试:
for (int i = 0; i < 10; i++) {
executor.submit(new CodingTask(i));
}
System.out.println("10 tasks dispatched successfully.");
由于线程池参数为3,所以会按3个一组来抢占任务。
第一轮派发完毕后:
可以看到将3个任务派发给了线程们,workQueue中还剩7个任务。
Future<?>可以用来控制任务的执行:
cancel(boolean):可以临时中止执行任务
get():若成功执行则返回null,否则等待
iscancelled():获取任务是否中止
isDone():获取任务是否成功执行
ExecutorService executor = Executors.newFixedThreadPool(3);
List<Future<?>> taskResults = new LinkedList<>();
for (int i = 0; i < 10; i++) {
taskResults.add(executor.submit(new CodingTask(i)));
}
System.out.println("10 tasks dispatched successfully.");
for (Future<?> taskResult : taskResults) {
taskResult.get();//get等待task结束,一旦结束会返回null
}
服务器socket编程
普通socket编程
我们设想这样一个场景,一个服务器监听在6666端口上。若客户发aaa,则响应Hello aaa。首先我们需要建立一个服务端socket去监听端口号:
ServerSocket serverSocket = new ServerSocket(6666)
然后建立一个客户端socket,用getRemoteSocketAddress()方法获取远端socket端口号:
Socket clientSocket = serverSocket.accept();
System.out.println("Incoming connection from "
+ clientSocket.getRemoteSocketAddress());
最后用clientSoclet提供的getInputStream()方法和getOutputStream()方法实现开头我们所设想的功能:
try(ServerSocket serverSocket = new ServerSocket(6666)){
System.out.println("listening on "+serverSocket.getLocalSocketAddress());
Socket clientSocket = serverSocket.accept();
System.out.println("Incoming connection from "+clientSocket.getRemoteSocketAddress());
try(Scanner input = new Scanner(clientSocket.getInputStream())){
String request = input.nextLine();
System.out.println(String.format("Request from %s: %s",
clientSocket.getRemoteSocketAddress(),
request));
String response = "Hello"+request+".\n";
clientSocket.getOutputStream().write(response.getBytes());
}
}
执行以上代码并在命令行执行:
telnet localhost 6666
输入aaa后,服务端返回消息:
当然,正常的服务器不会在处理完一个请求后就马上切断与客户端的连接。所以,我们需要在把处理请求块放进一个循环中,并且加入终止条件即可。
try(ServerSocket serverSocket = new ServerSocket(6666)){
System.out.println("listening on "+serverSocket.getLocalSocketAddress());
Socket clientSocket = serverSocket.accept();
System.out.println("Incoming connection from "+clientSocket.getRemoteSocketAddress());
try(Scanner input = new Scanner(clientSocket.getInputStream())){
while(true) {
String request = input.nextLine();
if(request.equals("quit")) {
break;
}
System.out.println(String.format("Request from %s: %s",
clientSocket.getRemoteSocketAddress(),
request));
String response = "Hello "+request+".\n";
clientSocket.getOutputStream().write(response.getBytes());
}
}
}
但是此时的服务器只能处理一个客户的请求。当另开一个控制台telnet 6666端口时,由于已经被占用,其他客户无法接受服务。
线程池并行处理客户请求
线程池在前面已经介绍过,它预先建立好了线程,等待任务来临。在线程池中有一个Blocking Queue,任务到来后会进入该队列,当到达队头并且有闲置进程时就会被选中执行。
在Java中,扔进Blocking Queue的是一个个Client Handler,因为每个client会占据一个线程,所以Client Handler即代表待执行的任务。线程调用Client Handle的run()方法来执行该任务。
ExecutorService executor = Executors.newFixedThreadPool(3);
RequestHandler requestHandler = new RequestHandler();
try (ServerSocket serverSocket = new ServerSocket(7777)) {
System.out.println("Listening on "
+ serverSocket.getLocalSocketAddress());
while (true) {
Socket clientSocket = serverSocket.accept();
System.out.println("Incoming connection from "
+ clientSocket.getRemoteSocketAddress());
executor.submit(
new ClientHandler(clientSocket, requestHandler));
}
}
此时,服务器端就就可以支持多线程工作,因为线程池最大为3,所以最多可以同时开3个命令行访问服务器。当第4个命令行请求服务时,仍可连接成功,但需等待前3个服务中至少有一个服务退出,才可以接受服务。
Java NIO异步处理客户请求
每个线程处理一个客户请求看似十分合理,实则也存在缺点。因为线程池的容量有限,一个几十上百线程的线程池最多也只能服务几十上百个客户,难以实现大容量高吞吐的服务端。
在上一节的服务端线程池中,看似是所有线程都在忙着应付手头上的任务,而无暇顾及新到的任务,然而事实真的如此吗?比如当线程等待用户的输入时,用户的输入可以非常慢,这时线程便会被阻塞,并非在忙碌工作着。
因此,我们是否可以将所有client的input记住,再去检查谁输入完毕,就先去读这个输入并处理。也就是说,我们应该把一个个的request去交给线程处理,而不是把整个client都交给线程去处理。这样就避免了整个线程的等待时间,即将同步处理优化为了异步处理,这就是NIO异步服务器。
在NIO异步服务器中,等待的任务不再进入一个Blocking Queue,而是进入一个Channel列表。列表中有很多的Client,在每个循环开始时,通过一个Selector选择器去选择一个有数据的Channel,接着Request Handle去处理这个有数据的Channel,如此循环。
ServerSocketChannel serverChannel = ServerSocketChannel.open();
serverChannel.configureBlocking(false);
serverChannel.bind(new InetSocketAddress(8888));
System.out.println("Listening on "
+ serverChannel.getLocalAddress());
Selector selector = Selector.open();
serverChannel.register(selector, SelectionKey.OP_ACCEPT);
ByteBuffer buffer = ByteBuffer.allocate(1024);
RequestHandler requestHandler = new RequestHandler();
while (true) {
int selected = selector.select();
if (selected == 0) {
continue;
}
Set<SelectionKey> selectedKeys = selector.selectedKeys();
Iterator<SelectionKey> keyIter = selectedKeys.iterator();
while (keyIter.hasNext()) {
SelectionKey key = keyIter.next();
if (key.isAcceptable()) {
ServerSocketChannel channel =
(ServerSocketChannel) key.channel();
SocketChannel clientChannel = channel.accept();
System.out.println("Incoming connection from "
+ clientChannel.getRemoteAddress());
clientChannel.configureBlocking(false);
clientChannel.register(
selector, SelectionKey.OP_READ);
}
if (key.isReadable()) {
SocketChannel channel =
(SocketChannel) key.channel();
channel.read(buffer);
String request = new String(buffer.array()).trim();
buffer.clear();
System.out.println(String.format(
"Request from %s: %s",
channel.getRemoteAddress(),
request));
String response = requestHandler.handle(request);
channel.write(ByteBuffer.wrap(response.getBytes()));
}
keyIter.remove();
}
}