• Netty是一个异步的、事件驱动的、基于NIO(Non-blocking I/O)的开源框架,用于快速开发高性能、高可靠性的网络服务器和客户端程序。

基本概念

Netty的客户端-服务器结构
  • Channel:是入站或出站数据的载体。它可以被打开或关闭,连接或断开。
  • ChannelHandler:实现了应用程序用于处理状态变化以及数据处理的逻辑。
  • ChannelPipeline:当Channel被创建时,它会被自动地分配到它专属的ChannelPipeline,它持有所有与属于它的Channel有关的ChannelHandler实例。
  • ChannelHandlerContext:代表了ChannelHandler 和ChannelPipeline 之间的绑定。
  • EventLoop:用于处理连接的生命周期中所发生的事件,一个Channel在其生命周期内只注册于一个EventLoop,一个EventLoop在其生命周期内只属于一个Thread。
  • EventLoopGroup:包含一个或者多个EventLoop。
  • 引导:是指对一个应用程序进行配置,并使它运行起来的过程。
  • 服务器:使用Serverstrap类引导,持有两个EventLoopGroup,一个负责接收请求(bossGroup),一个负责处理请求(workerGroup),在指定监听端口号后,可以实例化ServerChannel接收客户端的访问,由ServerChannel创建Channel来处理业务逻辑,handler()方法添加的对bossGroup起效,childHandler()方法添加的对workerGroup起效。
  • 客户端:使用Bootstrap类引导,持有一个EventLoopGroup,在指定远程服务器的IP地址和端口号后,可以实例化Channel与服务器进行通信,通信逻辑编写在ChannelHandler中,通过handler()方法添加到Channel所属的ChannelPipeline中。
  • 事件:Netty使用不同的事件来通知状态或状态的改变,并基于已经发生的事件来触发适当的动作。每个事件都被分发给ChannelHandler类中的某个用户实现的方法。
  • 出站和入站:如果事件的运动方向是从客户端到服务器端,那么我们称这些事件为出站的,反之
    则称为入站的。
    • 入站事件包括:连接激活、连接失活、数据读取、用户事件、错误事件等。
    • 出站事件包括:打开/关闭到远程节点的连接、将数据写入套接字等。
    • 触发动作包括:记录日志、数据转换、流控制、应用程序逻辑等。
  • ChannelFuture:是一个异步操作结果的占位符。在将来的某个时刻,异步操作完成后提供对其结果的访问。

Netty组件

Channel

  • Channel实现了类似Socket的功能。
  • Channel接口:为用户提供当前状态(是否打开、是否连接);提供支持的I/O操作;使用ChannelConfig配置参数;使用ChannelPipeline处理相关的I/O事件和请求。
    • ChannelConfig包含了该Channel的所有配置设置,并且支持热更新,可以通过实现其子类来进行自定义设置。
      Channel核心类图
  • Channel有一些常见方法:
  • Channel是线程安全的。
  • 生命周期:当这些状态发生改变时,将会生成对应的事件。这些事件将会被转发给ChannelPipeline 中的ChannelHandler,其可以随后对它们做出响应。
    • ChannelUnregistered:已经被创建,但还未注册到EventLoop。
    • ChannelRegistered:已经被注册到EventLoop。
    • ChannelActive:处于活动状态(已经连接到远程节点),可以接收和发送数据。
    • ChannelInactive:没有连接到远程节点。

EmbeddedChannel单元测试

ChannalFuture

  • ChannelFuture实现了异步通知的功能。
  • ChannelFuture接口:是将来要执行操作的结果的占位符。其addListener()方法注册了一个ChannelFutureListener,以便在某个操作完成时(无论是否成功)得到通知。
  • ChannelPromise:实现了ChannelFuture接口的一个类,其定义了一些可写的方法,如setSuccess()和setFailure()。ChannelOutboundHandler中的大部分方法都需要一个ChannelPromise参数,以便在操作完成时得到通知。

