Netty


Author
|
Earl
Describe
|
参考JavaGuide[v5.0]、个人Netty技术文档、满一航老师的Netty课程、黑马的IO通信模式课程
Last Update
|
2025-6-07

 

 

1-I/O模式

1.1-BIO概述

  1. BIO

    • 概念:Blocking IO,基于流的同步阻塞式IO通信模式,一个线程处理一个客户端连接的全部通信事件,用于监听新客户端的接入事件的socket.accept方法以及监听已连接客户端的可读事件的socket.getInputStream().read(byte[]);方法都会阻塞当前线程直到有接入事件或者可读事件发生,因此一个线程不能同时处理新客户端的接入事件和已连接客户端的可读事件,一个线程也不能同时处理多个已连接客户端的可读事件,JDK1.4以前BIOJava网络通信的唯一选择

      • Java中的各种IO流都是BIO通信模式

      • 服务端通过一个线程循环调用Socket socket=serverSocket.accept()阻塞等待新客户端的socket连接接入,新客户端接入时获取到socket对象后,将socket封装成任务对象中交给一个单独的线程进行处理,线程获取封装在socket中的输入流并循环读取处理流中的数据如socket.getInputStream().read(byte[]);,循环调用socket.getInputStream().read(byte[]);直到没有数据可以读取然后阻塞在read方法上,后续只要socket连接没有断开该连接通道的每次可读事件都在该循环中被处理;要向另一端写出数据只需要调用socket.getOutputStream().write(byte[])即可

        • 服务端和客户端都可以使用特定的流对socket中的InputStreamOutputStream进行包装以适应文件上传、聊天消息、远程调用等各种场景

      • 客户端发送完数据需要调用socket.shutdownOutput()通知服务端连接已断开,即使缓冲区byte[]数据没有读满也无需继续阻塞等待在socket.getInputStream().read(byte[]);,如果没有调用该方法通知服务端连接已断开,当连接断开时服务端会直接抛出异常,如果对异常处理的不好可能会导致最后一次缓冲区的数据丢失,像文件上传这种场景就可能导致文件数据不完整

      • 客户端与客户端的通信流程:服务端通过new ServerSocket()指定服务端socket通信端口创建serverSocket实例,通过Socket socket=serverSocket.accept()等待建立客户端连接并获取对应socket对象,将socket对象存入集合容器并将socket封装到任务对象交给单独的线程处理,该线程获取到socket对象中的输入流读取通信数据,从socket集合中获取到目标客户端与服务端连接通道对应的socket对象,通过socket对象的输出流将消息发送给一个或多个目标客户端,客户端下线了,与服务端的socket连接断开,服务端收到客户端下线通知或者没有收到客户端下线通知抛出异常时,服务端从集合中移除对应的socket对象

    • 特点:

      • 程序编写简单

      • 每个socket连接请求都要创建一个线程专门处理,对服务器性能要求高;

      • 客户端的并发访问突增会导致服务端的线程开销同比例突增,而且这些线程全部是同步阻塞式线程,如果客户端没有新的通信事件负责当前客户端通信事件处理的线程会一直阻塞在read方法上等待客户端数据无法执行其他任务,不控制客户端数量很容易导致线程栈溢出,因此BIO只适用于连接数小且固定的场景,早期的服务器使用BIO一般设计为线程池配合短连接的方案

 

1.2-NIO概述

  1. NIO

    • 概念:

      • non-blocking IO,基于通道、缓冲区和选择器的同步非阻塞式IO通信模式,JDK1.4引入;NIO的核心组件包括Seletor选择器、Channel通道、Buffer缓冲区,相关类在java.nio包下;

      • NIO阻塞模式

        • NIO阻塞模式下,serverSocketChannelaccept方法在没有检测到新客户端连接请求时会阻塞当前线程,socketChannelread方法在没有检测客户端的写数据请求时也会阻塞当前线程

        • 阻塞模式下NIO也只能像传统BIO通信模式一样,一个线程负责处理客户端接入请求,一个线程负责一个已连接客户端通道的写数据请求,JVM中一个线程在32位和64位操作系统中默认大小分别为320K1M,连接数过多必然导致虚拟机栈的OOM,线程太多也会因为频繁上下文切换降低系统性能,非阻塞模式配合线程池只适合短连接场景

      • NIO非阻塞模式

        • NIO非阻塞模式下,serverSocketChannelaccept方法在没有检测到新客户端连接请求时不会阻塞当前线程直接返回nullsocketChannelread方法在没有检测客户端的写数据请求时也不会阻塞当前线程直接返回0

        • 选择器必须配合网络通道的非阻塞模式才能实现多路复用让一个线程不停地轮询管理多个通道上的就绪事件

      • 服务端一个线程创建服务端Socket通道ServerSocketChannel以及选择器,将ServerSocketChannel注册到选择器中并指定监听该通道的接入事件,每当有接入事件发生时通过serverSocketChannel.accept()方法获取到已建立连接的客户端通道SocketChannel,将客户端通道注册到同一选择器中并指定监听客户端socket通道的可读事件,从头到尾只有一个线程监听注册在同一个选择器上的单个ServerSocketChannel的接入事件和所有SocketChannel的可读事件,通过选择器的select方法能获取到所有就绪事件的迭代器,遍历所有就绪事件根据事件类型仍使用当前线程轮询处理每一个事件,迭代器的所有事件遍历处理完成后阻塞在selector.select()方法上直到有新的接入或者可读事件发生重复上述对事件的处理过程;

      • 客户端通过SocketChannel.open()方法传参服务端主机地址和端口与服务端建立连接并返回一个非阻塞的SocketChannel通道,通过ByteBuffer可以向该通道写出数据,如果客户端需要监听服务端发送过来的数据需要新起一个线程创建选择器,将当前SocketChannel注册到客户端的选择器中并监听处理该通道上的可读事件即可,必须起新线程的原因是处理用户输入的扫描器和选择器监听可读事件的select方法都会阻塞当前线程

    • 特点:

      • 适合连接数多、流量低的场景,比如聊天服务器、弹幕系统、服务器间的网络通信。流量低是因为自始至终只有一个线程轮询一个选择器上注册的所有通道的就绪事件并依次串行处理,NIO要求每个通信事件的操作时间要尽可能短,如果事件的处理事件太长会长时间占用当前线程,降低本次轮询中后续所有事件的处理效率

      • 生产环境的NIO编程很复杂,非常容易出现BUG,特别是大型系统的研发维护成本非常高,一般都使用对基于NIO的高级网络通信框架Netty来开发网络通信应用

      • NIO的非阻塞体现在,非阻塞模式下线程不会在调用socketChannel.read()方法后socket连接未断开但无事发生期间死等客户端写数据请求

      • 通道和缓冲区可以双向读写数据,区别于IO流要么只能读入数据,要么只能写出数据

      • 提供基于直接内存的直接缓冲区,分配效率低,读写效率高,不受垃圾回收器的管理

 

1.3-NIO核心组件

Buffer

  1. 概念:缓冲区是用于写入和读取数据的数组,提供一组方法让用户方便地访问和操作缓冲区中的数据,缓冲区专门与NIO通道交互,数据可以从通道读取到缓冲区,也可以从缓冲区写出到通道

    • 将缓冲区数据写入通道实际是直接将缓冲区数据写入网络套接字socket,通道不直接与socket交互

    • Buffer的子类:ByteBufferCharBufferShortBufferIntBufferLongBufferFloatBufferDoubleBuffer,可以通过XxxxBuffer.allocate(int capacity)来创建一个指定容量的非直接缓冲区对象,可以通过XxxxBuffer.allocateDirect(int capacity)来创建一个指定容量的直接缓冲区对象;最常用的是抽象类ByteBuffer的子实现类MappedByteBufferDirectByteBufferHeapByteBuffer,其他类型的缓冲区很少使用

    • 缓冲区的属性:

      • capacity:缓冲区容量

      • limit:缓冲区可以被操作数据的容量,写入数据时limit等于缓冲区容量,读取数据时limit等于实际写入的字节数

      • position:下一个要读取或者写入数据位置的索引,索引从0开始

      • mark:调用buffer.mark()方法时会将position的值赋值给mark

      • reset:调用buffer.reset()方法时会将mark的值赋值给position

    • 直接缓冲区:直接缓冲区基于直接内存实现,直接内存可以同时被操作系统和JVM读写,能减少一次数据在用户空间和操作系统空间之间的拷贝,IO性能更高;直接内存的申请和销毁的性能开销很高,而且直接内存不受JVM管理,在NIO中使用操作不当容易导致内存泄漏;Netty采用对象池管理直接缓冲区,降低直接缓冲区的申请频率,使用对象池来申请和销毁直接缓冲区降低内存泄漏的机会;直接内存是本地内存,不会占用JVM内存,不受GC的影响,不会因为内存规整和STW暂停IO过程,适合缓存生命周期很长的大量数据;

    • 非直接缓冲区:非直接缓冲区基于堆内存实现,无论是向网卡或者磁盘写出数据还是从磁盘或者网络写入数据,都需要先将数据从用户空间即堆内存拷贝到操作系统空间即本地内存再由操作系统写入网卡或者本地磁盘,或者由操作系统从磁盘或者网卡写入到本地内存再拷贝到堆内存;而且受GC管理,IO过程会受到内存规整和STW的影响

  2. 特点

    • 每个通道都会发生半包问题,因此缓冲区不能同时被多个通道共用

  3. 常用方法

    • buffer.flip():将缓冲区的position赋值给limit,将position赋值为0将刚写完数据的缓冲区切换为读模式

    • buffer.clear():将position赋值为0,将limit赋值为capacity将缓冲区切换为写模式

    • buffer.compact():将缓冲区positionlimit之间的未读取数据按次序移到缓冲区的开头,将position移到未读数据最后一位的下一位,limit重置为capacity,准备从未读数据的最后一位的下一位写入数据

    • buffer.isDirect():判断缓冲区是否为直接缓冲区

    • buffer.get():该方法及其重载方法能读取position处的单个字节、读取指定索引处的单个字节以及读取指定长度的字节数组,配合mark()reset()方法能重复读取缓冲区的部分数据,而且读取指定索引处的单个字节不会改变position的值

    • buffer.put():该方法及其重载方法能写入position处单个字节、将单个字节写入到指定索引处、写入指定长度的字节数组以及将旧缓冲区数据写入到新缓冲区,且将单个字节写入到指定索引不会改变position的值

    • buffer.remaining():返回positionlimit之间的元素个数即未读取字节数

    • buffer.rewind():将position赋值0,将mark赋值为-1,一般用于重复读取缓冲区

    • buffer.array():将缓冲区转换为字节数组

    • 字符串和byteBuffer相互转换的几种方式

      • ByteBuffer.wrap(string.getBytes()):将字符串转换为缓冲区并返回,缓冲区会被切换为读模式

      • byteBuffer.put(string.getBytes()):使用操作系统默认编码将字符串写入指定byteBuffer对象中,缓冲区不会被切换为读模式

      • StandardCharsets.UTF_8.encode("hello"):使用指定编码格式将字符串转换为缓冲区并返回,缓冲区会被切换为读模式,注意Charset.defaultCharset()可以获取当前操作系统的默认字符集

      • StandardCharsets.UTF_8.decode(byteBuffer).toString():使用指定编码格式将缓冲区未读取的所有字节按照指定编码格式转换为字符串,只有处于读模式的byteBuffer才能调用该方法,写模式下的byteBuffer调用该方法会得到空串

 

Channel

  1. 概念:

    • 文件或者socket套接字的数据传输通道,通道是双向的可以同时读写数据,而且通道可以由操作系统异步地从缓冲读取数据或者向缓冲写入数据,执行结束通过调用回调方法通知应用进行下一步处理

    • 通道可以通过输入流、输出流或者随机读写流的getChannel()方法获取;可以通过SocketChannel.open()指定接收端的主机和IP获取SocketChannel;可以通过ServerSocketChannel.open()绑定本地监听端口获取ServerSocketChannel;可以通过serverSocketChannel.accept()获取成功建立连接的SocketChannel

    • Channel是一个接口,常用子实现类有负责文件读写的FileChannel、做UDP网络通信的DatagramChannel、做TCP网络通信的SocketChannelServerSocketChannel

  2. FileChannel

    • 特点:

      • FileChannel只能工作在阻塞模式下,不能配合选择器一起使用,只有网络通信通道才能工作在非阻塞模式下搭配选择器一起使用

    • 常用方法

      • fileChannel.read():该方法及其重载方法可以从文件通道中将数据读取到字节缓冲区或者字节缓冲区数组

      • fileChannel.write():该方法及其重载方法可以将字节缓冲区或者字节缓冲区数组写出到文件通道

        • 文件通道可以写入的数据大小是不受限制的,缓冲区的数据可以一次性全部写入文件通道;写入文件通道不是直接写入磁盘,而是写入操作系统缓冲,当文件通道关闭时操作系统才会将缓冲中的数据同步到磁盘文件;可以调用channel.force(true)强制立刻将文件内容和元数据写入磁盘文件但是会影响应用性能

      • fileChannel.size():返回文件通道对应文件的字节大小

      • fileChannel.position():该方法及其重载方法获取当前文件读写指针位置或者将读写指针设置为新位置

      • isChannel.transferTo()osChannel.transferFrom():将文件输入通道isChannel对应文件中指定起始位置和指定字节长度的数据拷贝写出到文件输出通道

        • 这种文件复制会利用零拷贝进行优化,而且代码非常简洁

        • 该方法一次调用能复制的最大数据量为2G,超出的数据无法写入新文件,需要多次调用该方法

  3. ServerSocketChannel

    • 特点:做TCP网络通信时客户端与服务端网络数据传输通道,该通道用于动态监听与服务端新建立TCP连接的接入事件并创建socket通道对应的SocketChannel实例

      • SocketChannel类似BIO模式下的SocketServerSocketChannel类似BIO模式下的ServerSocket

    • 常用方法:

      • ServerSocketChannel.open():通过指定服务端通信端口创建用于监听socket连接事件的ServerSocketChannel实例,也可以通过serverSocketChannel.bind()方法绑定服务端通信端口

      • serverSocketChannel.accept():当新客户端接入时获取对应的Socket通道SocketChannel

        • 阻塞模式下客户端没有新的接入请求线程会在该方法上阻塞等待,非阻塞模式下客户端没有新的接入请求会返回null且线程会继续向下运行

      • serverSocketChannel.configureBlocking(false):将服务端socket通道切换为非阻塞模式

      • serverSocketChannel.register():传参选择器、事件监听类型和通道附件将通道注册到指定选择器上,该方法返回SelectionKey表示通道的事件监听对象,服务端监听接入事件、可读可写事件的选择器是同一个

  4. SocketChannel

    • 特点:做TCP网络通信时客户端与服务端之间进行网络数据传输的通道,用于监听客户端和服务端上的可读可写事件

    • 常用方法

      • socketChannel.write(ByteBuffer buf):将数据从缓冲区写出到socket套接字

        • 单次网络数据传输量有上限,操作系统分配的socket缓冲区满了会停止socketChannel.write(byteBuffer)方法的执行并返回已写入缓冲区数据的字节数,开始网络传输写入socket缓冲区的数据,socket缓冲区的大小不固定,范围在2-8M之间浮动,只有socket缓冲区清空了才能继续发送下一条消息

        • socket缓冲区正在向网络写出数据时,即使调用socketChannel.write(byteBuffer)也无法将数据成功写入socket缓冲区,一般NIO都是使用一个线程管理若干通道的读写事件,大量数据网络传输过程中,单个线程死循环调用socketChannel.write(byteBuffer),在socket缓冲区未准备就绪的情况下会形成CAS自旋阻塞的效果导致CPU大量空转;socket缓冲区就绪时因为缓冲区还有可写数据会自动触发对应通道的可写事件

        • 程序可以被优化设计为,当缓冲区数据不能一次性完全写入通道时,让选择器监听对应通道的可写事件,调用selectionKey.attach(byteBuffer)将未写完的缓冲区存储为通道的附件,让当前线程继续处理其他可读事件,待socket缓冲区清空后自动触发可写事件再继续向socket缓冲区写出剩余数据;因为数据量较大,当缓冲区数据全部写出时清空附件让释放缓冲区并取消监听通道的可写事件

      • socketChannel.read(ByteBuffer buf):将数据从socket套接字读入缓冲区,返回实际读取的字节数;

        • 阻塞模式下客户端没有写数据请求线程会在该方法上阻塞等待,非阻塞模式下客户端没有写数据请求该方法会返回0且线程会继续向下运行

      • socketChannel.configureBlocking(false):将socket通道切换为非阻塞模式,防止线程在轮询处理可读事件时将通道数据读取完后阻塞在socketChannel.read()方法上无法继续轮询后续就绪事件

      • socketChannel.close():不管是客户端宕机暴力断开连接,还是在任意一端调用socketChannel.close()正常关闭连接,都会触发被关闭通道的可读事件,暴力断开的可读事件在调用read()方法处理可读事件时直接抛异常,正常断开的可读事件调用read()方法时也无法处理该可读事件并直接返回-1

        • 正常断开和异常断开都一定要调用selectionKey.cancel()取消通道在选择器中的注册,如果不取消通道在选择器中的注册,即使捕获了异常,选择器的select方法也会因为持续监听到无法被处理的可读事件导致当前线程无法被阻塞,再次处理该读事件还是继续抛出异常或者返回-1继续重复上述过程导致死循环

      • socketChannel.open():传参服务端主机地址和端口与服务端建立连接并返回对应socket通道

        • 客户端一般不会涉及到多连接业务,不需要ServerSocketChannel来动态监听客户端的socket接入事件,可以直接通过该方法获取socket通道向服务端写出数据或者将该通道注册到选择器中监听通道上的可读事件接收服务端传输来的数据,由于处理用户输入的扫描器和选择器监听可读事件的select方法都会阻塞当前线程,因此监听可读事件和向服务端写出数据不能使用同一个线程

      • socketChannel.register():传参选择器、事件监听类型和通道附件将通道注册到指定选择器上,通道附件指和通道唯一绑定的对象,该对象的生命周期和通道的生命周期一样长,在非定长消息使用分隔符解决黏包半包问题的场景下可以将可扩容的缓冲区作为通道附件来解决单条消息长度超出缓冲区容量的问题,该方法返回SelectionKey表示通道的事件监听对象

        • 该方法在执行时如果另一个线程阻塞在同一个选择器的select方法上,该方法的执行线程也会一直阻塞到select方法执行结束

 

