【Network】两种高性能 I/O 设计模式 - Reactor/Proactor

Posted by 西维蜀黍 on 2019-02-26, Last Modified on 2023-02-20

背景

在此之前,开发人员在开始的时候需要在事件多路分离器那里注册感兴趣的事件,并提供相应的事件处理者,或者是回调函数;事件多路分离器在适当的时候,会将请求的事件分发给这些事件处理者或者回调函数。

两个与事件多路分离器有关的模式是 Reactor 和 Proactor

  • Reactor 模式是基于同步非阻塞 I/O 的;
  • Proactor 模式基于异步非阻塞 I/O。

类比

我们以一个餐饮店为例,假设每一个人来就餐就是一个 I/O 操作,他会先看一下菜单,然后点餐。就像一个网站会有很多的请求,要求服务器做一些事情。处理这些就餐事件的就是我们需要解决的问题了。

多线程的并发解决方案

在传统的基于多线程的处理方式会是这样的:

  1. 一个人来就餐,一个服务员去服务,然后客人点菜。 服务员将菜单给后厨。
  2. 十个人来就餐,十个服务员去服务……

这个就是多线程的处理方式,一个请求到来,就会有一个线程服务。很显然这种方式在人少的情况下会有很好的用户体验,每个客人都感觉自己是VIP,专人服务的。如果餐厅一直这样同一时间最多来10个客人,这家餐厅是可以很好的服务下去的。

那么问题来了,如果这家餐厅生意非常好,同时就餐人数(网站并发访问量)达到100人呢,给每个客人分配一个服务员餐馆则需要很高的人力成本(需要很高的服务器配置资源)。

尽管可以考虑使用类似线程池的方法,组成一个10个服务员的线程池,一个服务员服务完一个人后继续去服务下一个。 但是这样也有比较严重的缺点,如果某个客人点菜很慢(读写大文件或长网路请求),其他人可能就要等好长时间了。

即使为了减少客人的等待时间,一个服务员在等待一个客人点菜思考的时候,跑到另一个客人那抓紧处理他的点菜,服务员切换到不同的点菜服务(线程的上下文切换)也是需要消耗额外的资源的(比如服务员需要翻出之前的小本本,快速浏览,以回忆起之前已经点了什么菜)。

事件驱动的解决方案

其实当客人在点菜的时候,服务员可以先把菜单给客户(响应新的网络请求),此后,服务员就去做别的事情了(非阻塞地),当客人想好要吃的菜后,大喊一声“服务员”,服务员再去服务。显然,服务员可以同时观察多个客户当前是否有喊“服务员”(I/O多路复用)。

客人将需要的菜告诉服务员,然后服务员就可以立刻去服务下一个人了,这样是不是能够有效利用服务员的资源?

其实这就是基于事件的解决方案,“做好之后放到14桌”其实就是回调函数。

甚至,桌上有一个二维码,当客人想好要吃的菜后,直接选择想吃的菜然后submit,这时整个餐厅的效率会更高,因为服务员只需要在菜ready的时候,把菜从厨房上到特定的桌上。

Reactor 模型

  • I/O 复用机制需要事件多路分离器(event demultiplexor)。 事件多路分离器的作用,即将那些读写事件源分发(dispatch)给dispatcher
  • Dispatcher: Handles registering and unregistering of request handlers. Dispatches resources from the demultiplexer to the associated request handler.
  • Request Handler: An application defined request handler and its associated resource.

经典的 Reactor 单线程模型

经典的 Reactor 单线程模型如下所示:

在Reactor模式中,包含如下角色:

  • Reactor:将I/O事件发派给对应的Handler
  • Acceptor:处理客户端连接请求
  • Handlers:执行非阻塞读/写

Demo

经典的 Reactor 单线程模型实现代码如下所示:

