JavaGuide
[v5.0
]、个人Netty
技术文档、满一航老师的Netty
课程、黑马的IO
通信模式课程
Netty1-I/O模式1.1-BIO概述1.2-NIO概述1.3-NIO核心组件BufferChannelSelectorPath、Paths&Files1.4-NIO线程模型单线程模型多线程模型1.5-AIO概述1.6-辨析BIO
、NIO
和AIO
2-Netty2.1-Netty核心组件EventLoopEventLoopGroupByteBufCompositeByteBufUnpooledChannelEmbeddedChannelFuture & PromiseFuturePromiseChannelFutureCloseFutureChannelPipelineChannelHandlerChannelHandlerContextChannelOptionChannelInitializerServerBootstrapBootstrap2.2-Reactor模式2.3-Netty线程模型2.4-服务端启动过程2.5-客户端启动过程2.6-空轮询BUG3-通信方案3.1-黏包半包问题分析解决方案短连接定长解码器行解码器LTC解码器3.2-通信协议Redis通信协议HttpServerCodec自定义协议序列化算法3.3-聊天室设计3.4-连接通道假死断线重连3.5-TCP
三次握手3.6-RPC
框架搭建3.7-select
、poll
和epoll
BIO
概念:Blocking IO
,基于流的同步阻塞式IO
通信模式,一个线程处理一个客户端连接的全部通信事件,用于监听新客户端的接入事件的socket.accept
方法以及监听已连接客户端的可读事件的socket.getInputStream().read(byte[]);
方法都会阻塞当前线程直到有接入事件或者可读事件发生,因此一个线程不能同时处理新客户端的接入事件和已连接客户端的可读事件,一个线程也不能同时处理多个已连接客户端的可读事件,JDK1.4
以前BIO
是Java
网络通信的唯一选择
Java
中的各种IO
流都是BIO
通信模式
服务端通过一个线程循环调用Socket socket=serverSocket.accept()
阻塞等待新客户端的socket
连接接入,新客户端接入时获取到socket
对象后,将socket
封装成任务对象中交给一个单独的线程进行处理,线程获取封装在socket
中的输入流并循环读取处理流中的数据如socket.getInputStream().read(byte[]);
,循环调用socket.getInputStream().read(byte[]);
直到没有数据可以读取然后阻塞在read
方法上,后续只要socket
连接没有断开该连接通道的每次可读事件都在该循环中被处理;要向另一端写出数据只需要调用socket.getOutputStream().write(byte[])
即可
服务端和客户端都可以使用特定的流对socket
中的InputStream
和OutputStream
进行包装以适应文件上传、聊天消息、远程调用等各种场景
客户端发送完数据需要调用socket.shutdownOutput()
通知服务端连接已断开,即使缓冲区byte[]
数据没有读满也无需继续阻塞等待在socket.getInputStream().read(byte[]);
,如果没有调用该方法通知服务端连接已断开,当连接断开时服务端会直接抛出异常,如果对异常处理的不好可能会导致最后一次缓冲区的数据丢失,像文件上传这种场景就可能导致文件数据不完整
客户端与客户端的通信流程:服务端通过new ServerSocket()
指定服务端socket
通信端口创建serverSocket
实例,通过Socket socket=serverSocket.accept()
等待建立客户端连接并获取对应socket
对象,将socket
对象存入集合容器并将socket
封装到任务对象交给单独的线程处理,该线程获取到socket
对象中的输入流读取通信数据,从socket
集合中获取到目标客户端与服务端连接通道对应的socket
对象,通过socket
对象的输出流将消息发送给一个或多个目标客户端,客户端下线了,与服务端的socket
连接断开,服务端收到客户端下线通知或者没有收到客户端下线通知抛出异常时,服务端从集合中移除对应的socket
对象
特点:
程序编写简单
每个socket
连接请求都要创建一个线程专门处理,对服务器性能要求高;
客户端的并发访问突增会导致服务端的线程开销同比例突增,而且这些线程全部是同步阻塞式线程,如果客户端没有新的通信事件负责当前客户端通信事件处理的线程会一直阻塞在read
方法上等待客户端数据无法执行其他任务,不控制客户端数量很容易导致线程栈溢出,因此BIO
只适用于连接数小且固定的场景,早期的服务器使用BIO
一般设计为线程池配合短连接的方案
NIO
概念:
non-blocking IO
,基于通道、缓冲区和选择器的同步非阻塞式IO
通信模式,JDK1.4
引入;NIO
的核心组件包括Seletor
选择器、Channel
通道、Buffer
缓冲区,相关类在java.nio
包下;
NIO
阻塞模式
NIO
阻塞模式下,serverSocketChannel
的accept
方法在没有检测到新客户端连接请求时会阻塞当前线程,socketChannel
的read
方法在没有检测客户端的写数据请求时也会阻塞当前线程
阻塞模式下NIO
也只能像传统BIO
通信模式一样,一个线程负责处理客户端接入请求,一个线程负责一个已连接客户端通道的写数据请求,JVM
中一个线程在32
位和64
位操作系统中默认大小分别为320K
和1M
,连接数过多必然导致虚拟机栈的OOM
,线程太多也会因为频繁上下文切换降低系统性能,非阻塞模式配合线程池只适合短连接场景
NIO
非阻塞模式
NIO
非阻塞模式下,serverSocketChannel
的accept
方法在没有检测到新客户端连接请求时不会阻塞当前线程直接返回null
,socketChannel
的read
方法在没有检测客户端的写数据请求时也不会阻塞当前线程直接返回0
选择器必须配合网络通道的非阻塞模式才能实现多路复用让一个线程不停地轮询管理多个通道上的就绪事件
服务端一个线程创建服务端Socket
通道ServerSocketChannel
以及选择器,将ServerSocketChannel
注册到选择器中并指定监听该通道的接入事件,每当有接入事件发生时通过serverSocketChannel.accept()
方法获取到已建立连接的客户端通道SocketChannel
,将客户端通道注册到同一选择器中并指定监听客户端socket
通道的可读事件,从头到尾只有一个线程监听注册在同一个选择器上的单个ServerSocketChannel
的接入事件和所有SocketChannel
的可读事件,通过选择器的select
方法能获取到所有就绪事件的迭代器,遍历所有就绪事件根据事件类型仍使用当前线程轮询处理每一个事件,迭代器的所有事件遍历处理完成后阻塞在selector.select()
方法上直到有新的接入或者可读事件发生重复上述对事件的处理过程;
客户端通过SocketChannel.open()
方法传参服务端主机地址和端口与服务端建立连接并返回一个非阻塞的SocketChannel
通道,通过ByteBuffer
可以向该通道写出数据,如果客户端需要监听服务端发送过来的数据需要新起一个线程创建选择器,将当前SocketChannel
注册到客户端的选择器中并监听处理该通道上的可读事件即可,必须起新线程的原因是处理用户输入的扫描器和选择器监听可读事件的select
方法都会阻塞当前线程
特点:
适合连接数多、流量低的场景,比如聊天服务器、弹幕系统、服务器间的网络通信。流量低是因为自始至终只有一个线程轮询一个选择器上注册的所有通道的就绪事件并依次串行处理,NIO
要求每个通信事件的操作时间要尽可能短,如果事件的处理事件太长会长时间占用当前线程,降低本次轮询中后续所有事件的处理效率
生产环境的NIO
编程很复杂,非常容易出现BUG
,特别是大型系统的研发维护成本非常高,一般都使用对基于NIO
的高级网络通信框架Netty
来开发网络通信应用
NIO
的非阻塞体现在,非阻塞模式下线程不会在调用socketChannel.read()
方法后socket
连接未断开但无事发生期间死等客户端写数据请求
通道和缓冲区可以双向读写数据,区别于IO
流要么只能读入数据,要么只能写出数据
提供基于直接内存的直接缓冲区,分配效率低,读写效率高,不受垃圾回收器的管理
概念:缓冲区是用于写入和读取数据的数组,提供一组方法让用户方便地访问和操作缓冲区中的数据,缓冲区专门与NIO
通道交互,数据可以从通道读取到缓冲区,也可以从缓冲区写出到通道
将缓冲区数据写入通道实际是直接将缓冲区数据写入网络套接字socket
,通道不直接与socket
交互
Buffer
的子类:ByteBuffer
、CharBuffer
、ShortBuffer
、IntBuffer
、LongBuffer
、FloatBuffer
、DoubleBuffer
,可以通过XxxxBuffer.allocate(int capacity)
来创建一个指定容量的非直接缓冲区对象,可以通过XxxxBuffer.allocateDirect(int capacity)
来创建一个指定容量的直接缓冲区对象;最常用的是抽象类ByteBuffer
的子实现类MappedByteBuffer
、DirectByteBuffer
、HeapByteBuffer
,其他类型的缓冲区很少使用
缓冲区的属性:
capacity
:缓冲区容量
limit
:缓冲区可以被操作数据的容量,写入数据时limit
等于缓冲区容量,读取数据时limit
等于实际写入的字节数
position
:下一个要读取或者写入数据位置的索引,索引从0
开始
mark
:调用buffer.mark()
方法时会将position
的值赋值给mark
reset
:调用buffer.reset()
方法时会将mark
的值赋值给position
直接缓冲区:直接缓冲区基于直接内存实现,直接内存可以同时被操作系统和JVM
读写,能减少一次数据在用户空间和操作系统空间之间的拷贝,IO
性能更高;直接内存的申请和销毁的性能开销很高,而且直接内存不受JVM
管理,在NIO
中使用操作不当容易导致内存泄漏;Netty
采用对象池管理直接缓冲区,降低直接缓冲区的申请频率,使用对象池来申请和销毁直接缓冲区降低内存泄漏的机会;直接内存是本地内存,不会占用JVM
内存,不受GC
的影响,不会因为内存规整和STW
暂停IO
过程,适合缓存生命周期很长的大量数据;
非直接缓冲区:非直接缓冲区基于堆内存实现,无论是向网卡或者磁盘写出数据还是从磁盘或者网络写入数据,都需要先将数据从用户空间即堆内存拷贝到操作系统空间即本地内存再由操作系统写入网卡或者本地磁盘,或者由操作系统从磁盘或者网卡写入到本地内存再拷贝到堆内存;而且受GC
管理,IO
过程会受到内存规整和STW
的影响
特点
每个通道都会发生半包问题,因此缓冲区不能同时被多个通道共用
常用方法
buffer.flip()
:将缓冲区的position
赋值给limit
,将position
赋值为0
将刚写完数据的缓冲区切换为读模式
buffer.clear()
:将position
赋值为0
,将limit
赋值为capacity
将缓冲区切换为写模式
buffer.compact()
:将缓冲区position
到limit
之间的未读取数据按次序移到缓冲区的开头,将position
移到未读数据最后一位的下一位,limit
重置为capacity
,准备从未读数据的最后一位的下一位写入数据
buffer.isDirect()
:判断缓冲区是否为直接缓冲区
buffer.get()
:该方法及其重载方法能读取position
处的单个字节、读取指定索引处的单个字节以及读取指定长度的字节数组,配合mark()
和reset()
方法能重复读取缓冲区的部分数据,而且读取指定索引处的单个字节不会改变position
的值
buffer.put()
:该方法及其重载方法能写入position
处单个字节、将单个字节写入到指定索引处、写入指定长度的字节数组以及将旧缓冲区数据写入到新缓冲区,且将单个字节写入到指定索引不会改变position
的值
buffer.remaining()
:返回position
和limit
之间的元素个数即未读取字节数
buffer.rewind()
:将position
赋值0
,将mark
赋值为-1
,一般用于重复读取缓冲区
buffer.array()
:将缓冲区转换为字节数组
字符串和byteBuffer
相互转换的几种方式
ByteBuffer.wrap(string.getBytes())
:将字符串转换为缓冲区并返回,缓冲区会被切换为读模式
byteBuffer.put(string.getBytes())
:使用操作系统默认编码将字符串写入指定byteBuffer
对象中,缓冲区不会被切换为读模式
StandardCharsets.UTF_8.encode("hello")
:使用指定编码格式将字符串转换为缓冲区并返回,缓冲区会被切换为读模式,注意Charset.defaultCharset()
可以获取当前操作系统的默认字符集
StandardCharsets.UTF_8.decode(byteBuffer).toString()
:使用指定编码格式将缓冲区未读取的所有字节按照指定编码格式转换为字符串,只有处于读模式的byteBuffer
才能调用该方法,写模式下的byteBuffer
调用该方法会得到空串
概念:
文件或者socket
套接字的数据传输通道,通道是双向的可以同时读写数据,而且通道可以由操作系统异步地从缓冲读取数据或者向缓冲写入数据,执行结束通过调用回调方法通知应用进行下一步处理
通道可以通过输入流、输出流或者随机读写流的getChannel()
方法获取;可以通过SocketChannel.open()
指定接收端的主机和IP
获取SocketChannel
;可以通过ServerSocketChannel.open()
绑定本地监听端口获取ServerSocketChannel
;可以通过serverSocketChannel.accept()
获取成功建立连接的SocketChannel
Channel
是一个接口,常用子实现类有负责文件读写的FileChannel
、做UDP
网络通信的DatagramChannel
、做TCP
网络通信的SocketChannel
和ServerSocketChannel
FileChannel
特点:
FileChannel
只能工作在阻塞模式下,不能配合选择器一起使用,只有网络通信通道才能工作在非阻塞模式下搭配选择器一起使用
常用方法
fileChannel.read()
:该方法及其重载方法可以从文件通道中将数据读取到字节缓冲区或者字节缓冲区数组
fileChannel.write()
:该方法及其重载方法可以将字节缓冲区或者字节缓冲区数组写出到文件通道
文件通道可以写入的数据大小是不受限制的,缓冲区的数据可以一次性全部写入文件通道;写入文件通道不是直接写入磁盘,而是写入操作系统缓冲,当文件通道关闭时操作系统才会将缓冲中的数据同步到磁盘文件;可以调用channel.force(true)
强制立刻将文件内容和元数据写入磁盘文件但是会影响应用性能
fileChannel.size()
:返回文件通道对应文件的字节大小
fileChannel.position()
:该方法及其重载方法获取当前文件读写指针位置或者将读写指针设置为新位置
isChannel.transferTo()
、osChannel.transferFrom()
:将文件输入通道isChannel
对应文件中指定起始位置和指定字节长度的数据拷贝写出到文件输出通道
这种文件复制会利用零拷贝进行优化,而且代码非常简洁
该方法一次调用能复制的最大数据量为2G
,超出的数据无法写入新文件,需要多次调用该方法
ServerSocketChannel
特点:做TCP
网络通信时客户端与服务端网络数据传输通道,该通道用于动态监听与服务端新建立TCP
连接的接入事件并创建socket
通道对应的SocketChannel
实例
SocketChannel
类似BIO
模式下的Socket
,ServerSocketChannel
类似BIO
模式下的ServerSocket
常用方法:
ServerSocketChannel.open()
:通过指定服务端通信端口创建用于监听socket
连接事件的ServerSocketChannel
实例,也可以通过serverSocketChannel.bind()
方法绑定服务端通信端口
serverSocketChannel.accept()
:当新客户端接入时获取对应的Socket
通道SocketChannel
阻塞模式下客户端没有新的接入请求线程会在该方法上阻塞等待,非阻塞模式下客户端没有新的接入请求会返回null
且线程会继续向下运行
serverSocketChannel.configureBlocking(false)
:将服务端socket
通道切换为非阻塞模式
serverSocketChannel.register()
:传参选择器、事件监听类型和通道附件将通道注册到指定选择器上,该方法返回SelectionKey
表示通道的事件监听对象,服务端监听接入事件、可读可写事件的选择器是同一个
SocketChannel
特点:做TCP
网络通信时客户端与服务端之间进行网络数据传输的通道,用于监听客户端和服务端上的可读可写事件
常用方法
socketChannel.write(ByteBuffer buf)
:将数据从缓冲区写出到socket
套接字
单次网络数据传输量有上限,操作系统分配的socket
缓冲区满了会停止socketChannel.write(byteBuffer)
方法的执行并返回已写入缓冲区数据的字节数,开始网络传输写入socket
缓冲区的数据,socket
缓冲区的大小不固定,范围在2-8M
之间浮动,只有socket
缓冲区清空了才能继续发送下一条消息
socket
缓冲区正在向网络写出数据时,即使调用socketChannel.write(byteBuffer)
也无法将数据成功写入socket
缓冲区,一般NIO
都是使用一个线程管理若干通道的读写事件,大量数据网络传输过程中,单个线程死循环调用socketChannel.write(byteBuffer)
,在socket
缓冲区未准备就绪的情况下会形成CAS
自旋阻塞的效果导致CPU
大量空转;socket
缓冲区就绪时因为缓冲区还有可写数据会自动触发对应通道的可写事件
程序可以被优化设计为,当缓冲区数据不能一次性完全写入通道时,让选择器监听对应通道的可写事件,调用selectionKey.attach(byteBuffer)
将未写完的缓冲区存储为通道的附件,让当前线程继续处理其他可读事件,待socket
缓冲区清空后自动触发可写事件再继续向socket
缓冲区写出剩余数据;因为数据量较大,当缓冲区数据全部写出时清空附件让释放缓冲区并取消监听通道的可写事件
socketChannel.read(ByteBuffer buf)
:将数据从socket
套接字读入缓冲区,返回实际读取的字节数;
阻塞模式下客户端没有写数据请求线程会在该方法上阻塞等待,非阻塞模式下客户端没有写数据请求该方法会返回0
且线程会继续向下运行
socketChannel.configureBlocking(false)
:将socket
通道切换为非阻塞模式,防止线程在轮询处理可读事件时将通道数据读取完后阻塞在socketChannel.read()
方法上无法继续轮询后续就绪事件
socketChannel.close()
:不管是客户端宕机暴力断开连接,还是在任意一端调用socketChannel.close()
正常关闭连接,都会触发被关闭通道的可读事件,暴力断开的可读事件在调用read()
方法处理可读事件时直接抛异常,正常断开的可读事件调用read()
方法时也无法处理该可读事件并直接返回-1
;
正常断开和异常断开都一定要调用selectionKey.cancel()
取消通道在选择器中的注册,如果不取消通道在选择器中的注册,即使捕获了异常,选择器的select
方法也会因为持续监听到无法被处理的可读事件导致当前线程无法被阻塞,再次处理该读事件还是继续抛出异常或者返回-1
继续重复上述过程导致死循环
socketChannel.open()
:传参服务端主机地址和端口与服务端建立连接并返回对应socket
通道
客户端一般不会涉及到多连接业务,不需要ServerSocketChannel
来动态监听客户端的socket
接入事件,可以直接通过该方法获取socket
通道向服务端写出数据或者将该通道注册到选择器中监听通道上的可读事件接收服务端传输来的数据,由于处理用户输入的扫描器和选择器监听可读事件的select
方法都会阻塞当前线程,因此监听可读事件和向服务端写出数据不能使用同一个线程
socketChannel.register()
:传参选择器、事件监听类型和通道附件将通道注册到指定选择器上,通道附件指和通道唯一绑定的对象,该对象的生命周期和通道的生命周期一样长,在非定长消息使用分隔符解决黏包半包问题的场景下可以将可扩容的缓冲区作为通道附件来解决单条消息长度超出缓冲区容量的问题,该方法返回SelectionKey
表示通道的事件监听对象
该方法在执行时如果另一个线程阻塞在同一个选择器的select
方法上,该方法的执行线程也会一直阻塞到select
方法执行结束
概念:
选择器也叫多路复用器,选择器可以监听在其上注册的所有ServerSocketChannel
的接入事件和SocketChannel
的可读可写事件,获取所有处于就绪状态的事件,轮询处理所有就绪事件配合NIO
非阻塞模式能实现一个线程管理多个通道
多路复用:单个线程配合选择器实现对多个通道事件的监听管理,只有网络通道才有多路复用的概念
选择器必须配合NIO
非阻塞模式一起使用,避免轮询处理事件期间线程阻塞在serverSocketChannel.accept()
和socketChannel.read()
方法上
事件类型用SelectionKey
中int
类型的1
、4
、8
、16
四个常量表示,分别表示可读、可写、连接建立、有连接请求事件;通过数字指定要监听的通道事件类型,如果要监听通道上的多种事件使用位或运算符或者无进位求和运算符连接不同的事件类型
可写事件的就绪条件是缓冲区有数据且socket
缓冲区就绪
选择器维护一个集合保存注册在其上的所有通道的监听事件SelectionKey
,同时还维护一个集合selectedKeys
保存当前就绪的事件SelectionKey
,就绪事件会被选择器主动存入集合selectedKeys
但是无法由选择器主动移除;
因此事件处理完后必须手动调用事件迭代器的remove
方法将事件监听对象SelectionKey
从集合selectedKeys
中移除,否则下次从选择器获取selectedKeys
的迭代器时仍会获取到已经被处理但无事发生的SelectionKey
,由于选择器必须配合通道的非阻塞模式一起使用,非阻塞模式下服务端socket
通道的accept
方法和socket
通道的read
方法会直接返回null
或者0
,处理不当很容易抛出空指针异常影响其他就绪事件的处理
打断selector.select
方法导致的线程阻塞的场景
注册在选择器上的通道有就绪的连接事件、可读事件、可写事件
linux
系统下NIO
可能发生空轮询BUG
,该BUG
发生时select
方法无法阻塞当前线程
调用selector.wakeup()
唤醒阻塞在select()
方法的线程
调用selector.close()
关闭选择器
阻塞在select
方法的线程被其他线程调用interrupt()
方法打断
常用方法
Selector.open()
:实例化一个选择器
selector.select()
:阻塞当前线程直到监听到在选择器上注册的通道有就绪事件,存在没有被处理的事件会导致该方法无法被阻塞,无法被处理的事件可以调用selectionKey.cancel()
移除事件对应通道在选择器中的注册
selector.select(long timeout)
:阻塞当前线程直到有就绪的通道事件或者超出指定时间,Netty
中使用该方法做事件监控
selector.selectNow()
:检查选择器上是否有事件,该方法不会阻塞当前线程,没有事件会返回0
selector.wakeup()
:唤醒当前阻塞在selector.select()
方法上的线程,如果目标线程没有处于阻塞状态,该线程下次调用selector.select()
方法时不会进入阻塞状态,效果类似于LockSupport.unPark()
selector.selectedKeys().iterator()
:获取就绪事件集合的迭代器
selector.keys()
:获取选择器上就绪事件SelectionKey
的Set
集合
已经被轮询处理的元素需要手动移除,移除集合元素最好使用迭代器的remove
方法
多线程情况下,如果迭代器每次调用next
方法获取下一个元素时发现在迭代器外部更新了集合会抛出ConcurrentModification
并发修改异常,很多集合都不允许使用迭代器时在迭代器器外部向集合删除或者添加元素,使用迭代器移除集合元素能避免出现并发线程安全问题;
单线程情况下,像基于数组实现的集合如ArrayList
使用for
循环遍历时不能使用集合自身的remove
方法移除元素,使用该方法会引起集合的结构变化导致未遍历元素占用已遍历索引位置导致遍历时部分元素被漏掉;
而且迭代器移除元素的效率更高,因为迭代器维护了指向被移除元素的指针,可以不需要查找元素位置直接删除目标元素,使用集合自身的remove
方法移除元素需要先从集合中找到该元素的位置
SelectionKey
的常用方法
selectionKey.isAcceptable()
、selectionKey.isReadable()
、selectionKey.isWriteable()
:判断事件类型是否为接入事件、可读事件和可写事件
selectionKey.cancel()
:取消事件对应通道在选择器中的注册,客户端连接异常断开会触发无法被处理的可读事件,处理该可读事件会直接抛出异常,捕获异常后如果不调用该方法手动取消事件对应通道在选择器中的注册会导致选择器无法被阻塞在select
方法处再次处理该事件继续抛出异常引起死循环导致CPU
空转
selectionKey.channel()
:获取当前事件所在的通道
selectionKey.interestOps()
:获取通道监听的事件类型
selectionKey.interestOps(SelectionKey.OP_ACCEPT)
:更新事件所在通道监听的事件类型
selectionKey.attachment()
:获取当前事件所在通道的通道附件
selectionKey.attach()
:为当前事件所在通道重新绑定通道附件
概念:Path
、Paths
和Files
都是java.nio.file
包下提供的用于文件和目录操作的工具类
Paths.get()
:该方法及其重载方法传参文件或者目录路径可以返回对应Path
路径实例,Files
下的方法基本都是通过传参path
实例来操作指定目录或者文件
Files.copy()
:将指定目录下的文件拷贝到另一个指定目录,该方法的效率很高甚至和FileChannel
的transferTo
、transferFrom
差不多,但是底层实现不同;
此外Files
工具类还提供了创建、删除、移动、覆盖目录或者文件;检查指定目录和文件是否存在,指定路径是否是一个文件或者目录的方法
Files.walkFileTree(Path start,FileVistor vistor)
:从指定目录自上而下依次遍历每个目录和文件,第二个入参需要一个FileVistor
子接口的匿名实现,比如SimpleFileVisitor<T>()
提供了四个方法供用户重写,可以按需求在进入目录前、离开目录后、访问文件时、访问文件失败时执行用户针对当前正在访问目录或者文件的自定义处理逻辑
walkFileTree
方法的设计是设计模式中访问者模式的体现,FileVistor
就是访问者,遍历到具体目标时通过访问者的回调方法执行用户的自定义访问逻辑
通过walkFileTree
遍历目录删除文件是危险代码,通过程序删除的文件不会进入回收站,删了就永久没了
NIO
模式下服务端运行流程
1️⃣:服务端通过ServerSocketChannel ssChannel = ServerSocketChannel.open()
以及ssChannel.bind(new InetSocketAddress(9999))
获取一个绑定指定端口的服务端Socket
通道,当有新客户端可连接事件时通过SocketChannel schannel = ssChannel.accept()
可以获取相应的客户端Socket
通道
注意通道也可以通过IO
流的getChannel
方法获取
2️⃣:服务端通过Selector selector = Selector.open()
可以创建一个选择器,通过serverSocketChannel.register(selector,SelectionKey.OP_ACCEPT)
可以将服务端Socket
通道注册到选择器上并指定监听通道上的接入事件或者socketChannel.register(selector,SelectionKey.OP_READ)
可以将客户端socket
通道注册到选择器上并指定监听通道上的可读事件
注意选择器必须配合ServerSocketChannel
和SocketChannel
的非阻塞模式一起使用,通过serverSocketChannel.configureBlocking(false)
和socketChannel.configureBlocking(false)
将通道切换为非阻塞模式
3️⃣:当selector.select()>0
说明选择器已经监听到处于就绪状态的通道事件,通过Iterator<SelectionKey> it = selector.selectedKeys().iterator()
能获取到选择器上已注册通道上处于就绪状态的事件的迭代器,对迭代器遍历能拿到每个就绪状态事件SelectionKey
,每遍历处理完一个SelectionKey
要调用it.remove()
移除掉已经被处理的事件否则it.next()
仍然会取出已经被处理完的事件,通过it.hasNext()
判断当迭代器没有事件可以遍历时进入下一轮循环阻塞在selector.select()
上直到所有已注册通道上有新的事件
4️⃣:通过selectionKey.isAcceptable()
和selectionKey.isReadable()
能判断事件是连接事件还是可读事件
如果是接入事件调用SocketChannel schannel = serverSocketChannel.accept()
获取刚建立连接的客户端通道,调用socketChannel.configureBlocking(false)
将通道切换为非阻塞模式,调用socketChannel.register(selector,SelectionKey.OP_READ)
将客户端通道注册在选择器上并指定监听客户端通道上的可读事件,结束接入事件处理遍历下一个事件
如果是可读事件调用SocketChannel sChannel = (SocketChannel) SelectionKey.channel()
通过事件SelectionKey
拿到发生当前事件的通道,通过ByteBuffer buf = ByteBuffer.allocate()
或者ByteBuffer buf = ByteBuffer.allocateDirect()
创建缓冲区ByteBuffer
,通过socketChannel.read(buf)
读取通道中的数据,每个循环将数据从通道读取到缓冲区后调用byteBuffer.flip()
将缓冲区切换为读模式准备将数据从缓冲区读出进行后续处理,数据从缓冲区读出后调用byteBuffer.clear()
将缓冲区切换为写模式准备再次从通道循环读取未读取数据,当通道没有数据可读时结束可读事件的处理轮询处理迭代器中的下一个事件
5️⃣:不管是客户端宕机暴力断开连接,还是在任意一端调用socketChannel.close()
正常关闭连接,都会触发被关闭通道的可读事件,暴力断开的可读事件在调用read()
方法处理可读事件时直接抛异常,正常断开的可读事件调用read()
方法时也无法处理该可读事件并直接返回-1
;
正常断开和异常断开都一定要调用selectionKey.cancel()
取消通道在选择器中的注册,如果不取消通道在选择器中的注册,即使捕获了异常,选择器的select
方法也会因为持续监听到无法被处理的可读事件导致当前线程无法被阻塞,再次处理该读事件还是继续抛出异常或者返回-1
继续重复上述过程导致死循环
NIO
模式下客户端运行流程
1️⃣:客户端一般不会涉及到多连接业务,不需要ServerSocketChannel
来动态监听客户端的socket
接入事件,可以直接通过SocketChannel sChannel = SocketChannel.open();
方法指定入参服务端主机地址和IP
与服务端建立连接并返回客户端socket
通道,调用socketChannel.configureBlocking(false)
将socket
通道切换为非阻塞模式,调用ByteBuffer buf = ByteBuffer.allocate(1024)
或者ByteBuffer buf = ByteBuffer.allocateDirect(1024)
创建缓冲区,配合监听用户输入的扫描器向缓冲区写入通信数据,调用byteBuffer.flip()
将缓冲区切换为读模式后调用socketChannel.write(buf)
将缓冲中的数据写出到socket
通道,调用byteBuf.clear()
将缓冲区切换为写模式准备下次向缓冲区写入新的通信数据
2️⃣:如果客户端要监听服务端的发送来的数据需要一个新线程创建选择器Selector
,将当前SocketChannel
注册到选择器上并指定监听处理该通道上的可读事件,由于处理用户输入的扫描器和选择器监听可读事件的select
方法都会阻塞当前线程,因此监听可读事件和向服务端写出数据不能使用同一个线程,但是写出消息和读取消息可以使用同一个socket
通道
单线程模型的缺点
只能利用单个核,浪费多核CPU
的性能
单个线程轮询处理所有通道上的就绪事件,一旦某个事件处理耗时较长,会影响到其他所有事件的处理效率
NIO
多线程模型
原理:一个线程和一个选择器组成一个单元,创建多个这样的单元,将这些单元中的一个设置为Boss
角色负责监听所有客户端的接入请求,将其他若干单元设置为Worker
角色负责监听处理多组已连接客户端的写数据请求,Boss
处理接入事件将已接入socket
通道按合适的策略派发注册到Worker
的选择器中监听通道的可读事件
最简单的负载均衡策略是将所有Worker
存入Worker
数组,使用原子整数自增并对Worker
总数量取模得到Worker
数组索引,将socket
通道派发注册到对应索引处的Worker
选择器上
Worker
和Boss
实现Runnable
接口,重写run
方法分别实现轮询处理接入事件和可读事件的代码,在构造器中实例化选择器和对应线程并启动线程执行Run
方法,必须保证选择器和线程只能被实例化一次,可以通过设计一个私有的标记字段来控制
问题分析:
多线程场景下,如果由Boss
线程将新接入的socket
通道注册到Worker
选择器上会存在两个问题;Boss
线程将新接入的socket
通道注册到处于阻塞状态的Worker
线程对应的选择器上,Boss
线程也会被阻塞在socketChannel.register()
方法上;Worker
线程启动在第一个socket
通道注册到Worker
选择器之前,很可能导致Worker
选择器还没有注册任何socket
通道Worker
线程就已经阻塞在选择器的select
方法上了,此时Worker
线程会因为没有事件可以监听一直阻塞,Boss
线程向Worker
选择器注册新接入的socket
通道时也因为Worker
线程阻塞也阻塞在socketChannel.register()
方法上,此时Boss
线程和Worker
线程会一直处于阻塞状态导致程序直接卡死,这种卡死现象和服务器启动后没有客户端请求接入的效果是一样的;
解决方案
方案一:Netty
的解决方案是为每个Worker
设置一个线程安全的任务队列ConcurrentLinkedQueue<Runnable>
,在Boss
线程获取到新接入的socketChannel
后,由Boss
线程将注册该socket
通道到指定Worker
选择器的操作封装成任务对象添加到Worker
任务队列,尝试让Worker
线程负责执行新接入socket
通道在Worker
选择器中的注册的同时将Boss
线程中的socketChannel
共享到Worker
线程;然后Boss
线程立即调用该Worker
选择器的wakeup()
方法,因为任务添加到指定Worker
任务队列时Worker
线程可能正在阻塞或者正在轮询就绪事件,如果Worker
线程正在阻塞立即被唤醒Worker
线程,如果Worker
线程正在轮询就绪事件下次调用selector.select()
方法时不再阻塞,保证Worker
线程能以最快的速度执行socket
通道注册任务;Worker
线程只要被唤醒立即从任务队列弹出任务对象调用runnable.run()
手动执行socket
通道注册任务
方案二:在Boss
线程执行新接入socket
通道的注册方法前先调用Worker
选择器的wakeup()
方法避免Worker
线程阻塞在Worker
选择器的select
方法上,Worker
线程被唤醒后还要抢夺CPU
时间片,极大概率比通道注册方法执行慢;但是现在基本都是多核CPU
,仍然有概率唤醒后在Boss
线程执行通道注册方法前再次阻塞在Worker
选择器的select
方法上;一旦遇上,Worker
选择器上如果没有注册任何socket
通道,Boss
线程和Worker
线程会一直阻塞导致程序卡死;Worker
选择器上即使有注册的socket
通道,Boss
线程仍然会因为Worker
线程阻塞而阻塞直到Worker
选择器监听到新的可读事件
概念
Asynchronous IO
,异步非阻塞IO
通信模式,也叫NIO2.0
,在JDK1.7
引入,主要在java.nio.channels
包下增加了四个异步通道,通道名就是NIO
下四个通道名前面加个Asynchronous
;AIO
模式下缓冲区数据写出到通道或者通道数据读入缓冲区由操作系统异步完成,完成后将执行结果传参给回调函数通知线程做进一步处理;
NIO
中socketChannel.write
和socketChannel.read
方法全是同步的,NIO
的非阻塞模式只是说read()
方法或者accept
方法不会在客户端通道无事发生时阻塞当前线程,但是AIO
这个IO
方法执行过程是异步的
因为IO
操作完全交给操作系统异步执行,因此即使是单线程模型数据量大事件处理时间长不会影响其他通道的事件处理
AIO
的文件通道也支持异步,不像多路复用只有网络通道才支持
特点:
适合连接数多且连接较长的场景,比如相册服务器
AIO
在实际应用中不成熟,windows
对AIO
支持比较好实现了真正的异步IO
,Linux
系统下还是使用多路复用模拟异步,性能上相较于NIO
没有优势,Java
程序大多都运行在Linux
环境下,Netty5
也想实现异步IO
,最后做出来发现性能没优势还更加复杂,直接把Netty5
废弃了
BIO
、NIO
和AIO
辨析:
BIO
:一个线程只能始终处理一个通道上的接入事件或者可读可写事件,基于serverSocket
动态获取新客户端连接,基于socket
做网络数据传输,基于单向的IO
流进行数据IO
,编程简单、服务器性能要求高,只适合连接数小且固定的场合,早期BIO
一般配合线程池和短连接使用;IO
过程同步,通道没有接入事件和可读事件会阻塞当前线程;
IO
流的操作性不如缓冲区,读写指针位置不能随意更改;通道会自动使用系统层面的一些辅助功能如缓冲区提升API
性能而流不会;流只支持阻塞式API
,通道同时支持阻塞式和非阻塞式API
,通道还能配合选择器实现多路复用
NIO
:一个线程通过多路复用的方式就能轮询处理所有通道上的接入、可读可写、连接建立事件;网络通道既可以工作在阻塞模式下也可以工作在非阻塞模式下,基于ServerSocketChannel
动态获取新接入的客户端连接,基于SocketChannel
做网络数据传输,基于双向的缓冲区进行数据IO
;基于选择器做事件驱动和通道轮询;编程非常复杂,适合连接数多流量轻的场合;IO
过程同步,通道没有接入事件和可读事件可以阻塞也可以不阻塞当前线程;提供多种零拷贝实现,提供基于Reactor
模式的多种线程模型设计
AIO
:NIO2.0
,NIO
的升级版,在NIO
的基础上提供异步通道实现把数据IO
到socket
套接字或者磁盘文件的过程交给操作系统异步完成,IO
过程异步,网络通道既可以工作在阻塞模式下也可以工作在非阻塞模式下,适合连接数多且连接较长的场景,linux
对异步IO
的支持不好性能提升相较于NIO
不明显,应用有限
概念辨析
Reactor
模式:事件分发器等待某个通信事件的发生,事件分发器将该事件传递给实现注册的时间处理函数或者回调函数,由该函数来执行实际的读写操作
通信方式
全双工通信:任意时刻连接通道上都允许客户端和服务端之间的双向通信,任意一个端读和写数据都可以同时进行,读写线程不会相互阻塞,NIO
和BIO
都是全双工通信
半双工通信:同一时刻连接通道上的数据只能向一个方向传输
用户程序空间:Java
程序本身没有从网络读取数据的能力;Java
程序只能通过调用操作系统的API
切换到Linux
内核空间来读取指定端口的数据,用户程序空间指用户程序运行的环境,也称用户态,由用户态切换到内核态指Java
程序去调用操作系统API
的过程
操作系统内核空间:Java
程序的read()
方法会调用操作系统从网络端口读取数据的API
,但是网络数据此时不一定发过来了,操作系统会等待数据,数据到达以后操作系统会将数据从网卡读取复制到内存中,数据复制结束了程序才会切换回用户程序空间,操作系统API
执行的环境称为内核态
阻塞IO
:操作系统的阻塞API
等待数据期间用户程序也会处于阻塞状态,操作系统阻塞API
将数据拷贝到内存切换回用户态才会停止用户程序的阻塞状态
非阻塞IO
:用户程序调用操作系统的非阻塞API
去网络获取数据,操作系统没有从端口读取到数据不会阻塞等待数据,立刻返回给程序读取到0
字节,用户程序按一定策略一定时间后再次去调用操作系统的非阻塞API
直到操作系统的非阻塞API
检查到网卡中有数据,操作系统会将网卡数据复制到内存再切换到用户态让程序继续向下运行,用户线程在数据复制阶段仍然是阻塞的,只是等待数据阶段不会进入阻塞状态
多路复用:selector.select()
方法就是在切换内核态调用操作系统API
去查询每个通道是否有事件,直到相关通道有事件发生,操作系统才会切换回用户态并告知用户线程有事件发生;用户程序根据事件类型、通道去调用socketChannel.read()
切换到内核态调用操作系统的API
将数据读取到内存中再切回用户态执行业务
多路复用比直接调用读写API
多出一层事件监听上的用户线程阻塞,但是单个线程一直调用阻塞API
,如果数据迟迟不到会一直阻塞等待,即使此时其他客户端通道的数据到了,该线程因为阻塞也无法去处理,只能一个客户端一个线程一对一处理,线程池线程资源迟早被耗尽;多路复用虽然每个事件都会多一次用户态与内核态的切换,但是只有一个线程专门切换到内核态阻塞监听所有通道上的事件,通道有事件才去使用单独的线程或者使用事件监听线程去处理就绪事件
多路复用的核心就是使用一个线程监听所有客户端通道等待数据到达端口的过程取代一个线程只能等待一个客户端通道数据到达的过程,而且会一次性返回阻塞期间监听到的所有事件,用户程序直接调用操作系统API
再切换到内核态直接处理客户端数据即可
同步:当前线程获取结果的过程只有当前线程参与,且只有获取到结果才能做别的事情
同步阻塞:阻塞API
需要用户线程自己等待操作系统的执行结果
同步非阻塞:非阻塞API
也需要用户线程自己等待操作系统的执行结果
多路复用也是用户线程自己等待操作系统的执行结果也是同步阻塞的
异步:当前线程由其他线程将结果交给自己,获取结果的过程至少需要两个线程协作
当前线程创建一个线程去获取目标数据,启动线程后就直接返回继续执行自己的业务,新线程阻塞等待操作系统完成数据获取,新线程得到数据后将数据以回调的方式通知到当前线程,这就叫异步 。
结果和任务对应主要就是通过回调方法来确定的,回调方法就是启动新线程获取目标数据时定义了一个通知方法,回调方法的入参就是目标数据,新线程获取到目标数据后自动调用通知方法将目标数据共享给当前线程,这就是异步非阻塞,根本不存在异步阻塞这种说法,因为异步当前线程启动新线程执行异步任务后就会继续去做自己的事由新线程调用通知方法共享执行结果,根本不会阻塞等待新线程的执行结果
只有同步阻塞、同步非阻塞、异步非阻塞,没有异步阻塞的说法
零拷贝[以下场景都是Java
程序将磁盘文件发送给网络]
操作系统不能直接将文件数据直接读取到JVM
内存,只能将数据从磁盘读取到操作系统的内核缓冲区,再将数据从内核缓冲区拷贝到用户缓冲区即JVM
内存;Java
程序向网络写出数据也不能直接从用户缓冲区直接写入网卡,必须先由用户缓冲区拷贝到socket
缓冲区,再从socket
缓冲区写入网卡;
使用Java
程序读取文件再写出到网卡文件数据要经过四次拷贝,涉及三次用户态与内核态之间的切换,磁盘读取到内核缓冲区,内核缓冲区拷贝到用户缓冲区,用户缓冲区再拷贝socket
缓冲区,socket
缓冲区再写出到网卡
NIO
的直接缓冲区基于直接内存,直接内存使用的是操作系统内存,直接内存可以同时被操作系统和Java
程序访问,操纵系统可以直接将磁盘数据读取到直接内存供Java
程序访问,但是直接内存的数据要写到网络中仍然需要先将数据从直接内存拷贝到socket
缓冲区,再由socket
缓冲区写入网卡,仍然有三次文件数据拷贝,涉及三次用户态与内核态之间的切换
Linux2.1
以后提供一个sendFile
方法,NIO
的SocketChannel
本身没有提供transferTo
和transferFrom
方法,但是FileChannel
的transferTo()
可以将FileChannel
的数据传输到SocketChannel
,transferFrom()
方法可以将SocketChannel
的数据传输到FileChannel
,transferTo
和transferFrom
方法底层就使用sendFile
方法,此外Java
调用这两个方法从用户态切换成内核态后,不会使用CPU
而是使用DMA
将磁盘数据读取到内核缓冲区,然后数据通过CPU
从内核缓冲区读取到socket
缓冲区,再使用DMA
将数据从socket
缓冲区写入网卡,和直接内存相比仍然是三次文件数据拷贝,但是只有调用transferTo()
或者transferFrom()
方法时从用户态切换成内核态,而且只有文件数据从内核缓冲区拷贝到socket
缓冲区使用的了CPU
DMA
[DirectMemoryAccess
,直接内存访问]是允许某些硬件子系统在不经过CPU干预的情况下直接访问系统内存的硬件,DMA
的作用是提高数据传输效率,降低CPU
在内存与外设间数据传输的负担
Linux2.4
以后改变了sendFile
方法的实现,Java
中的transferTo()
或者transferFrom()
方法会直接将内核缓冲区的数据拷贝到网卡,文件数据连socket
缓冲区都不经过,全程只有两次DMA
拷贝,只有文件的偏移量和文件长度length
等少量数据会拷贝到socket
缓冲区,涉及一次用户态与内核态之间的切换
所谓零拷贝指的是Java
中使用transferTo()
或者transferFrom()
方法,对应调用linux
操作系统的sendFile
方法的拷贝过程,零拷贝指的是文件数据不会拷贝到用户缓冲区
零拷贝的优点是用户态和内核态之间的切换次数少,文件数据拷贝只使用DMA
不使用CPU
概念
EventLoop
是事件循环接口,作为线程池以执行用户提交普通任务或者定时任务,轮询处理注册在其内部选择器上的各个通道的网络事件,默认情况下非IO
任务和IO
任务的执行时间默认各占一半
EventLoop
间接继承自事件执行器EventExecutor
,为EventLoop
提供了inEventLoop
方法及其重载方法用于判断指定线程或者当前线程是否属于指定eventLoop
,提供parent
方法获取指定eventLoop
的所属eventLoopGroup
EventLoop
间接继承自JUC
包下的定时单线程执行器ScheduledExecutorService
,作为线程池能调用submit
或者execute
方法执行用户提交的普通任务,调用scheduleAtFixedRate
执行用户提交的定时任务,定时任务可以用于实现keepalive
的连接保活
EventLoop
内部维护着选择器,eventLoop
实例上绑定的所有客户端通道的网络事件都由该eventLoop
单线程执行器负责处理,易于管理且对通道数据的处理不会存在线程安全问题;EventLoop
实例的构造器参数配置比较繁琐,一般通过EventLoopGroup
中的默认配置实例化eventLoop
对象
NioEventLoop
由一个线程、选择器和两个任务队列构成,在EventExecutor
中有一个thread
成员变量表示对应线程,taskQueue
和seheduledTaskQueue
分别用于缓存用户向线程池提交的普通任务和定时任务
常用方法
eventLoop.execute(runnable)
:执行用户提交的普通任务
eventLoop.submit(runnable)
:提交用户提交的普通任务
eventLoop.schedule(runnable,60,TimeUnit.SECONDS)
:在指定延迟后执行用户提交的一个任务,第二个参数是定时任务执行的延迟时间,该定时任务只会被执行一次
Selector
的相关问题
Selector
的创建时机:Netty
在实例化NioEventLoop
的构造器中也采用NIO
中的Selector.open()
方法底层采用的SelectorProvider.provider().openSelector()
创建的选择器
为什么有selector
和unwrappedSelector
两个选择器相关的成员变量:Netty
创建的原生NIO
的Selector
会被直接赋值给unwrappedSelector
,原生Selector
的SelectedKeys
集合基于哈希表的Set
集合实现,遍历性能没有数组好;Netty
为了提高SelectedKeys
的遍历性能通过暴力反射将原生Selector
私有的SelectedKeys
换成Netty
自定义基于数组实现的SelectedKeys
并将替换后的Selector
赋值给成员变量selector
;因为部分功能必须使用基于Set
的实现因此仍保留了原生的Selector
NIO
线程相关问题
NIO
线程在何时被启动:该问题的实现逻辑在向eventLoop
提交任务的execute
方法中,当eventLoop
执行用户提交的任务时,会首先获取当前线程和eventLoop
的成员变量thread
比较是否是同一个线程并随即将任务添加至任务队列,eventLoop
第一次执行提交的任务时该成员变量必为null
,判断结果必为false
,此时会执行startThread
方法通过CAS
操作将线程状态位从1
改成2
保证只能有一个线程能执行doStartThread
方法启动NIO
线程,向执行器成员变量executor
手动提交一个任务来启动线程池,将线程池的线程赋值给eventLoop
的thread
成员变量[即NIO
线程就是单线程执行器的线程],然后死循环去查看eventLoop
有没有IO
事件、用户提交的普通任务和定时任务,获取相应的任务并执行;因此NIO
线程在首次调用eventLoop.execute
方法时启动,通过CAS
更改一个EventExecutor
中的state
线程状态位控制NIO
线程只会被启动一次
NIO
线程阻塞在select
方法上的时机:在执行器的死循环任务中通过switch...case
语句匹配枚举值SelectStrategy.SELECT
进入对应分支阻塞在select
方法上,匹配值的判断条件是没有普通任务才会进入select
方法所在的分支,当有任务时还会通过selector.selectNow()
获取并返回处于就绪状态的IO
事件数量进入相应分支一次性处理完所有的普通任务和就绪的IO
事件;当没有普通任务进入select
方法,在select
方法中死循环调用带超时时间的selector.select(timeoutMillis)
方法,在没有定时任务的情况下,select
方法的超时时间是1s+0.5ms
,因为select
方法可能被中途唤醒,NIO
线程被唤醒以后会再次进入循环判断当前线程因为什么原因被唤醒决定是否退出死循环;当实际时间达到超时时间、存在普通任务或者选择器上有就绪的IO
事件都会退出死循环,否则再次计算超时时间并继续进行阻塞
提交普通任务是否会结束select
方法的阻塞:在启动线程池的死循环任务中一旦满足特定条件,死循环就会阻塞在带超时时间的selector.select(timeoutMillis)
方法上,因为Netty
的NIO
线程除了处理IO
事件还需要处理用户提交的普通任务或者定时任务,因此不能调用selector.select()
导致没有IO
事件时NIO
线程一直阻塞;非当前NIO
线程通过eventLoop.execute()
向当前NIO
线程提交任务时会在execute
方法中调用selector.wakeup()
手动唤醒已经阻塞或者即将阻塞在select
方法上的当前NIO
线程,因为可能存在多个线程同时提交任务的情况,而selector.wakeup()
又是一个重量级操作,这些线程会去通过CAS
操作竞争将原子布尔变量wakeup
状态位从false
改成true
,只有竞争成功的线程才能去调用selector.wakeup()
唤醒NIO
线程
NIO
线程处理IO
事件的时间比例
NioEventLoop
中成员变量ioRatio
的含义为假设使用100
段时间处理所有任务,IO
事件的处理时间在其中占多少段;在nioEventLoop.run()
方法中定义了在NIO
线程结束阻塞后执行任务的逻辑,首先NIO
线程会处理选择器上所有处于就绪状态的IO
事件,然后统计本轮所有IO
事件的处理时间,通过ioTime * (100 - ioRatio) / ioRatio
算出普通任务的预期执行时间,执行普通任务时如果发现当前任务执行完成后本轮普通任务的执行时间超出预期执行时间不会再从任务队列中获取下一个任务,NIO
线程会结束本轮循环进入下一轮循环继续处理IO
事件,这种设计是为了避免普通任务的执行时间太长影响IO
事件的处理;默认ioRatio
的值为50
,表示NIO
线程50%
的时间用于处理IO
事件
ioRatio
设置为100
的含义不是NIO
线程只处理IO
事件,而是处理完本轮IO
事件后完整处理完所有的普通任务和定时任务后才会继续处理下一轮IO
事件,一般情况下不要将ioRatio
设置为100
通道流水线对IO
事件数据的处理时间也算在IO
事件的处理时间段内
Netty
使用数组优化selectedKeys
提高就绪事件的遍历效率,处理IO
事件时通过selectedKeys.keys[i]
获取就绪的IO
事件,通过key.attachment
获取serverSocketChannel
通道对应附件NioServerSocketChannel
,在processSelectedKey
方法中判断IO
事件类型并从通道附件NioServerSocketChannel
获取通道流水线,使用流水线中的每个处理器依次对IO
事件的数据依次进行处理
概念
事件循环组接口,作为管理一组事件循环EventLoop
的容器
EventLoopGroup
继承自事件执行器组EventExecutorGroup
,该接口继承自Iterable
接口让EventLoopGroup
具备使用增强For
循环或者迭代器遍历内部集合或者数组的能力
Netty
服务端三种线程模型通过serverBootstrap.group(EventLoopGroup bossGroup,EventLoopGroup workerGroup)
进行配置;但是注意服务端始终只会创建一个ServerSocketChannel
,因此bossGroup
即使管理多条线程也只会有一个EventLoop
负责处理服务端socket
通道上的所有新客户端通道的接入事件
如果boss
事件循环组和worker
事件循环组共用同一个事件循环组则使用单线程模型
如果boss
事件循环组管理的线程数为1
,worker
事件循环组管理默认的线程数则使用多线程模型
如果boss
事件循环组和worker
事件循环组为两个都管理默认线程数的不同事件循环组实例则使用主从多线程模型
常用方法
eventLoopGroup.next()
:轮询获取事件循环组管理的下一个EventLoop
用于可执行任务提交或者向EventLoop
注册新接入的客户端通道的负载均衡
eventLoopGroup.shutdownGracefully()
:优雅地关闭事件循环组,已经提交的任务继续执行,拒绝新提交的任务,所有任务执行结束后释放事件循环组管理的所有线程
关闭所有事件循环组以后服务端或者客户端所在进程才能自动结束
常用实现类
NioEventLoopGroup
:既能处理通道上的IO
事件也能向事件循环组提交普通任务和定时任务
NioEventLoopGroup
的无参构造调用的是单参构造this(0)
,传参为线程数量nThreads
;一直调用父类的构造函数最终会调用到MultithreadEventLoopGroup
构造方法
在MultithreadEventLoopGroup
中有一个常量DEFAULT_EVENT_LOOP_THREADS
,该常量会从1
,系统属性io.netty.eventLoopThreads
以及本机CPU核心数*2
中取一个最大值,该常量就是EventExecutor
数组的容量
当构造方法传参的线程数量不为0
时表示用户指定了线程数量,NioEventLoopGroup
会启动用户指定数量的线程数;
当构造方法传参线程数量为0
,说明用户没有指定线程数量选择使用默认的线程数量,默认的线程数量即常量DEFAULT_EVENT_LOOP_THREADS
这个线程数实际是NioEventLoopGroup
管理的NioEventLoop
的数量,每个NioEventLoop
都是一个单线程线程池,因此nThreads
指定的是线程数的同时也指定的是管理的NioEventLoop
的数量
DefaultEventLoopGroup
:只能处理用户提交的普通任务和定时任务,除了不能执行IO
任务用法和NioEventLoopGroup
是一样的
注意DefaultEventLoopGroup
中也有事件执行器EventLoop
,且在流水线中使用时每个客户端通道也会和其中一个eventLoop
实例绑定
一般通过nioSocketChannel.pipeLine().addLast(EventExecutorGroup group,String name,ChannelHandler handler)
指定DefaultEventLoopGroup
实例替代NioEventLoopGroup
管理的EventLoop
处理流水线上非常耗时的非IO
操作,避免工序处理时间太长影响到NioEventLoop
处理和其绑定的其他客户端通道上就绪事件的处理
Netty
源码切换线程执行同一条流水线的不同工序的实现
流水线在添加处理器的时候会指定处理器绑定的事件执行组,同一条流水线的不同工序可能被不同的eventLoop
实例执行,前一道工序由eventLoop1
线程执行到AbstractChannelHandlerContext.invokeChannelRead(next,msg)
方法时,会通过通道处理器上下文拿到下一个处理器的事件执行器即eventLoop2
,通过eventLoop2.inEventLoop()
方法判断下一道处理工序的执行线程是否是当前线程;如果是当前线程直接在当前线程调用执行下一道工序,如果不是当前线程则将调用执行下一道工序的代码封装成任务对象提交给下一道工序的事件执行器即eventLoop2
执行;
概念
ByteBuf
是Netty
对NIO
中ByteBuffer
的封装和增强,基于字节数组的数据容器
ByteBuf
支持动态扩缩容,默认初始容量均为256B
,最大容量为Integer.MAX
,当容量小于512
扩容到最近的16
倍整数,容量超过512
扩容到最近的二次幂
ByteBuf
的基本属性
ridx
为读指针、widx
为写指针、cap
为ByteBuf
的容量;NIO
的ByteBuffer
读写共用一个position
指针,需要使用flip
方法切换读模式,使用clear
或者compact
切换写模式;Netty
的ByteBuf
设置读写两个指针,读和写不需要切换读写模式
写指针到容量之间的部分为可写区域,读指针到写指针之间的部分为可读区域、索引0
到读指针之间的部分为废弃区域
ByteBuf
的池化管理
基于直接内存的ByteBuf
分配效率低,基于堆内存的ByteBuf
数量太多也会给GC
带来压力,高并发场景下,对ByteBuf
采用池化技术重用ByteBuf
能降低ByteBuf
的分配开销,避免因为ByteBuf
数量太多引起的内存溢出
Netty4.1
以后非安卓平台默认都开启了ByteBuf
的池化功能
采用池化技术的ByteBuf
的实现类的类名上会以Pooled
字样打头,非池化技术实现类的类名会以Unpooled
字样打头
通过JVM
参数-Dio.netty.allocator.type={unpooled|pooled}
可以设置关闭或者开启ByteBuf
的池化功能
ByteBuf
的内存回收
基于堆内存的非池化ByteBuf
和普通对象一样受GC
管理;基于直接内存的非池化ByteBuf
通过定时的GC
回收,也可以手动回收;采用池化技术的ByteBuf
的回收是将ByteBuf
归还给ByteBuf
池
ByteBuf
实现了ReferenceCounted
接口,通过引用计数算法来管理ByteBuf
的生命周期和内存回收;每个ByteBuf
实例的初始引用计数均为1
,调用release
方法会让引用计数减1
,调用retain
方法会让引用计数加1
;引用计数没有减为0
,ByteBuf
对象不会被真正回收;引用计数减为0
,即使ByteBuf
实例还没有被GC
回收,ByteBuf
的方法也无法正常使用
释放ByteBuf
内存的方法是抽象方法deallocate
,基于直接内存、堆内存、池化技术的ByteBuf
的实现类都对该方法有相应的内存回收实现
ByteBuf
的释放时机
在最后使用ByteBuf
的处理器Handler
的finally
块中调用byteBuf.release()
释放ByteBuf
通道流水线的head
和tail
两个处理器中也会释放ByteBuf
,处理入站数据时执行到tail
处理器ByteBuf
实例还存在就在tail
处理器中释放ByteBuf
,处理出站数据时执行到head
处理器时ByteBuf
实例还存在就在head
处理器中释放ByteBuf
;但是head
和tail
释放ByteBuf
仅对传递给处理器的数据是ByteBuf
类型时才生效,如果中间工序将ByteBuf
处理成其他数据类型传递给后续处理器,head
和tail
处理器不会主动释放传入通道流水线的初始ByteBuf
;在源码中判断Object
类型的消息是否为ReferenceCounted
类型,如果是则强转为ReferenceCounted
类型后调用其release
方法让引用计数减1
释放ByteBuf
,此外如果处理出站数据时存在出站缓冲区,head
处理器将不会释放ByteBuf
,ByteBuf
将会交给出站缓冲区来释放
零拷贝设计
操作系统层面的零拷贝指避免在用户态和内核态之间来回拷贝数据,NIO
的零拷贝指磁盘文件数据向网络传输时数据无需拷贝经过用户缓冲区,Netty
的零拷贝指ByteBuf
之间的切片、复制、CompositeByteBuf
和Unpooled
中涉及多个ByteBuf
的组合操作过程不涉及任何数据的拷贝,此外Netty
中包装了NIO
的FileChannel
的FileRegion
使用sendFile
将磁盘数据向网络传输无需经过用户缓冲区也是Netty
零拷贝的体现
byteBuf.silce()
:将原始ByteBuf
切片成多个ByteBuf
,所有的切片还是使用原始ByteBuf
的内存,只是每个切片维护独立的读写指针和容量,切片过程没有发生任何数据内存拷贝
原始ByteBuf
调用byteBuf.release
导致引用计数减为0
会导致所有切片都无法使用,调用切片方法会抛出异常IllegalReferenceCountException
,特别是通道流水线这种存在部分处理器自动释放ByteBuf
的场景;因此一般获取切片都会手动调用切片的retain
方法避免原始ByteBuf
被误删,切片使用完以后再手动调用切片的release
方法释放byteBuf
byteBuf.duplicate()
:复制原ByteBuf
的数据得到一块新的ByteBuf
,与原始ByteBuf
共用同一块物理内存,读写指针完全独立,再次写入数据时没有最大容量限制
实例化ByteBuf
的方式
channelHandlerContext.alloc().buffer()
ByteBufAllocator.DEFAULT.buffer()
优先创建基于直接内存的ByteBuf
ByteBufAllocator.DEFAULT.directBuffer()
:创建基于直接内存的ByteBuf
ByteBufAllocator.DEFAULT.heapBuffer()
:创建基于堆内存的ByteBuf
ByteBuf
对ByteBuffer
的增强功能
ByteBuf
的API
相较于ByteBuf
更易用,无需切换缓冲区读写模式
采用池化机制让ByteBuf
实例得以重用,降低ByteBuf
对象创建、销毁或者GC
的开销,避免ByteBuf
对象过多导致内存溢出,使用二叉树实现的内存池管理缓冲区资源释放更安全,避免直接缓冲区导致的内存泄漏;基于引用计数算法做ByteBuf
的内存回收
读写指针分离,读写不需要切换读写模式
支持缓冲区自动扩缩容
方法设计上支持链式调用
CompositeByteBuf
以及Unpooled
类都对ByteBuf
的切片、复制和组合多个ByteBuf
或者字节数组进行了零拷贝实现,减少数据的拷贝次数提高数据处理性能
常用方法
byteBuf.toString(Charset Charset.defaultCharset())
:将字节缓冲区的数据按照指定编码格式转成字符串
实际生产一般都在客户端和服务端指定相同的字符集而不使用默认的字符集,因为客户端和服务端所在机器的默认字符集可能不一致,非常容易乱码
byteBuf.writeBytes()
:该方法及其重载方法将字节数组、byteBuf
、NIO
的ByteBuff
写入指定byteBuf
实例
byteBuf.writeXxx()
:Xxx可以为八种基本数据类型,分别表示向ByteBuf
写入对应的基本类型数据,占用对应字节数的存储空间
向ByteBuf
写入一个int
值可以采用大端写入和小端写入两种方式,大端写入指如十进制的592
对应16
进制的0x00000250
,写入ByteBuf
对应的存储空间与原存储空间一致也为00 00 02 50
;小端写入则为50 02 00 00
,即以字节为单位将原存储空间按高位到低位的次序从低位写到高位;网络编程一般都会采用大端写入,默认writeInt
方法是大端写入,小端写入为方法writeIntLE
byteBuf.writeCharSequence()
:将字符串按指定字符集写入ByteBuf
byteBuf.setByte()
:将byteBuf
指定索引处的字节数据更改成指定值,以set
打头的方法写入数据不会改变写指针的位置
byteBuf.readByte()
:从ByteBuf
的读指针位置读取一个字节
byteBuf.readBytes()
:从ByteBuf
中读取指定长度的数据到指定字节数组
byteBuf.readInt()
:从ByteBuf
的读指针位置读取一个int
类型数据
byteBuf.markReaderIndex()
:对读指针位置做标记
byteBuf.resetReaderIndex()
:将读指针位置还原回此前对读指针标记的位置
byteBuf.silce()
:从当前ByteBuf
的指定索引开始按指定长度进行切片并返回切片
切片可以作为独立的ByteBuf
进行操作
切片无法进行扩容,也无法写入超出切片容量的数据,会抛出IndexOutOfBoundsException
异常,这是为了避免切片扩容写数据导致原ByteBuf
数据错乱
byteBuf.readableBytes()
:获取ByteBuf
剩余可读数据的字节长度
byteBuf.duplicate()
:复制原ByteBuf
的数据得到一块新的ByteBuf
,与原始ByteBuf
共用同一块物理内存,读写指针完全独立,再次写入数据时没有最大容量限制
byteBuf.copy()
:将原始ByteBuf
的数据拷贝到新ByteBuf
对象中,无论如何读写新ByteBuf
对象都与原始ByteBuf
无关
概念
CompositeByteBuf
提供了将多个ByteBuf
的数据组合到指定compositByteBuf
的方法,组合过程不会涉及到ByteBuf
中字节数据的拷贝,像byteBuf.writeBytes()
将一个byteBuf
的数据追加指定byteBuf
的方式是通过数据拷贝实现的
通过ByteBufAllocator.DEFAULT.compositeBuffer()
实例化compositeByteBuf
对象
常用方法
compositByteBuf.addComponent(ByteBuf buffer)
:向compositByteBuf
中添加一个ByteBuf
compositByteBuf.addComponents(ByteBuf... buffers)
:一次性向compositByteBuf
中添加多个ByteBuf
addComponent
和addComponents
方法可以在入参中将参数increaseWriterIndex
设置为true
,在组合期间自动将写指针位置调整到最后一个字节,否则读写指针会保持组合前的位置
概念
Unpooled
是一个提供非池化ByteBuf
的创建、组合和复制操作的工具类
常用方法
Unpooled.wrappedBuffer(ByteBuf... buffers)
:将多个ByteBuf
对象按顺序依次组合成一个ByteBuf
该方法组合的ByteBuf
数量超过一个时会使用CompositeByteBuf
的零拷贝API
来组合多个ByteBuf
,因此该方法组合多个ByteBuf
底层不会有拷贝操作
Unpooled.wrappedBuffer(byte[]... byteArrs)
:将多个字节数组按顺序依次组合成一个ByteBuf
底层也不会有字节数据拷贝操作
Unpooled.copiedBuffer(byte[] bytes)
:将字节数组作为数据内容创建ByteBuf
对象
概念
Netty
中的一个接口,代表一个网络连接或者本地I/O
操作的入口
Channel
提供网络连接通道的状态信息、接收缓冲区大小等网络连接配置信息、提供如建立连接、数据IO
、绑定端口等异步网络操作,并提供回调方法供操作完成时执行用户自定义的处理逻辑
常用Channel
实现
NioSocketChannel
:基于TCP
的客户端Socket
连接
NioServerSocketChannel
:基于TCP
的服务器端Socket
连接
NioDatagramChannel
:基于UDP
的连接
常用方法
channel.close()
:异步关闭当前通道,连接通道关闭会自动解除被关闭通道和EventLoop
的绑定关系
channel.closeFuture()
:异步关闭当前通道,该方法返回ChannelFuture
实例,当前通道彻底关闭时ChannelFuture
实例会被标记为完成状态
可以通过该ChannelFuture
实例调用sync
或者await
方法让当前线程同步等待当前通道的关闭,还可以调用addListener()
为ChannelFuture
注册一个监听器以便在当前socket
彻底关闭时执行用户自定义的业务操作
channel.pipeline()
:获取当前通道对应的流水线以便向流水线中添加Handler
处理器用于处理网络事件
channel.write()
:将可序列化对象、字节数组、字节缓冲写出网络通道
由于Netty
的缓冲机制,每次写出的数据不一定会立即写出到网络,数据会先被缓冲区缓冲,当缓冲区到了一定大小会自动发送到网络,用户也可以调用channel.flush()
立即将缓冲区数据发送到网络,接收端接收完整的缓冲区数据,即几次发送的消息很可能会拼接在一起
channel.flush()
:将通道发送缓冲区中的数据立刻刷新到网络
channel.writeAndFlush()
:将可序列化对象、字节数组、字节缓冲写出到缓冲区并立即将缓冲区数据刷出到网络通道
该方法写出的出站数据会经历完成流水线从tail
处理器开始依次向前遍历执行每个出站处理器,区别于ctx.writeAndFlush()
从当前处理器开始依次向前遍历每个出站处理器
channel().eventLoop()
:获取通道注册的事件循环
概念
EmbeddedChannel
是Netty
提供的用于测试的一种通道,可以通过有参构造实例化通道的同时给通道流水线绑定一系列处理器,用户无需启动客户端或者服务端就可以模拟网络数据读入通道时数据经过各个入站处理器或者网络数据写出通道时数据经过各个出站处理器的过程
构造方法:new EmbeddedChannel(@NotNull ChannelHandler... handlers)
常用方法
embeddedChannel.writeInbound(ByteBuf byteBuf)
:向通道流水线写入站数据,从head
处理器开始沿着处理器链依次向后经过预设入站处理器处理ByteBuf
调用该方法时如果传参是ByteBuf
的一个切片,一定要在调用前调用其中一个切片或者整个ByteBuf
的retain()
方法让切片或者整个ByteBuf
的引用计数加1
,否则其他切片还没有发送整个ByteBuf
就会被释放掉
embeddedChannel.writeOutbound(ByteBuf byteBuf)
:向通道流水线写出站数据,从tail
处理器开始沿着处理器链依次向前经过预设出站处理器处理ByteBuf
概念
Netty
的Future
接口继承自JDK
的Future
接口
JDK
的Future
通过get()
方法同步阻塞当前线程等待异步任务执行结束并返回异步任务的执行结果,Netty
的Future
既可以同步等待异步任务的执行结果,也可以异步通过回调通知的方式获取异步任务的执行结果
JDK
的Future
实例通过向线程池提交Callable
任务对象的方法的返回值获取
JDK
的Future
常用方法
future.cancel()
:取消已经提交但还未执行的任务
future.isCanceled()
:判断任务是否已经取消
future.isDone()
:判断任务是否已经完成,注意该方法不能判断任务是被成功执行了还是任务失败了
future.isSuccess()
:判断任务被成功执行了还是任务执行失败了
future.get()
:同步阻塞当前线程直到获取到异步任务的结果
Netty
中Future
的扩展方法
future.sync()
:阻塞当前线程等待异步任务执行结束,如果异步任务执行失败会抛出异常
future.await()
:阻塞当前线程等待异步任务执行结束,如果异步任务执行失败不会抛出异常,需要用户通过手动调用future.isSuccess()
方法来进行判断
future.addLinstener()
:为异步任务注册监听器,异步任务执行结束在回调方法的入参传递执行结果并由异步线程执行一段用户自定义的业务操作
Netty
的future.addListener()
注册的异步回调无法在Junit
的测试方法中使用异步线程执行
future.cause()
:获取异步任务执行失败的错误信息,如果没有发生异常返回null
,等价于future.getCause()
方法
future.getNow()
:获取异步任务的执行结果,如果结果还未产生返回null
概念
Netty
的Promise
接口继承自Netty
的Future
常用的实现有DefaultPromise
,用户可以直接在当前线程通过DefaultPromise
的单参构造传参事件执行器即EventLoop
主动创建Promise
实例,而不是像Future
一样只能通过执行异步任务的线程创建并返回给当前线程
Promise
除了同步等待和异步回调获得异步任务的结果,还可以脱离任务独立存在,仅作为两个线程传递结果的容器
Promise
在RPC
框架中非常有用,能实现Future
无法达到的效果
Promise
相较于Future
就是能在当前线程手动创建并在异步任务的执行线程手动调用setSuccess
或者setFailure
设置异步调用的结果并且不用等异步任务执行结束就能放行阻塞在Promise
的await
或者sync
的线程;而Future
只能通过异步任务的执行线程创建并返回给异步任务的调用线程,也不提供手动设置异步任务执行结果的方法
Promise
的扩展方法
promise.setSuccess(T result)
:向promise
对象设置成功执行的结果,甚至不需要等到异步任务执行结束就能设置并获取结果
promise.setFailure(Throwable e)
:向promise
对象设置执行失败的异常
不管调用promise.setSuccess(T result)
还是promise.setFailure(Throwable e)
都会让promise.await()
或者promise.sync()
结束阻塞继续向下运行,但是promise.get()
仍然会继续阻塞直到异步任务执行结束
Promise
在RPC
框架中的用法
在RPCClientManager
中为用户提供接口动态代理对象的getProxyService()
方法将用户调用远程方法的行为转换成向远程服务发送远程调用消息的行为[这部分细节内容看搭建RPC框架部分内容]
这里演示使用的是JDK
动态代理
远程方法调用完响应消息被客户端接收后会被流水线处理成自定义的RPCResponseMessage
,流水线的处理得到RPCResponseMessage
是在线程eventLoop
中完成的,动态代理对象一般在用户线程中完成,这里涉及到两个线程间共享同一个数据的问题,使用Promise
容器可以实现多个线程间交换同一个数据
准备一个ConcurrentHashMap
以消息序号作为key
,以Promise
作为值缓存远程调用响应消息经流水线处理后的结果,为了保证多线程并发共享数据的线程安全性使用了concurrentHashMap<Integer,Promise<?>>
[?
是通配符,表示适配任意类型,这是因为不知道响应的结果是什么类型,注意这里用通配符不行,后续向Promise
对象中设置值会出现问题,将通配符改成Object
类型];Promise
对象由代理对象通过DefaultPromise<?> promise = new DefaultPromise<>(channel.eventLoop())
创建后存入ConcurrentHashMap
[入参channel.eventLoop()
是创建Promise
对象需要指定将结果传入Promise
对象的线程EventExecutor
对象,需要流水线的执行线程即channel.eventLoop()
对象,注意该concurrentHashMap
老师设置为RPCResponseMessageHandler
的一个公有静态变量],代理对象创建Promise
对象并将其存入concurrentHashMap
后调用promise.await()
或者promise.sync()
等待eventLoop
接收到响应将结果存入promise
对象,使用promise.await()
使用promise.isSuccess()
来判断是否正常成功获取消息,成功获取响应结果直接获取结果设置为代理对象对应方法的返回结果,如果没有成功获取响应结果,包装异常对象promise.cause()
直接通过代理对象抛出
xxxxxxxxxx
/**
* @param serviceClass
* @return {@link T }
* @描述 用户调用该方法获取代理对象时就指定了目标接口,用户获取到代理对象可以调用接口`ServiceClass`中的方法,
* 实际上就是OpenFeign中对远程接口调用时的写法,用户感觉在调用远程方法,实际上是用动态代理在向远程服务发送远程调用消息
*
* 这里使用的JDK的动态代理实现,用户通过代理对象调用目标接口中的方法时实际上调用的是调用对象中第三个参数用户通过Lambda表达式
* 自定义的业务,Lambda表达式中第二个参数会传参用户调用的接口方法对象,第三个参数会传参用户方法入参列表Object数组,我们可以拿到接口名,
* 方法名和方法入参执行代理对象的方法
* @author Earl
* @version 1.0.0
* @创建日期 2025/02/04
* @since 1.0.0
*/
public static <T> T getProxyService(Class<T> serviceClass){
ClassLoader classLoader = serviceClass.getClassLoader();
Class<?>[] interfaces = new Class[]{serviceClass};
Object proxyObject =Proxy.newProxyInstance(classLoader,interfaces,(proxy,method,args) -> {
//1. 将用户对代理接口方法的调用转换成对远程调用消息的封装
RPCRequestMessage msg = new RPCRequestMessage(
SequenceIdGenerator.nextId(),
serviceClass.getName(),
method.getName(),
method.getReturnType(),
method.getParameterTypes(),
args
) ;
//2. 将消息发送出去
getChannel().writeAndFlush(msg);
//3. 准备一个空Promise对象准备接收远程调用响应结果
DefaultPromise<?> promise = new DefaultPromise<>(getChannel().eventLoop());
RPCResponseMessageHandler.PROMISES.put(sequenceId,promise);
//4. 阻塞当前线程等待promise被设置结果
promise.await();
//5. 对promise中结果的处理
if(promise.isSuccess()){
return promise.getNow();
}else{
throw new RuntimeException(promise.cause());
}
});
return (T) proxyObject;
}
在RPCResponseMessageHandler
中增加将流水线处理结果存入concurrentHashMap
中的Promise
对象的逻辑
通过消息的序列号从concurrentHashMap
中获取消息对应的Promise
对象,检查远程调用的响应结果是否正常,如果正常调用promise.setSuccess(returnValue)
设置远程调用执行结果,如果远程调用有异常就调用promise.setFailure(exceptionValue)
将异常信息设置到Promise
中[注意Gson对Throwable
对象向json
字符串的转换不需要自定义转换适配器],为了避免序列号错误或者其他错误需要对promise
对象判空,只有非空才能进行设置值操作,否则会出现空指针异常,因为各种原因导致集合中没有对应promise
对象是可能出现这种问题的
xxxxxxxxxx
Sharable .
public class RPCResponseMessageHandler extends SimpleChannelInboundHandler<RPCResponseMessage> {
protected void channelRead0(ChannelHandlerContext channelHandlerContext, RPCResponseMessage rpcResponseMessage) throws Exception {
Promise<Object> promise = PROMISES.get(rpcResponseMessage.getSequenceId());
if(promise!=null){
Exception ex = rpcResponseMessage.getExceptionValue();
if(ex != null){
promise.setFailure(ex);
}else {
promise.setSuccess(rpcResponseMessage.getReturnValue());
}
}
}
}
概念
ChannelFuture
继承自Netty
的Future
,是用于表示Channel
各种异步操作结果的接口,Netty
中各种IO
操作都是异步的,ChannelFuture
能正确获取异步操作的结果并在异步操作成功或者失败时由异步线程执行一段开发者自定义的业务操作
异步任务调用正确获取执行结果要么同步阻塞当前当前线程等待异步任务执行完毕返回执行结果,要么在异步任务发起时为异步任务注册监听器待异步任务执行结果时自动调用回调方法并传参异步任务的执行结果
Netty
中bootstrap
和ServerBootstrap
的bind
、write
、connect
方法都会返回一个ChannelFuture
对象
常用方法
channelFuture.sync()
:该方法会阻塞当前线程直到通道连接成功建立后才会继续执行后续代码
如果连接还没有成功建立就通过channelFuture.channel()
获取到Channel
并使用该channel
发送消息,消息会直接丢失;但是连接建立好以后,连接信息会自动封装到channel
中此时仍能正常收发消息;
channelFuture.channel()
:连接成功建立后获取连接通道channel
实例
channelFuture.addListener(ChannelFutureListener channelFutureListener)
:用户通过匿名实现函数式接口ChannelFutureListener
的operationComplete(ChannelFuture future)
方法指定连接完成建立时要通过异步线程执行的操作
回调方法中入参ChannelFuture
对象就是channelFuture.addListener()
注册监听器的那个channelFuture
概念
CloseFuture
实现了ChannelFuture
,用于正确获取Channel
关闭操作结果的接口,CloseFuture
实例通过channel.closeFuture()
返回
常用方法
closeFuture.sync()
:阻塞当前线程等待指定连接通道完全异步关闭
closeFuture.addListener(ChannelFutureListener channelFutureListener)
:用户通过匿名实现函数式接口ChannelFutureListener
的operationComplete(ChannelFuture future)
方法指定连接完成关闭时要通过异步线程执行的操作
回调方法的入参future
就是添加监听器的closeFuture
概念
Pipeline
流水线底层是一个双向链表,首尾分别是head
和tail
处理器,中间是由用户使用Netty
提供或者由用户自定义的处理器组合而成的处理器链,流水线负责将网络事件传播给流水线上的每个Handler
,Handler
通过相应事件的处理方法对自身感兴趣的事件进行处理
EventLoop
是通道流水线处理器的实际执行者,在为流水线初始化处理器时可以指定执行当前处理器的EventLoopGroup
,同一条流水线的不同工序可能被不同的eventLoop
实例执行,前一道工序由eventLoop1
线程执行到AbstractChannelHandlerContext.invokeChannelRead(next,msg)
方法时,会通过通道处理器上下文拿到下一个处理器的事件执行器即eventLoop2
,通过eventLoop2.inEventLoop()
方法判断下一道处理工序的执行线程是否是当前线程;如果是当前线程直接在当前线程调用执行下一道工序,如果不是当前线程则将调用执行下一道工序的代码封装成任务对象提交给下一道工序的事件执行器即eventLoop2
执行;通过这种方式来实现不同处理器的执行线程切换
常用方法
channelPipeline.addLast()
:将指定处理器添加到tail
处理器前面
概念
通道处理器用于处理通道上的网络事件,通道处理器分为出站入站处理器和双向处理器,多个通道处理器组成的双向链表被称为流水线;
入站处理器用于处理如连接建立、数据读取、连接关闭等入站事件从网络接收的数据,将网络数据解析为应用可以理解的数据类型、对解析后的数据执行业务处理逻辑、根据需要向客户端发送响应并将解析后的数据传递给下一个入站处理器
出站处理器用于处理可写事件的出站数据
双向处理器可以同时作为出站处理器和入站处理器
入站处理器
入站处理器一般是实现了ChannelInboundHandler
接口的抽象类ChannelInboundHandlerAdapter
的子类,为所有方法提供了空实现,开发者可以通过继承ChannelInboundHandlerAdapter
实现其中的特定方法自定义处理器对特定网络事件的处理逻辑,Netty
也提供了部分入站处理器供开发者使用
channelRead(ctx,msg)
:处理可读事件的入站数据,入参ctx
为ChannelHandlerContext
实例,提供对通道和流水线上下文的访问;msg
的类型为Object
,流水线写出到通道或者通道读入流水线的数据类型一般为ByteBuf
,在处理器处理过程中ByteBuf
可能被处理成其他任意类型的数据并传递给后续流水线;
在channelRead
方法中调用ctx.fireChannelRead(msg)
将当前处理器处理后的消息传递给下一个处理器,如果入站处理器没有调用super.channelRead(ctx,msg)
或者ctx.fireChannelRead(msg)
方法,入站处理器流水线会直接在当前处理器直接断掉,后续入站处理器不会再继续执行;最后一个入站处理器调用该方法不会有任何效果
入站处理器在处理入站数据期间可能有写出数据的需求,比如需要向客户端发送响应,可以直接调用ctx.writeAndFlush(Object msg)
向客户端写出数据,某个入站处理器处理数据期间通过该方法写出数据,写出的数据会从当前处理器开始直接沿着处理器链向前依次执行每个出站处理器,该处理器后面的出站处理器都不会被执行;此外向客户端写出数据还可以使用初始化通道的initChannel(NioSocketChannel ch)
方法的入参ch
调用ch.writeAndFlush(ByteBuf)
在任意一个处理器中向客户端写出数据,此时出站数据都会从tail
处理器沿着完整的处理器链向前依次执行每个出站处理器
channelActive(ctx)
:连接通道成功建立时会触发active
通道激活事件,此时Netty
会执行通道流水线上每个入站处理器的channelActive()
方法,通常用于设置连接状态信息、向客户端发送通知消息、记录连接日志等等
通过ctx.fireChannelActive()
将通道激活事件传递给下一个处理器
channelInactive(ctx)
:连接通道失活时,Netty
会调用各个入站处理器的channelInactive()
方法,常用于释放连接相关资源,记录连接关闭日志
通过ctx.fireChannelInactive()
将通道失活事件传递给下一个处理器
userEventTriggered(ctx,evt)
:该方法用于处理用户的自定义事件或者IdleState
下的各种事件,其中入参evt
就是事件本身,通过ctx.fireUserEventTriggered(evt)
将事件传递给下一个处理器
exceptionCaught(ctx,cause)
:用于处理通道生命周期中捕获任何未被处理的异常,常用于记录异常发生时记录异常日志,释放资源,通知客户端
通过ctx.fireExceptionCaught(cause)
将异常传递给下一个处理器
出站处理器
入站处理器一般是实现了ChannelOutboundHandler
接口的抽象类ChannelOutboundHandlerAdapter
的子类
write(ctx,msg,promise)
:处理可写事件的出站数据
通过调用super.write(ctx,msg,promise)
将当前处理器的处理结果传递给下一个出站处理器
双向处理器
双向处理器一般是同时实现了出站处理器和入站处理器的抽象类ChannelDuplexHandler
的子类
Netty
提供的常用ChannelHandler
StringDecoder
:将ByteBuf
转成字符串
StringEncoder
:将字符串转成ByteBuf
LoggingHandler
:日志处理器,打印通道的状态打印客户端传输过来的ByteBuf
中的内容
在LoggingHandler
的构造方法中指定LogLevel.DEBUG
可以将日志级别调整为DEBUG
无法识别的字节会打印成.
,无法识别的字节一般是占用多个字节的类型数据或者特殊字符,比如int
类型数据、换行符\n
该日志处理器的底层使用的是logback
,需要配置logback.xml
日志配置文件
FixedLengthFrameDeacoder
:定长解码器,从ByteBuf
中拆分每条固定长度的消息
拆分每条消息的处理器必须添加到LoggingHandler
之前,否则ByteBuf
中的数据还没有处理粘包半包现象就会被打印
LineBasedFrameDecoder
:行解码器,从ByteBuf
中拆分以换行符作为消息分隔符的每条消息
DelimiterBasedFrameDecoder
:分隔符解码器,从ByteBuf
中拆分以自定义符号作为消息分隔符的每条消息
LengthFieldBasedFrameDecoder
:帧解码器,从ByteBuf
中拆分基于长度字段确定消息长度的每条消息
CombinedChannelDuplexHandler
:HTTP
协议解码器,能将ByteBuf
数据解码成消息头HttpRequest
和消息体HttpContent
SimpleChannelInboundHandler<T>
:开发者可以通过匿名实现入站处理器SimpleChannelInboundHandler<T>
指定泛型类型重写ChannelRead0(ctx,msg)
实现只对特定类型消息的入站处理,如果是其他类型的消息会跳过该处理器不执行
如socketChannel.pipeline().addLast(new SimpleChannelInboundHandler<HttpRequest>(){})
即处理器SimpleChannelInboundHandler
只会对HttpRequest
类型的msg
生效,如果是HttpContent
类型的消息该处理器就会跳过不执行
IdleStateHandler
:空闲状态检测器,检测通道是否存在连接假死,判断连接可能存在问题的原理是读入或者写出数据之后的空闲时间太长
new IdleStateHandler(int readerIdleTimeSeconds,int writerIdleTimeSeconds,int allIdleTimeSeconds)
,入参readerIdleTimeSeconds
设置读空闲超时时间,writerIdleTimeSeconds
设置写空闲超时时间,allIdleTimeSeconds
是设置读写的空闲时间都超过指定时间
概念
ChannelHandlerContext
实例提供对通道和流水线上下文的访问,流水线通过ChannelHandlerContext
管理流水线上的处理器
常用方法
fireChannelXxx()
:将当前处理器Xxx
事件对应处理方法的处理结果以及相应事件传递给下一个处理器
writeAndFlush(Object msg)
:将消息从当前处理器开始向前依次经过各个出站处理器处理并写出到网络通道
channel()
:获取当前流水线正在处理的通道,可以拿到通道在处理器中写出数据或者获取通道注册的事件循环提交普通或者定时任务
fireExceptionCaught(throwable)
:向流水线抛出指定异常
概念
ChannelOption
是枚举类,提供一系列枚举值作为Channel
的配置参数如缓冲区大小、超时时间等控制Channel
的行为;通过serverBootstrap
或者bootstrap
的option(ChannelOption枚举值,值)
方法对通道参数进行配置
常用枚举值
ChannelOption.SO_RCVBUF
:通道接收缓冲区滑动窗口的大小,SO_
表示TCP
套接字的参数
ChannelOption.SO_SNDBUF
:通道发送缓冲区滑动窗口大小
接收缓冲区和发送缓冲区滑动窗口大小早期一般由用户调整,现代操作系统TCP
流量控制和拥塞控制机制能高效自适应调整该参数
ChannelOption.SO_TIMEOUT
:设置阻塞式IO
中accept
或者read
方法的最大阻塞时间,默认为无时限等待
ChannelOption.CONNECT_TIMEOUT_MILLIS
:客户端建立连接超时时间,如果指定毫秒时间内无法建立连接会抛出ConnectTimeoutException
,默认值为30s
ChannelOption.SO_BACKLOG
:设置TCP
连接队列的最大长度,即服务端允许同时建立连接的客户端数量
ChannelOption.RCVBUF_ALLOCATOR
:调整Netty
的接收缓冲区容量
参数值为AdaptiveRecvByteBufAllocator
对象,通过构造方法new AdaptiveRecvByteBufAllocator(16,16,16)
来实例化,三个入参依次为接收缓冲区容量的最小值、初始值和最大值,指定的最小值比16
小也只会最小应用16
ChannelOption.TCP_NODELAY
:配置是否启用Nagle
算法优化,默认为false
表示启用,实际生产中希望降低延迟建议设置为true
关闭Nagle
算法
Nagle
算法优化指一些数据包的数据太少可能会在发送缓冲区等待积攒消息,这可能给消息通信带来延迟
ChannelOption.ALLOCATOR
:Allocator
是ByteBuf
的分配器,用户调用channelHandlerContext.alloc()
时拿到的就是分配器对象,分配器默认分配的ByteBuf
是一个池化的直接内存PooledUnsafeDirectByteBuf
,分配器的设置逻辑为
allocType
是JVM
系统参数io.netty.allocator.type
的值,如果用户启动JVM
时没有配置该参数,Netty
会通过PlatformDependent.isAndroid() ? "unpooled" : "pooled"
判断当前操作系统平台是不是安卓,是则将allocType
设置为unpooled
,不是安卓设置为pooled
,该参数只是设置默认分配的ByteBuf
是否池化,不会影响使用堆内存还是直接内存
allocType
是unpooled
默认分配器实例为UnpooledByteBufAllocator
,是pooled
则为PooledByteBufAllocator
JVM
系统参数可以在运行时通过System.getProperty(String key)
访问,用户通过-D
如-Dio.netty.allocator.type=unpooled
进行配置
构造方法new UnpooledByteBufAllocator(boolean preferDirect)
方法的入参preferDirect
表示是否首选直接内存,该参数由JVM
系统参数io.netty.noPreferDirect
决定,默认值是false
表示首选直接内存;通过该构造方法实例化的UnpooledByteBufAllocator
对象将被赋值给UnpooledByteBufAllocator.DEFAULT
,即通过UnpooledByteBufAllocator.DEFAULT.alloc()
获取的ByteBuf
默认使用直接内存
非安卓平台的非池化堆内存ByteBuf
可以在启动JVM
时通过配置系统参数-Dio.netty.allocator.type=unpooled -Dio.netty.noPreferDirect=true
实现
网络IO
读写数据时Netty
会强制使用直接内存,不会根据系统参数配置妥协使用堆内存以保证网络IO
的效率
ChannelOption.RCVBUF_ALLOCATOR
:该参数配置Netty
接收最原始网络数据的ByteBuf
容量
在AbstractNioByteChannel
的read()
方法中定义了接收最原始网络数据的逻辑,接收数据的ByteBuf
通过byteBuf=allocHandle.allocate(allocator)
创建,AllocHandle
是RecvByteBufAllocator
的内部类,入参allocator
通过config.getAllocator()
获取的就是上述ChannelOption.ALLOCATOR
创建的allocator
,allocator
控制ByteBuf
是池化还是非池化
allocHandle.allocate(allocator)
方法中调用allocator.ioBuffer(guess())
强制创建基于直接内存的ByteBuf
,guess()
方法会根据Netty
过去几次网络通信数据量动态决定ByteBuf
的容量
allocHandle
在recvBuffAllocHandle()
方法中调用rcvBufAllocator.newHandle()
获取,rcvBufAllocator
通过DefaultChannelConfig
的构造方法中调用setRecvByteBufAllocator(allocator)
赋值得到,入参allocator
在DefaultChannelConfig
的构造方法中通过实例化AdaptiveRecvByteBufAllocator
获得,即通过AdaptiveRecvByteBufAllocator
的参数配置创建allocHandle
实例从而决定Netty
接收缓冲区ByteBuf
的容量参数,AdaptiveRecvByteBufAllocator
的构造器调用this(DEFAULT_MINIMUM,DEFAULT_INITIAL,DEFAULT_MAXIMUM)
来进行实例化,其中入参DEFAULT_INITIAL
表示接收缓冲区的默认初始容量为1024
,DEFAULT_MAXIMUM
表示接收缓冲区允许的最大容量为65536
,DEFAULT_MINIMUM
表示接收缓冲区允许的最小容量为64
默认情况下接收最原始网络数据的接收缓冲区的内存类型调用的是allocator
的ioBuffer(guess())
方法定死了是直接内存,是否池化或者非池化根据JVM
的系统参数io.netty.allocator.type
决定,初始容量1024
,根据前几次的通信数据量在64-65536
范围内自适应调整接收缓冲区的容量
概念
ChannelInitializer
是一个抽象类,通过serverBootstrap.childHandler(new ChannelInitializer(){})
能自定义ChannelInitializer
实现,通过重写匿名实现的initChannel
方法能为已接入通道自定义组合多种通道处理器形成通道流水线,initChannel
方法在通道被注册到事件循环后被调用用于将通道处理器添加到通道流水线
在channelInitializer
的initChannel(SocketChannel ch)
方法中通过多次调用ch.pipeLine().addLast()
将处理器依次添加到tail
处理器前面,addLast
方法的重载方法可以在添加处理器的同时指定用户自定义的事件循环组替换掉轮询通道的事件循环组来执行处理器的对应时间处理方法,防止处理时间较长的非IO
事件降低其他通道的轮询效率
概念
服务端启动引导器,负责组装服务端Netty
组件,做服务端配置,引导服务端启动
ServerBootstrap
需要配置两个事件循环组,一个用于监听新客户端的接入事件,一个用于处理客户端的可读可写事件
常用方法
group(eventLoopGroup)
:设置一个EventLoopGroup
同时作为BossEventLoop
和WorkerEventLoop
的容器,这种方式会自动在事件循环组中分配处理accept
事件的EventLoop
和处理读写事件的EventLoop
group(bossGroup,workerGroup)
:设置两个EventLoopGroup
,bossGroup
作为BossEventLoop
的容器,只处理所有通道的accept
事件;workerGroup
作为workerEventLoop
的容器,只处理所有通道上的读写事件;通过设置两个事件循环组管理的线程数量可以切换配置Netty
的三种线程模式
channel(serverSocketChannel)
:指定服务端的ServerSocketChannel
实现,这个ServerSocketChannel
就是NIO
中的ServerSocketChannel
,Netty
对ServerSocketChannel
实现包括
NioServerSocketChannel
:这是对原生NIO
的ServerSocketChannel
做的进一步封装
OioServerSocketChannel
:基于BIO
同步阻塞式IO
的ServerSocketChannel
实现
EpollServerSocketChannel
:基于Linux
的Epoll
的ServerSocketChannel
实现
KQueueServerSocketChannel
:对Mac
的ServerSocketChannel
实现
childHandler(channelInitializer)
:自定义抽象类ChannelInitializer
的实现重写initChannel
方法为通道流水线组合多种Handler
来自定义对入站出站数据的处理过程
bind(port)
:指定服务端Socket
通道监听客户端连接的端口开始监听端口上的新客户端接入事件,该方法返回ChannelFuture
实例
option(channelOption,value)
:给服务端通道ServerSocketChannel
做一些全局配置,比如配置通道接收发送缓冲区的滑动窗口大小,接收原始网络数据的ByteBuf
的容量,建立连接的超时时间,是否启用Nagle
算法,TCP
连接队列的最大长度
childOption(channelOption,value)
:给服务端通道SocketChannel
做一些全局配置
概念
客户端启动引导器,负责组装客户端Netty
组件,做客户端配置,引导客户端启动;
常用方法
group(nioEventLoopGroup)
:设置EventLoopGroup
作为EventLoop
的容器,客户端只需要配置一个事件循环组收发网络消息即可
channel(socketChannel)
:指定客户端的SocketChannel
实现
NioSocketChannel
是Netty
对NIO
的SocketChannel
的封装
handler(channelInitializer)
:自定义抽象类ChannelInitializer
的实现重写initChannel
方法为通道流水线组合多种Handler
来自定义对入站出站数据的处理过程
connect(inetSocketAddress)
:通过指定服务端的主机和IP
尝试与服务端建立连接,返回ChannelFuture
对象
该方法是一个异步非阻塞方法,真正发起连接的线程不是创建启动器Bootstrap
的线程,而是NioEventLoopGroup
中的EventLoop
,建立连接的时间以秒作为单位
bind(port)
:作为UDP
协议的一端绑定本地一个端口
概念
Reactor
用于解决高并发场景下的IO
问题,Reactor
模式是一种使用非常广泛的高性能网络编程设计,Nginx
、Redis
、Netty
都是基于Reactor
模式实现的,Java
中projectreactor
框架在JVM
层面上实现了基于Reactor
反应式模式也叫响应式编程的设计
Reactor
模式的核心是Reactor
和Handler
两个角色
Reactor
负责监听查询IO
事件,当监听到IO
事件时将事件分发给相应处理器处理,这里的事件在Java
中就是指NIO
选择器监听的通道IO
事件
Handler
负责执行连接建立、通道数据读取、数据编解码、业务逻辑处理以及向通道写出数据等IO
事件的处理
Reactor
和Handler
有单线程模型、多线程模型和主从多线程模型三种实现方式
所谓的单线程模型就是NIO
的单线程模型,创建负责监听和分发事件的模块或者组件作为Reactor
,Reactor
通常运行在一个单独的线程中,由Reactor
创建选择器通过操作系统的epoll
函数监听通道上的I/O
事件,服务端启动时绑定监听指定端口上的Accept
客户端连接接入事件,一旦监听到端口上的客户端连接读写事件,Reactor
线程会从选择器中获取事件根据事件类型调用一系列处理器对事件进行处理;单线程模型要求所有的Handler
都必须在一个线程内同步执行,待所有处理器处理完后当前事件将处理结果响应给客户端后再去处理下一个事件;单线程模型不适合高并发、事件处理耗时很长的场景,而且无法发挥多核CPU
的优势
所谓的多线程Reactor
模型就是NIO
的多线程模型,将原来的一个Reactor
组件拆分成两部分,第一个部分由一个Reactor
组件只负责监听和分派Accept
事件,第二个部分由若干Reactor
组件作为SubReactor
专门处理已连接客户端通道除Accept
外的其他IO
事件,每个SubReactor
都运行在独立的一个线程中,由专门处理Accept
事件的Reactor
将非Accept
外的IO
事件分配给每个SubReactor
所在工作线程进行处理,每个工作线程处理完当前分配的IO
事件再处理下一个事件;多线程模型适合绝大多数场景,只有少数极端场景如百万并发场景下单个线程处理客户端接入事件可能存在性能问题
主从多线程Reactor
模型指和subReactor
一样由一组reactor
负责处理新客户端的接入请求,另一组subReactor
维持多线程模型的状态专门处理已连接客户端通道除Accept
外的其他IO
事件
特点
Reactor
模式是基于事件驱动的设计模式,可以通过单个或者少量线程就能管理所有客户端连接通道上的所有IO
事件
Reactor
模式可以通过简单增加处理事件的线程数来快速扩展系统的负载上限
Reactor
模式解耦事件接收、事件分派和事件处理,让系统更容易维护和扩展
多线程模型充分发挥多核CPU
的优势,提高系统吞吐量,降低处理长耗时事件对其他通道就绪事件的影响从而提交系统的响应速度
概述
大部分网络框架都是基于Reactor
模式设计开发的,Reactor
模式是基于事件驱动,采用多路复用将时间分发给相应处理器进行处理,适合海量IO
的场景
Netty
依靠NioEventLoopGroup
线程池实现具体的线程模型,使用Netty
创建服务端时一般会初始化bossGroup
和workerGroup
两个线程组,前者负责处理客户端连接建立请求,后者负责处理客户端除连接事件外的其他网络事件
单线程模型
一个线程处理客户端的所有事件,包括accept
、read
、decode
、process
、encode
、send
等事件
不适用于高并发、IO
时间很长的场景
对应使用Netty
创建服务端创建一个单线程的NioEventLoopGroup
,服务端启动引导类group
方法的两个入参都为该NioEventLoopGroup
xxxxxxxxxx
EventLoopGroup eventGroup = new NioEventLoopGroup(1);
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(eventGroup,eventGroup)
//......
多线程模型
一个只管理单个EventLoop
的NioEventLoopGroup
专门处理客户端连接请求,一个管理默认数量EventLoop
的NioEventLoopGroup
专门处理客户端的其它网络事件
多线程模型适合大部分应用场景,只有少数百万并发场景下,单个线程处理客户端连接事件可能存在性能问题
对应使用Netty
创建服务端创建一个单线程的NioEventLoopGroup
作为bossGroup
,创建一个默认线程数的NioEventLoopGroup
作为workGroup
;服务端启动引导类的group
方法两个入参一个为bossGroup
,一个为workerGroup
xxxxxxxxxx
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
try{
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup,workerGroup)
//......
}
主从多线程模型
创建两个默认线程数量的NioEventLoopGroup
,一个作为bossGroup
,一个作为workerGroup
,bossGroup
中选择一个线程绑定监听端口专门处理客户端接入请求,其他线程负责接入后的认证、登录等工作,连接建立后由workerGroup
专门处理客户端的其它通信事件
适合超高并发场景
xxxxxxxxxx
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
try{
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup,workerGroup)
//......
}
服务端通信流程
1️⃣:创建启动器类ServerBootstrap
2️⃣:根据需要的Reactor
线程模型添加EventLoopGroup
组件作为bossEventLoop
和workerEventLoop
的容器,bossGroup
监听新客户端的接入事件、workGroup
处理已接入通道的可读可写事件;根据要使用的Netty
线程模型控制两个事件循环组的线程数量
3️⃣:根据场景选择ServerSocketChannel
实现
4️⃣:调用childHandler
方法为通道添加通道初始化器并重写initChannel
为通道流水线组合处理器
5️⃣:服务端绑定网络通信端口开始监听客户端通道的接入事件
6️⃣:服务端监听到客户端接入事件将客户端注册到事件循环中,并调用initChannel
方法初始化通道流水线
7️⃣:通道可读事件触发流水线的入站处理器对入站数据的处理,有需要可以在处理器中通过ctx.writeAndFlush
或者channel.writeAndFlush
向客户端写出数据
public void testNettyServerBasicUsage(){
// 1.bossGroup 用于接收连接,workerGroup 用于具体的处理
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
//2.创建服务端启动引导/辅助类:ServerBootstrap
ServerBootstrap b = new ServerBootstrap();
//3.给引导类配置两大线程组,确定了线程模型
b.group(bossGroup, workerGroup)
// (非必备)打印日志
.handler(new LoggingHandler(LogLevel.INFO))
// 4.指定 IO 模型
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
public void initChannel(SocketChannel ch) {
ChannelPipeline p = ch.pipeline();
//5.可以自定义客户端消息的业务处理逻辑
p.addLast(new HelloServerHandler());
}
});
// 6.绑定端口,调用 sync 方法阻塞知道绑定完成
ChannelFuture f = b.bind(port).sync();
// 7.阻塞等待直到服务器Channel关闭(closeFuture()方法获取Channel 的CloseFuture对象,然后调用sync()方法)
f.channel().closeFuture().sync();
} finally {
//8.优雅关闭相关线程组资源
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
原生NIO
的服务端启动流程
Selector.open()
创建选择器
ServerSocketChannel.open()
创建ServerSocketSocket
serverSocketSocket.register(selector,0,attachment)
向选择器注册通道和相应附件
serverSocketChannel.bind(new InetSocketAddress(8080))
为通道绑定服务端通信监听端口
selectionKey.interestOps(SelectionKey,OP_ACCEPT)
通过通道对应事件监听对象设置监听serverSocketChannel
上的接入事件
Netty
的启动流程[Netty
中初始化选择器和NIO
线程在new NioEventLoopGroup
中完成,其余创建serverSocketChannel
,初始化通道流水线,将serverSocketChannel
注册到选择器中,绑定端口以及向选择器中注册监听接入事件都在serverBootstrap.bind()
方法中进行]
创建NioEventLoopGroup
的同时初始化封装了NIO
线程和选择器的NioEventLoop
管理客户端接入事件
创建Netty
的NioServerSocketChannel
,初始化nioServerSocketChannel
的流水线,将初始化处理器添加到流水线中等待被调用
启动NIO
线程负责后续步骤的执行,ServerSocketChannel.open()
创建NIO
的原生serverSocketChannel
,并通过serverSocketChannel.configureBlocking(false)
将通道设置为非阻塞模式
serverSocketChannel.register(selector,0,attachment)
将serverSocketChannel
注册到选择器中,同时将Netty
的NioServerSocketChannel
作为附件和原生serversocketChannel
绑定
serverSocketChannel.bind(new InetSocketAddress(8080))
为通道绑定服务端通信端口
selectionKey.interestOps(SelectionKey,OP_ACCEPT)
设置通道的事件监听类型为接入事件
serverBootstrap.open()
方法详解
serverBootstrap.open()
方法主要被拆解为三个部分,initAndRegister
中的init
和register
部分,doBind0
部分;init
部分由主线程执行,初始化Netty
的NioServerSocketChannel
、原生NIO
的ServerSocketChannel
和通道对应的流水线,将初始化处理器添加到流水线中;register
部分启动NIO
线程,由NIO
线程将ServerSocketChannel
注册到选择器并以附件的形式将NioServerSocketChannel
绑定到ServerSocketChannel
上,由NIO
线程执行流水线上的初始化处理器,initAndRegister
方法的返回值是一个ChannelFuture
,用于协调NIO
异步线程执行Register
部分的结果;doBind0
部分负责为serverSocketChannel
绑定监听端口,并且触发NioServerSocketChannel
上的active
事件,当即将执行doBind0
方法时会判断NIO
线程是否已经执行完register
部分,如果已经执行完会由主线程执行doBind0
方法,但是大部分情况下将serverSocketChannel
注册到选择器都是一个耗时操作,进行判断时一般都是NIO
线程仍然没有执行结束,此时doBind0
方法的执行会被封装到ChannelFuture
的回调监听器中,等NIO
线程执行完register
部分再由NIO
线程在监听回调中执行doBind0
方法;
init
部分:constructor.newInstance()
通过反射调用NioServerSocketChannel
的无参构造器实例化NioServerSocketChannel
,并在执行<init>
方法期间和原生NIO
的serverSocketChannel.open()
调用SelectorProvider.provider().openServerSocketChannel()
一样的方式创建原生NIO
的serverSocketChannel
,初始化NioServerSocketChannel
的流水线ChannelPipeline
并向其中添加通道初始化处理器ChannelInitializer
等待调用其中的initChannel
方法,该处理器中的initChannel
方法只会被执行一次,
register
部分:将register0
方法封装成任务对象提交给eventLoop
即NIO
线程执行,这是eventLoop
首次执行任务,也是实例化EventLoop
线程的时机,调用serverSocketChannel.register()
向EventLoop
中的选择器注册通道,并将Netty
实例化的NioServerSocketChannel
作为附件和通道进行绑定,通过channelPipeLine.invokeHandlerAddedIfNeeded()
由NIO
线程执行通道流水线中的初始化处理器ChannelInitializer
的initChannel
方法,该方法的作用是向通道流水线中加入处理器ServerBootstrapAcceptor
,该处理器用于在Accept
事件发生后建立连接;然后手动向ChanenlFuture
实例中设置成功的异步执行结果,触发主线程或者回调监听器中的doBind0
方法执行doBind0
逻辑
doBind0
部分:doBind0
中的所有代码也被封装成任务对象被提交给NIO
线程执行,通过serverSocketChannel.bind()
方法指定监听端口和全连接队列容量来为原生通道绑定监听端口;随即通过channelPipeLine.fireChannelActive()
手动触发NioServerSocketChannel
通道上的Active
事件触发执行head
处理器上的channelActive
方法设置选择器监听serverSocketChannel
上的接入事件
NIO
中服务端处理Accept
事件流程
selector.select
阻塞NIO
线程直到有事件发生
轮询遍历selectedKeys
中的每个就绪事件判断事件类型是否为接入事件
如果是接入事件创建客户端通道对应的SocketChannel
,将通道设置为非阻塞模式
将SocketChannel
注册到选择器中返回selectionKey
通过selectionKey
设置选择器关注通道上的可读事件
Netty
中服务端处理Accept
事件流程
Netty
首次调用eventLoop.execute()
方法时启动单线程执行器中的线程,给单线程执行器提交一个死循环任务,死循环不断轮询检查是否有IO
事件或者用户提交的普通任务,没有NIO
线程则阻塞在带超时时间的selector.select
方法上,如果NIO
线程被唤醒会优先处理当前所有就绪的IO
事件,然后根据IO
事件处理时间在预设时间内处理用户提交的任务。在IO
事件处理期间,通过基于数组实现的selectedKeys
获取就绪事件判断事件类型
如果是接入事件类型,通过serverSocketChannel.accept()
创建NIO
原生的SocketChannel
,随即以原生NIO
的SocketChannel
作为构造器入参创建Netty
相应的NioSocketChannel
,然后将nioSocketChannel
存入ArrayList
作为消息通过调用pipeline.fireChannelRead(nioSocketChannel)
触发nioServerSocketChannel
流水线上的读事件来处理nioSocketChannel
,该流水线在serverBootstrap.bind
方法中的register
部分添加了一个ServerBootstrapAcceptor
处理器,读事件会自动触发执行该处理器的ChannelRead
方法处理消息nioSocketChannel
,ChannelRead
方法会调用childGroup.register()
从serverBootStrap.group
方法指定的另一个eventLoopGroup
中获取一个eventLoop
,把socketChannel
注册到该eventLoop
的选择器封装成一个任务提交给对应eventLoop
的NIO
线程执行,注册逻辑和serverSocketChannel
是类似的,也将NioSocketChannel
作为通道附件和原生NIO
的SocketChannel
绑定,然后执行用户在serverBootStrap.childHandler()
中自定义的ChannelInitializer
的initChannel
方法为nioSocketChannel
的流水线添加用户自定义的各个流水线处理器,然后调用pipeline.fireChannelActive()
触发nioSocketChannel
对应流水线的Active
事件触发head
处理器的channelActive
方法,通过selectionKey.interestOps()
关注通道socketChannel
上的可读事件
NIO
中服务端处理Read
事件流程
selector.select
阻塞NIO
线程直到有事件发生
轮询遍历selectedKeys
中的每个就绪事件判断事件类型是否为可读事件
如果是可读事件则循环调用socketChannel.read(byteBuf)
直到读取到的字节数为0
Netty
中服务端处理Read
事件流程
NIO
线程对接入或者可读事件的处理代码都是复用的NioEventLoop.run
方法中对IO
事件的处理代码processSelectedKey
,而且接入事件和可读事件都执行的同一行unsafe.read()
方法,只是负责处理NioServerSocketChannel
的接入事件和NioSocketChannel
的可读事件对应的unsafe
实例的类型不同,对应的read
方法的具体实现也不同,因此NioSocketChannel
的可读事件的阻塞逻辑、IO
任务和普通任务的处理逻辑以及事件类型的判断都和服务端NioServerSocketChannel
处理Accept
流程是相同的,只是unsafe.read()
方法中对事件的处理逻辑不同
可读事件的read
处理方法首先通过recvBufAllocHandle().allocate(config.getAllocator())
创建池化可动态扩缩容基于直接内存的byteBuf
,使用该byteBuf
读取socketChannel
中的数据,然后手动调用channelPipeline.fireChannelRead(byteBuf)
触发NioSocketChannel
对应流水线上的可读事件依次执行各个处理器的channelRead
方法对byteBuf
中的数据进行用户的自定义处理
客户端通信流程
1️⃣:创建启动器类Bootstrap
2️⃣:添加EventLoopGroup
组件作为事件循环的容器
3️⃣:根据场景选择SocketChannel
实现
4️⃣:调用handler
方法为通道创建一个匿名通道初始化器并重写initChannel
为通道流水线组合处理器
5️⃣:使用connect
方法连接服务端,服务端和客户端成功建立连接后分别调用对应的initChannel
方法对通道进行初始化,返回ChannelFuture
实例,可以调用channelFuture.addListener()
注册一个监听器待连接成功建立时由异步线程执行回调方法
6️⃣:建立连接期间客户端使用channelFuture.sync
方法阻塞当前线程等待连接成功建立
7️⃣:与服务端建立连接后客户端使用channelFuture.channel
方法获取连接对象channel
,获取到channel
对象后通过channel.writeAndFlush
方法向服务端发送数据,使用channel
发送数据时会自动经过流水线的所有出站处理器
8️⃣::通道可读事件触发流水线的入站处理器对入站数据的处理,有需要可以在处理器中通过ctx.writeAndFlush
或者channel.writeAndFlush
向客户端写出数据
public void testNettyClientBasicUsage() throws InterruptedException {
//1.创建一个 NioEventLoopGroup 对象实例
EventLoopGroup group = new NioEventLoopGroup();
try {
//2.创建客户端启动引导/辅助类:Bootstrap
Bootstrap b = new Bootstrap();
//3.指定线程组
b.group(group)
//4.指定 IO 模型
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
// 5.这里可以自定义消息的业务处理逻辑
p.addLast(new HelloClientHandler(message));
}
});
// 6.尝试建立连接
ChannelFuture f = b.connect(host, port).sync();
// 7.等待连接关闭(阻塞,直到Channel关闭)
f.channel().closeFuture().sync();
} finally {
group.shutdownGracefully();
}
}
[回调示例]
future.addListener(f -> {
if (f.isSuccess()) {
System.out.println("连接成功!");
} else {
Throwable cause = f.cause();
System.err.println("连接失败: " + cause.getMessage());
cause.printStackTrace(); // 打印堆栈信息以便调试
}
});
NIO
的空轮询Bug
JDK
在linux
平台下不论是否带超时时间的selector.select()
都有极小的概率发生空轮询Bug
,一旦发生该Bug
,没有IO
事件发生的情况下select
方法无法阻塞NIO
线程导致NIO
线程一直在死循环上空转,如果多个NIO
线程都发生空轮询Bug
,就会导致CPU
资源被耗尽
Netty
对空轮询Bug
的解决方法
Netty
在eventLoop.select()
方法中使用一个selectCnt
初始值为0
的局部变量来统计在一次超时时间为1005ms
的eventLoop.select()
期间死循环调用selector.select()
方法的次数,一旦发生空轮询Bug
,该计数会因为selector.select
方法无法阻塞NIO
线程且无法满足达到预设超时时间、有用户提交的普通任务或者有就绪的IO
事件无法跳出死循环导致selectCnt
的值暴增
Netty
为用户提供了一个系统变量io.netty.selectorAutoRebuildThreshold
来指定selectCnt
阈值,默认值为512
,当selectCnt
超过该阈值Netty
就认为发生了空轮询Bug
,Netty
会重新创建一个selector
替换旧的selector
并将旧selector
上的状态信息拷贝到新selector
上,将selecCnt
重置为0
并让NIO
线程继续死循环重新阻塞在selector.select()
方法上
此外Netty
还提供对Selector
的全新实现来作为解决空轮询Bug
的其他办法
概念
只要使用TCP/IP
协议进行网络通信就会出现黏包半包现象,UDP
没有黏包半包现象;网络编程中的黏包和半包现象指多条消息由于某些原因在数据发送接收时被重新组合,多条消息被组合在一起就是黏包,一条消息被截断成两个或者多个部分就是半包,示例如下;服务端接收到消息后需要处理黏包半包现象将消息还原成用户实际发送的多条消息
[用户实际发送的消息]
xxxxxxxxxx
Hello,world
I'm zhangsan
How are you?
[服务端接收到的消息]
xxxxxxxxxx
Hello,worldI'm zhangsanHo
w are you?
使用Netty
时,NioSocketChannel
和SocketChannel
的channel.writeAndFlush
和channelHandlerContext.writeAndFlush
以及使用NIO
的SocketChannel
都可能随机发生黏包半包现象;一般当接收缓冲区相较于消息本身较小时才会比较容易发生半包现象
半包现象还会导致消息边界问题:socketChannel.write(Charset.defaultCharset().encode("中国"))
写入数据时使用UTF-8
来对字符进行编码,一个汉字会被编码成三个字节,一个英文或者数字编码成一个字节;如果不能完整读取一条消息就将全部字节数据转换成字符,最后一个字符会因为字节不完整转换为乱码,第二次读取到缓冲区的数据因为第一个字符的字节就不对导致后续消息全部乱码
原因分析:本质原因是TCP
协议是流式协议,消息之间无边界,解决黏包半包问题的本质是用户必须自己去设置找出消息的边界
TCP的滑动窗口引起黏包半包现象
TCP
协议网络数据传输以段segment
为单位,一条数据可能被分成多个段来发送,每个段都需要接收方进行一次ack
应答确认,如果接收方没有应答还会再次重试发送保证消息的可靠抵达,等到一个段的发送和应答抵达完成后才发送下一个段,这种方式会降低系统的吞吐量,包的往返时间越长性能越差
为了解决这个问题,TCP
中引入了滑动窗口,TCP
的接收缓冲区和发送缓冲区就是TCP
滑动窗口的实现,就是所谓的Socket
缓冲区;滑动窗口大小决定了无需等待应答就可以继续发送的段数量的最大值,滑动窗口内的段数据才允许被发送,在接收方应答未到达前窗口停止滑动,只有滑动窗口中的数据的接收方应答回来了窗口才能向后滑动包含等待发送的段并发送段数据,而且接收方也相应地维护着一个滑动窗口,超出滑动窗口的待接收数据只有等滑动窗口内的数据接收完毕后才能执行接收操作
滑动窗口起到了缓冲区的作用,避免有多少数据发送多少数据,也避免发送完一条数据必须等到接收方相应以后再发送下一条数据导致发送太慢,同时也起到了流量控制作用
因为TCP
数据可能被分割成多个段,但是滑动窗口的大小有限;如果接收方网络数据量太大,数据接收到一半滑动窗口缓冲区不够用,接收方此时必须去缓冲区读取数据,就会发生半包现象;如果接收方的滑动窗口比较空闲,客户端发送了多条数据,接收方没有及时去滑动窗口读取数据,接收方的滑动窗口把多条数据都缓冲在滑动窗口中,此时接收方程序去缓冲区读取数据,就会发生黏包现象
表现在编码层面socket
缓冲区满了会停止socketChannel.write(byteBuffer)
方法的执行,socket
缓冲区的大小不固定,范围在2-8M
之间浮动,只有socket
缓冲区清空了才能继续发送byteBuffer
中没有写完的数据;byteBuf
比较大消息比较短时总会发生黏包现象
TCP
的Nagle
算法引起的粘包现象
概念:TCP
网络数据传输在传输层和IP
层为数据添加报头,IP
层的报头为20
个字节,TCP
层的报头也为20个字节,即使只传输一个字节数据,最后添加了报头的数据长度至少为41
个字节,在某些情况下报头的长度远远大于数据内容的长度,这样非常不划算;因此出现了Nagle
算法对这种情况进行优化,Nagle
算法的原理是攒够了一定量的数据再发送,避免因为数据报头远远多于数据内容导致网络传输效率太低;而且多条消息堆积以后再传输就像快递配送一样效率更高
攒数据就带来了粘包问题
Netty
的ByteBuf
容量太大或者太小导致粘包半包现象
如果网络传输的数据相较于Netty
的ByteBuf
容量都是很短,很容易发生粘包现象;但是ByteBuf
容量小于消息长度就会发生半包现象
链路层的MSS
限制导致半包现象
链路层是比TCP
更底层的协议,链路层有一个MSS
限制,不同的网卡对数据包的大小有限制,笔记本的网卡一般限制数据包大小包括TCP
和IP
报头为1500
字节,只要发送的数据量大于1460
字节,数据就会被切分发送造成半包现象
注意回环地址对MSS
几乎没有限制,允许的数据包大小为65535
字节,如果向局域网中的另外一台电脑数据网络传输就会应用MSS
限制
网卡对数据包的大小限制值叫数据链路层最大载荷长度MTU
,MSS
是传输层的报文载荷长度即不包含报头的40
字节,MTU
包含了MSS
,MTU
减去IP
头和TCP头
就是MSS
,当数据包大小超过了MTU
时,路由器可能会把数据包分成更小的部分发送,如果不能分片,路由器会丢掉这个数据包,发送ICMP
报文,告诉发送方数据包太大
概念:
客户端建立连接发送一条消息后立即断开连接,客户端单次链接只会发送一条消息,NIO
客户端不管正常断开还是暴力宕机断开连接都能被服务端监听到,服务端能根据客户端连接的断开时机确定单条消息的边界,本质上是用网络连接状态标记消息的开始和结束
局限
短链接方案无法解决半包问题,TCP
的接收缓冲区滑动窗口容量太小、ByteBuf
的容量太小或者MSS
限制仍然会产生半包现象
每发一条数据都要建立一次连接,性能和效率比较低
概念
FixedLengthFrameDeacoder
是专门做固定长度消息解码的处理器,Frame
表示帧指一条完整的消息;固定长度指网络传输的每条消息的长度都相同,定长解码器会按照固定字节数从ByteBuf
取出每条消息,出现半包现象等待后续数据写入ByteBuf
再重复上述过程取出每条消息;消息内容不可能保证每次长度都相同,因此定长解码器需要长度最长的消息作为消息的固定长度防止有消息长度大于预设消息长度,没有数据的位置需要使用空格补齐,消息固有长度通过定长解码器的单参构造指定;
定长解码器就是Netty
提供的一个处理器,作为通道流水线处理通信消息的一道工序,类似定长解码器这种分割单条消息的处理器需要添加到日志处理器如LoggingHandler
前面,否则没有处理黏包半包现象的数据会直接被打印,半包现象引起的消息边界问题会导致后续该通道下一个ByteBuf
数据全部乱码
定长解码器这种分割单条消息的处理器都是拆分出单条消息后每条消息都单独传递给后续流水线的每道工序处理,如果ByteBuf
中的数据无法拼接出单条消息会阻塞当前消息的处理直到向ByteBuf
写入后续数据;
像定长解码器、行解码器和帧解码器这种处理半包现象会阻塞等待后续网络消息的处理器必须设置为每个通道一个,不要出现多个eventLoop
共用一个解码器的情况,因为这些解码器不是线程安全的,出现半包情况这些解码器会记录当前消息的状态,如果此时其他通道的消息拼接到帧解码器上一条还未处理的半包消息上会导致两条消息完全错乱,Netty
为所有支持多线程共享的处理器都添加了@Sharable
注解,Netty
充分考虑了各种场景下这些处理器的线程安全性,可以被不同流水线的多个事件循环共享
优势
后续的各种解码器方案,客户端都可以一次性发送任意条消息,将一条消息分多次发送也行,连接开销比短链接低的多
局限
消息长度方差稍微一大,定长解码器解决方案就会因为大量的空白数据补齐导致严重的内存和网络带宽浪费
概念
行解码器的思想是使用特定分隔符确定单条消息的边界,Netty
提供LineBasedFrameDecoder
和DelimiterBasedFrameDecoder
两种行解码器实现
LineBasedFrameDecoder
以换行符作为消息的分隔符,Linux
平台换行符为\n
,windows
平台换行符为\r\n
,LineBasedFrameDecoder
实例化时必须通过单参构造指定消息的最大长度,如果行解码器发现消息超过最大长度还没有发现换行符会抛出TooLongFrameException
消息太长异常,避免客户端消息格式错误导致服务端行解码器一直阻塞等待拼接客户端消息
DelimiterBasedFrameDecoder
开发者自定义特殊字符作为消息的分隔符,DelimiterBasedFrameDecoder
实例化时也必须通过双参构造指定ByteBuf
类型的分隔符和消息的最大长度
局限:
行解码器因为多一次遍历查找分隔符确定消息长度创建对应ByteBuf
的过程效率比较低
概念
LengthFieldBasedFrameDecoder
也叫帧解码器,是基于长度字段的消息解码器;帧解码器要求网络消息具备魔数、消息长度、消息附加内容和消息本身四部分,其中魔数和消息附加内容长度均可以为0
;这种结构设计的目的是在读取消息内容前就能通过消息长度字段确定消息长度避免通过遍历整条消息来确定消息长度;
LengthFieldBasedFrameDecoder
对象通过五个参数的构造方法来创建,maxFrameLength
指定消息最大长度,消息超出最大长度抛出TooLongFrameException
异常;lengthFieldOffset
指定消息长度字段偏移量,指定长度字段从消息的那一个索引开始;lengthFieldLength
指定长度字段的占用字节数;lengthAdjustment
指定附加消息的长度;initialBytesToStrip
解码出每条消息后要去掉消息头部的多少个字节作为传递给后续流水线的消息,一般用于保留消息内容本身
消息构建时可以通过byteBuf.writeInt(length)
等方式写入指定字节的长度字段,通过byteBuf.writeBytes(bytes)
写入消息内容本身
HTTP
协议就采用类似的LTV
消息格式[L
表示长度、T
表示消息类型、V
表示消息实际内容]
HTTP2.0
协议就是LTV
格式,HTTP1.1
是TLV
格式[HTTP
请求头先传输消息类型即content-type
,再传输消息长度content-length
,请求体就是消息本身]
局限
如果有一条消息格式错误,后续的消息会因为读取错误的长度字段导致所有消息读取错误直接抛出异常
其他衍生问题
行解码器和帧解码器都不可避免地会遇到ByteBuf
最后一条消息产生半包现象的问题
此时底层可以调用NIO
的byteBuffer.compact()
方法将缓冲区中未读的数据移到缓冲区的开头,将当前位置指针position
移到缓冲区未读出数据最后一位的下一位,缓冲区下次再从通道读数据时只会覆盖已读出数据来解决半包问题
行解码器和帧解码器还可能会遇到单条消息长度比缓冲区还长导致缓冲区始终写不完一条完整消息
NIO
的解决办法是将缓冲区绑定为通道附件,当调用byteBuffer.compact()
方法后如果缓冲区的当前位置指针和最大容量相等说明缓冲区覆盖已写出数据后仍然无法写入数据,此时创建一个原容量两倍的新缓冲区,将原数据拷贝到新缓冲区并重新绑定为通道附件再使用该缓冲区从通道写入当前消息
NIO
还可以使用多个数组组成缓冲区,空间不够就将剩余数据写入新数组,相应的消息解析也更复杂
Netty
对两种解决方案都进行了实现,ByteBuf
被设计为可以根据历史消息长度自动扩缩容,CompositeByteBuffer
也实现了通过零拷贝的方式实现多个ByteBuf
的组合
协议简介
Redis
服务端与客户端之间通过Redis
自定义的协议进行网络通信,Redis
的远程命令调用对应的通信消息格式为[元素个数][元素1字节数][元素1实际内容][元素2字节数][元素2实际内容][元素3字节数][元素3实际内容]...
,任意两个部分之间使用[回车键换行键]
作为分隔符,消息的结尾再加一个[回车键换行键]
*#
:元素个数通过*#
表示,其中#
是命令的元素个数即命令的单词个数
$#
:当前元素的字节个数,#
就是当前单词的字母或者数字个数,因为一个字母或者数字都占一个字节
元素实际内容
:当前元素字符对应的字节数据
回车键换行键
:元素个数、各个元素字节数和元素实际内容之间通过回车键换行键
两个字节两两分隔,回车键
对应字节码为十进制的13
,换行键对应10
比如客户端发送命令set name zhangsan
要求Redis
服务端执行,对应的通信消息为*3回车符换行符$3回车符换行符set回车符换行符$4回车符换行符name回车符换行符$8回车符换行符zhangsan回车符换行符
,除去回车符换行符为*3$3set$4name$8zhangsan
上述消息一个字节不改发给Redis
所在主机的对应端口如6379
端口就能在Redis
服务端成功执行命令set name zhangsan
,执行成功后服务端响应+OK回车符换行符
给客户端
协议的意义
客户端和服务端按照规定的通信协议发送网络消息,服务端和客户端就能解析对方的意图并执行对方希望自己执行的操作并由对方通知执行结果
Netty
提供了常用的如Redis
、HTTP
、HTTPS
、WebSocket
等等常用的网络通信协议,用户只需要在流水线中配置相应的协议编解码器,向流水线写入对应类型的消息实例如httpRequest
,Netty
就能自动将消息实例编码成HTTP
报文对应的字节数据发送给网络;对方也能通过相应协议编解码器将报文数据解析成消息实例按消息要求执行相应业务
概念
HttpServerCodec
是Netty
提供的HTTP
协议服务端编解码器,HttpServerCodec
继承自CombinedChannelDuplexHandler
,CombinedChannelDuplexHandler
组合了其泛型中的请求解码器HttpRequestDecoder
和HttpResponseEncoder
响应编码器,Netty
中类似以Codec
结尾的类就说明同时包含解码和编码功能
HttpServerCodec
是双向处理器
作为入站处理器时请求解码器HttpRequestDecoder
生效将通信请求数据解码成HttpRequest
和HttpContent
两种消息类型,分别表示请求头、请求行和请求体;在服务端编解码器后面的流水线需要区分消息类型选择性地进行处理;例如对于GET
请求只需要关心请求头无需关注请求头,可以自定义只处理HttpRequest
类型消息的入站处理器new SimpleChannelInboundHandler<HttpRequest>(){}
重写channelRead0(ctx,msg)
方法,遇到HttpContent
类型消息该处理器自动跳过不执行
作为出站处理器时响应编码器HttpResponseEncoder
生效将HttpResponse
的默认实现DefaultFullHttpResponse
响应对象编码成符合HTTP
协议格式的字节数据响应给客户端,DefaultFullHttpResponse
实例通过构造方法new DefaultFullHttpResponse(HttpVersion version,HttpResponseStatus status,ByteBuf content,boolean validateHeaders)
传参HTTP
协议版本、响应状态码、响应体内容对应ByteBuf
创建,通过调用ctx.writeAndFlush(defaultFullHttpResponse)
就能将响应写回给客户端
此外在写回defaultFullHttpResponse
还需要做一些必要配置,比如通过defaultFullHttpResponse.headers().setInt(HttpHeaderNames.CONTENT_LENGTH,响应体的字节长度)
设置响应头中的content-length
属性即设置响应体的内容长度,没有这项配置浏览器不知道何时接收完所有响应数据浏览器的加载图标会一直转圈等待接收更多的响应数据
使用Netty
启动一个服务端在通道流水线添加一个Http
服务端编解码器HttpServerCodec
就能直接实现了一个能被浏览器访问的极简服务器,使用浏览器给相应主机和端口发送请求给服务端就能直接获取服务端的响应并渲染响应体
浏览器在发起请求获取服务端响应后还会发起一个GET
类型的/favicon.ico
请求获取站点的图标
协议结构
魔数:用消息的头几个字节表明协议类型,比如Java
字节码文件头四个字节为cafebabe
版本号:有版本号能支持协议升级
序列化算法:对象和二进制流互转转换的方法
常用序列化算法有json
、JDK
原生序列化算法、谷歌的protobuf
、hessian
正文长度:界定消息的边界,方便解决TCP
网络通信中的黏包半包问题
消息正文:消息内容本身,消息对象序列化为二进制流的实际内容
消息类型:用来标识消息属于哪一项具体业务
请求序号:消息的唯一标识,请求序号能给全双工通信提供异步能力,没有请求序号一条消息收发完成后才能发送下一条
使用Netty
自定义协议的流程
定义一个可以序列化的消息抽象父类,提供所有消息共有的域信息如协议版本号、序列化算法、正文长度、消息类型,请求序号等,消息正文;根据具体消息类型定义各种消息子实现继承自该抽象父类
通过继承Netty
提供的ByteToMessageCodec<T>
消息编解码器自定义一个消息编解码器
通过实现decode
方法自定义ByteBuf
按照自定义通信协议字节数据解析为消息对象,专业的通信协议会让消息正文前的固定字节数为2
次幂,使用特定字符用于填充不足部分,消息正文前的部分消息私有的数据已经被封装在消息正文中用于记录消息状态,在消息正文前额外放一份是因为这部分数据解析方便,在不处理消息正文的情况下就能确定消息结构、消息正文的序列化算法、长度等信息;从ByteBuf
中读取消息正文部分,根据序列化算法类型调用指定序列化算法将字节数组序列化成消息对象,将消息对象添加到decode
方法的入参list
集合中
实现其中的encode
方法自定义消息正文对象转换为最终ByteBuf
的逻辑,通过消息对象获取消息私有的信息确定协议版本、采用的序列化算法、消息正文长度写入encode
方法的入参ByteBuf
;按照通信协议组织消息正文前面的数据,根据序列化算法类型调用指定序列化算法将消息对象序列化为字节数组作为消息正文追加到ByteBuf
ByteToMessageCodec<T>
在设计的时候被认为用户自定义消息编解码器需要处理黏包半包现象涉及到阻塞保存消息状态,即使我们可以在消息编解码器前先添加一个解码器如帧解码器拆分出单条消息避免在消息解码器中处理黏包半包现象,ByteToMessageCodec
在构造方法中会仍会调用ensureNotSharable()
方法确保子实现类上不能添加@Sharable
注解,如果添加了@Sharable
注解子类调用构造方法实例化时就会报错
如果一定要在自定义消息编解码器上添加@Sharable
注解,可以让编解码器继承MessageToMessageCodec<INBOUND_IN, OUTBOUND_IN>
,Netty
定义MessageToMessageCodec<INBOUND_IN, OUTBOUND_IN>
是为了让用户明确这是消息到消息的转换,使用自定义编解码器处理消息前就已经解决了消息的粘包半包问题不会涉及到编解码器保持消息状态的情况,其中INBOUND_IN
是传入当前处理器的消息类型,OUTBOUND_IN
是处理后要传递给后续流水线的消息,该抽象父类的子实现类对应的也需要实现encode
方法和decode
方法,通过继承MessageToMessageCodec<INBOUND_IN, OUTBOUND_IN>
实现的自定义编解码器上可以添加@Sharable
注解,这只是一种技法
将消息编解码器添加到通道流水线即可自动按照自定义通信协议在出站时将消息类型实例转换成ByteBuf
传递给下一个出站处理器,入站时将ByteBuf
转换成消息类型实例传递给下一个入站处理器
优势
通用协议一般为了跨平台设计的比较复杂,对于简单场景通过自定义协议能让消息更紧凑,节省带宽提高传输解析效率
概念:将对象序列化为字节数组用于网络传输和数据持久化;反序列化指将从网络、磁盘中读取的字节数组还原成对象
Java
中自带的通过实现Serializable
接口通过对象输入输出流实现序列化,性能和安全性不好一般不会被使用;比较知名的有专门针对Java
语言的Kryo
、FST
,跨语言的Protostuff
、ProtoBuf
、Thrift
、Avro
、MsgPack
等序列化方式性能都非常好,一般使用Protostuff
、Hessian2
、json
序列方式比较多
开发者可以自定义一个序列化工具类,提供多种序列化算法,用户通过工具类获取基于不同序列化算法的序列化器提供对应序列化和反序列化方法
定义一个接口提供将指定类型对象转换为字节数组的序列化方法serialize
和将字节数组转换为指定类型对象的反序列方法deserialize
创建一个实现该接口的枚举类,枚举类中声明每个枚举值都要重写接口提供的序列化方法和反序列方法,可以使用序列化算法的名字作为枚举值,重写方法时调用对应API
将字节数组转换成对象或者将对象转换成字节数组并返回
可以通过枚举值实例调用相应的序列化和反序列化方法返回对象实例或者字节数组;
枚举值有一个ordinal()
方法,可以根据枚举值的次序依次返回0,1,2...
,可以通过用户设置的枚举值的ordinal()
方法获取枚举值对应次序数字作为序列化算法标识写入通信消息;
枚举值还提供一个values()
方法,以数组的形式返回所有枚举值实例,可以调用该方法获取到枚举值数组通过解析网络消息读出序列化算法标识从数组对应索引处取出相应枚举值实例,通过枚举值实例调用序列化方法将字节数组序列化为对象实例;
枚举值还提供一个ValueOf(String name)
方法,可以通过传参枚举值名称获取对应枚举实例,可以通过读取用户在配置文件配置的序列化方式获取相应枚举值,再通过枚举值调用相应的序列化和反序列化方法
业务设计
用户使用用户名、密码和短信验证码登录,登录成功保存将用户信息保存到session
绑定用户ID
和客户端通道,服务端能根据用户ID
获取客户端通道、根据客户端通道获取用户ID
进而获取用户信息
聊天业务:创建聊天组、添加用户到聊天组、将用户移出聊天组、删除聊天组、根据聊天组名称获取聊天组所有成员、根据聊天组名获取所有在线成员的客户端通道
流水线设计:服务端和客户端流水线上依次添加帧处理器处理可能存在的粘包半包消息,记录日志的处理器LoggingHandler
,消息编解码器将ByteBuf
按照自定义协议转换成Message
对象,处理用户登录状态的处理器,
这里用日志处理器作为客户端的消息处理器,即直接打印客户端接收到的消息,没有设置客户端对接收消息的额外处理
服务端根据业务类型额外添加单聊消息处理器、群聊创建消息处理器
客户端发送不同类型消息实现
创建聊天消息接口封装消息发送者、消息正文;创建一系列子实现如封装用户名、密码的登录请求消息;封装聊天组ID
的群发信息、获取聊天组成员、加入聊天组、退出聊天组的消息类;封装聊天组名称和初始用户列表的创建聊天组消息;
客户端消息发送将对应消息类型的出站数据直接写出到流水线经过自定义消息编解码器根据自定义协议处理成ByteBuf
,序列化使用的是JDK
自带的序列化,能够自动判断消息类型
这种序列化方式在服务端某些simpleChannelInboundHandler
处理器只处理特定类型消息的时候很好用,消息编解码器将ByteBuf
解码成父接口类型的消息,simpleChannelInboundHandler
能自动判断消息的实际类型从而进行消息过滤
登录业务实现
给客户端流水线添加一个自定义处理器channelInboundHandlerAdapter
重写其channelActive
方法在连接建立成功时客户端的事件循环使用异步线程监听用户输入扫描器输入用户名和密码,由异步线程向服务端写出登录类型消息,异步线程调用countDownLatch.await
阻塞等待服务端响应
使用新线程的原因是避免监听用户输入阻塞客户端的事件循环线程与服务端的通信
同一个channelInboundHandlerAdapter
重写其channelRead
方法处理服务端的登录响应,服务端的响应数据作为入站数据由事件循环线程处理,解析服务端响应数据,判断用户登录状态,创建一个线程共享的原子布尔类型变量用于多线程共享用户登录状态,如果用户登录成功将登录标识改为true
,使用CountDownLatch.countDown
唤醒阻塞等待服务端处理登录数据的异步线程,异步线程通过登录标识判断登录状态;登录失败调用ctx.channel().close()
然后结束异步线程的执行;登录成功异步线程进入死循环,为用户提供服务菜单阻塞监听用户的输入扫描器,封装用户输入信息向服务端写出各种类型的消息
客户端连接关闭后在主线程调用的channel.closeFuture().sync()
会结束阻塞,主线程调用finally
块中的eventLoopGroup.shutdownGracefully()
结束所有事件循环组的执行,主线程结束执行,所有用户线程结束执行JVM
进程结束运行
服务端流水线添加一个simpleChannelInboundHandler
的子实现重写channelRead0
方法用于只处理客户端通道上的登录消息,登录成功或者失败都封装登录响应类型的消息向客户端写出,登录成功将用户ID
和相应客户端通道保存到本地缓存
单聊业务实现
客户端监听输入扫描器,监听到用户发送单聊消息直接向流水线写出封装单聊信息的对象实例经过编解码器写出到服务端
服务端添加一个simpleChannelInboundHandler
入站单聊消息处理器根据目标用户ID
的客户端通道将消息写出,如果用户没上线将消息持久化到数据库
创建聊天组业务实现
添加simpleChannelInboundHandler
处理群聊创建消息,根据群聊名称和初始成员创建群聊,向消息客户端发送群聊创建状态消息,群聊创建成功向所有初始群聊用户发送拉群成功消息
群聊业务实现
客户端监听输入扫描器,监听到用户发送群聊消息直接向流水线写出封装群聊消息的对象实例
服务端添加一个simpleChannelInboundHandler
入站群聊消息处理器,根据群聊ID
获取所有群聊用户,向所有在线用户的通道发送消息,未上线用户将消息持久化到数据库等待用户上线
客户端退出业务处理
客户端主动关闭连接正常退出会触发服务端对应客户端通道的inactive
事件、异常退出对应事件循环线程会抛出异常
服务端添加一个ChannelInboundHandlerAdapter
入站处理器,重写channelInactive()
方法在客户端正常退出的情况下执行该方法从会话管理器中移除用户和相应客户端通道,重写exceptionCaught
方法在客户端异常退出的情况下捕获异常从会话管理器中移除用户和相应客户端通道
连接通道假死
连接假死指服务端应用程序无法得知哪些TCP
连接因为网卡、网线故障、机房断电、防火墙或者路由器因为连接长时间空闲自动断开连接等原因断开;服务端仍然会保持连接的相关资源,服务端的最大连接数一般是限制死的,假死连接不释放会降低系统处理客户端消息的性能
假死状态检测
Netty
提供检测假死状态的处理器IdleStateHandler
,该处理器通过判断读或者写之后的空闲时间太长怀疑连接可能假死,IdleStateHandler
构造时可以指定三个入参,读入数据的空闲超时时间、写出数据的空闲超时时间,读写数据的空闲超时时间;超过指定超时时间没有读写事件会触发相应的读写超时事件,READER_IDLE
、WRITER_IDLE
通常由入站处理器和出站处理器处理,ALL_IDLE
既可以由入站处理器处理也可以由出站处理器处理;空闲超时时间指定为0
表示不再检测对应的读写空闲时间
IdleStateHandler
只是提供检测空闲超时时间触发相应读写空闲事件,对超时事件的处理需要用户自定义入站和出站处理器,各种处理器的userEventTriggered
方法用于自动处理用户自定义的事件或者IdleState
下的各种事件,入参object evt
就是事件本身,空闲超时事件的实际类型是IdleStateEvent
,通过idleStateEvent.state()
能获取空闲超时事件的类型;使用方法是实例化一个IdleStateHandler
并将其添加到通道流水线
处理连接通道假死
触发空闲超时事件服务端不会直接调用channel.close()
关闭客户端连接,这样很容易导致客户端异常闪退误伤正常客户端
常用的做法是客户端设置IdleStateHandler
的写空闲超时时间,服务端设置一个读空闲超时时间,客户端添加一个出站处理器重写userEventTriggered
方法处理该写空闲超时事件向服务端发送心跳数据包,客户端的写空闲超时时间一般设置成服务端读空闲超时时间的一半,服务端一旦触发读空闲超时事件就说明客户端连接肯定假死
心跳机制:连接通道一段时间内连接处于空闲状态,客户端或服务端会发送一个特定的数据包给对方,接收方接收到数据报文后立即发送一个特定的数据报文回应发送方形成一个PING-PONG
交互,通过心跳机制让通信双方都知道对方仍然在线
操作系统实现了TCP
协议中的keepalive
方法,但是读写空闲时间长达两小时,默认该功能是关闭的,配置TCP
的SO_KEEPALIVE
选项能使用TCP
自带的心跳机制,想要对心跳机制进行自定义还要修改操作系统层面的代码非常不灵活,开发者一般会在Netty
层面通过编码实现自定义心跳机制
最好在客户端也设置一个读空闲超时时间监测服务端写回的心跳数据,因为一般断线重连都由客户端在监测到连接中断的情况下发起重连尝试,如果客户端在读空闲超时时间内没有收到服务端的心跳数据包说明连接已经断开
概念
网络通信由于网络不稳定、服务器宕机重启、客户端网络环境变化、系统允许的最大文件描述符数达到上限、客户端或者服务端内存不足等等原因随时可能导致服务端和客户端的网路连接断开,需要设计一种在监测到网络连接断开后在客户端和服务端重新建立连接的重连机制
Netty
中客户端和服务端调用channel.close()
方法正常关闭连接时会触发双方的channelInactve
事件,能被Netty
检测到的网络异常导致的连接中断也会触发channelInactive
事件;但是在部分情况下比如客户端、服务端进程被强制终止,网络分区故障,系统资源如文件描述符耗尽等情况下Netty
可能无法检测到连接的中断不会触发channelInactive
事件;因此仅在channelInactive
编写重连逻辑很容易出现无法触发重连的情况,并且一般都会使用心跳检测机制保证切实监测到连接断开状态;一般有两种触发重连逻辑的方式,第一种是当心跳检测机制发现连接断开的情况下触发客户端的读空闲超时事件,客户端对该事件进行处理时通过channelHandlerContext.fireExceptionCaught(throwable)
抛出连接中断异常,在exceptionCaught()
方法中对异常进行处理时调用channel.close()
触发通道的channelInactive
事件执行channelInactive
方法中的重连逻辑,第二种是客户端处理读空闲超时事件时抛出连接中断异常,在异常处理方法中调用重连方法
定义重连策略封装最大重试次数、重连间隔、连接超时时间等重连参数
重连逻辑
调用channel.close()
关闭重连失败的通道
调用reconnect()
方法,在重连方法中判断本次重连是否被重连策略允许,如果不允许抛异常关闭连接打印日志,如果允许重连调用bootstrap.connect()
方法发起重连,为异步连接任务返回的ChannelFuture
添加监听器channelFutureListener
,实现监听器的operationCompelete()
方法在连接建立任务完成时通过channelFuture.isSuccess()
方法判断连接是否成功建立,如果连接成功建立打印日志结束方法执行,如果连接失败通过eventloop.schedule()
让事件循环在指定时间延迟后再次调用reconnect()
方法
TCP
三次握手概念介绍
半连接队列:服务端将尚未完成三次握手的连接信息存入半连接队列
全连接队列:服务端将已经完成三次握手的连接信息存入全连接队列,全连接队列的容量决定了服务端的最大连接数,一旦客户端连接数超过全连接队列容量,服务端会向客户端发送一个拒绝连接的错误信息,客户端随后会抛出ConnectException
三次握手流程
服务端调用bind()
方法绑定通信端口
服务端调用listen()
方法监听连接请求
客户端调用connect()
方法发起一个连接请求数据包SYN
,客户端的状态变成SYN_SEND
状态,
这是第一次握手
服务端接收到SYN
数据包表明客户端发送消息的能力正常,服务端将数据包封装成一个连接信息对象存入半连接队列,服务端状态变成SYN_RCVD
状态
服务端向客户端响应SYN
数据包和ACK
应答数据包
这是第二次握手
客户端接收到SYN+ACK
数据包表明服务端收发消息的能力正常,客户端状态变成ESTABLISHED
状态
客户端向服务端响应ACK
数据包表明客户端接收服务端消息的能力正常,服务端状态变成ESTABLISHED
状态
这是第三次握手
服务端将连接信息转移到全连接队列
服务端调用accept()
方法从全连接队列中获取连接对象进行数据操作,连接信息被accept
处理以后就会从全连接队列中移除,accept()
方法处理能力有限时连接信息会在全连接队列中进行堆积
backlog
参数
linux2.2
前通过backlog
参数同时控制半连接和全连接队列的容量
linux2.2
通过Linux
中的两个系统配置文件来分别配置两个连接队列的容量,全连接队列的配置文件中就只有一个backlog
参数值
NIO
可以通过serverSocketChannel.bind
的重载方法在入参配置backlog
参数,Netty
需要使用serverBootstrap.option(ChannelOption.SO_BACKLOG,1024)
进行配置,生产环境backlog
参数至少都要1024
,同时在系统和NIO
的bind
方法都配置了backlog
会自动使用较小的值;如果使用Netty
,Netty
的backlog
默认参数值在windows
下全连接队列容量为200
,linux
或者mac
下为128
,Netty
会去读取系统配置文件中的backlog
参数,如果该文件存在会直接使用该文件配置
Netty
的accept
方法处理能力是很强的,全连接队列设置的大一些可以避免高峰期队列被快速堆满导致客户端连接被频繁拒绝
linux
中不论是文件还是socket
都是用文件描述符来表示的,默认文件描述符数量是1024
,当进程打开的文件或者通道数量超过该值会报错TooManyOpenFile
,这是为了避免文件或者socket
打开太多伤害系统,做高并发的服务端一定要使用命令ulimit -n 数值
调整该参数,该命令是一个临时生效的命令,一般搭配进程启动命令一起组成启动脚本
RPC
框架搭建概念:
RPC
框架是远程调用框架,功能是能让本地服务程序像调用本地程序一样调用网络上不同计算机上的远程程序
常用的RPC
框架和消息队列底层都使用Netty
作为网络通信框架
方案设计
消息类型设计
远程请求消息类型RPCRequestMessage
:封装远程调用WEB
接口路径、目标方法所在类的全类名、目标方法名、返回值类型、入参类型列表、入参值列表
远程响应消息类型RPCResponseMessage
:封装目标方法的实际返回值和可能发生的异常
流水线组成
帧解码器处理黏包半包
日志处理器loggingHandler
打印消息内容
自定义协议消息编解码器
服务端在此基础上添加一个处理RPC
请求消息的自定义入站处理器,客户端在此基础上添加一个处理RPC
响应消息的自定义入站处理器以及一个代理对象管理器
客户端对RPC
请求和响应的处理逻辑
要实现用户像调用本地方法一样发起远程调用,就要实现将用户调用远程实例方法的行为转换成向远程服务发送远程调用消息的行为,OpenFeign
的解决方案是使用动态代理提供一个代理对象,用户通过代理对象调用远程方法并将对代理方法的调用转换成向远程服务发去远程调用网络通信消息;动态代理对象的获取和调用代理对象方法的线程和Netty
发送网络通信消息的线程不是同一个线程,当用户线程调用channel.writeAndFlush
时使用事件循环线程向服务端写出网络通信消息是由Netty
自动控制的,不是由用户线程显示发起的异步任务,这种情况可以使用Netty
提供的Promise
在两个线程间共享远程调用方法的执行结果
当获取动态代理对象时与服务端建立连接,将连接通道封装到动态代理对象管理器中方便后续通过动态代理对象调用目标方法时使用通道向服务端写出消息;要确保一个客户端和一个服务端只能有一个channel
通道,可以使用双重检查锁来与服务端建立连接,创建和重建channel
准备一个ConcurrentHashMap
以消息唯一标识ID
作为key
,以代理对象处理远程方法调用时通过new DefaultPromise<Object>(channel.eventLoop())
创建的Promise
实例作为值,当事件循环线程获取到执行结果后根据消息ID
获取到Promise
实例并手动设置方法执行的返回值,将该ConcurrentHashMap
存储为客户端处理RPC
响应的自定义入站处理器的公有静态属性,让其同时对用户线程和事件循环线程可见
JDK
动态代理有一个Proxy.newProxyInstance(classLoader,interfaces,invocationHandler)
方法实例化代理对象,参数classLoader
指代理类的类加载器,通常使用目标类或者接口的类加载器,参数interfaces
指代理类需要实现的接口列表,参数invocationHandler
是一个函数式接口,其中的invoke(proxy,method,args)
方法包含三个参数,proxy
是当前代理对象本身,一般该参数很少用到,method
是被代理对象调用的方法对象,包含方法名、参数类型列表、返回值类型信息,args
是代理对象调用方法时传递的实际参数值列表;当代理对象调用目标方法时,代理对象实际执行的是用户通过invocationHandler
自定义的invoke
方法,并将用户的实际传参以及被调用方法的方法信息作为参数传递给invoke
方法,将invoke
方法的返回值作为代理对象调用目标方法的返回值;我们可以在创建代理对象时获取被代理接口进而获取接口全类名,在代理对象的invoke
方法中将被调用方法的信息、代理对象所属接口以及用户的实际传参封装成远程调用请求消息对象,创建一个空Promise
对象,将消息ID
和Promise
对象存入自定义响应处理入站处理器的ConcurrentHashMap
中,调用channel.writeAndFlush()
将消息对象写出到服务端,调用promise.await()
阻塞执行invoke
方法的线程直到响应数据被事件循环线程写入到Promise
对象,如果Promise.isSuccess()
为true
通过promise.getNow()
获取远程方法的执行返回值并返回给用户,否则通过throw new RuntimeException(promise.cause())
抛出异常
xxxxxxxxxx
/**
* @param serviceClass
* @return {@link T }
* @author Earl
* @version 1.0.0
* @创建日期 2025/02/04
* @since 1.0.0
*/
public static <T> T getProxyService(Class<T> serviceClass){
ClassLoader classLoader = serviceClass.getClassLoader();
Class<?>[] interfaces = new Class[]{serviceClass};
Object proxyObject =Proxy.newProxyInstance(classLoader,interfaces,(proxy,method,args) -> {
//1. 将用户对代理接口方法的调用转换成对远程调用消息的封装
RPCRequestMessage msg = new RPCRequestMessage(
SequenceIdGenerator.nextId(),
serviceClass.getName(),
method.getName(),
method.getReturnType(),
method.getParameterTypes(),
args
) ;
//2. 将消息发送出去
getChannel().writeAndFlush(msg);
//3. 准备一个空Promise对象准备接收远程调用响应结果
DefaultPromise<?> promise = new DefaultPromise<>(getChannel().eventLoop());
RPCResponseMessageHandler.PROMISES.put(sequenceId,promise);
//4. 阻塞当前线程等待promise被设置结果
promise.await();
//5. 对promise中结果的处理
if(promise.isSuccess()){
return promise.getNow();
}else{
throw new RuntimeException(promise.cause());
}
});
return (T) proxyObject;
}
客户端处理响应的自定义入站处理器设计
根据消息id
从concurrentHashMap
中获取代理对象发送消息时存入的对应Promise
对象,当promise
对象不为空时,解析响应数据,响应正常通过promise.setSuccess()
将返回值写入promise
,响应异常通过promise.setFailure()
将异常信息设置到promise
中
服务端对客户端RPC
请求的处理逻辑
根据接口全类名从Spring
容器或者通过Class.forname("全类名")
获取到class
类对象,服务端按照策略选择执行对应方法的同类型对象实例如service
,通过class.getMethod
通过方法名和参数类型列表获取方法对象method
,通过方法对象的invoke
方法指定执行该方法的对象service
和参数值列表调用目标方法获取方法执行结果
将执行结果或者异常信息封装到RPC
响应消息实例中通过channel.writeAndFlush
或者channelHandlerContext.writeAndFlush
写出到客户端
其他问题
服务端通过反射调用方法时出现异常最后会抛出InvocationTargetException
,该异常不会封装最初异常的message
,需要调用InvocationTargetException.getcause().getMessage()
才能获取到最原始的异常message
服务端不要直接将异常对象封装到响应消息中直接返回给客户端,异常信息包含堆栈信息,随便就是上万字节,如果客户端设置了单条消息的最大长度,发生异常很容易因为数据量超出帧解码器设置的最大值直接报错,一般向客户端直接返回exception.getMessage()
或者InvocationTargetException.getcause().getMessage()
即可
select
、poll
和epoll
在Linux
中文件和网络连接在内核中都是以文件描述符的形式存在,实现一个单线程的网络通信服务器的方案被设计为单个线程遍历所有文件描述符,检查和处理每个文件描述符上的数据,如果文件描述符上没有数据就继续遍历下一个文件描述符,当遍历完所有文件描述符后死循环重复上述过程,在此基础上先后发展出了select
、poll
和epoll
三个函数
select
函数逻辑
用户调用select
函数的流程
select(max+1,&rset,NULL,NULL,NULL)
:用户在调用linux
提供的select
函数监听文件描述符上的事件前需要在用户空间准备被监听文件描述符索引集合和最大描述符索引,select
函数的入参依次为最大文件描述符索引+1
,读文件描述符索引集合、写文件描述符索引集合、异常文件描述符索引集合、超时时间;超时时间传NULL
表示使用默认超时时间;读文件描述符索引集合&rset
是表征被监听文件描述符的位图,&rset
在select
函数中的默认大小为1024
,对应linux
的默认文件描述符数量为1024
,位图中需要被监听的文件描述符索引对应位上的字节被置为1
,未被监听的被置为0
;
调用select
函数期间&rset
会由用户态拷贝到内核态,内核根据&rset
中max+1
范围内被监听的文件描述符索引判断对应文件描述符是否有数据读入,如果所有被监听文件描述符上没有数据写入,程序会一直阻塞在select
函数上,如果内核监听到文件描述符上的数据,内核会将有数据的文件描述符在&rset
中对应索引的位标识为1
表示有数据读入,将未准备好的描述符位置为0
,停止select
函数的阻塞返回有数据读入的文件描述符个数并切回用户态继续执行
用户程序根据返回的&rset
遍历所有文件描述符检查对应文件描述符是否有数据并调用read(fds[i],buffer,MAXBUF)
切换到内核态读取相应文件描述符上的数据再切回用户态进行对应数据的处理
遍历完所有有数据的文件描述符,用户程序初始化&rset
并继续调用select
函数让内核继续监听指定文件描述符上的事件
小结:用户将被监听的文件描述符收集整理成一个位图,通过系统提供的select
函数将位图交给内核让内核帮忙阻塞判断其中的一个或者多个文件描述符上有数据,当其中一个或者多个文件描述符上有数据select
函数会被返回并且有数据的文件描述符对应的位图标识位会被置位为1
,没有数据的文件描述符位图标识全部被清0
,用户根据位图遍历文件描述符集合判断和处理有数据的文件描述符
select
函数让用户将感兴趣的多个文件描述符交给内核让内核来监听这些文件描述符上的IO
事件来避免用户线程一个线程阻塞监听一个文件描述符上的IO
事件,通过select
函数提升用户程序的性能
缺陷:
位图的默认大小为1024
,可以调整该大小但是始终有上限
&rset
位图每次调用select
函数都要重新准备,一方面是内核会修改&rest
,另一方面应用期望监听的文件描述符会发生变化
&rest
拷贝到内核态开销较大,虽然比用户每次单独切换内核态判断一个文件描述符的效率高,但仍然有较大开销
用户通过select
函数返回只知道有文件描述符有数据读入,但是不知道是具体哪一个文件描述符,需要再遍历一遍所有文件描述符的状态,判断和处理有数据的文件描述符,时间复杂度为O(n)
select
函数是在1983
年的Unix
中引入的,到现在还有很多系统在使用,linux
在后续推出了poll
函数
poll
函数逻辑
用户调用poll
函数的流程
poll(pollfds,5,50000)
:pollfd
是poll
库中自带的结构体,pollfd
结构中有三个字段,fd
字段为socket
连接对应文件描述符索引,events
字段为short
类型的文件描述符上关注事件标识值,如果要关注多个事件该字段值为对应事件标志值的与运算结果,revents
字段为short
类型的事件回馈,初始值始终为0
,由内核修改;在调用poll
函数前,需要先准备pollfd
数组,设置各个文件描述符上关注的事件;poll
函数需要传参pollfd
数组,数组长度以及超时时间,poll
函数的原理和select
函数一样,将要监听的文件描述符一次性交给内核,只是使用pollfd
结构替换了select
函数中使用的位图;
poll
函数也是一个阻塞函数,pollfd
数组被拷贝到内核中会被组织成链表,当内核发现一个或者多个文件描述符上有事件会将字段revents
置为成对应事件的标识值,结束poll
函数的阻塞;用户程序遍历pollfd
数组,检查revents
的事件类型并进行相应事件的处理,处理后将revents
重置为0
,再次调用poll
函数
小结:
优势:
pollfd
数组的大小没有限制
内核操作pollfd
中的revents
字段,不会像位图一样增删数组元素本身,数组pollfds
可以复用
缺点:
poll
函数原理上和select
函数是一样的,因此也存在需要将被监听的文件描述符信息拷贝传递给内核以及函数返回后需要对每个文件描述符的状态进行遍历
epoll
函数逻辑
用户调用epoll
函数的流程
epoll
函数中有epoll_create
、epoll_ctl
、epoll_wait
三个重要的函数,在调用epoll_wait(epfd,events,5,10000)
前要使用epoll_create
和epoll_ctl
准备一个包含文件描述符和对应关注事件的epfd
结构;epoll_create()
创建一个epfd
白板;epoll_ctl(epfd,EPOLL_CTL_ADD,ev.data,&ev)
函数在白板epfd
上添加被监听的文件描述符信息,第二个参数EPOLL_CTL_ADD
表示正在向epfd
添加一个epoll_event
结构体,该结构体包含字段fd
文件描述符索引和events
文件描述符关注的事件,没有实际就绪事件标识revents
,epoll
函数在每次调用epoll_ctl
函数注册新的事件到epoll
句柄时会将fd
拷贝进内核保证每个文件描述符整个过程只会在内核态和用户态之间拷贝一次,不像select
和poll
一样每次调用select
或者poll
方法都会拷贝一次文件描述符;
调用epoll_wait
时,被监听的文件描述符上没有数据epoll
函数仍然会阻塞,当有一个或者多个文件描述符上有数据会将就绪的描述符加入到一个链表中进行管理,把有事件发生的文件描述符排在epfd
结构的最前面,并返回事件发生的文件描述符数量,返回后用户只需要处理有事件发生的文件描述符
触发模式
LT
水平触发:epoll_await()
检测到描述符事件到达时将事件通知到进程,进程可以不立即处理,下次进程再次调用epoll_await()
时会再次通知进程,默认水平触发,同时支持BIO
和NIO
ET
边缘触发:epoll_await()
检测到描述符事件到达时将事件通知到进程,进程必须立即处理该事件,没处理下次调用epoll_await()
时也不会再得到该事件到达的通知,能减少epoll
事件被重复触发的次数,效率比水平触发模式高,只支持NIO
模式
小结
epoll
使用的红黑树容量没有限制,内核通过重排文件描述符的方式标记发生事件的文件描述符,文件描述符在整个生命周期中只会发生一次从用户态到内核态的拷贝节省性能,epoll_await()
方法返回后用户只需要处理有事件发生的文件描述符,时间复杂度为O(1)
而不需要遍历所有文件描述符
现在epoll
和poll
用的比较多,redis
和nginx
都用的epoll
,Java
的NIO
在Linux
平台下也是使用的epoll