Java NIO 反应堆模式简单模型
一般NIO里反应堆模式都是这样:一个Acceptor(当然多个也行,不过一般场景一个够了)负责accept事件,把接收到Socket CHannel注册到按某种算法从Reactor池中取出的一个Reactor上,注册的事件为读,写等,之后这个Socket Channel的所有IO事件都和Acceptor没关系,都由被注册到的那个Reactor来负责。
每个Acceptor和每个Reactor都各自持有一个Selector
当然每个Acceptor和Reactor都得是一个线程(起码在逻辑上得是线程)
简单实现,三个类NioAcceptor、NioReactor和ReactorPool:
package cc.lixiaohui.demo.dp.reator; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.util.Objects; import java.util.Set; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * Acceptor负责处理SelectionKey.OP_ACCEPT事件, 将接收到的SocketChannel注册到Reactor上去 */ public class NioAcceptor { private int port; private String host; private Selector selector; // Java NIO Selector private final ServerSocketChannel serverChannel; // Java NIO ServerSocketChannel private ReactorPool reactorPool; // NioReactor池 private Thread thread; // 工作线程 private volatile boolean stop = false; private static final Logger logger = LoggerFactory.getLogger(NioAcceptor.class); public NioAcceptor(int port, String host, int reactorPoolSize) throws IOException { this.port = port; this.host = Objects.requireNonNull(host); this.reactorPool = new ReactorPool(reactorPoolSize); selector = Selector.open(); // 创建selector serverChannel = ServerSocketChannel.open(); // new server socket channel serverChannel.configureBlocking(false); // in non-blocking mode serverChannel.bind(new InetSocketAddress(host, port)); // bind serverChannel.register(selector, SelectionKey.OP_ACCEPT); // } public void stop() throws InterruptedException { stop = true; thread.join(); } public void start() { thread = new Thread(new AcceptTask(this)); thread.start(); } private static class AcceptTask implements Runnable { NioAcceptor acceptor; AcceptTask(NioAcceptor acceptor) { this.acceptor = acceptor; } public void run() { final Selector selector = acceptor.selector; Set<SelectionKey> keys = null; while (!acceptor.stop) { // 运行中 try { selector.select(1000L); // select, 最多等1秒 keys = selector.selectedKeys(); try { for (SelectionKey key : keys) { if (key.isValid() && key.isAcceptable()) { // 可accept SocketChannel channel = acceptor.serverChannel.accept(); channel.configureBlocking(false); // 取下一个Reactor并把SocketChannel加入到Reactor的注册队列 acceptor.reactorPool.nextReactor().postRegistry(channel); } else { key.cancel(); } } } finally { keys.clear(); } } catch (IOException e) { logger.error("", e); } } } } }
/** * Reactor负责SelectionKey.OP_READ | SelectionKey.OP_WRITE等事件 */ public class NioReactor { /** 待注册的{@link SocketChannel} 队列 */ private Queue<SocketChannel> registerQueue = new ConcurrentLinkedQueue<SocketChannel>(); private Selector selector; private volatile boolean stop = false; private Thread thread; private static final Logger logger = LoggerFactory.getLogger(NioReactor.class); public NioReactor() throws IOException { selector = Selector.open(); } public void postRegistry(SocketChannel channel) { registerQueue.add(channel); selector.wakeup(); // 唤醒selector, 以便让其即时处理注册 } public NioReactor start() { thread = new Thread(new ReactTask(this)); thread.start(); return this; } public void stop() throws InterruptedException { stop = true; thread.join(); } /** * 处理队列里面的待注册的SocketChannel */ private void doRegister(Selector selector) { while (!registerQueue.isEmpty()) { SocketChannel channel = registerQueue.poll(); try { // 注册读事件, 写事件无需注册, 写事件是业务驱动的, 当往channel写入 数据未写完时再注册写事件 channel.register(selector, SelectionKey.OP_READ); } catch (ClosedChannelException e) { logger.error("", e); } } } private void handleWrite(SelectionKey key) { // TODO 业务写 } private void handleRead(SelectionKey key) { // TODO 业务读 } private static class ReactTask implements Runnable { NioReactor reactor; ReactTask(NioReactor reactor) { this.reactor = reactor; } public void run() { Set<SelectionKey> keys = null; while (!reactor.stop) { final Selector selector = reactor.selector; try { selector.select(500L); reactor.doRegister(selector); // 处理注册 keys = selector.selectedKeys(); for (SelectionKey key : keys) { try { if (!key.isValid()) { // not valid key.cancel(); continue; } if (key.isReadable()) { // 可读 reactor.handleRead(key); } if (key.isWritable()) { // 可写 reactor.handleWrite(key); } } catch (Throwable t) { logger.error("", t); continue; } } } catch (IOException e) { logger.error("", e); } } } } }
ReactorPool用来管理Reactor:
public class ReactorPool extends LinkedList<NioReactor>{ private static final long serialVersionUID = 6525233920805533099L; private final int capacity; public ReactorPool(int size) { this.capacity = size; } // 轮询算法取下一个Reactor public NioReactor nextReactor() throws IOException { // 新建或从头部拿一个Reactor NioReactor reactor = size() < capacity ? new NioReactor().start() : poll(); add(reactor);// 加到尾部 return reactor; } }
参考:
Netty的NIO模型
Mycat的NIO实现
相关推荐
Java NIO反应器模式讲解,目前热门的Java网络通信框架中Mina,Netty等都采用NIO
java NIO和java并发编程的书籍java NIO和java并发编程的书籍java NIO和java并发编程的书籍java NIO和java并发编程的书籍java NIO和java并发编程的书籍java NIO和java并发编程的书籍java NIO和java并发编程的书籍java...
Java NIO系列教程(一) Java NIO 概述 Java NIO系列教程(二) Channel Java NIO系列教程(三) Buffer Java NIO系列教程(四) Scatter/Gather Java NIO系列教程(五) 通道之间的数据传输 Java NIO系列教程(六)...
Java nio 超大数据文件 超大数据文件Java nio 超大数据文件 超大数据文件Java nio 超大数据文件 超大数据文件Java nio 超大数据文件 超大数据文件Java nio 超大数据文件 超大数据文件Java nio 超大数据文件 超大数据...
Java NIO英文高清原版
讲解了 JavaIO 与 JAVA NIO区别,JAVA NIO设计理念,以及JDK中java NIO中语法的使用
Java NIO 深入探讨了 1.4 版的 I/O 新特性,并告诉您如何使用这些特性来极大地提升您所写的 Java 代码的执行效率。这本小册子就程序员所面临的有代表性的 I/O 问题作了详尽阐述,并讲解了 如何才能充分利用新的 I/O ...
java nio 实现socketjava nio 实现socketjava nio 实现socketjava nio 实现socketjava nio 实现socket
Java NIO(New IO)是一个可以替代标准Java IO API的IO API(从Java 1.4开始),Java NIO提供了与标准IO不同的IO工作方式。 Java NIO: Channels and Buffers(通道和缓冲区) 标准的IO基于字节流和字符流进行操作的,...
java NIO是 java New IO 的简称,在 jdk1.4 里提供的新 api 。 Sun 官方标榜的特性如下: – 为所有的原始类型提供 (Buffer) 缓存支持。 – 字符集编码解码解决方案。 – Channel :一个新的原始 I/O 抽象。 – 支持...
java侧起server(NioUdpServer1.java),基于Java Nio的selector 阻塞等候,一个android app(NioUdpClient1文件夹)和一个java程序(UI.java)作为两个client分别向该server发数据,server收到后分别打印收到的消息...
java NIO.zip
java nio编程 非阻塞模式的通信 电子书 带目录标签
java nio 读文件,java nio 读文件
java NIO技巧及原理解析,java IO原理,NIO框架分析,性能比较
java基于NIO实现Reactor模型源码java基于NIO实现Reactor模型源码java基于NIO实现Reactor模型源码java基于NIO实现Reactor模型源码java基于NIO实现Reactor模型源码java基于NIO实现Reactor模型源码java基于NIO实现...
JAVA NIO学习资料JAVA NIO学习资料
基于Java NIO实现五子棋游戏.zip基于Java NIO实现五子棋游戏.zip 基于Java NIO实现五子棋游戏.zip基于Java NIO实现五子棋游戏.zip 基于Java NIO实现五子棋游戏.zip基于Java NIO实现五子棋游戏.zip 基于Java NIO实现...
Java NIO测试示例
JavaNIO.pdf