public class NIOServer {
  private static final Logger LOGGER = LoggerFactory.getLogger(NIOServer.class);
  public static void main(String[] args) throws IOException {
    Selector selector = Selector.open();
    ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
    serverSocketChannel.configureBlocking(false);
    serverSocketChannel.bind(new InetSocketAddress(1234));
    serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
    while (selector.select() > 0) {
      Set<SelectionKey> keys = selector.selectedKeys();
      Iterator<SelectionKey> iterator = keys.iterator();
      while (iterator.hasNext()) {
        SelectionKey key = iterator.next();
        iterator.remove();
        if (key.isAcceptable()) {
          ServerSocketChannel acceptServerSocketChannel = (ServerSocketChannel) key.channel();
          SocketChannel socketChannel = acceptServerSocketChannel.accept();
          socketChannel.configureBlocking(false);
          LOGGER.info("Accept request from {}", socketChannel.getRemoteAddress());
          socketChannel.register(selector, SelectionKey.OP_READ);
        } else if (key.isReadable()) {
          SocketChannel socketChannel = (SocketChannel) key.channel();
          ByteBuffer buffer = ByteBuffer.allocate(1024);
          int count = socketChannel.read(buffer);
          if (count <= 0) {
            socketChannel.close();
            key.cancel();
            LOGGER.info("Received invalide data, close the connection");
            continue;
          }
          LOGGER.info("Received message {}", new String(buffer.array()));
        }
        keys.remove(key);
      }
    }
  }
}

为了方便阅读,上示代码将Reactor模式中的所有角色放在了一个类中。

从上示代码中可以看到,多个Channel可以注册到同一个Selector对象上,实现了一个线程同时监控多个请求状态(Channel)。同时注册时需要指定它所关注的事件,例如上示代码中socketServerChannel对象只注册了OP_ACCEPT事件,而socketChannel对象只注册了OP_READ事件。

selector.select()是阻塞的,当有至少一个通道可用时该方法返回可用通道个数。同时该方法只捕获Channel注册时指定的所关注的事件。

不足:

在一些小容量应用场景下,可以使用单线程模型。但是这对于高负载、大并发的应用场景却不合适,主要原因如下:

  • 一个NIO线程在同时处理成百上千的链路,性能上会出现瓶颈,即便NIO线程的 CPU 负荷达到100%,也无法满足海量消息的编码、解码、读取和发送
  • 当NIO线程负载过重之后,处理速度将变慢,这会导致大量客户端连接超时,超时之后往往会进行重发,这更加重了NIO线程的负载,最终会导致 大量消息积压和处理超时,成为系统的性能瓶颈。
  • 可靠性问题:一旦NIO线程意外跑飞,或者进入死循环,会导致整个系统通信模块不可用,不能接收和处理外部消息,造成节点故障。 为了解决这些问题, 演化出了 Reactor 多线程模型。

Reactor 多线程模型

Rector 多线程模型与单线程模型最大的区别就是有一组 NIO 线程来处理 I/O 操作。

经典Reactor模式中,尽管一个线程可同时监控多个请求(Channel),但是所有读/写请求以及对新连接请求的处理都在同一个线程中处理,无法充分利用多CPU的优势,同时读/写操作也会阻塞(指的是,比如对读操作来说,将数据从kernel space读到user space的过程)对新连接请求的处理。因此可以引入多线程,并行处理多个读/写操作,如下图所示:

Reactor 多线程模型的特点如下:

  • 有专门一个NIO线程(event loop thread)
    • Acceptor 模块用于监听服务端,接收客户端的TCP 连接请求。
  • 网络I/O操作(读、写等)由一个NIO线程池负责,线程池可以采用标准的 JDK线程池实现,它包含一个任务队列和N个可用的线程,由这些NIO线程负责消息的读取、解码、编码和发送。

event loop thread 可以同时处理N条链路,但是一个链路只对应一个NIO线程, 防止发生并发操作问题。 在绝大多数场景下,Reactor 多线程模型可以满足性能需求。