Selector

  1. 概念:

    • 选择器也叫多路复用器,选择器可以监听在其上注册的所有ServerSocketChannel的接入事件和SocketChannel的可读可写事件,获取所有处于就绪状态的事件,轮询处理所有就绪事件配合NIO非阻塞模式能实现一个线程管理多个通道

      • 多路复用:单个线程配合选择器实现对多个通道事件的监听管理,只有网络通道才有多路复用的概念

    • 选择器必须配合NIO非阻塞模式一起使用,避免轮询处理事件期间线程阻塞在serverSocketChannel.accept()socketChannel.read()方法上

    • 事件类型用SelectionKeyint类型的14816四个常量表示,分别表示可读、可写、连接建立、有连接请求事件;通过数字指定要监听的通道事件类型,如果要监听通道上的多种事件使用位或运算符或者无进位求和运算符连接不同的事件类型

      • 可写事件的就绪条件是缓冲区有数据且socket缓冲区就绪

    • 选择器维护一个集合保存注册在其上的所有通道的监听事件SelectionKey,同时还维护一个集合selectedKeys保存当前就绪的事件SelectionKey,就绪事件会被选择器主动存入集合selectedKeys但是无法由选择器主动移除;

      • 因此事件处理完后必须手动调用事件迭代器的remove方法将事件监听对象SelectionKey从集合selectedKeys中移除,否则下次从选择器获取selectedKeys的迭代器时仍会获取到已经被处理但无事发生的SelectionKey,由于选择器必须配合通道的非阻塞模式一起使用,非阻塞模式下服务端socket通道的accept方法和socket通道的read方法会直接返回null或者0,处理不当很容易抛出空指针异常影响其他就绪事件的处理

    • 打断selector.select方法导致的线程阻塞的场景

      • 注册在选择器上的通道有就绪的连接事件、可读事件、可写事件

      • linux系统下NIO可能发生空轮询BUG,该BUG发生时select方法无法阻塞当前线程

      • 调用selector.wakeup()唤醒阻塞在select()方法的线程

      • 调用selector.close()关闭选择器

      • 阻塞在select方法的线程被其他线程调用interrupt()方法打断

  2. 常用方法

    • Selector.open():实例化一个选择器

    • selector.select():阻塞当前线程直到监听到在选择器上注册的通道有就绪事件,存在没有被处理的事件会导致该方法无法被阻塞,无法被处理的事件可以调用selectionKey.cancel()移除事件对应通道在选择器中的注册

    • selector.select(long timeout):阻塞当前线程直到有就绪的通道事件或者超出指定时间,Netty中使用该方法做事件监控

    • selector.selectNow():检查选择器上是否有事件,该方法不会阻塞当前线程,没有事件会返回0

    • selector.wakeup():唤醒当前阻塞在selector.select()方法上的线程,如果目标线程没有处于阻塞状态,该线程下次调用selector.select()方法时不会进入阻塞状态,效果类似于LockSupport.unPark()

    • selector.selectedKeys().iterator():获取就绪事件集合的迭代器

    • selector.keys():获取选择器上就绪事件SelectionKeySet集合

      • 已经被轮询处理的元素需要手动移除,移除集合元素最好使用迭代器的remove方法

        • 多线程情况下,如果迭代器每次调用next方法获取下一个元素时发现在迭代器外部更新了集合会抛出ConcurrentModification并发修改异常,很多集合都不允许使用迭代器时在迭代器器外部向集合删除或者添加元素,使用迭代器移除集合元素能避免出现并发线程安全问题;

        • 单线程情况下,像基于数组实现的集合如ArrayList使用for循环遍历时不能使用集合自身的remove方法移除元素,使用该方法会引起集合的结构变化导致未遍历元素占用已遍历索引位置导致遍历时部分元素被漏掉;

        • 而且迭代器移除元素的效率更高,因为迭代器维护了指向被移除元素的指针,可以不需要查找元素位置直接删除目标元素,使用集合自身的remove方法移除元素需要先从集合中找到该元素的位置

  3. SelectionKey的常用方法

    • selectionKey.isAcceptable()selectionKey.isReadable()selectionKey.isWriteable():判断事件类型是否为接入事件、可读事件和可写事件

    • selectionKey.cancel():取消事件对应通道在选择器中的注册,客户端连接异常断开会触发无法被处理的可读事件,处理该可读事件会直接抛出异常,捕获异常后如果不调用该方法手动取消事件对应通道在选择器中的注册会导致选择器无法被阻塞在select方法处再次处理该事件继续抛出异常引起死循环导致CPU空转

    • selectionKey.channel():获取当前事件所在的通道

    • selectionKey.interestOps():获取通道监听的事件类型

    • selectionKey.interestOps(SelectionKey.OP_ACCEPT):更新事件所在通道监听的事件类型

    • selectionKey.attachment():获取当前事件所在通道的通道附件

    • selectionKey.attach():为当前事件所在通道重新绑定通道附件

 

Path、Paths&Files

  1. 概念:PathPathsFiles都是java.nio.file包下提供的用于文件和目录操作的工具类

    • Paths.get():该方法及其重载方法传参文件或者目录路径可以返回对应Path路径实例,Files下的方法基本都是通过传参path实例来操作指定目录或者文件

    • Files.copy():将指定目录下的文件拷贝到另一个指定目录,该方法的效率很高甚至和FileChanneltransferTotransferFrom差不多,但是底层实现不同;

      • 此外Files工具类还提供了创建、删除、移动、覆盖目录或者文件;检查指定目录和文件是否存在,指定路径是否是一个文件或者目录的方法

    • Files.walkFileTree(Path start,FileVistor vistor):从指定目录自上而下依次遍历每个目录和文件,第二个入参需要一个FileVistor子接口的匿名实现,比如SimpleFileVisitor<T>()提供了四个方法供用户重写,可以按需求在进入目录前、离开目录后、访问文件时、访问文件失败时执行用户针对当前正在访问目录或者文件的自定义处理逻辑

      • walkFileTree方法的设计是设计模式中访问者模式的体现,FileVistor就是访问者,遍历到具体目标时通过访问者的回调方法执行用户的自定义访问逻辑

      • 通过walkFileTree遍历目录删除文件是危险代码,通过程序删除的文件不会进入回收站,删了就永久没了

 

1.4-NIO线程模型

单线程模型

  1. NIO模式下服务端运行流程

    • 1️⃣:服务端通过ServerSocketChannel ssChannel = ServerSocketChannel.open()以及ssChannel.bind(new InetSocketAddress(9999))获取一个绑定指定端口的服务端Socket通道,当有新客户端可连接事件时通过SocketChannel schannel = ssChannel.accept()可以获取相应的客户端Socket通道

      • 注意通道也可以通过IO流的getChannel方法获取

    • 2️⃣:服务端通过Selector selector = Selector.open()可以创建一个选择器,通过serverSocketChannel.register(selector,SelectionKey.OP_ACCEPT)可以将服务端Socket通道注册到选择器上并指定监听通道上的接入事件或者socketChannel.register(selector,SelectionKey.OP_READ)可以将客户端socket通道注册到选择器上并指定监听通道上的可读事件

      • 注意选择器必须配合ServerSocketChannelSocketChannel的非阻塞模式一起使用,通过serverSocketChannel.configureBlocking(false)socketChannel.configureBlocking(false)将通道切换为非阻塞模式

    • 3️⃣:当selector.select()>0说明选择器已经监听到处于就绪状态的通道事件,通过Iterator<SelectionKey> it = selector.selectedKeys().iterator()能获取到选择器上已注册通道上处于就绪状态的事件的迭代器,对迭代器遍历能拿到每个就绪状态事件SelectionKey,每遍历处理完一个SelectionKey要调用it.remove()移除掉已经被处理的事件否则it.next()仍然会取出已经被处理完的事件,通过it.hasNext()判断当迭代器没有事件可以遍历时进入下一轮循环阻塞在selector.select()上直到所有已注册通道上有新的事件

    • 4️⃣:通过selectionKey.isAcceptable()selectionKey.isReadable()能判断事件是连接事件还是可读事件

      • 如果是接入事件调用SocketChannel schannel = serverSocketChannel.accept()获取刚建立连接的客户端通道,调用socketChannel.configureBlocking(false)将通道切换为非阻塞模式,调用socketChannel.register(selector,SelectionKey.OP_READ)将客户端通道注册在选择器上并指定监听客户端通道上的可读事件,结束接入事件处理遍历下一个事件

      • 如果是可读事件调用SocketChannel sChannel = (SocketChannel) SelectionKey.channel()通过事件SelectionKey拿到发生当前事件的通道,通过ByteBuffer buf = ByteBuffer.allocate()或者ByteBuffer buf = ByteBuffer.allocateDirect()创建缓冲区ByteBuffer,通过socketChannel.read(buf)读取通道中的数据,每个循环将数据从通道读取到缓冲区后调用byteBuffer.flip()将缓冲区切换为读模式准备将数据从缓冲区读出进行后续处理,数据从缓冲区读出后调用byteBuffer.clear()将缓冲区切换为写模式准备再次从通道循环读取未读取数据,当通道没有数据可读时结束可读事件的处理轮询处理迭代器中的下一个事件

    • 5️⃣:不管是客户端宕机暴力断开连接,还是在任意一端调用socketChannel.close()正常关闭连接,都会触发被关闭通道的可读事件,暴力断开的可读事件在调用read()方法处理可读事件时直接抛异常,正常断开的可读事件调用read()方法时也无法处理该可读事件并直接返回-1

      • 正常断开和异常断开都一定要调用selectionKey.cancel()取消通道在选择器中的注册,如果不取消通道在选择器中的注册,即使捕获了异常,选择器的select方法也会因为持续监听到无法被处理的可读事件导致当前线程无法被阻塞,再次处理该读事件还是继续抛出异常或者返回-1继续重复上述过程导致死循环

  2. NIO模式下客户端运行流程

    • 1️⃣:客户端一般不会涉及到多连接业务,不需要ServerSocketChannel来动态监听客户端的socket接入事件,可以直接通过SocketChannel sChannel = SocketChannel.open();方法指定入参服务端主机地址和IP与服务端建立连接并返回客户端socket通道,调用socketChannel.configureBlocking(false)socket通道切换为非阻塞模式,调用ByteBuffer buf = ByteBuffer.allocate(1024)或者ByteBuffer buf = ByteBuffer.allocateDirect(1024)创建缓冲区,配合监听用户输入的扫描器向缓冲区写入通信数据,调用byteBuffer.flip()将缓冲区切换为读模式后调用socketChannel.write(buf)将缓冲中的数据写出到socket通道,调用byteBuf.clear()将缓冲区切换为写模式准备下次向缓冲区写入新的通信数据

    • 2️⃣:如果客户端要监听服务端的发送来的数据需要一个新线程创建选择器Selector,将当前SocketChannel注册到选择器上并指定监听处理该通道上的可读事件,由于处理用户输入的扫描器和选择器监听可读事件的select方法都会阻塞当前线程,因此监听可读事件和向服务端写出数据不能使用同一个线程,但是写出消息和读取消息可以使用同一个socket通道

 

多线程模型

  1. 单线程模型的缺点

    • 只能利用单个核,浪费多核CPU的性能

    • 单个线程轮询处理所有通道上的就绪事件,一旦某个事件处理耗时较长,会影响到其他所有事件的处理效率

  2. NIO多线程模型

    • 原理:一个线程和一个选择器组成一个单元,创建多个这样的单元,将这些单元中的一个设置为Boss角色负责监听所有客户端的接入请求,将其他若干单元设置为Worker角色负责监听处理多组已连接客户端的写数据请求,Boss处理接入事件将已接入socket通道按合适的策略派发注册到Worker的选择器中监听通道的可读事件

      • 最简单的负载均衡策略是将所有Worker存入Worker数组,使用原子整数自增并对Worker总数量取模得到Worker数组索引,将socket通道派发注册到对应索引处的Worker选择器上

      • WorkerBoss实现Runnable接口,重写run方法分别实现轮询处理接入事件和可读事件的代码,在构造器中实例化选择器和对应线程并启动线程执行Run方法,必须保证选择器和线程只能被实例化一次,可以通过设计一个私有的标记字段来控制

    • 问题分析:

      • 多线程场景下,如果由Boss线程将新接入的socket通道注册到Worker选择器上会存在两个问题;Boss线程将新接入的socket通道注册到处于阻塞状态的Worker线程对应的选择器上,Boss线程也会被阻塞在socketChannel.register()方法上;Worker线程启动在第一个socket通道注册到Worker选择器之前,很可能导致Worker选择器还没有注册任何socket通道Worker线程就已经阻塞在选择器的select方法上了,此时Worker线程会因为没有事件可以监听一直阻塞,Boss线程向Worker选择器注册新接入的socket通道时也因为Worker线程阻塞也阻塞在socketChannel.register()方法上,此时Boss线程和Worker线程会一直处于阻塞状态导致程序直接卡死,这种卡死现象和服务器启动后没有客户端请求接入的效果是一样的;

    • 解决方案

      • 方案一:Netty的解决方案是为每个Worker设置一个线程安全的任务队列ConcurrentLinkedQueue<Runnable>,在Boss线程获取到新接入的socketChannel后,由Boss线程将注册该socket通道到指定Worker选择器的操作封装成任务对象添加到Worker任务队列,尝试让Worker线程负责执行新接入socket通道在Worker选择器中的注册的同时将Boss线程中的socketChannel共享到Worker线程;然后Boss线程立即调用该Worker选择器的wakeup()方法,因为任务添加到指定Worker任务队列时Worker线程可能正在阻塞或者正在轮询就绪事件,如果Worker线程正在阻塞立即被唤醒Worker线程,如果Worker线程正在轮询就绪事件下次调用selector.select()方法时不再阻塞,保证Worker线程能以最快的速度执行socket通道注册任务;Worker线程只要被唤醒立即从任务队列弹出任务对象调用runnable.run()手动执行socket通道注册任务

      • 方案二:在Boss线程执行新接入socket通道的注册方法前先调用Worker选择器的wakeup()方法避免Worker线程阻塞在Worker选择器的select方法上,Worker线程被唤醒后还要抢夺CPU时间片,极大概率比通道注册方法执行慢;但是现在基本都是多核CPU,仍然有概率唤醒后在Boss线程执行通道注册方法前再次阻塞在Worker选择器的select方法上;一旦遇上,Worker选择器上如果没有注册任何socket通道,Boss线程和Worker线程会一直阻塞导致程序卡死;Worker选择器上即使有注册的socket通道,Boss线程仍然会因为Worker线程阻塞而阻塞直到Worker选择器监听到新的可读事件

 

1.5-AIO概述

  1. 概念

    • Asynchronous IO,异步非阻塞IO通信模式,也叫NIO2.0,在JDK1.7引入,主要在java.nio.channels包下增加了四个异步通道,通道名就是NIO下四个通道名前面加个AsynchronousAIO模式下缓冲区数据写出到通道或者通道数据读入缓冲区由操作系统异步完成,完成后将执行结果传参给回调函数通知线程做进一步处理;

      • NIOsocketChannel.writesocketChannel.read方法全是同步的,NIO的非阻塞模式只是说read()方法或者accept方法不会在客户端通道无事发生时阻塞当前线程,但是AIO这个IO方法执行过程是异步的

      • 因为IO操作完全交给操作系统异步执行,因此即使是单线程模型数据量大事件处理时间长不会影响其他通道的事件处理

      • AIO的文件通道也支持异步,不像多路复用只有网络通道才支持

  2. 特点:

    • 适合连接数多且连接较长的场景,比如相册服务器

    • AIO在实际应用中不成熟,windowsAIO支持比较好实现了真正的异步IOLinux系统下还是使用多路复用模拟异步,性能上相较于NIO没有优势,Java程序大多都运行在Linux环境下,Netty5也想实现异步IO,最后做出来发现性能没优势还更加复杂,直接把Netty5废弃了

 

