1. tomcat网络整体架构
来自 https://www.cnblogs.com/cuzzz/p/17499364.html
上图是tomcat整个网络请求模型
- Acceptor线程作为监听线程,会通过通过 accept 方法 获取连接,该线程没有使用selector进行多路复用,使用了阻塞式的accept
- 有请求连接后,就会把该连接设置为非阻塞的socket,并且加入到队列中交给poller线程进行处理
- poller线程有selector,可以进行多路复用,从队列中获取 需要注册事件的连接
- 对于读事件来说,会先取消关心的读事件(因为JDK 的NIO是水平触发模,如果不进行取消,只要该socket缓冲区的数据没有读取完成,会一直触发读事件发生),然后交给业务线程池进行http协议解析,以及servlet业务执行
2. Tomcat读Socket数据原理
tomcat 连接建立时,监听read事件,然后使用fill(boolean block)
方法进行socket数据读取。参数block可以是阻塞模式,也可以是非阻塞模式
- 阻塞模式:如果socket.read()返回0,再次注册读事件,第二次读,如果有数据返回,没有数据,object.wait,如果超时会抛出超时异常
- 非阻塞模式:不管是否读取到数据,直接返回
http1.1协议数据解析
-
解析请求行 :非阻塞模式读取
-
解析请求头 :非阻塞模式读取
-
请求行和请求头使用非阻塞模式读,会带来的问题,比如请求头只发送了一半,请求行只有一半,然后非阻塞读的时候没有读取到新数据,tomcat的处理是直接让该连接变成长连接,并重新注册读事件监听,这样就可以不占用线程资源
-
请求体(HttpServletRequest中请求体Stream流方式读取):阻塞,没有读完就注册读事件,直到读到数据,会占用线程资源(因为会使得该线程阻塞)
怎么判断请求体是否读取完整?
Http1.1协议中使用content-length
或者 transfer-encoding
字段进行判断
content-length
:整个请求内容字节数transfer-encoding
:比如:transfer-encoding:chunk
,格式是chunk长度\r\n
,结束标志是0\r\n\r\n
3. 实现简易的Tomcat NIO网络模型
public static void main(String[] args) throws IOException {
MyTomcat tomcat= new MyTomcat ();
tomcat.start();
}
void start() {
Poller poller = new Poller();
Acceptor acceptor = new Acceptor();
acceptor.poller = poller;
acceptor.start();
poller.start();
Thread thread = new Thread(acceptor);
thread.setName("acceptor");
thread.start();
Thread pollerThread = new Thread(poller);
pollerThread.setName("poller");
pollerThread.start();
}
class Acceptor implements Runnable {
ServerSocketChannel serverSocketChannel = null;
Poller poller = null;
public void start() {
try {
serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.configureBlocking(true);
serverSocketChannel.bind(new InetSocketAddress("0.0.0.0", 9898));
} catch (IOException e) {
e.printStackTrace();
}
}
@Override
public void run() {
while (true) {
try {
SocketChannel socketChannel = serverSocketChannel.accept();
socketChannel.configureBlocking(false);
SocketWrapper socketWrapper = new SocketWrapper();
socketWrapper.socketChannel = socketChannel;
socketWrapper.poller = poller;
poller.register(socketWrapper);
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
class Poller implements Runnable {
Selector selector = null;
Executor executor = Executors.newFixedThreadPool(50);
LinkedBlockingQueue<Event> queue = new LinkedBlockingQueue<>();
public void register(SocketWrapper socketWrapper) {
Event event = new Event();
event.socketWrapper = socketWrapper;
addEvent(event);
}
public void addEvent(Event event) {
queue.add(event);
selector.wakeup();
}
public void start() {
try {
selector = Selector.open();
} catch (IOException e) {
e.printStackTrace();
}
}
@Override
public void run() {
int keyCount = 0;
while (true) {
try {
keyCount = selector.select();
Event event = null;
while ((event = queue.poll()) != null) {
SocketWrapper socketWrapper = event.socketWrapper;
SocketChannel socketChannel = socketWrapper.socketChannel;
if (event.ops == event.REGISTER) {
socketChannel.register(selector, SelectionKey.OP_READ, socketWrapper);
} else if (socketWrapper.readBlocking) {
synchronized (socketWrapper.object) {
socketWrapper.readBlocking = false;
socketWrapper.object.notify();
}
} else {
SelectionKey selectionKey = socketChannel.keyFor(selector);
if (selectionKey == null) {
socketChannel.close();
} else {
SocketWrapper attachment = ((SocketWrapper) selectionKey.attachment());
if (attachment != null) {
try {
int ops = event.ops | selectionKey.interestOps();
selectionKey.interestOps(ops);
} catch (CancelledKeyException ckx) {
selectionKey.cancel();
}
} else {
selectionKey.cancel();
}
}
}
}
if (keyCount > 0) {
Set<SelectionKey> selectionKeys = selector.selectedKeys();
System.out.println(keyCount);
Iterator<SelectionKey> iterator = selectionKeys.iterator();
while (iterator.hasNext()) {
SelectionKey selectionKey = iterator.next();
iterator.remove();
if (selectionKey.isReadable()) {
final SocketWrapper socketWrapper = (SocketWrapper) selectionKey.attachment();
selectionKey.interestOps(selectionKey.interestOps() & (~selectionKey.readyOps()));
executor.execute(() -> {
Http11Processor http11Processor = new Http11Processor();
http11Processor.process(socketWrapper);
});
}
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
class SocketWrapper {
private SocketChannel socketChannel;
private Poller poller;
private final Object object = new Object();
private volatile boolean readBlocking = false;
private long timeout = TimeUnit.SECONDS.toMillis(20);
private int read(boolean isBlock, ByteBuffer byteBuffer) {
int len = 0;
try {
if (isBlock) {
long startNanos = 0;
do {
if (startNanos > 0) {
long elapsedMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
if (elapsedMillis == 0) {
elapsedMillis = 1;
}
timeout -= elapsedMillis;
if (timeout <= 0) {
throw new RuntimeException("读取超时");
}
}
len = socketChannel.read(byteBuffer);
if (len == -1) {
throw new RuntimeException("客户端断开连接");
} else if (len == 0) {
if (!readBlocking) {
readBlocking = true;
poller.register(this);
}
synchronized (object) {
if (readBlocking) {
if (timeout > 0) {
startNanos = System.nanoTime();
object.wait(timeout);
} else {
object.wait();
}
}
}
}
} while (len == 0);
} else {
len = socketChannel.read(byteBuffer);
}
} catch (IOException | InterruptedException e) {
e.printStackTrace();
}
return len;
}
}
class Event {
private final int REGISTER = 0x100;
private int ops = REGISTER;
private SocketWrapper socketWrapper;
}
class Http11Processor {
void process(SocketWrapper socketWrapper) {
ByteBuffer byteBuffer = ByteBuffer.allocate(4 * 1024);
Request request = new Request();
//解析请求行
parseRequestLine(request, socketWrapper, byteBuffer);
//解析请求头
parseRequestHeaders(request, socketWrapper, byteBuffer);
System.out.println(request.method);
System.out.println(request.uri);
System.out.println(request.protocol);
System.out.println(request.headers);
//写
Response response = new Response();
response.socketWrapper = socketWrapper;
response.write(("<h1>hello world</h1>").getBytes(StandardCharsets.UTF_8));
}
private void parseRequestHeaders(Request request, SocketWrapper socketWrapper, ByteBuffer byteBuffer) {
boolean isParseHeaders = false;
StringBuilder stringBuilder = new StringBuilder();
String headerName = null;
while (!isParseHeaders) {
if (byteBuffer.position() >= byteBuffer.limit()) {
byteBuffer.clear();
socketWrapper.read(true, byteBuffer);
}
byte b = byteBuffer.get();
if (headerName == null && b == ':') {
headerName = stringBuilder.toString();
stringBuilder = new StringBuilder();
} else if (b == '\r') {
} else if (b == '\n' && headerName == null) {
isParseHeaders = true;
} else if (b == '\n') {
request.headers.put(headerName, stringBuilder.toString());
headerName = null;
stringBuilder = new StringBuilder();
} else {
stringBuilder.append((char) b);
}
}
}
private void parseRequestLine(Request request, SocketWrapper socketWrapper, ByteBuffer byteBuffer) {
//读 解析
socketWrapper.read(true, byteBuffer);
byteBuffer.flip();
StringBuilder stringBuilder = new StringBuilder();
boolean space = false;
while (!space) {
if (byteBuffer.position() >= byteBuffer.limit()) {
byteBuffer.clear();
socketWrapper.read(true, byteBuffer);
}
char c = (char) byteBuffer.get();
if (c == ' ') {
request.method = stringBuilder.toString();
space = true;
} else if (c == '\r') {
} else if (c == '\n') {
space = true;
} else {
stringBuilder.append(c);
}
}
space = false;
stringBuilder = new StringBuilder();
while (!space) {
if (byteBuffer.position() >= byteBuffer.limit()) {
byteBuffer.clear();
socketWrapper.read(true, byteBuffer);
}
char c = (char) byteBuffer.get();
if (c == ' ') {
request.uri = stringBuilder.toString();
space = true;
} else if (c == '\r') {
} else if (c == '\n') {
space = true;
} else {
stringBuilder.append(c);
}
}
space = false;
stringBuilder = new StringBuilder();
while (!space) {
if (byteBuffer.position() >= byteBuffer.limit()) {
byteBuffer.clear();
socketWrapper.read(true, byteBuffer);
}
char c = (char) byteBuffer.get();
if (c == '\r') {
} else if (c == '\n') {
request.protocol = stringBuilder.toString();
space = true;
} else {
stringBuilder.append(c);
}
}
}
}
class Request {
private String method;
private String uri;
private String protocol;
private int contentLength;
private final Map<String, Object> headers = new HashMap<>();
}
class Response {
SocketWrapper socketWrapper = null;
private final int maxHeaderSize = 1024 * 2;
private ByteBuffer byteBuffer = null;
public void write(byte[] bytes) {
System.out.println("write enter");
byteBuffer = ByteBuffer.allocate(maxHeaderSize);
//写入响应行
byteBuffer.put("HTTP/1.1 200 OK\r\n".getBytes(StandardCharsets.UTF_8));
//写入响应头
byteBuffer.put("Content-Type: text/html;charset=utf-8\r\n".getBytes(StandardCharsets.UTF_8));
//content-length
byteBuffer.put(("Content-Length: " + bytes.length + "\r\n").getBytes(StandardCharsets.UTF_8));
//写入响应空行
byteBuffer.put("\r\n".getBytes(StandardCharsets.UTF_8));
//写入响应体
try {
byteBuffer.flip();
SocketChannel socketChannel = socketWrapper.socketChannel;
int write = socketChannel.write(byteBuffer);
System.out.println("http header write:" + write);
ByteBuffer picByteBuffer = ByteBuffer.wrap(bytes);
while (picByteBuffer.hasRemaining()) {
int write1 = socketChannel.write(picByteBuffer);
System.out.println("picByteBuffer position:" + picByteBuffer.position() + " limit:" + picByteBuffer.limit() + " leave:" + (picByteBuffer.limit() - picByteBuffer.position()) + " http body write:" + write1 + " content-length:" + bytes.length);
}
Event event = new Event();
event.ops = SelectionKey.OP_READ;
event.socketWrapper = socketWrapper;
socketWrapper.poller.addEvent(event);
} catch (IOException e) {
e.printStackTrace();
}
}
}