但是,在个别特殊场景中,一个 NIO 线程负责监听和处理所有的客户端连接可能会存在性能问题。例如并发百万客户端连接,或者服务端需要对客户端握手进行安全认证,但是认证本身非常损耗性能。在这类场景下,单独一个 Acceptor 线程可能会存在性能不足的问题,为了解决性能问题,产生了第三种 Reactor 线程模型——主从 Reactor 多线程模型

Demo

多线程Reactor模式示例代码如下所示。

public class NIOServer {
  private static final Logger LOGGER = LoggerFactory.getLogger(NIOServer.class);
  public static void main(String[] args) throws IOException {
    Selector selector = Selector.open();
    ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
    serverSocketChannel.configureBlocking(false);
    serverSocketChannel.bind(new InetSocketAddress(1234));
    serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
    while (true) {
      if(selector.selectNow() < 0) {
        continue;
      }
      Set<SelectionKey> keys = selector.selectedKeys();
      Iterator<SelectionKey> iterator = keys.iterator();
      while(iterator.hasNext()) {
        SelectionKey key = iterator.next();
        iterator.remove();
        if (key.isAcceptable()) {
          ServerSocketChannel acceptServerSocketChannel = (ServerSocketChannel) key.channel();
          SocketChannel socketChannel = acceptServerSocketChannel.accept();
          socketChannel.configureBlocking(false);
          LOGGER.info("Accept request from {}", socketChannel.getRemoteAddress());
          SelectionKey readKey = socketChannel.register(selector, SelectionKey.OP_READ);
          readKey.attach(new Processor());
        } else if (key.isReadable()) {
          Processor processor = (Processor) key.attachment();
          processor.process(key);
        }
      }
    }
  }
}

从上示代码中可以看到,注册完SocketChannel的OP_READ事件后,可以对相应的SelectionKey attach一个对象(本例中attach了一个Processor对象,该对象处理读请求),并且在获取到可读事件后,可以取出该对象。

注:attach对象及取出该对象是NIO提供的一种操作,但该操作并非Reactor模式的必要操作,本文使用它,只是为了方便演示NIO的接口。

具体的读请求处理在如下所示的Processor类中。该类中设置了一个静态的线程池处理所有请求。而process方法并不直接处理I/O请求,而是把该I/O操作提交给上述线程池去处理,这样就充分利用了多线程的优势,同时将对新连接的处理和读/写操作的处理放在了不同的线程中,读/写操作不再阻塞对新连接请求的处理。

public class Processor {
  private static final Logger LOGGER = LoggerFactory.getLogger(Processor.class);
  private static final ExecutorService service = Executors.newFixedThreadPool(16);
  public void process(SelectionKey selectionKey) {
    service.submit(() -> {
      ByteBuffer buffer = ByteBuffer.allocate(1024);
      SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
      int count = socketChannel.read(buffer);
      if (count < 0) {
        socketChannel.close();
        selectionKey.cancel();
        LOGGER.info("{}\t Read ended", socketChannel);
        return null;
      } else if(count == 0) {
        return null;
      }
      LOGGER.info("{}\t Read message {}", socketChannel, new String(buffer.array()));
      return null;
    });
  }
}

主从 Reactor 多线程模型

主从 Reactor 线程模型的特点是:服务端用于接收客户端连接的不再是一 个单独的 NIO 线程, 而是一个独立的 NIO 线程池。

Acceptor 接收到客户端 TCP连接请求并处理完成后(可能包含接入认证等),将新创建的 SocketChannel 注册到 I/O 线程池(sub reactor 线程池) 的某 I/O 线程上, 由它负责 SocketChannel 的读写和编解码工作。Acceptor 线程池仅仅用于客户端的登录、 握手和安全认证,一旦链路建立成功,就将链路注册到后端 subReactor 线程池 的 I/O 线程上,由 I/O 线程负责后续的 I/O 操作。

Netty中使用的Reactor模式,引入了多Reactor,也即一个主Reactor负责 accept 新连接,多个子Reactor负责监控并处理读/写请求,减轻了主Reactor的压力,降低了主Reactor压力太大而造成的延迟。 并且每个子Reactor分别属于一个独立的线程,每个成功连接后的Channel的所有操作由同一个线程处理。这样保证了同一请求的所有状态和上下文在同一个线程中,避免了不必要的上下文切换,同时也方便了监控请求响应状态。