1.6-辨析BIONIOAIO

  1. 辨析:

    • BIO:一个线程只能始终处理一个通道上的接入事件或者可读可写事件,基于serverSocket动态获取新客户端连接,基于socket做网络数据传输,基于单向的IO流进行数据IO,编程简单、服务器性能要求高,只适合连接数小且固定的场合,早期BIO一般配合线程池和短连接使用;IO过程同步,通道没有接入事件和可读事件会阻塞当前线程;

      • IO流的操作性不如缓冲区,读写指针位置不能随意更改;通道会自动使用系统层面的一些辅助功能如缓冲区提升API性能而流不会;流只支持阻塞式API,通道同时支持阻塞式和非阻塞式API,通道还能配合选择器实现多路复用

    • NIO:一个线程通过多路复用的方式就能轮询处理所有通道上的接入、可读可写、连接建立事件;网络通道既可以工作在阻塞模式下也可以工作在非阻塞模式下,基于ServerSocketChannel动态获取新接入的客户端连接,基于SocketChannel做网络数据传输,基于双向的缓冲区进行数据IO;基于选择器做事件驱动和通道轮询;编程非常复杂,适合连接数多流量轻的场合;IO过程同步,通道没有接入事件和可读事件可以阻塞也可以不阻塞当前线程;提供多种零拷贝实现,提供基于Reactor模式的多种线程模型设计

    • AIONIO2.0NIO的升级版,在NIO的基础上提供异步通道实现把数据IOsocket套接字或者磁盘文件的过程交给操作系统异步完成,IO过程异步,网络通道既可以工作在阻塞模式下也可以工作在非阻塞模式下,适合连接数多且连接较长的场景,linux对异步IO的支持不好性能提升相较于NIO不明显,应用有限

  2. 概念辨析

    • Reactor模式:事件分发器等待某个通信事件的发生,事件分发器将该事件传递给实现注册的时间处理函数或者回调函数,由该函数来执行实际的读写操作

    • 通信方式

      • 全双工通信:任意时刻连接通道上都允许客户端和服务端之间的双向通信,任意一个端读和写数据都可以同时进行,读写线程不会相互阻塞,NIOBIO都是全双工通信

      • 半双工通信:同一时刻连接通道上的数据只能向一个方向传输

    • 用户程序空间:Java程序本身没有从网络读取数据的能力;Java程序只能通过调用操作系统的API切换到Linux内核空间来读取指定端口的数据,用户程序空间指用户程序运行的环境,也称用户态,由用户态切换到内核态指Java程序去调用操作系统API的过程

    • 操作系统内核空间:Java程序的read()方法会调用操作系统从网络端口读取数据的API,但是网络数据此时不一定发过来了,操作系统会等待数据,数据到达以后操作系统会将数据从网卡读取复制到内存中,数据复制结束了程序才会切换回用户程序空间,操作系统API执行的环境称为内核态

    • 阻塞IO:操作系统的阻塞API等待数据期间用户程序也会处于阻塞状态,操作系统阻塞API将数据拷贝到内存切换回用户态才会停止用户程序的阻塞状态

    • 非阻塞IO:用户程序调用操作系统的非阻塞API去网络获取数据,操作系统没有从端口读取到数据不会阻塞等待数据,立刻返回给程序读取到0字节,用户程序按一定策略一定时间后再次去调用操作系统的非阻塞API直到操作系统的非阻塞API检查到网卡中有数据,操作系统会将网卡数据复制到内存再切换到用户态让程序继续向下运行,用户线程在数据复制阶段仍然是阻塞的,只是等待数据阶段不会进入阻塞状态

    • 多路复用:selector.select()方法就是在切换内核态调用操作系统API去查询每个通道是否有事件,直到相关通道有事件发生,操作系统才会切换回用户态并告知用户线程有事件发生;用户程序根据事件类型、通道去调用socketChannel.read()切换到内核态调用操作系统的API将数据读取到内存中再切回用户态执行业务

      • 多路复用比直接调用读写API多出一层事件监听上的用户线程阻塞,但是单个线程一直调用阻塞API,如果数据迟迟不到会一直阻塞等待,即使此时其他客户端通道的数据到了,该线程因为阻塞也无法去处理,只能一个客户端一个线程一对一处理,线程池线程资源迟早被耗尽;多路复用虽然每个事件都会多一次用户态与内核态的切换,但是只有一个线程专门切换到内核态阻塞监听所有通道上的事件,通道有事件才去使用单独的线程或者使用事件监听线程去处理就绪事件

      • 多路复用的核心就是使用一个线程监听所有客户端通道等待数据到达端口的过程取代一个线程只能等待一个客户端通道数据到达的过程,而且会一次性返回阻塞期间监听到的所有事件,用户程序直接调用操作系统API再切换到内核态直接处理客户端数据即可

    • 同步:当前线程获取结果的过程只有当前线程参与,且只有获取到结果才能做别的事情

      • 同步阻塞:阻塞API需要用户线程自己等待操作系统的执行结果

      • 同步非阻塞:非阻塞API也需要用户线程自己等待操作系统的执行结果

      • 多路复用也是用户线程自己等待操作系统的执行结果也是同步阻塞的

    • 异步:当前线程由其他线程将结果交给自己,获取结果的过程至少需要两个线程协作

      • 当前线程创建一个线程去获取目标数据,启动线程后就直接返回继续执行自己的业务,新线程阻塞等待操作系统完成数据获取,新线程得到数据后将数据以回调的方式通知到当前线程,这就叫异步 。

      • 结果和任务对应主要就是通过回调方法来确定的,回调方法就是启动新线程获取目标数据时定义了一个通知方法,回调方法的入参就是目标数据,新线程获取到目标数据后自动调用通知方法将目标数据共享给当前线程,这就是异步非阻塞,根本不存在异步阻塞这种说法,因为异步当前线程启动新线程执行异步任务后就会继续去做自己的事由新线程调用通知方法共享执行结果,根本不会阻塞等待新线程的执行结果

      • 只有同步阻塞、同步非阻塞、异步非阻塞,没有异步阻塞的说法

  3. 零拷贝[以下场景都是Java程序将磁盘文件发送给网络]

    • 操作系统不能直接将文件数据直接读取到JVM内存,只能将数据从磁盘读取到操作系统的内核缓冲区,再将数据从内核缓冲区拷贝到用户缓冲区即JVM内存;Java程序向网络写出数据也不能直接从用户缓冲区直接写入网卡,必须先由用户缓冲区拷贝到socket缓冲区,再从socket缓冲区写入网卡;

      • 使用Java程序读取文件再写出到网卡文件数据要经过四次拷贝,涉及三次用户态与内核态之间的切换,磁盘读取到内核缓冲区,内核缓冲区拷贝到用户缓冲区,用户缓冲区再拷贝socket缓冲区,socket缓冲区再写出到网卡

    • NIO的直接缓冲区基于直接内存,直接内存使用的是操作系统内存,直接内存可以同时被操作系统和Java程序访问,操纵系统可以直接将磁盘数据读取到直接内存供Java程序访问,但是直接内存的数据要写到网络中仍然需要先将数据从直接内存拷贝到socket缓冲区,再由socket缓冲区写入网卡,仍然有三次文件数据拷贝,涉及三次用户态与内核态之间的切换

    • Linux2.1以后提供一个sendFile方法,NIOSocketChannel本身没有提供transferTotransferFrom方法,但是FileChanneltransferTo()可以将FileChannel的数据传输到SocketChanneltransferFrom()方法可以将SocketChannel的数据传输到FileChanneltransferTotransferFrom方法底层就使用sendFile方法,此外Java调用这两个方法从用户态切换成内核态后,不会使用CPU而是使用DMA将磁盘数据读取到内核缓冲区,然后数据通过CPU从内核缓冲区读取到socket缓冲区,再使用DMA将数据从socket缓冲区写入网卡,和直接内存相比仍然是三次文件数据拷贝,但是只有调用transferTo()或者transferFrom()方法时从用户态切换成内核态,而且只有文件数据从内核缓冲区拷贝到socket缓冲区使用的了CPU

      • DMA[DirectMemoryAccess,直接内存访问]是允许某些硬件子系统在不经过CPU干预的情况下直接访问系统内存的硬件,DMA的作用是提高数据传输效率,降低CPU在内存与外设间数据传输的负担

    • Linux2.4以后改变了sendFile方法的实现,Java中的transferTo()或者transferFrom()方法会直接将内核缓冲区的数据拷贝到网卡,文件数据连socket缓冲区都不经过,全程只有两次DMA拷贝,只有文件的偏移量和文件长度length等少量数据会拷贝到socket缓冲区,涉及一次用户态与内核态之间的切换

    • 所谓零拷贝指的是Java中使用transferTo()或者transferFrom()方法,对应调用linux操作系统的sendFile方法的拷贝过程,零拷贝指的是文件数据不会拷贝到用户缓冲区

      • 零拷贝的优点是用户态和内核态之间的切换次数少,文件数据拷贝只使用DMA不使用CPU

 

 

2-Netty

2.1-Netty核心组件

EventLoop

  1. 概念

    • EventLoop是事件循环接口,作为线程池以执行用户提交普通任务或者定时任务,轮询处理注册在其内部选择器上的各个通道的网络事件,默认情况下非IO任务和IO任务的执行时间默认各占一半

      • EventLoop间接继承自事件执行器EventExecutor,为EventLoop提供了inEventLoop方法及其重载方法用于判断指定线程或者当前线程是否属于指定eventLoop,提供parent方法获取指定eventLoop的所属eventLoopGroup

      • EventLoop间接继承自JUC包下的定时单线程执行器ScheduledExecutorService,作为线程池能调用submit或者execute方法执行用户提交的普通任务,调用scheduleAtFixedRate执行用户提交的定时任务,定时任务可以用于实现keepalive的连接保活

    • EventLoop内部维护着选择器,eventLoop实例上绑定的所有客户端通道的网络事件都由该eventLoop单线程执行器负责处理,易于管理且对通道数据的处理不会存在线程安全问题;EventLoop实例的构造器参数配置比较繁琐,一般通过EventLoopGroup中的默认配置实例化eventLoop对象

    • NioEventLoop由一个线程、选择器和两个任务队列构成,在EventExecutor中有一个thread成员变量表示对应线程,taskQueueseheduledTaskQueue分别用于缓存用户向线程池提交的普通任务和定时任务

  2. 常用方法

    • eventLoop.execute(runnable):执行用户提交的普通任务

    • eventLoop.submit(runnable):提交用户提交的普通任务

    • eventLoop.schedule(runnable,60,TimeUnit.SECONDS):在指定延迟后执行用户提交的一个任务,第二个参数是定时任务执行的延迟时间,该定时任务只会被执行一次

  3. Selector的相关问题

    • Selector的创建时机:Netty在实例化NioEventLoop的构造器中也采用NIO中的Selector.open()方法底层采用的SelectorProvider.provider().openSelector()创建的选择器

    • 为什么有selectorunwrappedSelector两个选择器相关的成员变量:Netty创建的原生NIOSelector会被直接赋值给unwrappedSelector,原生SelectorSelectedKeys集合基于哈希表的Set集合实现,遍历性能没有数组好;Netty为了提高SelectedKeys的遍历性能通过暴力反射将原生Selector私有的SelectedKeys换成Netty自定义基于数组实现的SelectedKeys并将替换后的Selector赋值给成员变量selector;因为部分功能必须使用基于Set的实现因此仍保留了原生的Selector

  4. NIO线程相关问题

    • NIO线程在何时被启动:该问题的实现逻辑在向eventLoop提交任务的execute方法中,当eventLoop执行用户提交的任务时,会首先获取当前线程和eventLoop的成员变量thread比较是否是同一个线程并随即将任务添加至任务队列,eventLoop第一次执行提交的任务时该成员变量必为null,判断结果必为false,此时会执行startThread方法通过CAS操作将线程状态位从1改成2保证只能有一个线程能执行doStartThread方法启动NIO线程,向执行器成员变量executor手动提交一个任务来启动线程池,将线程池的线程赋值给eventLoopthread成员变量[即NIO线程就是单线程执行器的线程],然后死循环去查看eventLoop有没有IO事件、用户提交的普通任务和定时任务,获取相应的任务并执行;因此NIO线程在首次调用eventLoop.execute方法时启动,通过CAS更改一个EventExecutor中的state线程状态位控制NIO线程只会被启动一次

    • NIO线程阻塞在select方法上的时机:在执行器的死循环任务中通过switch...case语句匹配枚举值SelectStrategy.SELECT进入对应分支阻塞在select方法上,匹配值的判断条件是没有普通任务才会进入select方法所在的分支,当有任务时还会通过selector.selectNow()获取并返回处于就绪状态的IO事件数量进入相应分支一次性处理完所有的普通任务和就绪的IO事件;当没有普通任务进入select方法,在select方法中死循环调用带超时时间的selector.select(timeoutMillis)方法,在没有定时任务的情况下,select方法的超时时间是1s+0.5ms,因为select方法可能被中途唤醒,NIO线程被唤醒以后会再次进入循环判断当前线程因为什么原因被唤醒决定是否退出死循环;当实际时间达到超时时间、存在普通任务或者选择器上有就绪的IO事件都会退出死循环,否则再次计算超时时间并继续进行阻塞

    • 提交普通任务是否会结束select方法的阻塞:在启动线程池的死循环任务中一旦满足特定条件,死循环就会阻塞在带超时时间的selector.select(timeoutMillis)方法上,因为NettyNIO线程除了处理IO事件还需要处理用户提交的普通任务或者定时任务,因此不能调用selector.select()导致没有IO事件时NIO线程一直阻塞;非当前NIO线程通过eventLoop.execute()向当前NIO线程提交任务时会在execute方法中调用selector.wakeup()手动唤醒已经阻塞或者即将阻塞在select方法上的当前NIO线程,因为可能存在多个线程同时提交任务的情况,而selector.wakeup()又是一个重量级操作,这些线程会去通过CAS操作竞争将原子布尔变量wakeup状态位从false改成true,只有竞争成功的线程才能去调用selector.wakeup()唤醒NIO线程

    • NIO线程处理IO事件的时间比例

      • NioEventLoop中成员变量ioRatio的含义为假设使用100段时间处理所有任务,IO事件的处理时间在其中占多少段;在nioEventLoop.run()方法中定义了在NIO线程结束阻塞后执行任务的逻辑,首先NIO线程会处理选择器上所有处于就绪状态的IO事件,然后统计本轮所有IO事件的处理时间,通过ioTime * (100 - ioRatio) / ioRatio算出普通任务的预期执行时间,执行普通任务时如果发现当前任务执行完成后本轮普通任务的执行时间超出预期执行时间不会再从任务队列中获取下一个任务,NIO线程会结束本轮循环进入下一轮循环继续处理IO事件,这种设计是为了避免普通任务的执行时间太长影响IO事件的处理;默认ioRatio的值为50,表示NIO线程50%的时间用于处理IO事件

      • ioRatio设置为100的含义不是NIO线程只处理IO事件,而是处理完本轮IO事件后完整处理完所有的普通任务和定时任务后才会继续处理下一轮IO事件,一般情况下不要将ioRatio设置为100

      • 通道流水线对IO事件数据的处理时间也算在IO事件的处理时间段内

    • Netty使用数组优化selectedKeys提高就绪事件的遍历效率,处理IO事件时通过selectedKeys.keys[i]获取就绪的IO事件,通过key.attachment获取serverSocketChannel通道对应附件NioServerSocketChannel,在processSelectedKey方法中判断IO事件类型并从通道附件NioServerSocketChannel获取通道流水线,使用流水线中的每个处理器依次对IO事件的数据依次进行处理

 

EventLoopGroup

  1. 概念

    • 事件循环组接口,作为管理一组事件循环EventLoop的容器

      • EventLoopGroup继承自事件执行器组EventExecutorGroup,该接口继承自Iterable接口让EventLoopGroup具备使用增强For循环或者迭代器遍历内部集合或者数组的能力

    • Netty服务端三种线程模型通过serverBootstrap.group(EventLoopGroup bossGroup,EventLoopGroup workerGroup)进行配置;但是注意服务端始终只会创建一个ServerSocketChannel,因此bossGroup即使管理多条线程也只会有一个EventLoop负责处理服务端socket通道上的所有新客户端通道的接入事件

      • 如果boss事件循环组和worker事件循环组共用同一个事件循环组则使用单线程模型

      • 如果boss事件循环组管理的线程数为1worker事件循环组管理默认的线程数则使用多线程模型

      • 如果boss事件循环组和worker事件循环组为两个都管理默认线程数的不同事件循环组实例则使用主从多线程模型

  2. 常用方法

    • eventLoopGroup.next():轮询获取事件循环组管理的下一个EventLoop用于可执行任务提交或者向EventLoop注册新接入的客户端通道的负载均衡

    • eventLoopGroup.shutdownGracefully():优雅地关闭事件循环组,已经提交的任务继续执行,拒绝新提交的任务,所有任务执行结束后释放事件循环组管理的所有线程

      • 关闭所有事件循环组以后服务端或者客户端所在进程才能自动结束

  3. 常用实现类

    • NioEventLoopGroup:既能处理通道上的IO事件也能向事件循环组提交普通任务和定时任务

      • NioEventLoopGroup的无参构造调用的是单参构造this(0),传参为线程数量nThreads;一直调用父类的构造函数最终会调用到MultithreadEventLoopGroup构造方法

      • MultithreadEventLoopGroup中有一个常量DEFAULT_EVENT_LOOP_THREADS,该常量会从1,系统属性io.netty.eventLoopThreads以及本机CPU核心数*2中取一个最大值,该常量就是EventExecutor数组的容量

      • 当构造方法传参的线程数量不为0时表示用户指定了线程数量,NioEventLoopGroup会启动用户指定数量的线程数;

      • 当构造方法传参线程数量为0,说明用户没有指定线程数量选择使用默认的线程数量,默认的线程数量即常量DEFAULT_EVENT_LOOP_THREADS

      • 这个线程数实际是NioEventLoopGroup管理的NioEventLoop的数量,每个NioEventLoop都是一个单线程线程池,因此nThreads指定的是线程数的同时也指定的是管理的NioEventLoop的数量

    • DefaultEventLoopGroup:只能处理用户提交的普通任务和定时任务,除了不能执行IO任务用法和NioEventLoopGroup是一样的

      • 注意DefaultEventLoopGroup中也有事件执行器EventLoop,且在流水线中使用时每个客户端通道也会和其中一个eventLoop实例绑定

      • 一般通过nioSocketChannel.pipeLine().addLast(EventExecutorGroup group,String name,ChannelHandler handler)指定DefaultEventLoopGroup实例替代NioEventLoopGroup管理的EventLoop处理流水线上非常耗时的非IO操作,避免工序处理时间太长影响到NioEventLoop处理和其绑定的其他客户端通道上就绪事件的处理

      • Netty源码切换线程执行同一条流水线的不同工序的实现

        • 流水线在添加处理器的时候会指定处理器绑定的事件执行组,同一条流水线的不同工序可能被不同的eventLoop实例执行,前一道工序由eventLoop1线程执行到AbstractChannelHandlerContext.invokeChannelRead(next,msg)方法时,会通过通道处理器上下文拿到下一个处理器的事件执行器即eventLoop2,通过eventLoop2.inEventLoop()方法判断下一道处理工序的执行线程是否是当前线程;如果是当前线程直接在当前线程调用执行下一道工序,如果不是当前线程则将调用执行下一道工序的代码封装成任务对象提交给下一道工序的事件执行器即eventLoop2执行;

 