ChannelHandler

  • 针对不同类型的事件调用ChannelHandler;应用程序通过实现或者扩展ChannelHandler来挂钩到事件的生命周期,并且提供自定义的应用程序逻辑;在架构上,ChannelHandler 有助于保持业务逻辑与网络处理代码的分离。
    • ChannelHandler 的典型用途包括:
      • 将数据从一种格式转换为另一种格式;
      • 提供异常的通知;
      • 提供Channel变为活动的或者非活动的通知;
      • 提供当Channel注册到EventLoop或者从EventLoop注销时的通知;
      • 提供有关用户自定义事件的通知。
  • ChannelHandler作为最顶层的接口,接口中只包含最基本的方法,并不直接处理入站和出站事件。
// 该注解用来标识Handler是否可以在多个channel直接共享使用
@Sharable
// Handler本身被添加到ChannelPipeline时调用
void handlerAdded(ChannelHandlerContext ctx) throws Exception;
// Handler本身被从ChannelPipeline中删除时调用
void handlerRemoved(ChannelHandlerContext ctx) throws Exception;
// 发生异常时调用
void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception;
  • 其核心类类图如下:
  • ChannelInboundHandler拦截和处理入站事件,ChannelOutboundHandler拦截和处理出站事件。

  • SimpleChannelInboundHandler和ChannelInboundHandler的区别:
    • 通常,在客户端的Handler继承的是SimpleChannelInboundHandler,而在服务器端继承的是ChannelInboundHandlerAdapter。
    • SimpleChannelInboundHandler在接收到数据后会自动release掉数据占用的Bytebuffer资源(自动调用Bytebuffer.release()),而服务器端需要把客户端请求的数据发送回客户端,且有可能在channelRead方法返回前还没有写完数据,因此不能让它自动release。
  • 生命周期:在ChannelHandler被添加到ChannelPipeline 中或者被从ChannelPipeline 中移除时会调用这些操作。
    • handlerAdded:当把ChannelHandler 添加到ChannelPipeline 中时被调用
    • handlerRemoved:当从ChannelPipeline 中移除ChannelHandler 时被调用
    • exceptionCaught:当处理过程中在ChannelPipeline 中有错误产生时被调用

ChannelHandlerAdapter

  • 对于大多数的ChannelHandler会选择性地拦截和处理某个或者某些事件,其他的事件会忽略,由下一个ChannelHandler进行拦截和处理。这就会导致一个问题:用户ChannelHandler必须要实现所有接口,为了解决这个问题,Netty提供了ChannelHandlerAdapter基类,它的所有接口实现都是直接向后传。如果用户只关心某个事件,就覆盖对应的方法即可,不关心的就可以继承使用父类方法。

ChannelPipeline

  • ChannelPipeline将多个ChannelHandler链接在一起来让事件在其中传播处理。一个ChannelPipeline中可能不仅有入站处理器,还有出站处理器,入站处理器只会处理入站的事件,而出站处理器只会处理出站的数据。
  • 当Channel被创建时,它会被自动地分配到它专属的ChannelPipeline。
  • 当ChannelHandler 被添加到ChannelPipeline 时,它将会被分配一个ChannelHandlerContext,其代表了ChannelHandler 和ChannelPipeline 之间的绑定。
  • ChannelHandler安装到ChannelPipeline 中的过程如下:
    1. 一个ChannelInitializer的实现被注册到了Bootstrap中。
    2. 当ChannelInitializer.initChannel()方法被调用时,ChannelInitializer将在ChannelPipeline 中安装一组自定义的ChannelHandler。
    3. ChannelInitializer 将它自己从ChannelPipeline 中移除。
  • Netty有两种发送消息的方式:
    • 写到Channel中。消息会从ChannelPipeline的尾端开始流动(Netty 总是将ChannelPipeline的入站口作为头部,而将出站口作为尾端)。
    • 写到ChannelHandlerContext对象中。消息会从ChannelPipeline中的下一个ChannelHandler开始流动。
  • ChannelHandler可以通过添加、删除(包括自己)或者替换其他的ChannelHandler来实时地修改ChannelPipeline 的布局。

  • 触发事件:用于通知ChannelHandler在ChannelPipeline中所发生的事件

