Netty的本质:如何提升NIO服务端的工作效率?
我们都知道,Netty 是以高性能著称的,而高性能就体现在以 Netty 构建的服务端和客户端处理数据的效率之上。那 Netty 为何能做到如此高的性能,并被广泛应用的呢?这就要从 Netty 的本质说起了。
Netty 是一个建立在 NIO 模式之上的框架,它对 NIO 模式做了一层封装,并在其之上扩展了许多功能。因此,Netty 的高性能和 NIO 模式处理数据的效率息息相关。
所以这一节课,我们就从最基础、最本质的 NIO 模式讲起,探讨它高性能的原因,并且以 NIO 模式构建的服务端为例子,看看能采用哪些方式,进一步提高服务端的工作效率。
# NIO 是什么?
我们的课程以 NIO 为起点,但是在讲解 NIO 之前,我仍然想啰嗦几句,简单解释一下 IO 的概念。所谓 IO,就是 Input 和 Output 。一个字符从键盘输入到计算机内部,再由计算机发送给显示器,最终被我们看到,整个过程就是计算机的 IO 在做工作。 键盘就是输入设备,显示器就是输出设备。
和磁盘打交道也是如此,从磁盘加载文件到内存进行的就是 Input 操作,而将内存的数据保存在磁盘内进行的就是 Output 操作。这些操作从概念上都可以称为本地 IO 。当然,有本地 IO 就会有网络 IO 。两台计算机要进行信息交互,接受对方发送过来的数据,或者向对方发送一条消息,这就是网络 IO。
而 NIO 就是在计算机进行网络 IO 时,接收和发送数据的一种方式。它是一种同步非阻塞的 I/O 模型 ,是在 JDK 1.4 版本被提出来的。
# NIO 为什么诞生?
最开始,在进行网络 IO 的过程中,是以 BIO(阻塞 IO 模型),也就是以流的形式,一次一个字节地接收或发送数据。比如服务端接收客户端的连接时,服务端每接收到一个连接就要创建一个线程,并且该线程只对该连接负责。如果该线程管理的客户端连接迟迟没有再发送数据过来,那么该线程就会一直阻塞,直到有数据到来,才会继续开始工作。
比如,当服务端要接收来自 4 个客户端的连接时,就要创建4个线程,如果之后这些连接再没发送数据过来,这 4 个线程就会阻塞。连接客户端较少的情况下并不会有处理压力,只是有些浪费线程资源,但如果来自客户端的连接不断增加,达到成千上万个,再按照 BIO 模式接收网络数据就太荒谬了。
因为线程并不是凭空存在的,每一个线程都会占用一定的内存,而且线程的切换会耗费大量时间,甚至切换线程的时间会超过我们程序执行的时间。另外在 Java 中,线程的创建和销毁实际上都是操作系统帮我们完成的,频繁创建和销毁线程也很耗时。最后,也是最不能容忍的,如果线程管理的客户端连接没有数据到来,那么大面积的线程都将阻塞,资源会被白白浪费了。
由此可见,在处理器核数一定的情况下,BIO 实际上只适用于客户端连接比较少的情况,并不适用于高并发场景。那么,在处理器核数一定的情况下,如何创建较少的线程来管理众多客户端连接呢?这就轮到 NIO的多路复用机制登场了。
# NIO 中的多路复用机制
和 BIO 有所区别,NIO 并不是以流的方式来接收和发送数据的,它用的是 Buffer 缓冲区。也就是说,在 NIO 中,要操作的数据全部先放入缓冲区,再由缓冲区向外发送,这显然比一次一个字节处理数据的流要高效很多。
那么,NIO 的 IO 多路复用机制又是什么,如何利用它实现以较少的线程管理众多客户端连接呢?
所谓多路复用,说到底就一句话,就是用一个线程管理多个连接,管理多个连接的数据收发 。当然, 多路复用 也可以理解为 是用 selector 管理多个 channel ,处理每个 channel 到来的 IO 事件。 这两种解释其实只是外在和内在的区别,因为从表面上看,确实是一个线程管理了多个客户端连接,但深入到内部,实际上是 selector 管理了多个客户端 channel,每当有 IO 事件到来,就交给该线程去处理。多路复用最终的效果就是,服务器再也不必每接收一个客户端连接就创建一个线程了。
为了加深对 IO 多路复用的理解,我画了一张简单的示意图,大家可以看看。