ByteBuf

  1. 概念

    • ByteBufNettyNIOByteBuffer的封装和增强,基于字节数组的数据容器

    • ByteBuf支持动态扩缩容,默认初始容量均为256B,最大容量为Integer.MAX,当容量小于512扩容到最近的16倍整数,容量超过512扩容到最近的二次幂

    • ByteBuf的基本属性

      • ridx为读指针、widx为写指针、capByteBuf的容量;NIOByteBuffer读写共用一个position指针,需要使用flip方法切换读模式,使用clear或者compact切换写模式;NettyByteBuf设置读写两个指针,读和写不需要切换读写模式

      • 写指针到容量之间的部分为可写区域,读指针到写指针之间的部分为可读区域、索引0到读指针之间的部分为废弃区域

    • ByteBuf的池化管理

      • 基于直接内存的ByteBuf分配效率低,基于堆内存的ByteBuf数量太多也会给GC带来压力,高并发场景下,对ByteBuf采用池化技术重用ByteBuf能降低ByteBuf的分配开销,避免因为ByteBuf数量太多引起的内存溢出

      • Netty4.1以后非安卓平台默认都开启了ByteBuf的池化功能

      • 采用池化技术的ByteBuf的实现类的类名上会以Pooled字样打头,非池化技术实现类的类名会以Unpooled字样打头

      • 通过JVM参数-Dio.netty.allocator.type={unpooled|pooled}可以设置关闭或者开启ByteBuf的池化功能

    • ByteBuf的内存回收

      • 基于堆内存的非池化ByteBuf和普通对象一样受GC管理;基于直接内存的非池化ByteBuf通过定时的GC回收,也可以手动回收;采用池化技术的ByteBuf的回收是将ByteBuf归还给ByteBuf

      • ByteBuf实现了ReferenceCounted接口,通过引用计数算法来管理ByteBuf的生命周期和内存回收;每个ByteBuf实例的初始引用计数均为1,调用release方法会让引用计数减1,调用retain方法会让引用计数加1;引用计数没有减为0ByteBuf对象不会被真正回收;引用计数减为0,即使ByteBuf实例还没有被GC回收,ByteBuf的方法也无法正常使用

      • 释放ByteBuf内存的方法是抽象方法deallocate,基于直接内存、堆内存、池化技术的ByteBuf的实现类都对该方法有相应的内存回收实现

      • ByteBuf的释放时机

        • 在最后使用ByteBuf的处理器Handlerfinally块中调用byteBuf.release()释放ByteBuf

        • 通道流水线的headtail两个处理器中也会释放ByteBuf,处理入站数据时执行到tail处理器ByteBuf实例还存在就在tail处理器中释放ByteBuf,处理出站数据时执行到head处理器时ByteBuf实例还存在就在head处理器中释放ByteBuf;但是headtail释放ByteBuf仅对传递给处理器的数据是ByteBuf类型时才生效,如果中间工序将ByteBuf处理成其他数据类型传递给后续处理器,headtail处理器不会主动释放传入通道流水线的初始ByteBuf;在源码中判断Object类型的消息是否为ReferenceCounted类型,如果是则强转为ReferenceCounted类型后调用其release方法让引用计数减1释放ByteBuf,此外如果处理出站数据时存在出站缓冲区,head处理器将不会释放ByteBufByteBuf将会交给出站缓冲区来释放

    • 零拷贝设计

      • 操作系统层面的零拷贝指避免在用户态和内核态之间来回拷贝数据,NIO的零拷贝指磁盘文件数据向网络传输时数据无需拷贝经过用户缓冲区,Netty的零拷贝指ByteBuf之间的切片、复制、CompositeByteBufUnpooled中涉及多个ByteBuf的组合操作过程不涉及任何数据的拷贝,此外Netty中包装了NIOFileChannelFileRegion使用sendFile将磁盘数据向网络传输无需经过用户缓冲区也是Netty零拷贝的体现

      • byteBuf.silce():将原始ByteBuf切片成多个ByteBuf,所有的切片还是使用原始ByteBuf的内存,只是每个切片维护独立的读写指针和容量,切片过程没有发生任何数据内存拷贝

        • 原始ByteBuf调用byteBuf.release导致引用计数减为0会导致所有切片都无法使用,调用切片方法会抛出异常IllegalReferenceCountException,特别是通道流水线这种存在部分处理器自动释放ByteBuf的场景;因此一般获取切片都会手动调用切片的retain方法避免原始ByteBuf被误删,切片使用完以后再手动调用切片的release方法释放byteBuf

      • byteBuf.duplicate():复制原ByteBuf的数据得到一块新的ByteBuf,与原始ByteBuf共用同一块物理内存,读写指针完全独立,再次写入数据时没有最大容量限制

  2. 实例化ByteBuf的方式

    • channelHandlerContext.alloc().buffer()

    • ByteBufAllocator.DEFAULT.buffer()优先创建基于直接内存的ByteBuf

    • ByteBufAllocator.DEFAULT.directBuffer():创建基于直接内存的ByteBuf

    • ByteBufAllocator.DEFAULT.heapBuffer():创建基于堆内存的ByteBuf

  3. ByteBufByteBuffer的增强功能

    • ByteBufAPI相较于ByteBuf更易用,无需切换缓冲区读写模式

    • 采用池化机制让ByteBuf实例得以重用,降低ByteBuf对象创建、销毁或者GC的开销,避免ByteBuf对象过多导致内存溢出,使用二叉树实现的内存池管理缓冲区资源释放更安全,避免直接缓冲区导致的内存泄漏;基于引用计数算法做ByteBuf的内存回收

    • 读写指针分离,读写不需要切换读写模式

    • 支持缓冲区自动扩缩容

    • 方法设计上支持链式调用

    • CompositeByteBuf以及Unpooled类都对ByteBuf的切片、复制和组合多个ByteBuf或者字节数组进行了零拷贝实现,减少数据的拷贝次数提高数据处理性能

  4. 常用方法

    • byteBuf.toString(Charset Charset.defaultCharset()):将字节缓冲区的数据按照指定编码格式转成字符串

      • 实际生产一般都在客户端和服务端指定相同的字符集而不使用默认的字符集,因为客户端和服务端所在机器的默认字符集可能不一致,非常容易乱码

    • byteBuf.writeBytes():该方法及其重载方法将字节数组、byteBufNIOByteBuff写入指定byteBuf实例

    • byteBuf.writeXxx():Xxx可以为八种基本数据类型,分别表示向ByteBuf写入对应的基本类型数据,占用对应字节数的存储空间

      • ByteBuf写入一个int值可以采用大端写入和小端写入两种方式,大端写入指如十进制的592对应16进制的0x00000250,写入ByteBuf对应的存储空间与原存储空间一致也为00 00 02 50;小端写入则为50 02 00 00,即以字节为单位将原存储空间按高位到低位的次序从低位写到高位;网络编程一般都会采用大端写入,默认writeInt方法是大端写入,小端写入为方法writeIntLE

    • byteBuf.writeCharSequence():将字符串按指定字符集写入ByteBuf

    • byteBuf.setByte():将byteBuf指定索引处的字节数据更改成指定值,以set打头的方法写入数据不会改变写指针的位置

    • byteBuf.readByte():从ByteBuf的读指针位置读取一个字节

    • byteBuf.readBytes():从ByteBuf中读取指定长度的数据到指定字节数组

    • byteBuf.readInt():从ByteBuf的读指针位置读取一个int类型数据

    • byteBuf.markReaderIndex():对读指针位置做标记

    • byteBuf.resetReaderIndex():将读指针位置还原回此前对读指针标记的位置

    • byteBuf.silce():从当前ByteBuf的指定索引开始按指定长度进行切片并返回切片

      • 切片可以作为独立的ByteBuf进行操作

      • 切片无法进行扩容,也无法写入超出切片容量的数据,会抛出IndexOutOfBoundsException异常,这是为了避免切片扩容写数据导致原ByteBuf数据错乱

    • byteBuf.readableBytes():获取ByteBuf剩余可读数据的字节长度

    • byteBuf.duplicate():复制原ByteBuf的数据得到一块新的ByteBuf,与原始ByteBuf共用同一块物理内存,读写指针完全独立,再次写入数据时没有最大容量限制

    • byteBuf.copy():将原始ByteBuf的数据拷贝到新ByteBuf对象中,无论如何读写新ByteBuf对象都与原始ByteBuf无关

 

CompositeByteBuf
  1. 概念

    • CompositeByteBuf提供了将多个ByteBuf的数据组合到指定compositByteBuf的方法,组合过程不会涉及到ByteBuf中字节数据的拷贝,像byteBuf.writeBytes()将一个byteBuf的数据追加指定byteBuf的方式是通过数据拷贝实现的

    • 通过ByteBufAllocator.DEFAULT.compositeBuffer()实例化compositeByteBuf对象

  2. 常用方法

    • compositByteBuf.addComponent(ByteBuf buffer):向compositByteBuf中添加一个ByteBuf

    • compositByteBuf.addComponents(ByteBuf... buffers):一次性向compositByteBuf中添加多个ByteBuf

      • addComponentaddComponents方法可以在入参中将参数increaseWriterIndex设置为true,在组合期间自动将写指针位置调整到最后一个字节,否则读写指针会保持组合前的位置

 

Unpooled
  1. 概念

    • Unpooled是一个提供非池化ByteBuf的创建、组合和复制操作的工具类

  2. 常用方法

    • Unpooled.wrappedBuffer(ByteBuf... buffers):将多个ByteBuf对象按顺序依次组合成一个ByteBuf

      • 该方法组合的ByteBuf数量超过一个时会使用CompositeByteBuf的零拷贝API来组合多个ByteBuf,因此该方法组合多个ByteBuf底层不会有拷贝操作

    • Unpooled.wrappedBuffer(byte[]... byteArrs):将多个字节数组按顺序依次组合成一个ByteBuf

      • 底层也不会有字节数据拷贝操作

    • Unpooled.copiedBuffer(byte[] bytes):将字节数组作为数据内容创建ByteBuf对象

 

Channel

  1. 概念

    • Netty中的一个接口,代表一个网络连接或者本地I/O操作的入口

    • Channel提供网络连接通道的状态信息、接收缓冲区大小等网络连接配置信息、提供如建立连接、数据IO、绑定端口等异步网络操作,并提供回调方法供操作完成时执行用户自定义的处理逻辑

    • 常用Channel实现

      • NioSocketChannel:基于TCP的客户端Socket连接

      • NioServerSocketChannel:基于TCP的服务器端Socket连接

      • NioDatagramChannel:基于UDP的连接

  2. 常用方法

    • channel.close():异步关闭当前通道,连接通道关闭会自动解除被关闭通道和EventLoop的绑定关系

    • channel.closeFuture():异步关闭当前通道,该方法返回ChannelFuture实例,当前通道彻底关闭时ChannelFuture实例会被标记为完成状态

      • 可以通过该ChannelFuture实例调用sync或者await方法让当前线程同步等待当前通道的关闭,还可以调用addListener()ChannelFuture注册一个监听器以便在当前socket彻底关闭时执行用户自定义的业务操作

    • channel.pipeline():获取当前通道对应的流水线以便向流水线中添加Handler处理器用于处理网络事件

    • channel.write():将可序列化对象、字节数组、字节缓冲写出网络通道

      • 由于Netty的缓冲机制,每次写出的数据不一定会立即写出到网络,数据会先被缓冲区缓冲,当缓冲区到了一定大小会自动发送到网络,用户也可以调用channel.flush()立即将缓冲区数据发送到网络,接收端接收完整的缓冲区数据,即几次发送的消息很可能会拼接在一起

    • channel.flush():将通道发送缓冲区中的数据立刻刷新到网络

    • channel.writeAndFlush():将可序列化对象、字节数组、字节缓冲写出到缓冲区并立即将缓冲区数据刷出到网络通道

      • 该方法写出的出站数据会经历完成流水线从tail处理器开始依次向前遍历执行每个出站处理器,区别于ctx.writeAndFlush()从当前处理器开始依次向前遍历每个出站处理器

    • channel().eventLoop():获取通道注册的事件循环

 

EmbeddedChannel
  1. 概念

    • EmbeddedChannelNetty提供的用于测试的一种通道,可以通过有参构造实例化通道的同时给通道流水线绑定一系列处理器,用户无需启动客户端或者服务端就可以模拟网络数据读入通道时数据经过各个入站处理器或者网络数据写出通道时数据经过各个出站处理器的过程

      • 构造方法:new EmbeddedChannel(@NotNull ChannelHandler... handlers)

  2. 常用方法

    • embeddedChannel.writeInbound(ByteBuf byteBuf):向通道流水线写入站数据,从head处理器开始沿着处理器链依次向后经过预设入站处理器处理ByteBuf

      • 调用该方法时如果传参是ByteBuf的一个切片,一定要在调用前调用其中一个切片或者整个ByteBufretain()方法让切片或者整个ByteBuf的引用计数加1,否则其他切片还没有发送整个ByteBuf就会被释放掉

    • embeddedChannel.writeOutbound(ByteBuf byteBuf):向通道流水线写出站数据,从tail处理器开始沿着处理器链依次向前经过预设出站处理器处理ByteBuf

 

Future & Promise

Future
  1. 概念

    • NettyFuture接口继承自JDKFuture接口

      • JDKFuture通过get()方法同步阻塞当前线程等待异步任务执行结束并返回异步任务的执行结果,NettyFuture既可以同步等待异步任务的执行结果,也可以异步通过回调通知的方式获取异步任务的执行结果

      • JDKFuture实例通过向线程池提交Callable任务对象的方法的返回值获取

  2. JDKFuture常用方法

    • future.cancel():取消已经提交但还未执行的任务

    • future.isCanceled():判断任务是否已经取消

    • future.isDone():判断任务是否已经完成,注意该方法不能判断任务是被成功执行了还是任务失败了

    • future.isSuccess():判断任务被成功执行了还是任务执行失败了

    • future.get():同步阻塞当前线程直到获取到异步任务的结果

  3. NettyFuture的扩展方法

    • future.sync():阻塞当前线程等待异步任务执行结束,如果异步任务执行失败会抛出异常

    • future.await():阻塞当前线程等待异步任务执行结束,如果异步任务执行失败不会抛出异常,需要用户通过手动调用future.isSuccess()方法来进行判断

    • future.addLinstener():为异步任务注册监听器,异步任务执行结束在回调方法的入参传递执行结果并由异步线程执行一段用户自定义的业务操作

      • Nettyfuture.addListener()注册的异步回调无法在Junit的测试方法中使用异步线程执行

    • future.cause():获取异步任务执行失败的错误信息,如果没有发生异常返回null,等价于future.getCause()方法

    • future.getNow():获取异步任务的执行结果,如果结果还未产生返回null

 

Promise
  1. 概念

    • NettyPromise接口继承自NettyFuture

      • 常用的实现有DefaultPromise,用户可以直接在当前线程通过DefaultPromise的单参构造传参事件执行器即EventLoop主动创建Promise实例,而不是像Future一样只能通过执行异步任务的线程创建并返回给当前线程

    • Promise除了同步等待和异步回调获得异步任务的结果,还可以脱离任务独立存在,仅作为两个线程传递结果的容器

    • PromiseRPC框架中非常有用,能实现Future无法达到的效果

      • Promise相较于Future就是能在当前线程手动创建并在异步任务的执行线程手动调用setSuccess或者setFailure设置异步调用的结果并且不用等异步任务执行结束就能放行阻塞在Promiseawait或者sync的线程;而Future只能通过异步任务的执行线程创建并返回给异步任务的调用线程,也不提供手动设置异步任务执行结果的方法

  2. Promise的扩展方法

    • promise.setSuccess(T result):向promise对象设置成功执行的结果,甚至不需要等到异步任务执行结束就能设置并获取结果

    • promise.setFailure(Throwable e):向promise对象设置执行失败的异常

      • 不管调用promise.setSuccess(T result)还是promise.setFailure(Throwable e)都会让promise.await()或者promise.sync()结束阻塞继续向下运行,但是promise.get()仍然会继续阻塞直到异步任务执行结束

  3. PromiseRPC框架中的用法

    • RPCClientManager中为用户提供接口动态代理对象的getProxyService()方法将用户调用远程方法的行为转换成向远程服务发送远程调用消息的行为[这部分细节内容看搭建RPC框架部分内容]

      • 这里演示使用的是JDK动态代理

      • 远程方法调用完响应消息被客户端接收后会被流水线处理成自定义的RPCResponseMessage,流水线的处理得到RPCResponseMessage是在线程eventLoop中完成的,动态代理对象一般在用户线程中完成,这里涉及到两个线程间共享同一个数据的问题,使用Promise容器可以实现多个线程间交换同一个数据

        • 准备一个ConcurrentHashMap以消息序号作为key,以Promise作为值缓存远程调用响应消息经流水线处理后的结果,为了保证多线程并发共享数据的线程安全性使用了concurrentHashMap<Integer,Promise<?>>[?是通配符,表示适配任意类型,这是因为不知道响应的结果是什么类型,注意这里用通配符不行,后续向Promise对象中设置值会出现问题,将通配符改成Object类型]Promise对象由代理对象通过DefaultPromise<?> promise = new DefaultPromise<>(channel.eventLoop())创建后存入ConcurrentHashMap[入参channel.eventLoop()是创建Promise对象需要指定将结果传入Promise对象的线程EventExecutor对象,需要流水线的执行线程即channel.eventLoop()对象,注意该concurrentHashMap老师设置为RPCResponseMessageHandler的一个公有静态变量],代理对象创建Promise对象并将其存入concurrentHashMap后调用promise.await()或者promise.sync()等待eventLoop接收到响应将结果存入promise对象,使用promise.await()使用promise.isSuccess()来判断是否正常成功获取消息,成功获取响应结果直接获取结果设置为代理对象对应方法的返回结果,如果没有成功获取响应结果,包装异常对象promise.cause()直接通过代理对象抛出

    • RPCResponseMessageHandler中增加将流水线处理结果存入concurrentHashMap中的Promise对象的逻辑

      • 通过消息的序列号从concurrentHashMap中获取消息对应的Promise对象,检查远程调用的响应结果是否正常,如果正常调用promise.setSuccess(returnValue)设置远程调用执行结果,如果远程调用有异常就调用promise.setFailure(exceptionValue)将异常信息设置到Promise[注意Gson对Throwable对象向json字符串的转换不需要自定义转换适配器],为了避免序列号错误或者其他错误需要对promise对象判空,只有非空才能进行设置值操作,否则会出现空指针异常,因为各种原因导致集合中没有对应promise对象是可能出现这种问题的

     

ChannelFuture
  1. 概念

    • ChannelFuture继承自NettyFuture,是用于表示Channel各种异步操作结果的接口,Netty中各种IO操作都是异步的,ChannelFuture能正确获取异步操作的结果并在异步操作成功或者失败时由异步线程执行一段开发者自定义的业务操作

      • 异步任务调用正确获取执行结果要么同步阻塞当前当前线程等待异步任务执行完毕返回执行结果,要么在异步任务发起时为异步任务注册监听器待异步任务执行结果时自动调用回调方法并传参异步任务的执行结果

    • NettybootstrapServerBootstrapbindwriteconnect方法都会返回一个ChannelFuture对象

  2. 常用方法

    • channelFuture.sync():该方法会阻塞当前线程直到通道连接成功建立后才会继续执行后续代码

      • 如果连接还没有成功建立就通过channelFuture.channel()获取到Channel并使用该channel发送消息,消息会直接丢失;但是连接建立好以后,连接信息会自动封装到channel中此时仍能正常收发消息;

    • channelFuture.channel():连接成功建立后获取连接通道channel实例

    • channelFuture.addListener(ChannelFutureListener channelFutureListener):用户通过匿名实现函数式接口ChannelFutureListeneroperationComplete(ChannelFuture future)方法指定连接完成建立时要通过异步线程执行的操作

      • 回调方法中入参ChannelFuture对象就是channelFuture.addListener()注册监听器的那个channelFuture

       

CloseFuture
  1. 概念

    • CloseFuture实现了ChannelFuture,用于正确获取Channel关闭操作结果的接口,CloseFuture实例通过channel.closeFuture()返回

  2. 常用方法

    • closeFuture.sync():阻塞当前线程等待指定连接通道完全异步关闭

    • closeFuture.addListener(ChannelFutureListener channelFutureListener):用户通过匿名实现函数式接口ChannelFutureListeneroperationComplete(ChannelFuture future)方法指定连接完成关闭时要通过异步线程执行的操作

      • 回调方法的入参future就是添加监听器的closeFuture

 

ChannelPipeline

  1. 概念

    • Pipeline流水线底层是一个双向链表,首尾分别是headtail处理器,中间是由用户使用Netty提供或者由用户自定义的处理器组合而成的处理器链,流水线负责将网络事件传播给流水线上的每个HandlerHandler通过相应事件的处理方法对自身感兴趣的事件进行处理

    • EventLoop是通道流水线处理器的实际执行者,在为流水线初始化处理器时可以指定执行当前处理器的EventLoopGroup,同一条流水线的不同工序可能被不同的eventLoop实例执行,前一道工序由eventLoop1线程执行到AbstractChannelHandlerContext.invokeChannelRead(next,msg)方法时,会通过通道处理器上下文拿到下一个处理器的事件执行器即eventLoop2,通过eventLoop2.inEventLoop()方法判断下一道处理工序的执行线程是否是当前线程;如果是当前线程直接在当前线程调用执行下一道工序,如果不是当前线程则将调用执行下一道工序的代码封装成任务对象提交给下一道工序的事件执行器即eventLoop2执行;通过这种方式来实现不同处理器的执行线程切换

  2. 常用方法

    • channelPipeline.addLast():将指定处理器添加到tail处理器前面

 