ChannelHandlerContext

  • ChannelHandlerContext代表了ChannelHandler和ChannelPipeline之间的关联,每当有ChannelHandler添加到ChannelPipeline中时,都会创建ChannelHandlerContext。ChannelHandlerContext的主要功能是管理它所关联的ChannelHandler和在同一个ChannelPipeline中的其他ChannelHandler之间的交互。
  • 如果调用Channel或者ChannelPipeline上的方法,它们将沿着整个ChannelPipeline 进行传播。而调用位于ChannelHandlerContext上的相同方法,则将从当前所关联的ChannelHandler 开始,并且只会传播给位于该ChannelPipeline 中的下一个能够处理该事件的ChannelHandler。
  • ChannelHandlerContext 和ChannelHandler 之间的关联(绑定)是永远不会改
    变的,所以缓存对它的引用是安全的
  • 下图是Channel、ChannelPipeline、ChannelHandler、ChannelHandlerContext之间的关系。
  • 因为一个ChannelHandler可以从属于多个ChannelPipeline,所以它也可以绑定到多个ChannelHandlerContext实例。对于这种用法指在多个ChannelPipeline中共享同一个ChannelHandler,对应的ChannelHandler必须要使用@Sharable注解标注。
  • 只应该在确定了你的ChannelHandler是线程安全的时才使用@Sharable注解。
  • 在多个ChannelPipeline中安装同一个ChannelHandler的一个常见的原因是用于收集跨越多个Channel的统计信息。

EventLoop

  • EventLoop实现了控制流、多线程处理、并发等功能。
  • EventLoop接口:定义了Netty的核心抽象,用于处理连接的生命周期中所发生的事件。
    • 一个EventLoopGroup包含一个或者多个EventLoop。
    • 一个EventLoop 在它的生命周期内只和一个Thread绑定。
    • 所有由EventLoop处理的I/O 事件都将在它专有的Thread上被处理。
    • 一个Channel在它的生命周期内只注册于一个EventLoop。(消除了Channel对于同步的需要)
    • 一个EventLoop可能会被分配给一个或多个Channel。

线程模型

  • 线程模型指定了操作系统、编程语言、框架或者应用程序的上下文中的线程管理的关键方面。
  • 下图是Java基本线程池Executor的执行逻辑:
  • 这种方法比起为每个任务都创建和销毁线程更优,但是它并不能消除由上下文切换所带来的开销。

EventLoop接口

  • 事件循环机制:Netty在一个循环任务中为一个特定的连接处理生命周期内发生的事件。
    • 每一个循环任务都是一个Runnable的实例。
  • 下图为EventLoop的类层次结构
  • 一个EventLoop将由一个永远都不会改变的Thread驱动,同时任务可以直接提交给EventLoop 实现,以立即执行或者调度执行。根据配置和可用核心的不同,可能会创建多个EventLoop 实例用以优化资源的使用,并且单个EventLoop 可能会被指派用于服务多个Channel。
  • 事件和任务是以先进先出(FIFO)的顺序执行的。这样可以通过保证字节内容总是按正确的顺序被处理,消除潜在的数据损坏的可能性。
  • 任务调度:
    • Netty的EventLoop扩展了ScheduledExecutorService,所以它提供了使用JDK实现可用的所有方法。
    • 要想取消或者检查(被调度任务的)执行状态,可以使用每个异步操作所返回的ScheduledFuture。
    • 如果当前调用线程正是支撑EventLoop的线程,那么所提交的代码块将会被直接
      执行。否则,EventLoop将调度该任务以便稍后执行,并将它放入到内部队列中。当EventLoop
      下次处理它的事件时,它会执行队列中的那些任务/事件。
    • 永远不要将一个长时间运行的任务放入到执行队列中,因为它将阻塞需要在同一线程上执行的任何其他任务。
    • 如果必须要进行阻塞调用或者执行长时间运行的任务,建议使用一个专门的EventExecutor。
  • 线程分配:
    • 异步传输:尽可能使用少量的Thread支撑大量的Channel。(少量的EventLoop被多个Channel共享)对于所有相关联的Channel 来说,ThreadLocal 都将是一样的。这使得它对于实现状态追踪等功能来说是个糟糕的选择。
    • 阻塞传输:每个Channel都分配给一个Thread。

