`

Java NIO 反应堆模式

阅读更多

Java NIO 反应堆模式简单模型

一般NIO里反应堆模式都是这样:一个Acceptor(当然多个也行,不过一般场景一个够了)负责accept事件,把接收到Socket CHannel注册到按某种算法从Reactor池中取出的一个Reactor上,注册的事件为读,写等,之后这个Socket Channel的所有IO事件都和Acceptor没关系,都由被注册到的那个Reactor来负责。

 

每个Acceptor和每个Reactor都各自持有一个Selector

 

当然每个Acceptor和Reactor都得是一个线程(起码在逻辑上得是线程)

 

简单实现,三个类NioAcceptor、NioReactor和ReactorPool:

 

package cc.lixiaohui.demo.dp.reator;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Objects;
import java.util.Set;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * Acceptor负责处理SelectionKey.OP_ACCEPT事件, 将接收到的SocketChannel注册到Reactor上去
 */
public class NioAcceptor {
	
	private int port;
	
	private String host;
	
	private Selector selector; // Java NIO Selector
	
	private final ServerSocketChannel serverChannel; // Java NIO ServerSocketChannel
	
	private ReactorPool reactorPool; // NioReactor池
	
	private Thread thread; // 工作线程
	
	private volatile boolean stop = false;
	
	private static final Logger logger = LoggerFactory.getLogger(NioAcceptor.class);
	
	public NioAcceptor(int port, String host, int reactorPoolSize) throws IOException {
		this.port = port;
		this.host = Objects.requireNonNull(host);
		this.reactorPool = new ReactorPool(reactorPoolSize);
		
		selector = Selector.open(); // 创建selector
		serverChannel = ServerSocketChannel.open(); // new server socket channel
		serverChannel.configureBlocking(false); // in non-blocking mode
		serverChannel.bind(new InetSocketAddress(host, port)); // bind
		serverChannel.register(selector, SelectionKey.OP_ACCEPT); // 
	}
	
	public void stop() throws InterruptedException {
		stop = true;
		thread.join();
	}
	
	public void start() {
		thread = new Thread(new AcceptTask(this));
		thread.start();
	}
	
	private static class AcceptTask implements Runnable {
		
		NioAcceptor acceptor;
		
		AcceptTask(NioAcceptor acceptor) {
			this.acceptor = acceptor;
		}

		public void run() {
			final Selector selector = acceptor.selector;
			Set<SelectionKey> keys = null;
			while (!acceptor.stop) { // 运行中
				try {
					selector.select(1000L); // select, 最多等1秒
					keys = selector.selectedKeys();
					try {
						for (SelectionKey key : keys) {
							if (key.isValid() && key.isAcceptable()) { // 可accept
								SocketChannel channel = acceptor.serverChannel.accept();
								channel.configureBlocking(false);
								// 取下一个Reactor并把SocketChannel加入到Reactor的注册队列
								acceptor.reactorPool.nextReactor().postRegistry(channel);
							} else {
								key.cancel();
							}
						}
					} finally {
						keys.clear();
					}
				} catch (IOException e) {
					logger.error("", e);
				}
			}
		}
	}
}
/**
 * Reactor负责SelectionKey.OP_READ | SelectionKey.OP_WRITE等事件
 */
public class NioReactor {
	
	/** 待注册的{@link SocketChannel} 队列 */
	private Queue<SocketChannel> registerQueue = new ConcurrentLinkedQueue<SocketChannel>();
	
	private Selector selector;
	
	private volatile boolean stop = false;
	
	private Thread thread;
	
	private static final Logger logger = LoggerFactory.getLogger(NioReactor.class);
	
	public NioReactor() throws IOException {
		selector = Selector.open();
	}
	
	public void postRegistry(SocketChannel channel) {
		registerQueue.add(channel);
		selector.wakeup(); // 唤醒selector, 以便让其即时处理注册
	}
	
	public NioReactor start() {
		thread = new Thread(new ReactTask(this));
		thread.start();
		return this;
	}
	
	public void stop() throws InterruptedException {
		stop = true;
		thread.join();
	}
	
	/**
	 * 处理队列里面的待注册的SocketChannel
	 */
	private void doRegister(Selector selector) {
		while (!registerQueue.isEmpty()) {
			SocketChannel channel = registerQueue.poll();
			try {
				// 注册读事件, 写事件无需注册, 写事件是业务驱动的, 当往channel写入 数据未写完时再注册写事件
				channel.register(selector, SelectionKey.OP_READ); 
			} catch (ClosedChannelException e) {
				logger.error("", e);
			}
		}
	}
	
	private void handleWrite(SelectionKey key) {
		// TODO 业务写
	}

	private void handleRead(SelectionKey key) {
		// TODO 业务读
	}
	
	private static class ReactTask implements Runnable {
		
		NioReactor reactor;
		
		ReactTask(NioReactor reactor) {
			this.reactor = reactor;
		}
		
		public void run() {
			Set<SelectionKey> keys = null;
			while (!reactor.stop) {
				final Selector selector = reactor.selector;
				try {
					selector.select(500L);
					reactor.doRegister(selector); // 处理注册
					keys = selector.selectedKeys();
					
					for (SelectionKey key : keys) {
						try {
							if (!key.isValid()) { // not valid
								key.cancel();
								continue;
							}
							if (key.isReadable()) { // 可读
								reactor.handleRead(key);
							}
							if (key.isWritable()) { // 可写
								reactor.handleWrite(key);
							}
						} catch (Throwable t) {
							logger.error("", t);
							continue;
						}
					}
				} catch (IOException e) {
					logger.error("", e);
				}
			}
		}
	}
 }

 

ReactorPool用来管理Reactor:

public class ReactorPool extends LinkedList<NioReactor>{
	
	private static final long serialVersionUID = 6525233920805533099L;
	
	private final int capacity;
	
	public ReactorPool(int size) {
		this.capacity = size;
	}
	// 轮询算法取下一个Reactor
	public NioReactor nextReactor() throws IOException {
		// 新建或从头部拿一个Reactor
		NioReactor reactor = size() < capacity ? new NioReactor().start() : poll();
		add(reactor);// 加到尾部
		return reactor;
	}

}

 

参考:

Netty的NIO模型

Mycat的NIO实现

0
1
分享到:
评论

相关推荐

    基于Java NIO反应器模式设计与实现

    Java NIO反应器模式讲解,目前热门的Java网络通信框架中Mina,Netty等都采用NIO

    java NIO和java并发编程的书籍

    java NIO和java并发编程的书籍java NIO和java并发编程的书籍java NIO和java并发编程的书籍java NIO和java并发编程的书籍java NIO和java并发编程的书籍java NIO和java并发编程的书籍java NIO和java并发编程的书籍java...

    JavaNIO chm帮助文档

    Java NIO系列教程(一) Java NIO 概述 Java NIO系列教程(二) Channel Java NIO系列教程(三) Buffer Java NIO系列教程(四) Scatter/Gather Java NIO系列教程(五) 通道之间的数据传输 Java NIO系列教程(六)...

    java nio 包读取超大数据文件

    Java nio 超大数据文件 超大数据文件Java nio 超大数据文件 超大数据文件Java nio 超大数据文件 超大数据文件Java nio 超大数据文件 超大数据文件Java nio 超大数据文件 超大数据文件Java nio 超大数据文件 超大数据...

    Java NIO英文高清原版

    Java NIO英文高清原版

    java NIO 中文版

    讲解了 JavaIO 与 JAVA NIO区别,JAVA NIO设计理念,以及JDK中java NIO中语法的使用

    Java NIO 中文 Java NIO 中文 Java NIO 中文文档

    Java NIO 深入探讨了 1.4 版的 I/O 新特性,并告诉您如何使用这些特性来极大地提升您所写的 Java 代码的执行效率。这本小册子就程序员所面临的有代表性的 I/O 问题作了详尽阐述,并讲解了 如何才能充分利用新的 I/O ...

    java nio 实现socket

    java nio 实现socketjava nio 实现socketjava nio 实现socketjava nio 实现socketjava nio 实现socket

    java NIO 视频教程

    Java NIO(New IO)是一个可以替代标准Java IO API的IO API(从Java 1.4开始),Java NIO提供了与标准IO不同的IO工作方式。 Java NIO: Channels and Buffers(通道和缓冲区) 标准的IO基于字节流和字符流进行操作的,...

    java nio中文版

    java NIO是 java New IO 的简称,在 jdk1.4 里提供的新 api 。 Sun 官方标榜的特性如下: – 为所有的原始类型提供 (Buffer) 缓存支持。 – 字符集编码解码解决方案。 – Channel :一个新的原始 I/O 抽象。 – 支持...

    Java Nio selector例程

    java侧起server(NioUdpServer1.java),基于Java Nio的selector 阻塞等候,一个android app(NioUdpClient1文件夹)和一个java程序(UI.java)作为两个client分别向该server发数据,server收到后分别打印收到的消息...

    java NIO.zip

    java NIO.zip

    Java NIO.pdf

    java nio编程 非阻塞模式的通信 电子书 带目录标签

    java nio 读文件

    java nio 读文件,java nio 读文件

    java NIO技巧及原理

    java NIO技巧及原理解析,java IO原理,NIO框架分析,性能比较

    java基于NIO实现Reactor模型源码.zip

    java基于NIO实现Reactor模型源码java基于NIO实现Reactor模型源码java基于NIO实现Reactor模型源码java基于NIO实现Reactor模型源码java基于NIO实现Reactor模型源码java基于NIO实现Reactor模型源码java基于NIO实现...

    JAVA NIO 学习资料

    JAVA NIO学习资料JAVA NIO学习资料

    基于Java NIO实现五子棋游戏.zip

    基于Java NIO实现五子棋游戏.zip基于Java NIO实现五子棋游戏.zip 基于Java NIO实现五子棋游戏.zip基于Java NIO实现五子棋游戏.zip 基于Java NIO实现五子棋游戏.zip基于Java NIO实现五子棋游戏.zip 基于Java NIO实现...

    Java NIO测试示例

    Java NIO测试示例

    JavaNIO.pdf

    JavaNIO.pdf

Global site tag (gtag.js) - Google Analytics