ChannelHandler
  1. 概念

    • 通道处理器用于处理通道上的网络事件,通道处理器分为出站入站处理器和双向处理器,多个通道处理器组成的双向链表被称为流水线;

      • 入站处理器用于处理如连接建立、数据读取、连接关闭等入站事件从网络接收的数据,将网络数据解析为应用可以理解的数据类型、对解析后的数据执行业务处理逻辑、根据需要向客户端发送响应并将解析后的数据传递给下一个入站处理器

      • 出站处理器用于处理可写事件的出站数据

      • 双向处理器可以同时作为出站处理器和入站处理器

  2. 入站处理器

    • 入站处理器一般是实现了ChannelInboundHandler接口的抽象类ChannelInboundHandlerAdapter的子类,为所有方法提供了空实现,开发者可以通过继承ChannelInboundHandlerAdapter实现其中的特定方法自定义处理器对特定网络事件的处理逻辑,Netty也提供了部分入站处理器供开发者使用

    • channelRead(ctx,msg):处理可读事件的入站数据,入参ctxChannelHandlerContext实例,提供对通道和流水线上下文的访问;msg的类型为Object,流水线写出到通道或者通道读入流水线的数据类型一般为ByteBuf,在处理器处理过程中ByteBuf可能被处理成其他任意类型的数据并传递给后续流水线;

      • channelRead方法中调用ctx.fireChannelRead(msg)将当前处理器处理后的消息传递给下一个处理器,如果入站处理器没有调用super.channelRead(ctx,msg)或者ctx.fireChannelRead(msg)方法,入站处理器流水线会直接在当前处理器直接断掉,后续入站处理器不会再继续执行;最后一个入站处理器调用该方法不会有任何效果

      • 入站处理器在处理入站数据期间可能有写出数据的需求,比如需要向客户端发送响应,可以直接调用ctx.writeAndFlush(Object msg)向客户端写出数据,某个入站处理器处理数据期间通过该方法写出数据,写出的数据会从当前处理器开始直接沿着处理器链向前依次执行每个出站处理器,该处理器后面的出站处理器都不会被执行;此外向客户端写出数据还可以使用初始化通道的initChannel(NioSocketChannel ch)方法的入参ch调用ch.writeAndFlush(ByteBuf)在任意一个处理器中向客户端写出数据,此时出站数据都会从tail处理器沿着完整的处理器链向前依次执行每个出站处理器

    • channelActive(ctx):连接通道成功建立时会触发active通道激活事件,此时Netty会执行通道流水线上每个入站处理器的channelActive()方法,通常用于设置连接状态信息、向客户端发送通知消息、记录连接日志等等

      • 通过ctx.fireChannelActive()将通道激活事件传递给下一个处理器

    • channelInactive(ctx):连接通道失活时,Netty会调用各个入站处理器的channelInactive()方法,常用于释放连接相关资源,记录连接关闭日志

      • 通过ctx.fireChannelInactive()将通道失活事件传递给下一个处理器

    • userEventTriggered(ctx,evt):该方法用于处理用户的自定义事件或者IdleState下的各种事件,其中入参evt就是事件本身,通过ctx.fireUserEventTriggered(evt)将事件传递给下一个处理器

    • exceptionCaught(ctx,cause):用于处理通道生命周期中捕获任何未被处理的异常,常用于记录异常发生时记录异常日志,释放资源,通知客户端

      • 通过ctx.fireExceptionCaught(cause)将异常传递给下一个处理器

  3. 出站处理器

    • 入站处理器一般是实现了ChannelOutboundHandler接口的抽象类ChannelOutboundHandlerAdapter的子类

    • write(ctx,msg,promise):处理可写事件的出站数据

      • 通过调用super.write(ctx,msg,promise)将当前处理器的处理结果传递给下一个出站处理器

  4. 双向处理器

    • 双向处理器一般是同时实现了出站处理器和入站处理器的抽象类ChannelDuplexHandler的子类

  5. Netty提供的常用ChannelHandler

    • StringDecoder:将ByteBuf转成字符串

    • StringEncoder:将字符串转成ByteBuf

    • LoggingHandler:日志处理器,打印通道的状态打印客户端传输过来的ByteBuf中的内容

      • LoggingHandler的构造方法中指定LogLevel.DEBUG可以将日志级别调整为DEBUG

      • 无法识别的字节会打印成.,无法识别的字节一般是占用多个字节的类型数据或者特殊字符,比如int类型数据、换行符\n

      • 该日志处理器的底层使用的是logback,需要配置logback.xml日志配置文件

    • FixedLengthFrameDeacoder:定长解码器,从ByteBuf中拆分每条固定长度的消息

      • 拆分每条消息的处理器必须添加到LoggingHandler之前,否则ByteBuf中的数据还没有处理粘包半包现象就会被打印

    • LineBasedFrameDecoder:行解码器,从ByteBuf中拆分以换行符作为消息分隔符的每条消息

    • DelimiterBasedFrameDecoder:分隔符解码器,从ByteBuf中拆分以自定义符号作为消息分隔符的每条消息

    • LengthFieldBasedFrameDecoder:帧解码器,从ByteBuf中拆分基于长度字段确定消息长度的每条消息

    • CombinedChannelDuplexHandlerHTTP协议解码器,能将ByteBuf数据解码成消息头HttpRequest和消息体HttpContent

    • SimpleChannelInboundHandler<T>:开发者可以通过匿名实现入站处理器SimpleChannelInboundHandler<T>指定泛型类型重写ChannelRead0(ctx,msg)实现只对特定类型消息的入站处理,如果是其他类型的消息会跳过该处理器不执行

      • socketChannel.pipeline().addLast(new SimpleChannelInboundHandler<HttpRequest>(){})即处理器SimpleChannelInboundHandler只会对HttpRequest类型的msg生效,如果是HttpContent类型的消息该处理器就会跳过不执行

    • IdleStateHandler:空闲状态检测器,检测通道是否存在连接假死,判断连接可能存在问题的原理是读入或者写出数据之后的空闲时间太长

      • new IdleStateHandler(int readerIdleTimeSeconds,int writerIdleTimeSeconds,int allIdleTimeSeconds),入参readerIdleTimeSeconds设置读空闲超时时间,writerIdleTimeSeconds设置写空闲超时时间,allIdleTimeSeconds是设置读写的空闲时间都超过指定时间

       

ChannelHandlerContext
  1. 概念

    • ChannelHandlerContext实例提供对通道和流水线上下文的访问,流水线通过ChannelHandlerContext管理流水线上的处理器

  2. 常用方法

    • fireChannelXxx():将当前处理器Xxx事件对应处理方法的处理结果以及相应事件传递给下一个处理器

    • writeAndFlush(Object msg):将消息从当前处理器开始向前依次经过各个出站处理器处理并写出到网络通道

    • channel():获取当前流水线正在处理的通道,可以拿到通道在处理器中写出数据或者获取通道注册的事件循环提交普通或者定时任务

    • fireExceptionCaught(throwable):向流水线抛出指定异常

 

ChannelOption

  1. 概念

    • ChannelOption是枚举类,提供一系列枚举值作为Channel的配置参数如缓冲区大小、超时时间等控制Channel的行为;通过serverBootstrap或者bootstrapoption(ChannelOption枚举值,值)方法对通道参数进行配置

  2. 常用枚举值

    • ChannelOption.SO_RCVBUF:通道接收缓冲区滑动窗口的大小,SO_表示TCP套接字的参数

    • ChannelOption.SO_SNDBUF:通道发送缓冲区滑动窗口大小

      • 接收缓冲区和发送缓冲区滑动窗口大小早期一般由用户调整,现代操作系统TCP流量控制和拥塞控制机制能高效自适应调整该参数

    • ChannelOption.SO_TIMEOUT:设置阻塞式IOaccept或者read方法的最大阻塞时间,默认为无时限等待

    • ChannelOption.CONNECT_TIMEOUT_MILLIS:客户端建立连接超时时间,如果指定毫秒时间内无法建立连接会抛出ConnectTimeoutException,默认值为30s

    • ChannelOption.SO_BACKLOG:设置TCP连接队列的最大长度,即服务端允许同时建立连接的客户端数量

    • ChannelOption.RCVBUF_ALLOCATOR:调整Netty的接收缓冲区容量

      • 参数值为AdaptiveRecvByteBufAllocator对象,通过构造方法new AdaptiveRecvByteBufAllocator(16,16,16)来实例化,三个入参依次为接收缓冲区容量的最小值、初始值和最大值,指定的最小值比16小也只会最小应用16

    • ChannelOption.TCP_NODELAY:配置是否启用Nagle算法优化,默认为false表示启用,实际生产中希望降低延迟建议设置为true关闭Nagle算法

      • Nagle算法优化指一些数据包的数据太少可能会在发送缓冲区等待积攒消息,这可能给消息通信带来延迟

    • ChannelOption.ALLOCATORAllocatorByteBuf的分配器,用户调用channelHandlerContext.alloc()时拿到的就是分配器对象,分配器默认分配的ByteBuf是一个池化的直接内存PooledUnsafeDirectByteBuf,分配器的设置逻辑为

      • allocTypeJVM系统参数io.netty.allocator.type的值,如果用户启动JVM时没有配置该参数,Netty会通过PlatformDependent.isAndroid() ? "unpooled" : "pooled"判断当前操作系统平台是不是安卓,是则将allocType设置为unpooled,不是安卓设置为pooled,该参数只是设置默认分配的ByteBuf是否池化,不会影响使用堆内存还是直接内存

        • allocTypeunpooled默认分配器实例为UnpooledByteBufAllocator,是pooled则为PooledByteBufAllocator

        • JVM系统参数可以在运行时通过System.getProperty(String key)访问,用户通过-D-Dio.netty.allocator.type=unpooled进行配置

      • 构造方法new UnpooledByteBufAllocator(boolean preferDirect)方法的入参preferDirect表示是否首选直接内存,该参数由JVM系统参数io.netty.noPreferDirect决定,默认值是false表示首选直接内存;通过该构造方法实例化的UnpooledByteBufAllocator对象将被赋值给UnpooledByteBufAllocator.DEFAULT,即通过UnpooledByteBufAllocator.DEFAULT.alloc()获取的ByteBuf默认使用直接内存

        • 非安卓平台的非池化堆内存ByteBuf可以在启动JVM时通过配置系统参数-Dio.netty.allocator.type=unpooled -Dio.netty.noPreferDirect=true实现

      • 网络IO读写数据时Netty会强制使用直接内存,不会根据系统参数配置妥协使用堆内存以保证网络IO的效率

    • ChannelOption.RCVBUF_ALLOCATOR:该参数配置Netty接收最原始网络数据的ByteBuf容量

      • AbstractNioByteChannelread()方法中定义了接收最原始网络数据的逻辑,接收数据的ByteBuf通过byteBuf=allocHandle.allocate(allocator)创建,AllocHandleRecvByteBufAllocator的内部类,入参allocator通过config.getAllocator()获取的就是上述ChannelOption.ALLOCATOR创建的allocatorallocator控制ByteBuf是池化还是非池化

      • allocHandle.allocate(allocator)方法中调用allocator.ioBuffer(guess())强制创建基于直接内存的ByteBufguess()方法会根据Netty过去几次网络通信数据量动态决定ByteBuf的容量

      • allocHandlerecvBuffAllocHandle()方法中调用rcvBufAllocator.newHandle()获取,rcvBufAllocator通过DefaultChannelConfig的构造方法中调用setRecvByteBufAllocator(allocator)赋值得到,入参allocatorDefaultChannelConfig的构造方法中通过实例化AdaptiveRecvByteBufAllocator获得,即通过AdaptiveRecvByteBufAllocator的参数配置创建allocHandle实例从而决定Netty接收缓冲区ByteBuf的容量参数,AdaptiveRecvByteBufAllocator的构造器调用this(DEFAULT_MINIMUM,DEFAULT_INITIAL,DEFAULT_MAXIMUM)来进行实例化,其中入参DEFAULT_INITIAL表示接收缓冲区的默认初始容量为1024DEFAULT_MAXIMUM表示接收缓冲区允许的最大容量为65536DEFAULT_MINIMUM表示接收缓冲区允许的最小容量为64

      • 默认情况下接收最原始网络数据的接收缓冲区的内存类型调用的是allocatorioBuffer(guess())方法定死了是直接内存,是否池化或者非池化根据JVM的系统参数io.netty.allocator.type决定,初始容量1024,根据前几次的通信数据量在64-65536范围内自适应调整接收缓冲区的容量

     

ChannelInitializer

  1. 概念

    • ChannelInitializer是一个抽象类,通过serverBootstrap.childHandler(new ChannelInitializer(){})能自定义ChannelInitializer实现,通过重写匿名实现的initChannel方法能为已接入通道自定义组合多种通道处理器形成通道流水线,initChannel方法在通道被注册到事件循环后被调用用于将通道处理器添加到通道流水线

    • channelInitializerinitChannel(SocketChannel ch)方法中通过多次调用ch.pipeLine().addLast()将处理器依次添加到tail处理器前面,addLast方法的重载方法可以在添加处理器的同时指定用户自定义的事件循环组替换掉轮询通道的事件循环组来执行处理器的对应时间处理方法,防止处理时间较长的非IO事件降低其他通道的轮询效率

 

ServerBootstrap

  1. 概念

    • 服务端启动引导器,负责组装服务端Netty组件,做服务端配置,引导服务端启动

    • ServerBootstrap需要配置两个事件循环组,一个用于监听新客户端的接入事件,一个用于处理客户端的可读可写事件

  2. 常用方法

    • group(eventLoopGroup):设置一个EventLoopGroup同时作为BossEventLoopWorkerEventLoop的容器,这种方式会自动在事件循环组中分配处理accept事件的EventLoop和处理读写事件的EventLoop

    • group(bossGroup,workerGroup):设置两个EventLoopGroupbossGroup作为BossEventLoop的容器,只处理所有通道的accept事件;workerGroup作为workerEventLoop的容器,只处理所有通道上的读写事件;通过设置两个事件循环组管理的线程数量可以切换配置Netty的三种线程模式

    • channel(serverSocketChannel):指定服务端的ServerSocketChannel实现,这个ServerSocketChannel就是NIO中的ServerSocketChannelNettyServerSocketChannel实现包括

      • NioServerSocketChannel:这是对原生NIOServerSocketChannel做的进一步封装

      • OioServerSocketChannel:基于BIO同步阻塞式IOServerSocketChannel实现

      • EpollServerSocketChannel:基于LinuxEpollServerSocketChannel实现

      • KQueueServerSocketChannel:对MacServerSocketChannel实现

    • childHandler(channelInitializer):自定义抽象类ChannelInitializer的实现重写initChannel方法为通道流水线组合多种Handler来自定义对入站出站数据的处理过程

    • bind(port):指定服务端Socket通道监听客户端连接的端口开始监听端口上的新客户端接入事件,该方法返回ChannelFuture实例

    • option(channelOption,value):给服务端通道ServerSocketChannel做一些全局配置,比如配置通道接收发送缓冲区的滑动窗口大小,接收原始网络数据的ByteBuf的容量,建立连接的超时时间,是否启用Nagle算法,TCP连接队列的最大长度

    • childOption(channelOption,value):给服务端通道SocketChannel做一些全局配置

     

Bootstrap

  1. 概念

    • 客户端启动引导器,负责组装客户端Netty组件,做客户端配置,引导客户端启动;

  2. 常用方法

    • group(nioEventLoopGroup):设置EventLoopGroup作为EventLoop的容器,客户端只需要配置一个事件循环组收发网络消息即可

    • channel(socketChannel):指定客户端的SocketChannel实现

      • NioSocketChannelNettyNIOSocketChannel的封装

    • handler(channelInitializer):自定义抽象类ChannelInitializer的实现重写initChannel方法为通道流水线组合多种Handler来自定义对入站出站数据的处理过程

    • connect(inetSocketAddress):通过指定服务端的主机和IP尝试与服务端建立连接,返回ChannelFuture对象

      • 该方法是一个异步非阻塞方法,真正发起连接的线程不是创建启动器Bootstrap的线程,而是NioEventLoopGroup中的EventLoop,建立连接的时间以秒作为单位

    • bind(port):作为UDP协议的一端绑定本地一个端口

 

2.2-Reactor模式

  1. 概念

    • Reactor用于解决高并发场景下的IO问题,Reactor模式是一种使用非常广泛的高性能网络编程设计,NginxRedisNetty都是基于Reactor模式实现的,Javaprojectreactor框架JVM层面上实现了基于Reactor反应式模式也叫响应式编程的设计

    • Reactor模式的核心是ReactorHandler两个角色

      • Reactor负责监听查询IO事件,当监听到IO事件时将事件分发给相应处理器处理,这里的事件在Java中就是指NIO选择器监听的通道IO事件

      • Handler负责执行连接建立、通道数据读取、数据编解码、业务逻辑处理以及向通道写出数据等IO事件的处理

    • ReactorHandler有单线程模型、多线程模型和主从多线程模型三种实现方式

      • 所谓的单线程模型就是NIO的单线程模型,创建负责监听和分发事件的模块或者组件作为ReactorReactor通常运行在一个单独的线程中,由Reactor创建选择器通过操作系统的epoll函数监听通道上的I/O事件,服务端启动时绑定监听指定端口上的Accept客户端连接接入事件,一旦监听到端口上的客户端连接读写事件,Reactor线程会从选择器中获取事件根据事件类型调用一系列处理器对事件进行处理;单线程模型要求所有的Handler都必须在一个线程内同步执行,待所有处理器处理完后当前事件将处理结果响应给客户端后再去处理下一个事件;单线程模型不适合高并发、事件处理耗时很长的场景,而且无法发挥多核CPU的优势

      • 所谓的多线程Reactor模型就是NIO的多线程模型,将原来的一个Reactor组件拆分成两部分,第一个部分由一个Reactor组件只负责监听和分派Accept事件,第二个部分由若干Reactor组件作为SubReactor专门处理已连接客户端通道除Accept外的其他IO事件,每个SubReactor都运行在独立的一个线程中,由专门处理Accept事件的Reactor将非Accept外的IO事件分配给每个SubReactor所在工作线程进行处理,每个工作线程处理完当前分配的IO事件再处理下一个事件;多线程模型适合绝大多数场景,只有少数极端场景如百万并发场景下单个线程处理客户端接入事件可能存在性能问题

      • 主从多线程Reactor模型指和subReactor一样由一组reactor负责处理新客户端的接入请求,另一组subReactor维持多线程模型的状态专门处理已连接客户端通道除Accept外的其他IO事件

  2. 特点

    • Reactor模式是基于事件驱动的设计模式,可以通过单个或者少量线程就能管理所有客户端连接通道上的所有IO事件

    • Reactor模式可以通过简单增加处理事件的线程数来快速扩展系统的负载上限

    • Reactor模式解耦事件接收、事件分派和事件处理,让系统更容易维护和扩展

    • 多线程模型充分发挥多核CPU的优势,提高系统吞吐量,降低处理长耗时事件对其他通道就绪事件的影响从而提交系统的响应速度

 

2.3-Netty线程模型

  1. 概述

    • 大部分网络框架都是基于Reactor模式设计开发的,Reactor模式是基于事件驱动,采用多路复用将时间分发给相应处理器进行处理,适合海量IO的场景

    • Netty依靠NioEventLoopGroup线程池实现具体的线程模型,使用Netty创建服务端时一般会初始化bossGroupworkerGroup两个线程组,前者负责处理客户端连接建立请求,后者负责处理客户端除连接事件外的其他网络事件

  2. 单线程模型

    • 一个线程处理客户端的所有事件,包括acceptreaddecodeprocessencodesend等事件

    • 不适用于高并发、IO时间很长的场景

    • 对应使用Netty创建服务端创建一个单线程的NioEventLoopGroup,服务端启动引导类group方法的两个入参都为该NioEventLoopGroup

  3. 多线程模型

    • 一个只管理单个EventLoopNioEventLoopGroup专门处理客户端连接请求,一个管理默认数量EventLoopNioEventLoopGroup专门处理客户端的其它网络事件

    • 多线程模型适合大部分应用场景,只有少数百万并发场景下,单个线程处理客户端连接事件可能存在性能问题

    • 对应使用Netty创建服务端创建一个单线程的NioEventLoopGroup作为bossGroup,创建一个默认线程数的NioEventLoopGroup作为workGroup;服务端启动引导类的group方法两个入参一个为bossGroup,一个为workerGroup

  4. 主从多线程模型

    • 创建两个默认线程数量的NioEventLoopGroup,一个作为bossGroup,一个作为workerGroupbossGroup中选择一个线程绑定监听端口专门处理客户端接入请求,其他线程负责接入后的认证、登录等工作,连接建立后由workerGroup专门处理客户端的其它通信事件

    • 适合超高并发场景

       