主从Reactor模式示意图如下所示:

Demo

多Reactor示例代码如下所示:

public class NIOServer {
  private static final Logger LOGGER = LoggerFactory.getLogger(NIOServer.class);
  public static void main(String[] args) throws IOException {
    Selector selector = Selector.open();
    ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
    serverSocketChannel.configureBlocking(false);
    serverSocketChannel.bind(new InetSocketAddress(1234));
    serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
    int coreNum = Runtime.getRuntime().availableProcessors();
    Processor[] processors = new Processor[coreNum];
    for (int i = 0; i < processors.length; i++) {
      processors[i] = new Processor();
    }
    int index = 0;
    while (selector.select() > 0) {
      Set<SelectionKey> keys = selector.selectedKeys();
      for (SelectionKey key : keys) {
        keys.remove(key);
        if (key.isAcceptable()) {
          ServerSocketChannel acceptServerSocketChannel = (ServerSocketChannel) key.channel();
          SocketChannel socketChannel = acceptServerSocketChannel.accept();
          socketChannel.configureBlocking(false);
          LOGGER.info("Accept request from {}", socketChannel.getRemoteAddress());
          Processor processor = processors[(int) ((index++) % coreNum)];
          processor.addChannel(socketChannel);
          processor.wakeup();
        }
      }
    }
  }
}

如上代码所示,本文设置的子Reactor个数是当前机器可用核数的两倍(与Netty默认的子Reactor个数一致)。对于每个成功连接的SocketChannel,通过round robin的方式交给不同的子Reactor。

子Reactor对SocketChannel的处理如下所示。

public class Processor {
  private static final Logger LOGGER = LoggerFactory.getLogger(Processor.class);
  private static final ExecutorService service =
      Executors.newFixedThreadPool(2 * Runtime.getRuntime().availableProcessors());
  private Selector selector;
  public Processor() throws IOException {
    this.selector = SelectorProvider.provider().openSelector();
    start();
  }
  public void addChannel(SocketChannel socketChannel) throws ClosedChannelException {
    socketChannel.register(this.selector, SelectionKey.OP_READ);
  }
  public void wakeup() {
    this.selector.wakeup();
  }
  public void start() {
    service.submit(() -> {
      while (true) {
        if (selector.select(500) <= 0) {
          continue;
        }
        Set<SelectionKey> keys = selector.selectedKeys();
        Iterator<SelectionKey> iterator = keys.iterator();
        while (iterator.hasNext()) {
          SelectionKey key = iterator.next();
          iterator.remove();
          if (key.isReadable()) {
            ByteBuffer buffer = ByteBuffer.allocate(1024);
            SocketChannel socketChannel = (SocketChannel) key.channel();
            int count = socketChannel.read(buffer);
            if (count < 0) {
              socketChannel.close();
              key.cancel();
              LOGGER.info("{}\t Read ended", socketChannel);
              continue;
            } else if (count == 0) {
              LOGGER.info("{}\t Message size is 0", socketChannel);
              continue;
            } else {
              LOGGER.info("{}\t Read message {}", socketChannel, new String(buffer.array()));
            }
          }
        }
      }
    });
  }
}

在Processor中,同样创建了一个静态的线程池,且线程池的大小为机器核数的两倍。每个Processor实例均包含一个Selector实例。同时每次获取Processor实例时均提交一个任务到该线程池,并且该任务正常情况下一直循环处理,不会停止。而提交给该Processor的SocketChannel通过在其Selector注册事件,加入到相应的任务中。由此实现了每个子Reactor包含一个Selector对象,并由一个独立的线程处理。

模拟Proactor

Proactor 模式

在 Reactor 模式中,Reactor 等待某个事件或者可应用或者操作的状态发生(比如文件描述符可读写,或者是 Socket 可读写)。