ByteBuf

  • ByteBuf实现了数据传输缓存。
  • ByteBuf API的优点:
    • 它可以被用户自定义的缓冲区类型扩展;
    • 通过内置的复合缓冲区类型实现了透明的零拷贝;
    • 容量可以按需增长;
    • 在读和写这两种模式之间切换不需要调用ByteBuffer的flip()方法;
    • 读和写使用了不同的索引;
    • 支持方法的链式调用;
    • 支持引用计数;
    • 支持池化(pooled)。(先申请一块连续的空间作为ByteBuf 池,需要用到的时候直接去池里面取,用完之后返还给ByteBuf池,而不需要每次要用ByteBuf的时候都去申请。)
  • Netty的数据处理API主要包括抽象类ByteBuf和接口ByteBufHolder。

抽象类ByteBuf

  • 索引:名称以read或者write开头的ByteBuf方法,将会推进其对应的索引,而名称以set或者get开头的操作则不会。
    • readerIndex:已经读取的字节数。
    • writerIndex:已经写入的字节数。
  • 使用模式:
    1. 堆缓冲区(支撑数组):数据存储在JVM的堆空间中。它能在没有使用池化的情况下提供快速的分配和释放,非常适合于有遗留的数据需要处理的情况,以及数据将按照数组的形式访问的情况。
    2. 直接缓冲区:通过本地调用来分配内存。可以避免在每次调用本地I/O时内容在缓冲区与中间缓冲区之间复制移动。直接缓冲区适合网络数据传输,但分配和释放代价更高。
    3. 复合缓冲区:CompositeByteBuf实现了一个将多个缓冲区(可以是不同模式的缓冲区)表示为单个合并缓冲区的虚拟表示。
  • 字节级操作:
    • 随机访问索引:getByte(i)
    • 可丢弃字节:discardReadBytes()。调用可确保可写分段的最大化,但可能会导致内存复制,因为可读字节必须被移动到缓冲区的开始位置。
    • 可读字节:readBytes(ByteBuf dest)
    • 可写字节:writeBytes(ByteBuf dest)
  • 索引管理:
    • 调用markReaderIndex()、markWriterIndex()、resetWriterIndex()和resetReaderIndex()可以标记和重置ByteBuf的readerIndex和writerIndex。
    • 调用readerIndex(int)或writerIndex(int)可以将索引移动到指定位置。
    • 调用clear()方法可以将readerIndex和writerIndex都设置为0。
  • 查找操作:
    • indexOf()
    • forEachByte(ByteBufProcessor.FIND_XX)
  • 派生缓冲区:创建一个已经存在的缓冲区的视图。这会产生其自己的读写索引和其他标记索引,但是它们共享内部的数据。
    • duplicate()
    • slice()
    • slice(int, int)
    • Unpooled.unmodifiableBuffer(…)
    • order(ByteOrder)
    • readSlice(int)
  • 读写操作:



  • 辅助类:
    • ByteBufAllocator接口,用于分配ByteBuf,它有两种实现
      • PooledByteBufAllocator:池化,目的是最大限度地减少内存碎片。使用了jemalloc内存分配器分配内存。
      • UnpooledByteBufAllocator:不池化,每次被调用都返回一个新的实例。可以用于非网络项目。
    • ByteBufUtil类,实现了一些通用方法,比如打印内容,转换进制等。
  • 引用计数:
    • ReferenceCounted接口。
    • retain()方法通常对引用计数加1,release()方法通常对引用计数减1。也可以自定义。如果引用计数被减到0,则这个对象就将被显式的回收。
    • 它对于池化实现很重要,降低了其内存分配的开销。
  • 资源泄漏检测:
    • ResourceLeakDetector类,它将对应用程序的缓冲区分配做大约1%的采样来检测内存泄露。
    • 目前有四种检测级别:DISABLED/SIMPLE/ADVANCED/PARANOID
    • 不仅要释放资源,还要通知ChannelFuture。否则可能会出现ChannelFutureListener 收不到某个消息已经被处理了的通知的情况。