2.4-服务端启动过程

  1. 服务端通信流程

    • 1️⃣:创建启动器类ServerBootstrap

    • 2️⃣:根据需要的Reactor线程模型添加EventLoopGroup组件作为bossEventLoopworkerEventLoop的容器,bossGroup监听新客户端的接入事件、workGroup处理已接入通道的可读可写事件;根据要使用的Netty线程模型控制两个事件循环组的线程数量

    • 3️⃣:根据场景选择ServerSocketChannel实现

    • 4️⃣:调用childHandler方法为通道添加通道初始化器并重写initChannel为通道流水线组合处理器

    • 5️⃣:服务端绑定网络通信端口开始监听客户端通道的接入事件

    • 6️⃣:服务端监听到客户端接入事件将客户端注册到事件循环中,并调用initChannel方法初始化通道流水线

    • 7️⃣:通道可读事件触发流水线的入站处理器对入站数据的处理,有需要可以在处理器中通过ctx.writeAndFlush或者channel.writeAndFlush向客户端写出数据

  2. 原生NIO的服务端启动流程

    • Selector.open()创建选择器

    • ServerSocketChannel.open()创建ServerSocketSocket

    • serverSocketSocket.register(selector,0,attachment)向选择器注册通道和相应附件

    • serverSocketChannel.bind(new InetSocketAddress(8080))为通道绑定服务端通信监听端口

    • selectionKey.interestOps(SelectionKey,OP_ACCEPT)通过通道对应事件监听对象设置监听serverSocketChannel上的接入事件

  3. Netty的启动流程[Netty中初始化选择器和NIO线程在new NioEventLoopGroup中完成,其余创建serverSocketChannel,初始化通道流水线,将serverSocketChannel注册到选择器中,绑定端口以及向选择器中注册监听接入事件都在serverBootstrap.bind()方法中进行]

    • 创建NioEventLoopGroup的同时初始化封装了NIO线程和选择器的NioEventLoop管理客户端接入事件

    • 创建NettyNioServerSocketChannel,初始化nioServerSocketChannel的流水线,将初始化处理器添加到流水线中等待被调用

    • 启动NIO线程负责后续步骤的执行,ServerSocketChannel.open()创建NIO的原生serverSocketChannel,并通过serverSocketChannel.configureBlocking(false)将通道设置为非阻塞模式

    • serverSocketChannel.register(selector,0,attachment)serverSocketChannel注册到选择器中,同时将NettyNioServerSocketChannel作为附件和原生serversocketChannel绑定

    • serverSocketChannel.bind(new InetSocketAddress(8080))为通道绑定服务端通信端口

    • selectionKey.interestOps(SelectionKey,OP_ACCEPT)设置通道的事件监听类型为接入事件

  4. serverBootstrap.open()方法详解

    • serverBootstrap.open()方法主要被拆解为三个部分,initAndRegister中的initregister部分,doBind0部分;init部分由主线程执行,初始化NettyNioServerSocketChannel、原生NIOServerSocketChannel和通道对应的流水线,将初始化处理器添加到流水线中;register部分启动NIO线程,由NIO线程将ServerSocketChannel注册到选择器并以附件的形式将NioServerSocketChannel绑定到ServerSocketChannel上,由NIO线程执行流水线上的初始化处理器,initAndRegister方法的返回值是一个ChannelFuture,用于协调NIO异步线程执行Register部分的结果;doBind0部分负责为serverSocketChannel绑定监听端口,并且触发NioServerSocketChannel上的active事件,当即将执行doBind0方法时会判断NIO线程是否已经执行完register部分,如果已经执行完会由主线程执行doBind0方法,但是大部分情况下将serverSocketChannel注册到选择器都是一个耗时操作,进行判断时一般都是NIO线程仍然没有执行结束,此时doBind0方法的执行会被封装到ChannelFuture的回调监听器中,等NIO线程执行完register部分再由NIO线程在监听回调中执行doBind0方法;

    • init部分:constructor.newInstance()通过反射调用NioServerSocketChannel的无参构造器实例化NioServerSocketChannel,并在执行<init>方法期间和原生NIOserverSocketChannel.open()调用SelectorProvider.provider().openServerSocketChannel()一样的方式创建原生NIOserverSocketChannel,初始化NioServerSocketChannel的流水线ChannelPipeline并向其中添加通道初始化处理器ChannelInitializer等待调用其中的initChannel方法,该处理器中的initChannel方法只会被执行一次,

    • register部分:将register0方法封装成任务对象提交给eventLoopNIO线程执行,这是eventLoop首次执行任务,也是实例化EventLoop线程的时机,调用serverSocketChannel.register()EventLoop中的选择器注册通道,并将Netty实例化的NioServerSocketChannel作为附件和通道进行绑定,通过channelPipeLine.invokeHandlerAddedIfNeeded()NIO线程执行通道流水线中的初始化处理器ChannelInitializerinitChannel方法,该方法的作用是向通道流水线中加入处理器ServerBootstrapAcceptor,该处理器用于在Accept事件发生后建立连接;然后手动向ChanenlFuture实例中设置成功的异步执行结果,触发主线程或者回调监听器中的doBind0方法执行doBind0逻辑

    • doBind0部分:doBind0中的所有代码也被封装成任务对象被提交给NIO线程执行,通过serverSocketChannel.bind()方法指定监听端口和全连接队列容量来为原生通道绑定监听端口;随即通过channelPipeLine.fireChannelActive()手动触发NioServerSocketChannel通道上的Active事件触发执行head处理器上的channelActive方法设置选择器监听serverSocketChannel上的接入事件

  5. NIO中服务端处理Accept事件流程

    • selector.select阻塞NIO线程直到有事件发生

    • 轮询遍历selectedKeys中的每个就绪事件判断事件类型是否为接入事件

    • 如果是接入事件创建客户端通道对应的SocketChannel,将通道设置为非阻塞模式

    • SocketChannel注册到选择器中返回selectionKey

    • 通过selectionKey设置选择器关注通道上的可读事件

  6. Netty中服务端处理Accept事件流程

    • Netty首次调用eventLoop.execute()方法时启动单线程执行器中的线程,给单线程执行器提交一个死循环任务,死循环不断轮询检查是否有IO事件或者用户提交的普通任务,没有NIO线程则阻塞在带超时时间的selector.select方法上,如果NIO线程被唤醒会优先处理当前所有就绪的IO事件,然后根据IO事件处理时间在预设时间内处理用户提交的任务。在IO事件处理期间,通过基于数组实现的selectedKeys获取就绪事件判断事件类型

    • 如果是接入事件类型,通过serverSocketChannel.accept()创建NIO原生的SocketChannel,随即以原生NIOSocketChannel作为构造器入参创建Netty相应的NioSocketChannel,然后将nioSocketChannel存入ArrayList作为消息通过调用pipeline.fireChannelRead(nioSocketChannel)触发nioServerSocketChannel流水线上的读事件来处理nioSocketChannel,该流水线在serverBootstrap.bind方法中的register部分添加了一个ServerBootstrapAcceptor处理器,读事件会自动触发执行该处理器的ChannelRead方法处理消息nioSocketChannelChannelRead方法会调用childGroup.register()serverBootStrap.group方法指定的另一个eventLoopGroup中获取一个eventLoop,把socketChannel注册到该eventLoop的选择器封装成一个任务提交给对应eventLoopNIO线程执行,注册逻辑和serverSocketChannel是类似的,也将NioSocketChannel作为通道附件和原生NIOSocketChannel绑定,然后执行用户在serverBootStrap.childHandler()中自定义的ChannelInitializerinitChannel方法为nioSocketChannel的流水线添加用户自定义的各个流水线处理器,然后调用pipeline.fireChannelActive()触发nioSocketChannel对应流水线的Active事件触发head处理器的channelActive方法,通过selectionKey.interestOps()关注通道socketChannel上的可读事件

  7. NIO中服务端处理Read事件流程

    • selector.select阻塞NIO线程直到有事件发生

    • 轮询遍历selectedKeys中的每个就绪事件判断事件类型是否为可读事件

    • 如果是可读事件则循环调用socketChannel.read(byteBuf)直到读取到的字节数为0

  8. Netty中服务端处理Read事件流程

    • NIO线程对接入或者可读事件的处理代码都是复用的NioEventLoop.run方法中对IO事件的处理代码processSelectedKey,而且接入事件和可读事件都执行的同一行unsafe.read()方法,只是负责处理NioServerSocketChannel的接入事件和NioSocketChannel的可读事件对应的unsafe实例的类型不同,对应的read方法的具体实现也不同,因此NioSocketChannel的可读事件的阻塞逻辑、IO任务和普通任务的处理逻辑以及事件类型的判断都和服务端NioServerSocketChannel处理Accept流程是相同的,只是unsafe.read()方法中对事件的处理逻辑不同

    • 可读事件的read处理方法首先通过recvBufAllocHandle().allocate(config.getAllocator())创建池化可动态扩缩容基于直接内存的byteBuf,使用该byteBuf读取socketChannel中的数据,然后手动调用channelPipeline.fireChannelRead(byteBuf)触发NioSocketChannel对应流水线上的可读事件依次执行各个处理器的channelRead方法对byteBuf中的数据进行用户的自定义处理

 

2.5-客户端启动过程

  1. 客户端通信流程

    • 1️⃣:创建启动器类Bootstrap

    • 2️⃣:添加EventLoopGroup组件作为事件循环的容器

    • 3️⃣:根据场景选择SocketChannel实现

    • 4️⃣:调用handler方法为通道创建一个匿名通道初始化器并重写initChannel为通道流水线组合处理器

    • 5️⃣:使用connect方法连接服务端,服务端和客户端成功建立连接后分别调用对应的initChannel方法对通道进行初始化,返回ChannelFuture实例,可以调用channelFuture.addListener()注册一个监听器待连接成功建立时由异步线程执行回调方法

    • 6️⃣:建立连接期间客户端使用channelFuture.sync方法阻塞当前线程等待连接成功建立

    • 7️⃣:与服务端建立连接后客户端使用channelFuture.channel方法获取连接对象channel,获取到channel对象后通过channel.writeAndFlush方法向服务端发送数据,使用channel发送数据时会自动经过流水线的所有出站处理器

    • 8️⃣::通道可读事件触发流水线的入站处理器对入站数据的处理,有需要可以在处理器中通过ctx.writeAndFlush或者channel.writeAndFlush向客户端写出数据

    [回调示例]

     

2.6-空轮询BUG

  1. NIO的空轮询Bug

    • JDKlinux平台下不论是否带超时时间的selector.select()都有极小的概率发生空轮询Bug,一旦发生该Bug,没有IO事件发生的情况下select方法无法阻塞NIO线程导致NIO线程一直在死循环上空转,如果多个NIO线程都发生空轮询Bug,就会导致CPU资源被耗尽

  2. Netty对空轮询Bug的解决方法

    • NettyeventLoop.select()方法中使用一个selectCnt初始值为0的局部变量来统计在一次超时时间为1005mseventLoop.select()期间死循环调用selector.select()方法的次数,一旦发生空轮询Bug,该计数会因为selector.select方法无法阻塞NIO线程且无法满足达到预设超时时间、有用户提交的普通任务或者有就绪的IO事件无法跳出死循环导致selectCnt的值暴增

    • Netty为用户提供了一个系统变量io.netty.selectorAutoRebuildThreshold来指定selectCnt阈值,默认值为512,当selectCnt超过该阈值Netty就认为发生了空轮询BugNetty会重新创建一个selector替换旧的selector并将旧selector上的状态信息拷贝到新selector上,将selecCnt重置为0并让NIO线程继续死循环重新阻塞在selector.select()方法上

    • 此外Netty还提供对Selector的全新实现来作为解决空轮询Bug的其他办法

     

3-通信方案

3.1-黏包半包

问题分析

  1. 概念

    • 只要使用TCP/IP协议进行网络通信就会出现黏包半包现象,UDP没有黏包半包现象;网络编程中的黏包和半包现象指多条消息由于某些原因在数据发送接收时被重新组合,多条消息被组合在一起就是黏包,一条消息被截断成两个或者多个部分就是半包,示例如下;服务端接收到消息后需要处理黏包半包现象将消息还原成用户实际发送的多条消息

      [用户实际发送的消息]

      [服务端接收到的消息]

    • 使用Netty时,NioSocketChannelSocketChannelchannel.writeAndFlushchannelHandlerContext.writeAndFlush以及使用NIOSocketChannel都可能随机发生黏包半包现象;一般当接收缓冲区相较于消息本身较小时才会比较容易发生半包现象

    • 半包现象还会导致消息边界问题:socketChannel.write(Charset.defaultCharset().encode("中国"))写入数据时使用UTF-8来对字符进行编码,一个汉字会被编码成三个字节,一个英文或者数字编码成一个字节;如果不能完整读取一条消息就将全部字节数据转换成字符,最后一个字符会因为字节不完整转换为乱码,第二次读取到缓冲区的数据因为第一个字符的字节就不对导致后续消息全部乱码

  2. 原因分析:本质原因是TCP协议是流式协议,消息之间无边界,解决黏包半包问题的本质是用户必须自己去设置找出消息的边界

    • TCP的滑动窗口引起黏包半包现象

      • TCP协议网络数据传输以段segment为单位,一条数据可能被分成多个段来发送,每个段都需要接收方进行一次ack应答确认,如果接收方没有应答还会再次重试发送保证消息的可靠抵达,等到一个段的发送和应答抵达完成后才发送下一个段,这种方式会降低系统的吞吐量,包的往返时间越长性能越差

      • 为了解决这个问题,TCP中引入了滑动窗口,TCP的接收缓冲区和发送缓冲区就是TCP滑动窗口的实现,就是所谓的Socket缓冲区;滑动窗口大小决定了无需等待应答就可以继续发送的段数量的最大值,滑动窗口内的段数据才允许被发送,在接收方应答未到达前窗口停止滑动,只有滑动窗口中的数据的接收方应答回来了窗口才能向后滑动包含等待发送的段并发送段数据,而且接收方也相应地维护着一个滑动窗口,超出滑动窗口的待接收数据只有等滑动窗口内的数据接收完毕后才能执行接收操作

        • 滑动窗口起到了缓冲区的作用,避免有多少数据发送多少数据,也避免发送完一条数据必须等到接收方相应以后再发送下一条数据导致发送太慢,同时也起到了流量控制作用

      • 因为TCP数据可能被分割成多个段,但是滑动窗口的大小有限;如果接收方网络数据量太大,数据接收到一半滑动窗口缓冲区不够用,接收方此时必须去缓冲区读取数据,就会发生半包现象;如果接收方的滑动窗口比较空闲,客户端发送了多条数据,接收方没有及时去滑动窗口读取数据,接收方的滑动窗口把多条数据都缓冲在滑动窗口中,此时接收方程序去缓冲区读取数据,就会发生黏包现象

        • 表现在编码层面socket缓冲区满了会停止socketChannel.write(byteBuffer)方法的执行,socket缓冲区的大小不固定,范围在2-8M之间浮动,只有socket缓冲区清空了才能继续发送byteBuffer中没有写完的数据;byteBuf比较大消息比较短时总会发生黏包现象

    • TCPNagle算法引起的粘包现象

      • 概念:TCP网络数据传输在传输层和IP层为数据添加报头,IP层的报头为20个字节,TCP层的报头也为20个字节,即使只传输一个字节数据,最后添加了报头的数据长度至少为41个字节,在某些情况下报头的长度远远大于数据内容的长度,这样非常不划算;因此出现了Nagle算法对这种情况进行优化,Nagle算法的原理是攒够了一定量的数据再发送,避免因为数据报头远远多于数据内容导致网络传输效率太低;而且多条消息堆积以后再传输就像快递配送一样效率更高

        • 攒数据就带来了粘包问题

    • NettyByteBuf容量太大或者太小导致粘包半包现象

      • 如果网络传输的数据相较于NettyByteBuf容量都是很短,很容易发生粘包现象;但是ByteBuf容量小于消息长度就会发生半包现象

    • 链路层的MSS限制导致半包现象

      • 链路层是比TCP更底层的协议,链路层有一个MSS限制,不同的网卡对数据包的大小有限制,笔记本的网卡一般限制数据包大小包括TCPIP报头为1500字节,只要发送的数据量大于1460字节,数据就会被切分发送造成半包现象

        • 注意回环地址对MSS几乎没有限制,允许的数据包大小为65535字节,如果向局域网中的另外一台电脑数据网络传输就会应用MSS限制

        • 网卡对数据包的大小限制值叫数据链路层最大载荷长度MTUMSS是传输层的报文载荷长度即不包含报头的40字节,MTU包含了MSSMTU减去IP头和TCP头就是MSS,当数据包大小超过了MTU时,路由器可能会把数据包分成更小的部分发送,如果不能分片,路由器会丢掉这个数据包,发送ICMP报文,告诉发送方数据包太大

 

解决方案

短连接
  1. 概念:

    • 客户端建立连接发送一条消息后立即断开连接,客户端单次链接只会发送一条消息,NIO客户端不管正常断开还是暴力宕机断开连接都能被服务端监听到,服务端能根据客户端连接的断开时机确定单条消息的边界,本质上是用网络连接状态标记消息的开始和结束

  2. 局限

    • 短链接方案无法解决半包问题,TCP的接收缓冲区滑动窗口容量太小、ByteBuf的容量太小或者MSS限制仍然会产生半包现象

    • 每发一条数据都要建立一次连接,性能和效率比较低

 

定长解码器
  1. 概念

    • FixedLengthFrameDeacoder是专门做固定长度消息解码的处理器,Frame表示帧指一条完整的消息;固定长度指网络传输的每条消息的长度都相同,定长解码器会按照固定字节数从ByteBuf取出每条消息,出现半包现象等待后续数据写入ByteBuf再重复上述过程取出每条消息;消息内容不可能保证每次长度都相同,因此定长解码器需要长度最长的消息作为消息的固定长度防止有消息长度大于预设消息长度,没有数据的位置需要使用空格补齐,消息固有长度通过定长解码器的单参构造指定;

    • 定长解码器就是Netty提供的一个处理器,作为通道流水线处理通信消息的一道工序,类似定长解码器这种分割单条消息的处理器需要添加到日志处理器如LoggingHandler前面,否则没有处理黏包半包现象的数据会直接被打印,半包现象引起的消息边界问题会导致后续该通道下一个ByteBuf数据全部乱码

    • 定长解码器这种分割单条消息的处理器都是拆分出单条消息后每条消息都单独传递给后续流水线的每道工序处理,如果ByteBuf中的数据无法拼接出单条消息会阻塞当前消息的处理直到向ByteBuf写入后续数据;

    • 像定长解码器、行解码器和帧解码器这种处理半包现象会阻塞等待后续网络消息的处理器必须设置为每个通道一个,不要出现多个eventLoop共用一个解码器的情况,因为这些解码器不是线程安全的,出现半包情况这些解码器会记录当前消息的状态,如果此时其他通道的消息拼接到帧解码器上一条还未处理的半包消息上会导致两条消息完全错乱,Netty为所有支持多线程共享的处理器都添加了@Sharable注解,Netty充分考虑了各种场景下这些处理器的线程安全性,可以被不同流水线的多个事件循环共享

  2. 优势

    • 后续的各种解码器方案,客户端都可以一次性发送任意条消息,将一条消息分多次发送也行,连接开销比短链接低的多

  3. 局限

    • 消息长度方差稍微一大,定长解码器解决方案就会因为大量的空白数据补齐导致严重的内存和网络带宽浪费

 