然后把这个事件传给事先注册的 Handler(事件处理函数或者回调函数),由后者来做实际的读写操作。

其中的读写操作都需要应用程序同步操作,所以 Reactor 是非阻塞同步网络模型。

如果把 I/O 操作改为异步,即交给操作系统来完成就能进一步提升性能,这就是异步网络模型 Proactor。

Proactor 是和异步 I/O 相关的,详细方案如下:

  • Proactor Initiator 创建 Proactor 和 Handler 对象,并将 Proactor 和 Handler 都通过 AsyOptProcessor(Asynchronous Operation Processor)注册到内核;
  • AsyOptProcessor 处理注册请求,并处理 I/O 操作;
  • AsyOptProcessor 完成 I/O 操作后通知 Proactor;
  • Proactor 根据不同的事件类型回调不同的 Handler 进行业务处理;
  • Handler 完成业务处理。

通过对比 Proactor 和 Reactor ,可以看出 Proactor有如下缺点:

  • 编程复杂性,由于异步操作流程的事件的初始化和事件完成在时间和空间上都是相互分离的,因此开发异步应用程序更加复杂。应用程序还可能因为反向的流控而变得更加难以 Debug;
  • 内存使用,缓冲区在读或写操作的时间段内必须保持住,可能造成持续的不确定性,并且每个并发操作都要求有独立的缓存,相比 Reactor 模式,在 Socket 已经准备好读或写前,是不要求开辟缓存的;
  • 操作系统支持,Windows 下通过 IOCP 实现了真正的异步 I/O,而在 Linux 系统下,Linux 2.6 才引入,目前异步 I/O 还不完善。

总结

总结来说,Reactor 和 Proactor 模式的主要区别就是真正的读取和写入操作是由谁来完成的Reactor 中需要工作线程自己来完成数据读取或者写入,而Proactor模式中,应用程序不需要进行实际的读写过程,它只需要从缓存区读取或者写入即可,操作系统会读取缓存区或者写入缓存区到 I/O 设备

所以说,同步(非阻塞) I/O 模型通常用于实现 Reactor 模式,异步 I/O 模型则用于实现 Proactor 模式。同步情况下(Reactor),调用事件处理器时,表示 I/O 设备可以进行可读或可写操作(can read or can write);异步情况下(Proactor),当调用事件处理器时,表示 I/O 操作已经完成。

在Proactor中实现读:

  • 处理器发起异步读操作(注意:操作系统必须支持异步IO)。在这种情况下,处理器无视IO就绪事件,它关注的是完成事件。
  • 事件分离器等待操作完成事件
  • 在分离器等待过程中,操作系统利用并行的内核线程执行实际的读操作,并将结果数据存入用户自定义缓冲区,最后通知事件分离器读操作完成。
  • 事件分离器呼唤处理器。
  • 事件处理器处理用户自定义缓冲区中的数据,然后启动一个新的异步操作,并将控制权返回事件分离器。

可以看出,两个模式的相同点,都是对某个IO事件的事件通知(即告诉某个模块,这个IO操作可以进行或已经完成)。在结构上,两者也有相同点:demultiplexor负责提交IO操作(异步)、查询设备是否可操作(同步),然后当条件满足时,就回调handler;不同点在于,异步情况下(Proactor),当回调handler时,表示IO操作已经完成;同步情况下(Reactor),回调handler时,表示IO设备可以进行某个操作(can read or can write)。

因此,Proactor 模式需要操作系统提供异步 I/O API。

模拟异步

我们将尝试提供一种融合了Proactor和Reactor两种模式的解决方案。 为了演示这个方案,我们将Reactor稍做调整,模拟成异步的Proactor模型(主要是在事件分离器里完成本该事件处理者做的实际读写工作,我们称这种方法为”模拟异步“)。

//TO DO

例子

Reactor:

  • libevent/libev/libuv/ZeroMQ/Event Library in Redis
  • Nginx 采用 master-slave 模型
  • Node.js

Reference