接口ByteBufHolder

  • ByteBufHolder是ByteBuf的容器,它用于解决需要在储存数据负载外同时储存属性值的情况。
  • 此外还有继承自ReferenceCounted的方法。

编码器与解码器

  • 网络数据总是一系列的字节:
    • 入站消息会被解码,即从字节转换为一个Java对象。
    • 出站消息会被编码,即从一个Java对象转换为字节。
  • 一旦消息被编码或者解码,它就会被 ReferenceCountUtil.release(message)调用自动释放。如果需要保留引用以便稍后使用,可以调用 ReferenceCountUtil.retain(message)方法。
  • 解码器:都实现了ChannelInboundHandler接口,用于转换入站数据的格式。可分为两类:
    • 解码字节到消息:ByteToMessageDecoder 和 ReplayingDecoder
    • 解码消息到消息:MessageToMessageDecoder
  • 例如:Integer到String的解码器
  • 编码器:都实现了ChannelOutboundHandler接口,用于转换出站数据的格式。可分为两类:
    • 编码从消息到字节:MessageToByteEncoder
    • 编码从消息到消息:MessageToMessageEncoder
  • 例如:Short到Byte的编码器

引导、服务器、客户端

Bootstrap类

  • 服务器致力于使用一个父Channel 来接受来自客户端的连接,并创建子Channel 以用于它们之间的通信;而客户端将最可能只需要一个单独的、没有父Channel 的Channel 来用于所有的网络交互。
  • 两种应用程序类型之间通用的引导步骤由AbstractBootstrap 处理,而特定于客户端或者服务器的引导步骤则分别由Bootstrap 或ServerBootstrap 处理。
  • AbstractBootstrap 被标记为了Cloneable。在一个已经配置完成的引导类实例上调用clone()方法将返回另一个可以立即使用的引导类实例。这种方式只会创建引导类实例的EventLoopGroup的一个浅拷贝,所以其将在所有克隆的Channel实例之间共享。
  • Bootstrap 类被用于客户端或者使用了无连接协议的应用程序中。

服务器与客户端

  • 所有的Netty程序都至少需要以下两部分:
    1. ChannelHandler:实现业务逻辑。
    2. 引导:配置启动代码。

引导服务器

  • 概念:将一个进程绑定到某个指定的端口。
  • 引导服务器的流程如下:
    1. 创建EventLoopGroup。
    2. 创建ServerBootstrap。
    3. 为ServerBootstrap指定EventLoopGroup(2个)。
    4. 指定Channel类型。
    5. 指定监听端口套接字地址。
    6. 添加一个Handler到Channel的ChannelPipeline。(将Handler标注为@Sharable后所有的客户端连接都可使用同一Handler实例)
    7. 异步地绑定服务器,并阻塞直到绑定完成。
    8. 获取Channel的CloseFuture,并阻塞直到获取完成。
    9. 关闭EventLoopGroup,释放所有的资源。
  • 服务器需要两组不同的Channel:
    • 第一组将只包含一个ServerChannel,代表服务器自身的已绑定到某个本地端口的正在监听的套接字。
    • 第二组将包含所有已创建的用来处理传入客户端连接(对于每个服务器已经接受的连接都有一个)的Channel。
    • 与ServerChannel相关联的EventLoopGroup 将分配一个负责为传入连接请求创建Channel的EventLoop。一旦连接被接受,第二个EventLoopGroup就会给它的Channel分配一个EventLoop。