行解码器
  1. 概念

    • 行解码器的思想是使用特定分隔符确定单条消息的边界,Netty提供LineBasedFrameDecoderDelimiterBasedFrameDecoder两种行解码器实现

    • LineBasedFrameDecoder

      • 以换行符作为消息的分隔符,Linux平台换行符为\nwindows平台换行符为\r\nLineBasedFrameDecoder实例化时必须通过单参构造指定消息的最大长度,如果行解码器发现消息超过最大长度还没有发现换行符会抛出TooLongFrameException消息太长异常,避免客户端消息格式错误导致服务端行解码器一直阻塞等待拼接客户端消息

    • DelimiterBasedFrameDecoder

      • 开发者自定义特殊字符作为消息的分隔符,DelimiterBasedFrameDecoder实例化时也必须通过双参构造指定ByteBuf类型的分隔符和消息的最大长度

  2. 局限:

    • 行解码器因为多一次遍历查找分隔符确定消息长度创建对应ByteBuf的过程效率比较低

 

LTC解码器
  1. 概念

    • LengthFieldBasedFrameDecoder也叫帧解码器,是基于长度字段的消息解码器;帧解码器要求网络消息具备魔数、消息长度、消息附加内容和消息本身四部分,其中魔数和消息附加内容长度均可以为0;这种结构设计的目的是在读取消息内容前就能通过消息长度字段确定消息长度避免通过遍历整条消息来确定消息长度;

    • LengthFieldBasedFrameDecoder对象通过五个参数的构造方法来创建,maxFrameLength指定消息最大长度,消息超出最大长度抛出TooLongFrameException异常;lengthFieldOffset指定消息长度字段偏移量,指定长度字段从消息的那一个索引开始;lengthFieldLength指定长度字段的占用字节数;lengthAdjustment指定附加消息的长度;initialBytesToStrip解码出每条消息后要去掉消息头部的多少个字节作为传递给后续流水线的消息,一般用于保留消息内容本身

    • 消息构建时可以通过byteBuf.writeInt(length)等方式写入指定字节的长度字段,通过byteBuf.writeBytes(bytes)写入消息内容本身

    • HTTP协议就采用类似的LTV消息格式[L表示长度、T表示消息类型、V表示消息实际内容]

      • HTTP2.0协议就是LTV格式,HTTP1.1TLV格式[HTTP请求头先传输消息类型即content-type,再传输消息长度content-length,请求体就是消息本身]

  2. 局限

    • 如果有一条消息格式错误,后续的消息会因为读取错误的长度字段导致所有消息读取错误直接抛出异常

  3. 其他衍生问题

    • 行解码器和帧解码器都不可避免地会遇到ByteBuf最后一条消息产生半包现象的问题

      • 此时底层可以调用NIObyteBuffer.compact()方法将缓冲区中未读的数据移到缓冲区的开头,将当前位置指针position移到缓冲区未读出数据最后一位的下一位,缓冲区下次再从通道读数据时只会覆盖已读出数据来解决半包问题

    • 行解码器和帧解码器还可能会遇到单条消息长度比缓冲区还长导致缓冲区始终写不完一条完整消息

      • NIO的解决办法是将缓冲区绑定为通道附件,当调用byteBuffer.compact()方法后如果缓冲区的当前位置指针和最大容量相等说明缓冲区覆盖已写出数据后仍然无法写入数据,此时创建一个原容量两倍的新缓冲区,将原数据拷贝到新缓冲区并重新绑定为通道附件再使用该缓冲区从通道写入当前消息

      • NIO还可以使用多个数组组成缓冲区,空间不够就将剩余数据写入新数组,相应的消息解析也更复杂

      • Netty对两种解决方案都进行了实现,ByteBuf被设计为可以根据历史消息长度自动扩缩容,CompositeByteBuffer也实现了通过零拷贝的方式实现多个ByteBuf的组合

 

3.2-通信协议

Redis通信协议

  1. 协议简介

    • Redis服务端与客户端之间通过Redis自定义的协议进行网络通信,Redis的远程命令调用对应的通信消息格式为[元素个数][元素1字节数][元素1实际内容][元素2字节数][元素2实际内容][元素3字节数][元素3实际内容]...,任意两个部分之间使用[回车键换行键]作为分隔符,消息的结尾再加一个[回车键换行键]

      • *#:元素个数通过*#表示,其中#是命令的元素个数即命令的单词个数

      • $#:当前元素的字节个数,#就是当前单词的字母或者数字个数,因为一个字母或者数字都占一个字节

      • 元素实际内容:当前元素字符对应的字节数据

      • 回车键换行键:元素个数、各个元素字节数和元素实际内容之间通过回车键换行键两个字节两两分隔,回车键对应字节码为十进制的13,换行键对应10

    • 比如客户端发送命令set name zhangsan要求Redis服务端执行,对应的通信消息为*3回车符换行符$3回车符换行符set回车符换行符$4回车符换行符name回车符换行符$8回车符换行符zhangsan回车符换行符,除去回车符换行符为*3$3set$4name$8zhangsan

      • 上述消息一个字节不改发给Redis所在主机的对应端口如6379端口就能在Redis服务端成功执行命令set name zhangsan,执行成功后服务端响应+OK回车符换行符给客户端

  2. 协议的意义

    • 客户端和服务端按照规定的通信协议发送网络消息,服务端和客户端就能解析对方的意图并执行对方希望自己执行的操作并由对方通知执行结果

  3. Netty提供了常用的如RedisHTTPHTTPSWebSocket等等常用的网络通信协议,用户只需要在流水线中配置相应的协议编解码器,向流水线写入对应类型的消息实例如httpRequestNetty就能自动将消息实例编码成HTTP报文对应的字节数据发送给网络;对方也能通过相应协议编解码器将报文数据解析成消息实例按消息要求执行相应业务

 

HttpServerCodec

  1. 概念

    • HttpServerCodecNetty提供的HTTP协议服务端编解码器,HttpServerCodec继承自CombinedChannelDuplexHandlerCombinedChannelDuplexHandler组合了其泛型中的请求解码器HttpRequestDecoderHttpResponseEncoder响应编码器,Netty中类似以Codec结尾的类就说明同时包含解码和编码功能

    • HttpServerCodec是双向处理器

      • 作为入站处理器时请求解码器HttpRequestDecoder生效将通信请求数据解码成HttpRequestHttpContent两种消息类型,分别表示请求头、请求行和请求体;在服务端编解码器后面的流水线需要区分消息类型选择性地进行处理;例如对于GET请求只需要关心请求头无需关注请求头,可以自定义只处理HttpRequest类型消息的入站处理器new SimpleChannelInboundHandler<HttpRequest>(){}重写channelRead0(ctx,msg)方法,遇到HttpContent类型消息该处理器自动跳过不执行

      • 作为出站处理器时响应编码器HttpResponseEncoder生效将HttpResponse的默认实现DefaultFullHttpResponse响应对象编码成符合HTTP协议格式的字节数据响应给客户端,DefaultFullHttpResponse实例通过构造方法new DefaultFullHttpResponse(HttpVersion version,HttpResponseStatus status,ByteBuf content,boolean validateHeaders)传参HTTP协议版本、响应状态码、响应体内容对应ByteBuf创建,通过调用ctx.writeAndFlush(defaultFullHttpResponse)就能将响应写回给客户端

        • 此外在写回defaultFullHttpResponse还需要做一些必要配置,比如通过defaultFullHttpResponse.headers().setInt(HttpHeaderNames.CONTENT_LENGTH,响应体的字节长度)设置响应头中的content-length属性即设置响应体的内容长度,没有这项配置浏览器不知道何时接收完所有响应数据浏览器的加载图标会一直转圈等待接收更多的响应数据

  2. 使用Netty启动一个服务端在通道流水线添加一个Http服务端编解码器HttpServerCodec就能直接实现了一个能被浏览器访问的极简服务器,使用浏览器给相应主机和端口发送请求给服务端就能直接获取服务端的响应并渲染响应体

    • 浏览器在发起请求获取服务端响应后还会发起一个GET类型的/favicon.ico请求获取站点的图标

 

自定义协议

  1. 协议结构

    • 魔数:用消息的头几个字节表明协议类型,比如Java字节码文件头四个字节为cafebabe

    • 版本号:有版本号能支持协议升级

    • 序列化算法:对象和二进制流互转转换的方法

      • 常用序列化算法有jsonJDK原生序列化算法、谷歌的protobufhessian

    • 正文长度:界定消息的边界,方便解决TCP网络通信中的黏包半包问题

    • 消息正文:消息内容本身,消息对象序列化为二进制流的实际内容

    • 消息类型:用来标识消息属于哪一项具体业务

    • 请求序号:消息的唯一标识,请求序号能给全双工通信提供异步能力,没有请求序号一条消息收发完成后才能发送下一条

  2. 使用Netty自定义协议的流程

    • 定义一个可以序列化的消息抽象父类,提供所有消息共有的域信息如协议版本号、序列化算法、正文长度、消息类型,请求序号等,消息正文;根据具体消息类型定义各种消息子实现继承自该抽象父类

    • 通过继承Netty提供的ByteToMessageCodec<T>消息编解码器自定义一个消息编解码器

      • 通过实现decode方法自定义ByteBuf按照自定义通信协议字节数据解析为消息对象,专业的通信协议会让消息正文前的固定字节数为2次幂,使用特定字符用于填充不足部分,消息正文前的部分消息私有的数据已经被封装在消息正文中用于记录消息状态,在消息正文前额外放一份是因为这部分数据解析方便,在不处理消息正文的情况下就能确定消息结构、消息正文的序列化算法、长度等信息;从ByteBuf中读取消息正文部分,根据序列化算法类型调用指定序列化算法将字节数组序列化成消息对象,将消息对象添加到decode方法的入参list集合中

      • 实现其中的encode方法自定义消息正文对象转换为最终ByteBuf的逻辑,通过消息对象获取消息私有的信息确定协议版本、采用的序列化算法、消息正文长度写入encode方法的入参ByteBuf;按照通信协议组织消息正文前面的数据,根据序列化算法类型调用指定序列化算法将消息对象序列化为字节数组作为消息正文追加到ByteBuf

      • ByteToMessageCodec<T>在设计的时候被认为用户自定义消息编解码器需要处理黏包半包现象涉及到阻塞保存消息状态,即使我们可以在消息编解码器前先添加一个解码器如帧解码器拆分出单条消息避免在消息解码器中处理黏包半包现象,ByteToMessageCodec在构造方法中会仍会调用ensureNotSharable()方法确保子实现类上不能添加@Sharable注解,如果添加了@Sharable注解子类调用构造方法实例化时就会报错

      • 如果一定要在自定义消息编解码器上添加@Sharable注解,可以让编解码器继承MessageToMessageCodec<INBOUND_IN, OUTBOUND_IN>Netty定义MessageToMessageCodec<INBOUND_IN, OUTBOUND_IN>是为了让用户明确这是消息到消息的转换,使用自定义编解码器处理消息前就已经解决了消息的粘包半包问题不会涉及到编解码器保持消息状态的情况,其中INBOUND_IN是传入当前处理器的消息类型,OUTBOUND_IN是处理后要传递给后续流水线的消息,该抽象父类的子实现类对应的也需要实现encode方法和decode方法,通过继承MessageToMessageCodec<INBOUND_IN, OUTBOUND_IN>实现的自定义编解码器上可以添加@Sharable注解,这只是一种技法

    • 将消息编解码器添加到通道流水线即可自动按照自定义通信协议在出站时将消息类型实例转换成ByteBuf传递给下一个出站处理器,入站时将ByteBuf转换成消息类型实例传递给下一个入站处理器

  3. 优势

    • 通用协议一般为了跨平台设计的比较复杂,对于简单场景通过自定义协议能让消息更紧凑,节省带宽提高传输解析效率

 

序列化算法

  1. 概念:将对象序列化为字节数组用于网络传输和数据持久化;反序列化指将从网络、磁盘中读取的字节数组还原成对象

    • Java中自带的通过实现Serializable接口通过对象输入输出流实现序列化,性能和安全性不好一般不会被使用;比较知名的有专门针对Java语言的KryoFST,跨语言的ProtostuffProtoBufThriftAvroMsgPack等序列化方式性能都非常好,一般使用ProtostuffHessian2json序列方式比较多

  2. 开发者可以自定义一个序列化工具类,提供多种序列化算法,用户通过工具类获取基于不同序列化算法的序列化器提供对应序列化和反序列化方法

    • 定义一个接口提供将指定类型对象转换为字节数组的序列化方法serialize和将字节数组转换为指定类型对象的反序列方法deserialize

    • 创建一个实现该接口的枚举类,枚举类中声明每个枚举值都要重写接口提供的序列化方法和反序列方法,可以使用序列化算法的名字作为枚举值,重写方法时调用对应API将字节数组转换成对象或者将对象转换成字节数组并返回

    • 可以通过枚举值实例调用相应的序列化和反序列化方法返回对象实例或者字节数组;

      • 枚举值有一个ordinal()方法,可以根据枚举值的次序依次返回0,1,2...,可以通过用户设置的枚举值的ordinal()方法获取枚举值对应次序数字作为序列化算法标识写入通信消息;

      • 枚举值还提供一个values()方法,以数组的形式返回所有枚举值实例,可以调用该方法获取到枚举值数组通过解析网络消息读出序列化算法标识从数组对应索引处取出相应枚举值实例,通过枚举值实例调用序列化方法将字节数组序列化为对象实例;

      • 枚举值还提供一个ValueOf(String name)方法,可以通过传参枚举值名称获取对应枚举实例,可以通过读取用户在配置文件配置的序列化方式获取相应枚举值,再通过枚举值调用相应的序列化和反序列化方法

 

3.3-聊天室设计

  1. 业务设计

    • 用户使用用户名、密码和短信验证码登录,登录成功保存将用户信息保存到session

    • 绑定用户ID和客户端通道,服务端能根据用户ID获取客户端通道、根据客户端通道获取用户ID进而获取用户信息

    • 聊天业务:创建聊天组、添加用户到聊天组、将用户移出聊天组、删除聊天组、根据聊天组名称获取聊天组所有成员、根据聊天组名获取所有在线成员的客户端通道

    • 流水线设计:服务端和客户端流水线上依次添加帧处理器处理可能存在的粘包半包消息,记录日志的处理器LoggingHandler,消息编解码器将ByteBuf按照自定义协议转换成Message对象,处理用户登录状态的处理器,

      • 这里用日志处理器作为客户端的消息处理器,即直接打印客户端接收到的消息,没有设置客户端对接收消息的额外处理

      • 服务端根据业务类型额外添加单聊消息处理器、群聊创建消息处理器

  2. 客户端发送不同类型消息实现

    • 创建聊天消息接口封装消息发送者、消息正文;创建一系列子实现如封装用户名、密码的登录请求消息;封装聊天组ID的群发信息、获取聊天组成员、加入聊天组、退出聊天组的消息类;封装聊天组名称和初始用户列表的创建聊天组消息;

    • 客户端消息发送将对应消息类型的出站数据直接写出到流水线经过自定义消息编解码器根据自定义协议处理成ByteBuf,序列化使用的是JDK自带的序列化,能够自动判断消息类型

      • 这种序列化方式在服务端某些simpleChannelInboundHandler处理器只处理特定类型消息的时候很好用,消息编解码器将ByteBuf解码成父接口类型的消息,simpleChannelInboundHandler能自动判断消息的实际类型从而进行消息过滤

  3. 登录业务实现

    • 给客户端流水线添加一个自定义处理器channelInboundHandlerAdapter重写其channelActive方法在连接建立成功时客户端的事件循环使用异步线程监听用户输入扫描器输入用户名和密码,由异步线程向服务端写出登录类型消息,异步线程调用countDownLatch.await阻塞等待服务端响应

      • 使用新线程的原因是避免监听用户输入阻塞客户端的事件循环线程与服务端的通信

    • 同一个channelInboundHandlerAdapter重写其channelRead方法处理服务端的登录响应,服务端的响应数据作为入站数据由事件循环线程处理,解析服务端响应数据,判断用户登录状态,创建一个线程共享的原子布尔类型变量用于多线程共享用户登录状态,如果用户登录成功将登录标识改为true,使用CountDownLatch.countDown唤醒阻塞等待服务端处理登录数据的异步线程,异步线程通过登录标识判断登录状态;登录失败调用ctx.channel().close()然后结束异步线程的执行;登录成功异步线程进入死循环,为用户提供服务菜单阻塞监听用户的输入扫描器,封装用户输入信息向服务端写出各种类型的消息

      • 客户端连接关闭后在主线程调用的channel.closeFuture().sync()会结束阻塞,主线程调用finally块中的eventLoopGroup.shutdownGracefully()结束所有事件循环组的执行,主线程结束执行,所有用户线程结束执行JVM进程结束运行

    • 服务端流水线添加一个simpleChannelInboundHandler的子实现重写channelRead0方法用于只处理客户端通道上的登录消息,登录成功或者失败都封装登录响应类型的消息向客户端写出,登录成功将用户ID和相应客户端通道保存到本地缓存

  4. 单聊业务实现

    • 客户端监听输入扫描器,监听到用户发送单聊消息直接向流水线写出封装单聊信息的对象实例经过编解码器写出到服务端

    • 服务端添加一个simpleChannelInboundHandler入站单聊消息处理器根据目标用户ID的客户端通道将消息写出,如果用户没上线将消息持久化到数据库

  5. 创建聊天组业务实现

    • 添加simpleChannelInboundHandler处理群聊创建消息,根据群聊名称和初始成员创建群聊,向消息客户端发送群聊创建状态消息,群聊创建成功向所有初始群聊用户发送拉群成功消息

  6. 群聊业务实现

    • 客户端监听输入扫描器,监听到用户发送群聊消息直接向流水线写出封装群聊消息的对象实例

    • 服务端添加一个simpleChannelInboundHandler入站群聊消息处理器,根据群聊ID获取所有群聊用户,向所有在线用户的通道发送消息,未上线用户将消息持久化到数据库等待用户上线

  7. 客户端退出业务处理

    • 客户端主动关闭连接正常退出会触发服务端对应客户端通道的inactive事件、异常退出对应事件循环线程会抛出异常

    • 服务端添加一个ChannelInboundHandlerAdapter入站处理器,重写channelInactive()方法在客户端正常退出的情况下执行该方法从会话管理器中移除用户和相应客户端通道,重写exceptionCaught方法在客户端异常退出的情况下捕获异常从会话管理器中移除用户和相应客户端通道

 