当然,IO 多路复用并不是 NIO 独有的,该机制是在操作系统中实现的,NIO 只是使用了该机制,在处理数据时发挥出了强大的效果。下面,我们就从代码中看看 IO 多路复用的好处是怎么体现出来的。
下面是 NIO 中最基础的两段代码,从中可以看出服务端和客户端的工作原理。说起来确实很简单,不管是服务端还是客户端,都要创建一个 selector,然后将客户端 channel 和服务端的 channel 注册到各自的 sleector 上,并设置感兴趣的事件。当有事件到来的时候,就可以从 selector 中取出,再作出相应的处理即可。
public class SimpleClient {
public static void main(String[] args) throws IOException, InterruptedException {
Logger logger = LoggerFactory.getLogger(SimpleClient.class);
//得到客户端的channel
SocketChannel socketChannel = SocketChannel.open();
//设置非阻塞
socketChannel.configureBlocking(false);
//得到selector
Selector selector = Selector.open();
//把客户端的channel注册到selector上
SelectionKey selectionKey = socketChannel.register(selector, 0);
//设置事件
selectionKey.interestOps(SelectionKey.OP_CONNECT);
//客户端的channel去连接服务器
socketChannel.connect(new InetSocketAddress(8080));
//开始轮询事件
while (true) {
//无事件则阻塞
selector.select();
//得到事件的key
Set<SelectionKey> selectionKeys = selector.selectedKeys();
Iterator<SelectionKey> iterator = selectionKeys.iterator();
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
iterator.remove();
//如果是连接成功事件
if (key.isConnectable()) {
if (socketChannel.finishConnect()) {
socketChannel.register(selector,SelectionKey.OP_READ);
logger.info("已经注册了读事件!");
//紧接着向服务端发送一条消息
socketChannel.write(ByteBuffer.wrap("客户端发送成功了".getBytes()));
}
}
//如果是读事件
if (key.isReadable()) {
SocketChannel channel = (SocketChannel)key.channel();
//分配字节缓冲区来接受服务端传过来的数据
ByteBuffer buffer = ByteBuffer.allocate(1024);
//向buffer写入客户端传来的数据
int len = channel.read(buffer);
byte[] readByte = new byte[len];
buffer.flip();
buffer.get(readByte);
logger.info("读到来自服务端的数据:" + new String(readByte));
}
}
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
public class SimpleServer {
public static void main(String[] args) throws IOException {
Logger logger = LoggerFactory.getLogger(SimpleServer.class);
//创建服务端channel
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
//设置channel非阻塞
serverSocketChannel.configureBlocking(false);
//获得selector
Selector selector = Selector.open();
//把channel注册到selector上,现在还没有给key设置感兴趣的事件
SelectionKey selectionKey = serverSocketChannel.register(selector, 0, serverSocketChannel);
//给key设置感兴趣的事件
selectionKey.interestOps(SelectionKey.OP_ACCEPT);
//绑定端口号
serverSocketChannel.bind(new InetSocketAddress(8080));
//然后开始接受连接,处理事件,整个处理都在一个死循环之中
while (true) {
//当没有事件到来的时候,这里是阻塞的,有事件的时候会自动运行
selector.select();
//如果有事件到来,这里可以得到注册到该selector上的所有的key,每一个key上都有一个channel
Set<SelectionKey> selectionKeys = selector.selectedKeys();
//得到集合的迭代器
Iterator<SelectionKey> keyIterator = selectionKeys.iterator();
while (keyIterator.hasNext()) {
//得到每一个key
SelectionKey key = keyIterator.next();
//首先要从集合中把key删除,否则会一直报告该key
keyIterator.remove();
//接下来就要处理事件,判断selector轮询到的是什么事件,并根据事件作出回应
//如果是连接事件
if (key.isAcceptable()) {
//得到服务端的channel,这里有两种方式获得服务端的channel,一种是直接获得,一种是通过attachment获得
//因为之前把服务端channel注册到selector上时,同时把serverSocketChannel放进去了
ServerSocketChannel channel = (ServerSocketChannel)key.channel();
//ServerSocketChannel attachment = (ServerSocketChannel)key.attachment();
//得到客户端的channel
SocketChannel socketChannel = channel.accept();
socketChannel.configureBlocking(false);
//接下来就要管理客户端的channel了,和服务端的channel的做法相同,客户端的channel也应该被注册到selector上
//通过一次次的轮询来接受并处理channel上的相关事件
//把客户端的channel注册到之前已经创建好的selector上
SelectionKey socketChannelKey = socketChannel.register(selector, 0, socketChannel);
//给客户端的channel设置可读事件
socketChannelKey.interestOps(SelectionKey.OP_READ);
logger.info("客户端连接成功!");
//连接成功之后,用客户端的channel写回一条消息
socketChannel.write(ByteBuffer.wrap("我发送成功了".getBytes()));
logger.info("向客户端发送数据成功!");
}
//如果接受到的为可读事件,说明要用客户端的channel来处理
if (key.isReadable()) {
//同样有两种方式得到客户端的channel,这里只列出一种
SocketChannel channel = (SocketChannel)key.channel();
//分配字节缓冲区来接受客户端传过来的数据
ByteBuffer buffer = ByteBuffer.allocate(1024);
//向buffer写入客户端传来的数据
int len = channel.read(buffer);
logger.info("读到的字节数:" + len);
if (len == -1) {
channel.close();
break;
}else{
//切换buffer的读模式
buffer.flip();
logger.info(Charset.defaultCharset().decode(buffer).toString());
}
}
}
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
在我们上面给出的 SimpleServer 代码中,服务端接收连接的方式是不是已经和多路复用对应了呢?服务端只有一个 main 函数的线程在工作,该线程持有一个 selector,接收到的客户端连接全都注册到该 selector上。如果有 IO 事件到来,也是该线程处理。看起来我们的服务端代码已经使用了 NIO 的多路复用,在性能上比 BIO 要好很多了。但情况真的是这样吗?
如果服务器的处理器是单核的,在这样的服务器中单线程的执行效率一定高于多线程,因为线程切换会浪费时间。但是,在多核服务器中,将并发的线程数量动态调整在某个范围内,和单线程执行任务比起来,其工作效率一定是遥遥领先的。
但我们的服务端呢?从代码上看,整个服务端只创建了一个线程,如果我们的处理器是多核的,只创建一个线程完全发挥不出多核处理器的优势,我们完全可以创建更多的线程,让更多线程来工作,管理更多的客户端连接。多个线程同时工作,服务端的工作效率不就大大提高了?但是,我们要怎么增加线程的数量呢?难道要像 BIO 那样,收到一个客户端连接就创建一个线程?
别忘了,我们刚刚学习了 IO 多路复用,可以直接在 NIO 的基础上增加线程数量,多搞几个可以多路复用的线程。比如服务端一下子涌入 50 个客户端连接,那我们就搞两个多路复用的线程,每个线程处理 25 个连接。当连接没有事件的时候,线程也会阻塞,但阻塞也仅仅是阻塞两个线程。如果我们服务器的 CPU 是双核的,我们这么做的效率绝对会比在服务端创建 50 个 BIO 线程高很多,我们甚至可以搞 4 个多路复用线程。
接下来,就让我们顺着这个思路,动手修改一下代码。
# 如何为服务端添加新的线程?
我们之前一直在重复一句话,在 NIO 中可以一个线程处理多个连接,那具体是怎么处理的?
实际上,我们可以从 bind 方法的位置将 SimpleServe 的代码分为两部分,服务端 channel 和端口号绑定之后,程序就进入了 while 循环,while 循环中就是线程具体处理事件的操作。你会看到这个线程即会处理 selector 接收到的 Accept 事件,把新的客户端 channel 注册到 selector 上,也会处理 Read 事件,接收客户端发来的消息。
如果现在服务端接收到了20个连接,那只用一个线程处理就太浪费我的双核 CPU 了,于是我想再创建一个新的线程来分担 10 个连接。那我该从哪个地方入手,把新的线程添加到程序中?
首先,我们要创建的新的线程,必须持有一个新的 selector,这毋庸置疑。试想一下,如果我们的新线程仍然持有旧的 selector 会发生什么?所有连接仍然会集中在一个 selector 上,而多个线程去处理一个 selector 上的事件,很容易发生并发问题。所以,新的线程一定需要持有一个新的 selector。但我过去的 Java 编程经验告诉我,线程的创建并不由我操控,更别说向线程中添加属性,因此我们需要对线程包装一下。就像 Java 线程池中对线程的包装。对于要创建的线程,我们可以写成这样。
public class Work implements Runnable{
private Thread thread;
public Work() {
thread = new Thread(this);
}
public void start() {
thread.start();
}
@Override
public void run() {
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
接下来,我们要讨论另外一个至关重要的东西,ServerSocketChannel。请大家先想一个问题,我们需要把 ServerSocketChannel 也放进我们的 Work 对象中,以便让新线程使用吗?由此也就引申出了真正的问题,我们创建的新线程,需要使用服务端 channel 处理 selector 上的连接事件吗?如果你也感到疑惑,那我给准备了两段代码,你可以看一看。
public class Work implements Runnable{
private static final Logger logger = LoggerFactory.getLogger(Work.class);
private ServerSocketChannel serverSocketChannel;
private Selector selector = Selector.open();;
private Thread thread;
private SelectionKey selectionKey;
public Work(ServerSocketChannel serverSocketChannel) throws IOException {
this.serverSocketChannel = serverSocketChannel;
selectionKey = serverSocketChannel.register(selector,SelectionKey.OP_ACCEPT);
thread = new Thread(this);
}
public void start() {
thread.start();
}
@Override
public void run() {
while (true) {
logger.info("新线程阻塞在这里吧。。。。。。。");
try {
selector.select();
Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
while (iterator.hasNext()) {
SelectionKey selectionKey = iterator.next();
iterator.remove();
if (selectionKey.isAcceptable()) {
ServerSocketChannel channel = (ServerSocketChannel)selectionKey.channel();
SocketChannel socketChannel = channel.accept();
socketChannel.configureBlocking(false);
socketChannel.register(selector,SelectionKey.OP_READ);
logger.info("客户端channel连接新线程成功了:{}",socketChannel.toString());
socketChannel.write(ByteBuffer.wrap("我还不是netty,但我知道你上线了".getBytes()));
logger.info("服务器新线程发送消息成功!");
}
if (selectionKey.isReadable()) {
SocketChannel channel = (SocketChannel)selectionKey.channel();
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
int len = channel.read(byteBuffer);
if (len == -1) {
logger.info("客户端通道要关闭!");
channel.close();
break;
}
byte[] bytes = new byte[len];
byteBuffer.flip();
byteBuffer.get(bytes);
logger.info("收到客户端发送的数据:{}",new String(bytes));
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
public class TestServer {
private static final Logger logger = LoggerFactory.getLogger(TestServer.class);
public static void main(String[] args) throws Exception {
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.configureBlocking(false);
//创建新的线程
Work work = new Work(serverSocketChannel);
Selector selector = Selector.open();
SelectionKey selectionKey = serverSocketChannel.register(selector, 0, serverSocketChannel);
selectionKey.interestOps(SelectionKey.OP_ACCEPT);
serverSocketChannel.bind(new InetSocketAddress(8080));
//启动线程
work.start();
while (true) {
logger.info("main函数阻塞在这里吧。。。。。。。");
selector.select();
Set<SelectionKey> selectionKeys = selector.selectedKeys();
Iterator<SelectionKey> keyIterator = selectionKeys.iterator();
while (keyIterator.hasNext()) {
SelectionKey key = keyIterator.next();
keyIterator.remove();
if (key.isAcceptable()) {
ServerSocketChannel channel = (ServerSocketChannel)key.channel();
SocketChannel socketChannel = channel.accept();
socketChannel.configureBlocking(false);
//接下来就要管理客户端的channel了,和服务端的channel的做法相同,客户端的channel也应该被注册到selector上
//通过一次次的轮询来接受并处理channel上的相关事件
//把客户端的channel注册到之前已经创建好的selector上
SelectionKey socketChannelKey = socketChannel.register(selector, 0, socketChannel);
//给客户端的channel设置可读事件
socketChannelKey.interestOps(SelectionKey.OP_READ);
logger.info("客户端在main函数中连接成功!");
//连接成功之后,用客户端的channel写回一条消息
socketChannel.write(ByteBuffer.wrap("我发送成功了".getBytes()));
logger.info("main函数服务器向客户端发送数据成功!");
}
if (key.isReadable()) {
SocketChannel channel = (SocketChannel)key.channel();
ByteBuffer buffer = ByteBuffer.allocate(1024);
int len = channel.read(buffer);
logger.info("读到的字节数:" + len);
if (len == -1) {
channel.close();
break;
}
//切换buffer的读模式
buffer.flip();
logger.info(Charset.defaultCharset().decode(buffer).toString());
}
}
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
可以看到,我们在 TestServer 中创建了新的线程,并且在服务端 channel 和端口号绑定之后启动了新的线程,这时候向下运行,不管是新线程还是 main 函数的线程,都会阻塞在 selector.select() 方法上,就像下面这样。
[main] INFO nio.server.doserver.ServerBootstrap - main函数阻塞在这里吧。。。。。。。
[Thread-0] INFO nio.server.doserver.ServerBootstrap - 新线程阻塞在这里吧。。。。。。。
2
接着我们启动刚才的 SimpleClient,很快就会发现服务端报了一个空指针异常。
[main] INFO nio.server.doserver.ServerBootstrap - 客户端在main函数中连接成功!
Exception in thread "Thread-0" java.lang.NullPointerException
at nio.server.Work.run(Work.java:53)
at java.lang.Thread.run(Thread.java:750)
[main] INFO nio.server.doserver.ServerBootstrap - main函数服务器向客户端发送数据成功!
[main] INFO nio.server.doserver.ServerBootstrap - main函数阻塞在这里吧。。。。。。。
2
3
4
5
6
在 Work 类中的第 53 行,也就是这行代码:socketChannel.configureBlocking(false),socketChannel 为null,那么这行代码的前面是什么呢?是这两行代码。
ServerSocketChannel channel = (ServerSocketChannel)selectionKey.channel();
SocketChannel socketChannel = channel.accept();
2
到这里,我相信刚才那个问题的答案已经相当明朗了,新的线程需要持有服务端 channel 吗?当然不需要。只启动一个客户端去连接服务端,而服务端有两个线程,每个线程都持有一个 selector ,所以客户端连接进来的时候每个 selector 都可以接收到连接事件, 但是真正接收到的客户端连接只有一个,被其中一个线程处理并接收了,另一个线程得到的当然为 null 。
好的,我们又解决了一个问题。可以继续谈下一个问题了,既然新创建的线程不需要服务端 channel,也就意味着该线程不必处理 selector 上的 Accept 事件,因为只有服务端的 channel 才能处理 Accept 事件。这样 的话, 我们新创建的线程能处理的只有 read 和 write 事件了 ( 这只是服务端接收到的客户端的 channel 所能处理的事件,并不是 SimpleClient 中的客户端 channel )。现在我们暂且只关心 read 事件,这样一来,我们的目标就很明确了。新创建的线程持有一个新的 selector,当有新的客户端连接被接收后,该客户端的 channel 将被注册到新线程持有的 selector 上,每当 selector 有读事件到来,就由新的线程来处理。
很好,接下来我们就可以着手修改代码了。但在这之前,我们还有最后两个问题需要解决,访问服务端的客户端 channel 该由谁来接收,接收到的客户端 channel 又该由谁注册到新线程的 selector 上?第一个问题很容易回答,继续让主线程接收客户端的连接就行了,就像 SimpleServer 中那样,在 main 函数中接收客户端连接,创建客户端 channel。
那第二个问题呢?如果仍由主线程把创建好的客户端 channel 注册到新线程的 selector 上,就必须在主线程中获得该 selector。 那 如果由新创建的线程把客户端 channel 注册到新的selector 上 ,是否可以 呢? 带着这两种思考,我们终于可以开始修改代码了。
# 重写Work类,引出线程池
先看第一种情况,让主线程把客户端 channel 注册到新线程的 selector上。代码没有太多的变化。
public class Work implements Runnable{
private static final Logger logger = LoggerFactory.getLogger(Work.class);
private boolean flags;
private Selector selector = Selector.open();;
private Thread thread;
private SelectionKey selectionKey;
public Work(ServerSocketChannel serverSocketChannel) throws IOException {
thread = new Thread(this);
}
public Selector getSelector() {
return selector;
}
public void setSelector(Selector selector) {
this.selector = selector;
}
public void start() {
if (flags) {
return;
}
flags = true;
thread.start();
}
@Override
public void run() {
while (true) {
logger.info("新线程阻塞在这里吧。。。。。。。");
try {
selector.select();
Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
while (iterator.hasNext()) {
SelectionKey selectionKey = iterator.next();
iterator.remove();
if (selectionKey.isReadable()) {
SocketChannel channel = (SocketChannel)selectionKey.channel();
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
int len = channel.read(byteBuffer);
if (len == -1) {
logger.info("客户端通道要关闭!");
channel.close();
break;
}
byte[] bytes = new byte[len];
byteBuffer.flip();
byteBuffer.get(bytes);
logger.info("新线程收到客户端发送的数据:{}",new String(bytes));
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
public class TestServer {
private static final Logger logger = LoggerFactory.getLogger(TestServer.class);
public static void main(String[] args) throws Exception {
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.configureBlocking(false);
//创建新的线程
Work work = new Work();
//得到work中的selector
Selector workSelector = work.getSelector();
Selector selector = Selector.open();
SelectionKey selectionKey = serverSocketChannel.register(selector, 0, serverSocketChannel);
selectionKey.interestOps(SelectionKey.OP_ACCEPT);
serverSocketChannel.bind(new InetSocketAddress(8080));
while (true) {
logger.info("main函数阻塞在这里吧。。。。。。。");
selector.select();
Set<SelectionKey> selectionKeys = selector.selectedKeys();
Iterator<SelectionKey> keyIterator = selectionKeys.iterator();
while (keyIterator.hasNext()) {
SelectionKey key = keyIterator.next();
keyIterator.remove();
if (key.isAcceptable()) {
ServerSocketChannel channel = (ServerSocketChannel)key.channel();
//得到客户端的channel
SocketChannel socketChannel = channel.accept();
socketChannel.configureBlocking(false);
//把客户端的channel注册到新线程的selector上,但这时,新的线程还未启动
SelectionKey socketChannelKey = socketChannel.register(workSelector, 0, socketChannel);
//给客户端的channel设置可读事件
socketChannelKey.interestOps(SelectionKey.OP_READ);
//可以启动新的线程了,在while循环中,我们必须保证线程只启动一次
work.start();
logger.info("客户端在main函数中连接成功!");
//连接成功之后,用客户端的channel写回一条消息
socketChannel.write(ByteBuffer.wrap("客户端发送成功了".getBytes()));
logger.info("main函数服务器向客户端发送数据成功!");
}
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
启动客户端之后,发现确实可以按照我们预想的那样接收连接和数据。连接在主线程中被接收,然后注册到新线程的 selector 上,然后在新线程中接收到了客户端发来的消息。
[main] INFO nio.server.doserver.ServerBootstrap - main函数阻塞在这里吧。。。。。。。
[main] INFO nio.server.doserver.ServerBootstrap - 客户端在main函数中连接成功!
[Thread-0] INFO nio.server.doserver.ServerBootstrap - 新线程阻塞在这里吧。。。。。。。
[main] INFO nio.server.doserver.ServerBootstrap - main函数阻塞在这里吧。。。。。。。
[Thread-0] INFO nio.server.doserver.ServerBootstrap - 新线程收到客户端发送的数据:客户端发送成功了
[Thread-0] INFO nio.server.doserver.ServerBootstrap - 新线程阻塞在这里吧。。。。。。。
2
3
4
5
6
可以看到,在没有新的连接和数据的时候,主线程和新创建的线程都会阻塞。看起来一切都很顺利,但是这么做真的没有弊端吗?
试想一下,如果一瞬间有几百 、 几千个客户端连接涌进来,服务端的主线程必须处理完一个连接,确保它真正注册到新建线程持有的 selector 上,才能继续处理下一个。 如果我比较追求性能,并且坚持把响应速度放在第一位,那在客户端连接特别多的情况下,这种方式无疑会令我苦恼。
那有没有更好的方法呢?如果主线程不必关心是否注册成功,直接把创建成功的客户端 channel 丢给新创建的线程,就能直接处理下一个连接就好了。换言之,就是让新创建的线程自行把主线程接收到的客户端 channel 注册到自己持有的 selector 上 。 这样一来,服务端处理客户端连接的效率就更高了。那么,这种方案究竟可行吗?当然是可行的。接下来,我们看一段改造后的代码。
public class Work implements Runnable{
private static final Logger logger = LoggerFactory.getLogger(Work.class);
private volatile boolean start;
private final SelectorProvider provider;
private Selector selector;
private Thread thread;
private SelectionKey selectionKey;
private SocketChannel socketChannel;
public Work() {
//java中的方法,通过provider不仅可以得到selector,还可以得到ServerSocketChannel和SocketChannel
provider = SelectorProvider.provider();
this.selector = openSecector();
thread = new Thread(this);
}
public void register(SocketChannel socketChannel) {
try {
this.socketChannel = socketChannel;
socketChannel.configureBlocking(false);
//在这里注册有用吗?这里仍然是主线程注册channel到新的selector上
selectionKey = socketChannel.register(selector,SelectionKey.OP_READ);
start();
} catch (IOException e) {
logger.error(e.getMessage());
}
}
/**
* @Author: PP-jessica
* @Description:得到用于多路复用的selector
*/
private Selector openSecector() {
try {
selector = provider.openSelector();
return selector;
} catch (IOException e) {
throw new RuntimeException("failed to open a new selector", e);
}
}
public void start() {
if (start) {
return;
}
start = true;
thread.start();
}
@Override
public void run() {
while (true) {
logger.info("新线程阻塞在这里吧。。。。。。。");
try {
selector.select();
Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
while (iterator.hasNext()) {
SelectionKey selectionKey = iterator.next();
iterator.remove();
if (selectionKey.isReadable()) {
SocketChannel channel = (SocketChannel)selectionKey.channel();
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
int len = channel.read(byteBuffer);
if (len == -1) {
logger.info("客户端通道要关闭!");
channel.close();
break;
}
byte[] bytes = new byte[len];
byteBuffer.flip();
byteBuffer.get(bytes);
logger.info("新线程收到客户端发送的数据:{}",new String(bytes));
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
public class TestServer {
private static final Logger logger = LoggerFactory.getLogger(TestServer.class);
public static void main(String[] args) throws Exception {
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.configureBlocking(false);
Selector selector = Selector.open();
SelectionKey selectionKey = serverSocketChannel.register(selector, 0, serverSocketChannel);
selectionKey.interestOps(SelectionKey.OP_ACCEPT);
serverSocketChannel.bind(new InetSocketAddress(8080));
Work work = new Work();
while (true) {
logger.info("main函数阻塞在这里吧。。。。。。。");
selector.select();
Set<SelectionKey> selectionKeys = selector.selectedKeys();
Iterator<SelectionKey> keyIterator = selectionKeys.iterator();
while (keyIterator.hasNext()) {
SelectionKey key = keyIterator.next();
keyIterator.remove();
if (key.isAcceptable()) {
ServerSocketChannel channel = (ServerSocketChannel)key.channel();
//得到客户端的channel
SocketChannel socketChannel = channel.accept();
//把客户端的channel注册到新线程的selector上
work.register(socketChannel);
logger.info("客户端在main函数中连接成功!");
//连接成功之后,用客户端的channel写回一条消息
socketChannel.write(ByteBuffer.wrap("我发送成功了".getBytes()));
logger.info("main函数服务器向客户端发送数据成功!");
}
}
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
仍然是服务端和 Work 的代码,但这次比之前又有了些不同,启动客户端和服务端后也可以正常运行。但是,这段代码并不是我们想要的结果。可以看到,在服务端调用的 work.register(socketChannel) 这个方法,仍然是由主线程把客户端 channel 注册到新线程持有的 selector 上。那么,我们究竟该怎么办呢?
不要灰心,这时候,也许你的脑海里已经有一个声音对你大喊了:用线程池啊,笨蛋 !把 SocketChannel注册到新线程的 selector 这个步骤封装成一个异步任务,主线程只负责把任务提交给线程池就行! 很可能这个声音会让你已被打开的思路一发不可收拾,因为结合平时的工作经验,你很快就会回过神来。在工作中需要用多线程处理任务时,恐怕没人会在项目中直接 new 出一个个线程,大家都是用线程池来管理线程。因此,我们也急需一个线程池帮我们处理任务,但不要忘记,我们采用的是 NIO 多路复用模式来处理 channel 的 IO 事件,这要求我们只能用一个线程来管理多个连接。其实很简单,我们创建一个只管理一个线程的线程池就好了。
具体该怎么实现呢?我们下节课继续讲解。
# 小 结
好了,这节课就讲到这里,虽然代码块有点多,但我们也能看到大多数方法都是重复的,没有复杂的代码。我们通过分析 BIO 的不足,引出了 NIO 中的多路复用,然后一点点改善服务端的代码,尽可能地去提高它处理客户端连接的效率。
总的来说,这节课的重点只有一个,那就是掌握 NIO 线程的工作模式,并且这套模式会一直沿用到 Netty 中。我们再来复习下:每一个 NIO 线程都持有一个 selector,selector 可以管理多个 channel,每当 selector 接收到各个 channel 的 IO 事件,NIO 线程就会处理该 selector 上的事件。简单来说,整个流程也就这点东西。
当然,这些东西还不足以大大提高我们代码的性能,所以我们需要用到线程池,来进一步提高 NIO 模式下服务端处理客户端连接的效率。而线程池的引入,标志着我们离 Netty 的核心 Reactor 线程模型更进一步了。