引导客户端

  • 概念:将一个进程连接到另一个运行在某个指定主机的指定端口上的进程。
  • 引导客户端的流程如下:
    1. 创建EventLoopGroup。
    2. 创建Bootstrap。
    3. 为Bootstrap指定EventLoopGroup(1个)。
    4. 指定Channel类型。
    5. 指定远程服务器的套接字地址。
    6. 添加一个Handler到Channel的ChannelPipeline。
    7. 连接远程服务器,并阻塞直到连接完成。
    8. 获取Channel的CloseFuture,并阻塞直到Channel关闭。
    9. 关闭EventLoopGroup,释放所有的资源。

从Channel引导客户端

  • 服务器处理某个客户端的请求时,该请求可能需要同时成为第三方系统的客户端(代理、需要Web服务、需要连接其他数据库检索数据等情况),此时需要从已经被接受的子Channel中引导一个客户端Channel。

在引导过程中添加多个ChannelHandler

  • initChannel(C ch)提供了将多个ChannelHandler添加到一个ChannelPipeline中的简便方法。之后只需要向Bootstrap或ServerBootstrap的实例提供ChannelInitializer实现即可。并且一旦Channel 被注册到了它的EventLoop 之后,就会调用你的initChannel()版本。在该方法返回之后,ChannelInitializer 的实例将会从ChannelPipeline 中移除它自己。

ChannelOption属性

  • 使用option()方法来将ChannelOption 应用到引导。你所提供的值将会被自动应用到引导所创建的所有Channel。可用的ChannelOption 包括了底层连接的详细信息,如keep-alive 或者超时属性以及缓冲区设置。
  • 在某些常用的属性和数据不可用时,Netty 提供了AttributeMap 抽象(一个由Channel 和引导类提供的集合)以及AttributeKey<T>(一个用于插入和获取属性值的泛型类)。使用这些工具,便可以安全地将任何类型的数据项与客户端和服务器Channel(包含ServerChannel 的子Channel)相关联了。
  • 关闭:EventLoopGroup.shutdownGracefully()方法是一个异步方法,将处理任何挂起的事件和任务,并且随后
    释放所有活动的线程。这个方法调用将会返回一个Future,这个Future 将在关闭完成时接收到通知。

内置的传输

  • NIO:基于选择器。
    • 选择器运行在一个检查状态变化并对其做出相应响应的线程上,在应用程序对状态的改变做出响应之后,选择器将会被重置,并将重复这个过程。
    • 可能的状态变化:新的Channel已被接受并且就绪;Channel 连接完成;Channel 有已经就绪的可读;Channel可写。
  • Epoll:由JNI驱动,支持只有在Linux上可用的多种特性,比NIO传输更快,完全非阻塞。
  • OIO:使用阻塞流,是同步的。
  • Local:在同一个JVM中运行的客户端和服务器程序之间使用管道进行异步通信可以在VM内部通过管道进行通信。
  • Embedded:允许使用ChannelHandler而又不需要一个真正的基于网络的传输,常用于测试。
    • 它是Netty 专门为改进针对ChannelHandler 的单元测试而提供的。
  • 相互兼容的Channel和EventLoopGroup

异常处理

  • 入站异常:
    • ChannelHandler.exceptionCaught()的默认实现是简单地将当前异常转发给ChannelPipeline中的下一个ChannelHandler;
    • 如果异常到达了ChannelPipeline 的尾端,它将会被记录为未被处理;
    • 要想定义自定义的处理逻辑,需要重写exceptionCaught()方法。然后需要决定是否需要将该异常传播出去。
  • 出站异常:
    • 每个出站操作都将返回一个ChannelFuture。注册到ChannelFuture 的ChannelFutureListener 将在操作完成时被通知该操作是成功了还是出错了。
    • 几乎所有的ChannelOutboundHandler 上的方法都会传入一个ChannelPromise的实例。作为ChannelFuture 的子类,ChannelPromise 也可以被分配用于异步通知的监听器。但是,ChannelPromise 还具有提供立即通知的可写方法:
      • ChannelPromise setSuccess();
      • ChannelPromise setFailure(Throwable cause);
      • 通过调用ChannelPromise 上的setSuccess()和setFailure()方法,可以使一个操作的状态在ChannelHandler 的方法返回给其调用者时便即刻被感知到。