3.4-连接通道假死

  1. 连接通道假死

    • 连接假死指服务端应用程序无法得知哪些TCP连接因为网卡、网线故障、机房断电、防火墙或者路由器因为连接长时间空闲自动断开连接等原因断开;服务端仍然会保持连接的相关资源,服务端的最大连接数一般是限制死的,假死连接不释放会降低系统处理客户端消息的性能

  2. 假死状态检测

    • Netty提供检测假死状态的处理器IdleStateHandler,该处理器通过判断读或者写之后的空闲时间太长怀疑连接可能假死,IdleStateHandler构造时可以指定三个入参,读入数据的空闲超时时间、写出数据的空闲超时时间,读写数据的空闲超时时间;超过指定超时时间没有读写事件会触发相应的读写超时事件,READER_IDLEWRITER_IDLE通常由入站处理器和出站处理器处理,ALL_IDLE既可以由入站处理器处理也可以由出站处理器处理;空闲超时时间指定为0表示不再检测对应的读写空闲时间

    • IdleStateHandler只是提供检测空闲超时时间触发相应读写空闲事件,对超时事件的处理需要用户自定义入站和出站处理器,各种处理器的userEventTriggered方法用于自动处理用户自定义的事件或者IdleState下的各种事件,入参object evt就是事件本身,空闲超时事件的实际类型是IdleStateEvent,通过idleStateEvent.state()能获取空闲超时事件的类型;使用方法是实例化一个IdleStateHandler并将其添加到通道流水线

  3. 处理连接通道假死

    • 触发空闲超时事件服务端不会直接调用channel.close()关闭客户端连接,这样很容易导致客户端异常闪退误伤正常客户端

    • 常用的做法是客户端设置IdleStateHandler的写空闲超时时间,服务端设置一个读空闲超时时间,客户端添加一个出站处理器重写userEventTriggered方法处理该写空闲超时事件向服务端发送心跳数据包,客户端的写空闲超时时间一般设置成服务端读空闲超时时间的一半,服务端一旦触发读空闲超时事件就说明客户端连接肯定假死

      • 心跳机制:连接通道一段时间内连接处于空闲状态,客户端或服务端会发送一个特定的数据包给对方,接收方接收到数据报文后立即发送一个特定的数据报文回应发送方形成一个PING-PONG交互,通过心跳机制让通信双方都知道对方仍然在线

      • 操作系统实现了TCP协议中的keepalive方法,但是读写空闲时间长达两小时,默认该功能是关闭的,配置TCPSO_KEEPALIVE选项能使用TCP自带的心跳机制,想要对心跳机制进行自定义还要修改操作系统层面的代码非常不灵活,开发者一般会在Netty层面通过编码实现自定义心跳机制

      • 最好在客户端也设置一个读空闲超时时间监测服务端写回的心跳数据,因为一般断线重连都由客户端在监测到连接中断的情况下发起重连尝试,如果客户端在读空闲超时时间内没有收到服务端的心跳数据包说明连接已经断开

 

断线重连

  1. 概念

    • 网络通信由于网络不稳定、服务器宕机重启、客户端网络环境变化、系统允许的最大文件描述符数达到上限、客户端或者服务端内存不足等等原因随时可能导致服务端和客户端的网路连接断开,需要设计一种在监测到网络连接断开后在客户端和服务端重新建立连接的重连机制

    • Netty中客户端和服务端调用channel.close()方法正常关闭连接时会触发双方的channelInactve事件,能被Netty检测到的网络异常导致的连接中断也会触发channelInactive事件;但是在部分情况下比如客户端、服务端进程被强制终止,网络分区故障,系统资源如文件描述符耗尽等情况下Netty可能无法检测到连接的中断不会触发channelInactive事件;因此仅在channelInactive编写重连逻辑很容易出现无法触发重连的情况,并且一般都会使用心跳检测机制保证切实监测到连接断开状态;一般有两种触发重连逻辑的方式,第一种是当心跳检测机制发现连接断开的情况下触发客户端的读空闲超时事件,客户端对该事件进行处理时通过channelHandlerContext.fireExceptionCaught(throwable)抛出连接中断异常,在exceptionCaught()方法中对异常进行处理时调用channel.close()触发通道的channelInactive事件执行channelInactive方法中的重连逻辑,第二种是客户端处理读空闲超时事件时抛出连接中断异常,在异常处理方法中调用重连方法

    • 定义重连策略封装最大重试次数、重连间隔、连接超时时间等重连参数

  2. 重连逻辑

    • 调用channel.close()关闭重连失败的通道

    • 调用reconnect()方法,在重连方法中判断本次重连是否被重连策略允许,如果不允许抛异常关闭连接打印日志,如果允许重连调用bootstrap.connect()方法发起重连,为异步连接任务返回的ChannelFuture添加监听器channelFutureListener,实现监听器的operationCompelete()方法在连接建立任务完成时通过channelFuture.isSuccess()方法判断连接是否成功建立,如果连接成功建立打印日志结束方法执行,如果连接失败通过eventloop.schedule()让事件循环在指定时间延迟后再次调用reconnect()方法

 

3.5-TCP三次握手

  1. 概念介绍

    • 半连接队列:服务端将尚未完成三次握手的连接信息存入半连接队列

    • 全连接队列:服务端将已经完成三次握手的连接信息存入全连接队列,全连接队列的容量决定了服务端的最大连接数,一旦客户端连接数超过全连接队列容量,服务端会向客户端发送一个拒绝连接的错误信息,客户端随后会抛出ConnectException

  2. 三次握手流程

    • 服务端调用bind()方法绑定通信端口

    • 服务端调用listen()方法监听连接请求

    • 客户端调用connect()方法发起一个连接请求数据包SYN,客户端的状态变成SYN_SEND状态,

      • 这是第一次握手

    • 服务端接收到SYN数据包表明客户端发送消息的能力正常,服务端将数据包封装成一个连接信息对象存入半连接队列,服务端状态变成SYN_RCVD状态

    • 服务端向客户端响应SYN数据包和ACK应答数据包

      • 这是第二次握手

    • 客户端接收到SYN+ACK数据包表明服务端收发消息的能力正常,客户端状态变成ESTABLISHED状态

    • 客户端向服务端响应ACK数据包表明客户端接收服务端消息的能力正常,服务端状态变成ESTABLISHED状态

      • 这是第三次握手

    • 服务端将连接信息转移到全连接队列

    • 服务端调用accept()方法从全连接队列中获取连接对象进行数据操作,连接信息被accept处理以后就会从全连接队列中移除,accept()方法处理能力有限时连接信息会在全连接队列中进行堆积

  3. backlog参数

    • linux2.2前通过backlog参数同时控制半连接和全连接队列的容量

    • linux2.2通过Linux中的两个系统配置文件来分别配置两个连接队列的容量,全连接队列的配置文件中就只有一个backlog参数值

      • NIO可以通过serverSocketChannel.bind的重载方法在入参配置backlog参数,Netty需要使用serverBootstrap.option(ChannelOption.SO_BACKLOG,1024)进行配置,生产环境backlog参数至少都要1024,同时在系统和NIObind方法都配置了backlog会自动使用较小的值;如果使用NettyNettybacklog默认参数值在windows下全连接队列容量为200linux或者mac下为128Netty会去读取系统配置文件中的backlog参数,如果该文件存在会直接使用该文件配置

      • Nettyaccept方法处理能力是很强的,全连接队列设置的大一些可以避免高峰期队列被快速堆满导致客户端连接被频繁拒绝

  4. linux中不论是文件还是socket都是用文件描述符来表示的,默认文件描述符数量是1024,当进程打开的文件或者通道数量超过该值会报错TooManyOpenFile,这是为了避免文件或者socket打开太多伤害系统,做高并发的服务端一定要使用命令ulimit -n 数值调整该参数,该命令是一个临时生效的命令,一般搭配进程启动命令一起组成启动脚本

 

3.6-RPC框架搭建

  1. 概念:

    • RPC框架是远程调用框架,功能是能让本地服务程序像调用本地程序一样调用网络上不同计算机上的远程程序

    • 常用的RPC框架和消息队列底层都使用Netty作为网络通信框架

  2. 方案设计

    • 消息类型设计

      • 远程请求消息类型RPCRequestMessage:封装远程调用WEB接口路径、目标方法所在类的全类名、目标方法名、返回值类型、入参类型列表、入参值列表

      • 远程响应消息类型RPCResponseMessage:封装目标方法的实际返回值和可能发生的异常

    • 流水线组成

      • 帧解码器处理黏包半包

      • 日志处理器loggingHandler打印消息内容

      • 自定义协议消息编解码器

      • 服务端在此基础上添加一个处理RPC请求消息的自定义入站处理器,客户端在此基础上添加一个处理RPC响应消息的自定义入站处理器以及一个代理对象管理器

    • 客户端对RPC请求和响应的处理逻辑

      • 要实现用户像调用本地方法一样发起远程调用,就要实现将用户调用远程实例方法的行为转换成向远程服务发送远程调用消息的行为,OpenFeign的解决方案是使用动态代理提供一个代理对象,用户通过代理对象调用远程方法并将对代理方法的调用转换成向远程服务发去远程调用网络通信消息;动态代理对象的获取和调用代理对象方法的线程和Netty发送网络通信消息的线程不是同一个线程,当用户线程调用channel.writeAndFlush时使用事件循环线程向服务端写出网络通信消息是由Netty自动控制的,不是由用户线程显示发起的异步任务,这种情况可以使用Netty提供的Promise在两个线程间共享远程调用方法的执行结果

      • 当获取动态代理对象时与服务端建立连接,将连接通道封装到动态代理对象管理器中方便后续通过动态代理对象调用目标方法时使用通道向服务端写出消息;要确保一个客户端和一个服务端只能有一个channel通道,可以使用双重检查锁来与服务端建立连接,创建和重建channel

      • 准备一个ConcurrentHashMap以消息唯一标识ID作为key,以代理对象处理远程方法调用时通过new DefaultPromise<Object>(channel.eventLoop())创建的Promise实例作为值,当事件循环线程获取到执行结果后根据消息ID获取到Promise实例并手动设置方法执行的返回值,将该ConcurrentHashMap存储为客户端处理RPC响应的自定义入站处理器的公有静态属性,让其同时对用户线程和事件循环线程可见

      • JDK动态代理有一个Proxy.newProxyInstance(classLoader,interfaces,invocationHandler)方法实例化代理对象,参数classLoader指代理类的类加载器,通常使用目标类或者接口的类加载器,参数interfaces指代理类需要实现的接口列表,参数invocationHandler是一个函数式接口,其中的invoke(proxy,method,args)方法包含三个参数,proxy是当前代理对象本身,一般该参数很少用到,method是被代理对象调用的方法对象,包含方法名、参数类型列表、返回值类型信息,args是代理对象调用方法时传递的实际参数值列表;当代理对象调用目标方法时,代理对象实际执行的是用户通过invocationHandler自定义的invoke方法,并将用户的实际传参以及被调用方法的方法信息作为参数传递给invoke方法,将invoke方法的返回值作为代理对象调用目标方法的返回值;我们可以在创建代理对象时获取被代理接口进而获取接口全类名,在代理对象的invoke方法中将被调用方法的信息、代理对象所属接口以及用户的实际传参封装成远程调用请求消息对象,创建一个空Promise对象,将消息IDPromise对象存入自定义响应处理入站处理器的ConcurrentHashMap中,调用channel.writeAndFlush()将消息对象写出到服务端,调用promise.await()阻塞执行invoke方法的线程直到响应数据被事件循环线程写入到Promise对象,如果Promise.isSuccess()true通过promise.getNow()获取远程方法的执行返回值并返回给用户,否则通过throw new RuntimeException(promise.cause())抛出异常

      • 客户端处理响应的自定义入站处理器设计

        • 根据消息idconcurrentHashMap中获取代理对象发送消息时存入的对应Promise对象,当promise对象不为空时,解析响应数据,响应正常通过promise.setSuccess()将返回值写入promise,响应异常通过promise.setFailure()将异常信息设置到promise

    • 服务端对客户端RPC请求的处理逻辑

      • 根据接口全类名从Spring容器或者通过Class.forname("全类名")获取到class类对象,服务端按照策略选择执行对应方法的同类型对象实例如service,通过class.getMethod通过方法名和参数类型列表获取方法对象method,通过方法对象的invoke方法指定执行该方法的对象service和参数值列表调用目标方法获取方法执行结果

      • 将执行结果或者异常信息封装到RPC响应消息实例中通过channel.writeAndFlush或者channelHandlerContext.writeAndFlush写出到客户端

  3. 其他问题

    • 服务端通过反射调用方法时出现异常最后会抛出InvocationTargetException,该异常不会封装最初异常的message,需要调用InvocationTargetException.getcause().getMessage()才能获取到最原始的异常message

    • 服务端不要直接将异常对象封装到响应消息中直接返回给客户端,异常信息包含堆栈信息,随便就是上万字节,如果客户端设置了单条消息的最大长度,发生异常很容易因为数据量超出帧解码器设置的最大值直接报错,一般向客户端直接返回exception.getMessage()或者InvocationTargetException.getcause().getMessage()即可

 

3.7-selectpollepoll

  1. Linux中文件和网络连接在内核中都是以文件描述符的形式存在,实现一个单线程的网络通信服务器的方案被设计为单个线程遍历所有文件描述符,检查和处理每个文件描述符上的数据,如果文件描述符上没有数据就继续遍历下一个文件描述符,当遍历完所有文件描述符后死循环重复上述过程,在此基础上先后发展出了selectpollepoll三个函数

  2. select函数逻辑

    • 用户调用select函数的流程

      • select(max+1,&rset,NULL,NULL,NULL):用户在调用linux提供的select函数监听文件描述符上的事件前需要在用户空间准备被监听文件描述符索引集合和最大描述符索引,select函数的入参依次为最大文件描述符索引+1,读文件描述符索引集合、写文件描述符索引集合、异常文件描述符索引集合、超时时间;超时时间传NULL表示使用默认超时时间;读文件描述符索引集合&rset是表征被监听文件描述符的位图,&rsetselect函数中的默认大小为1024,对应linux的默认文件描述符数量为1024,位图中需要被监听的文件描述符索引对应位上的字节被置为1,未被监听的被置为0

      • 调用select函数期间&rset会由用户态拷贝到内核态,内核根据&rsetmax+1范围内被监听的文件描述符索引判断对应文件描述符是否有数据读入,如果所有被监听文件描述符上没有数据写入,程序会一直阻塞在select函数上,如果内核监听到文件描述符上的数据,内核会将有数据的文件描述符在&rset中对应索引的位标识为1表示有数据读入,将未准备好的描述符位置为0,停止select函数的阻塞返回有数据读入的文件描述符个数并切回用户态继续执行

      • 用户程序根据返回的&rset遍历所有文件描述符检查对应文件描述符是否有数据并调用read(fds[i],buffer,MAXBUF)切换到内核态读取相应文件描述符上的数据再切回用户态进行对应数据的处理

      • 遍历完所有有数据的文件描述符,用户程序初始化&rset并继续调用select函数让内核继续监听指定文件描述符上的事件

    • 小结:用户将被监听的文件描述符收集整理成一个位图,通过系统提供的select函数将位图交给内核让内核帮忙阻塞判断其中的一个或者多个文件描述符上有数据,当其中一个或者多个文件描述符上有数据select函数会被返回并且有数据的文件描述符对应的位图标识位会被置位为1,没有数据的文件描述符位图标识全部被清0,用户根据位图遍历文件描述符集合判断和处理有数据的文件描述符

      • select函数让用户将感兴趣的多个文件描述符交给内核让内核来监听这些文件描述符上的IO事件来避免用户线程一个线程阻塞监听一个文件描述符上的IO事件,通过select函数提升用户程序的性能

      • 缺陷:

        • 位图的默认大小为1024,可以调整该大小但是始终有上限

        • &rset位图每次调用select函数都要重新准备,一方面是内核会修改&rest,另一方面应用期望监听的文件描述符会发生变化

        • &rest拷贝到内核态开销较大,虽然比用户每次单独切换内核态判断一个文件描述符的效率高,但仍然有较大开销

        • 用户通过select函数返回只知道有文件描述符有数据读入,但是不知道是具体哪一个文件描述符,需要再遍历一遍所有文件描述符的状态,判断和处理有数据的文件描述符,时间复杂度为O(n)

      • select函数是在1983年的Unix中引入的,到现在还有很多系统在使用,linux在后续推出了poll函数

  3. poll函数逻辑

    • 用户调用poll函数的流程

      • poll(pollfds,5,50000)pollfdpoll库中自带的结构体,pollfd结构中有三个字段,fd字段为socket连接对应文件描述符索引,events字段为short类型的文件描述符上关注事件标识值,如果要关注多个事件该字段值为对应事件标志值的与运算结果,revents字段为short类型的事件回馈,初始值始终为0,由内核修改;在调用poll函数前,需要先准备pollfd数组,设置各个文件描述符上关注的事件;poll函数需要传参pollfd数组,数组长度以及超时时间,poll函数的原理和select函数一样,将要监听的文件描述符一次性交给内核,只是使用pollfd结构替换了select函数中使用的位图;

      • poll函数也是一个阻塞函数,pollfd数组被拷贝到内核中会被组织成链表,当内核发现一个或者多个文件描述符上有事件会将字段revents置为成对应事件的标识值,结束poll函数的阻塞;用户程序遍历pollfd数组,检查revents的事件类型并进行相应事件的处理,处理后将revents重置为0,再次调用poll函数

    • 小结:

      • 优势:

        • pollfd数组的大小没有限制

        • 内核操作pollfd中的revents字段,不会像位图一样增删数组元素本身,数组pollfds可以复用

      • 缺点:

        • poll函数原理上和select函数是一样的,因此也存在需要将被监听的文件描述符信息拷贝传递给内核以及函数返回后需要对每个文件描述符的状态进行遍历

  4. epoll函数逻辑

    • 用户调用epoll函数的流程

      • epoll函数中有epoll_createepoll_ctlepoll_wait三个重要的函数,在调用epoll_wait(epfd,events,5,10000)前要使用epoll_createepoll_ctl准备一个包含文件描述符和对应关注事件的epfd结构;epoll_create()创建一个epfd白板;epoll_ctl(epfd,EPOLL_CTL_ADD,ev.data,&ev)函数在白板epfd上添加被监听的文件描述符信息,第二个参数EPOLL_CTL_ADD表示正在向epfd添加一个epoll_event结构体,该结构体包含字段fd文件描述符索引和events文件描述符关注的事件,没有实际就绪事件标识reventsepoll函数在每次调用epoll_ctl函数注册新的事件到epoll句柄时会将fd拷贝进内核保证每个文件描述符整个过程只会在内核态和用户态之间拷贝一次,不像selectpoll一样每次调用select或者poll方法都会拷贝一次文件描述符;

      • 调用epoll_wait时,被监听的文件描述符上没有数据epoll函数仍然会阻塞,当有一个或者多个文件描述符上有数据会将就绪的描述符加入到一个链表中进行管理,把有事件发生的文件描述符排在epfd结构的最前面,并返回事件发生的文件描述符数量,返回后用户只需要处理有事件发生的文件描述符

    • 触发模式

      • LT水平触发:epoll_await()检测到描述符事件到达时将事件通知到进程,进程可以不立即处理,下次进程再次调用epoll_await()时会再次通知进程,默认水平触发,同时支持BIONIO

      • ET边缘触发:epoll_await()检测到描述符事件到达时将事件通知到进程,进程必须立即处理该事件,没处理下次调用epoll_await()时也不会再得到该事件到达的通知,能减少epoll事件被重复触发的次数,效率比水平触发模式高,只支持NIO模式

    • 小结

      • epoll使用的红黑树容量没有限制,内核通过重排文件描述符的方式标记发生事件的文件描述符,文件描述符在整个生命周期中只会发生一次从用户态到内核态的拷贝节省性能,epoll_await()方法返回后用户只需要处理有事件发生的文件描述符,时间复杂度为O(1)而不需要遍历所有文件描述符

      • 现在epollpoll用的比较多,redisnginx都用的epollJavaNIOLinux平台下也是使用的epoll