`

Java NIO时间服务

阅读更多

Java NIO时间服务

 

这篇文章内容是另一篇文章《Java 实现基于Redis的分布式锁》的分支. 

 

时间服务包括客户端和服务端, 服务端监听请求 ,若是时间请求,则返回当前服务器的时间, 各个客户端(分布式锁) 都从给服务器获取时间,已达到全局时间一致。

 

共三个类 TimeServer、 TimeClient和TimeClientException,下面是源码:

 

TimeServer.java:

package cc.lixiaohui.lock.time.nio.server;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * 提供简单时间服务的服务器
 * 
 * @author lixiaohui
 */
public class TimeServer {

	private ServerSocketChannel serverChannel;

	private Selector selector;

	private volatile boolean alive = true;

	private static final String TIME_CMD = "time";
	private static final String HALT_CMD = "halt";

	private static final String ERROR = "error";

	private static final Logger logger = LoggerFactory.getLogger(TimeServer.class);

	public void start(int port) throws IOException {
		selector = Selector.open();

		serverChannel = ServerSocketChannel.open();
		serverChannel.configureBlocking(false); // non-blocking mode
		serverChannel.bind(new InetSocketAddress(port));

		// interested only in accept event
		serverChannel.register(selector, SelectionKey.OP_ACCEPT);

		while (alive) {
			try {
				if (selector.select() < 0) { // no events
					continue;
				}
				Iterator<SelectionKey> it = selector.selectedKeys().iterator();
				while (it.hasNext()) {
					SelectionKey key = it.next();
					it.remove();
					try {
						if (!key.isValid()) 
							continue;
						
						if (key.isAcceptable()) { // new channel incoming
							SocketChannel ch = ((ServerSocketChannel) key.channel()).accept();
							// ignore if register failed
							if (!registerChannel(selector, ch, SelectionKey.OP_READ)) {
								continue;
							}
							logger.info("new channel registered {}", ch.getRemoteAddress().toString());
						}
						// client request
						if (key.isReadable()) {
							handleRead(key);
						}
						if (key.isWritable()) {
							handleWrite(key);
						}
					} catch (IOException e) {
						logger.error("{} exception: {}", key.channel(), e);
						if (key != null) {
							key.cancel();
							if (key.channel() != null) {
								key.channel().close();
							}
						}
					}
				}
			} catch (Exception e) {
				logger.error("{}", e);
			}
		}
		
		if (selector != null) {
			try {
			selector.close();
			} catch (Exception e) {
				logger.error("error occurred when closing selector: e", e);
			}
		}
	}

	private void handleWrite(SelectionKey key) throws IOException {
		SocketChannel ch = (SocketChannel) key.channel();
		try {
			ByteBuffer buf = (ByteBuffer) key.attachment();
			if (buf != null) {
				writeBytesToChannel(ch, buf, key);
			}
		} catch (ClassCastException e) {
			logger.error("{}", e);
		}
	}

	private void handleRead(SelectionKey key) throws IOException {
		SocketChannel ch = (SocketChannel) key.channel();
		ByteBuffer buffer = ByteBuffer.allocate(16);
		int read = ch.read(buffer);
		if (read < 4) { // not a full command, write error back,
						// meaning client will send command
						// again.
			writeBytesToChannel(ch, ERROR.getBytes(), key);
		} else {
			String cmd = extractCommand(buffer);
			logger.info("recieve {} request from {}", cmd, ch.getRemoteAddress().toString());
			if (TIME_CMD.equalsIgnoreCase(cmd)) {
				// 回写时间
				writeBytesToChannel(ch, String.valueOf(time()).getBytes(), key);
				logger.info("write time to {}", ch.getRemoteAddress().toString());
			} else if (HALT_CMD.equalsIgnoreCase(cmd)) {
				// 停止服务
				logger.info("stopping timeserver");
				stop();
				logger.info("timeserver stopped");
			} else {
				writeBytesToChannel(ch, ERROR.getBytes(), key);
				logger.warn("unreconized command {}, will discard it.", cmd);
			}
		}
	}

	private String extractCommand(ByteBuffer buffer) {
		buffer.flip();
		byte[] array = buffer.array();
		byte[] newArray = new byte[buffer.remaining()];
		System.arraycopy(array, buffer.position(), newArray, 0, buffer.remaining());
		return new String(newArray);
	}

	private void writeBytesToChannel(SocketChannel ch, byte[] bs, SelectionKey key) throws IOException {
		ByteBuffer buf = ByteBuffer.wrap(bs);
		int total = buf.remaining();
		int write = ch.write(buf);
		if (write < total) { // didn't wrote all, then write rest when next
								// event triggered
			key.attach(buf);
		}
	}

	private void writeBytesToChannel(SocketChannel ch, ByteBuffer buf, SelectionKey key) throws IOException {
		if (!buf.hasRemaining()) {
			return;
		}
		int total = buf.remaining();
		int write = ch.write(buf);
		if (write < total) { // didn't wrote all, then write rest when next
								// event triggered
			key.attach(buf);
		}
	}

	protected void stop() {
		alive = false;
		try {
			serverChannel.close();
			selector.close();
		} catch (IOException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
	}

	private boolean registerChannel(Selector sel, SocketChannel sc, int ops) {
		try {
			sc.configureBlocking(false);
			sc.register(sel, ops);
		} catch (Exception e) {
			return false;
		}
		return true;
	}

	private long time() {
		return System.currentTimeMillis();
	}

}

 

TimeCient.java:

 

package cc.lixiaohui.lock.time.nio.client;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;

/**
 * 时间获取客户端
 * @author lixiaohui
 *
 */
public class TimeClient {
	
	private static final String TIME_CMD = "time";
	
	private final SocketAddress address;
	
	private SocketChannel channel;
	
	public TimeClient(SocketAddress address) throws IOException {
		this.address = address;
		channel = SocketChannel.open(address);
		channel.configureBlocking(true); // blocking mode
	}
	
	/**
	 * @throws TimeClientException when connection with time server is closed.
	 * @return currentTimeMillis in server
	 */
	public long currentTimeMillis() {
		try {
			channel.write(ByteBuffer.wrap(TIME_CMD.getBytes()));
			
			ByteBuffer buf = ByteBuffer.allocate(64);
			channel.read(buf);
			
			buf.flip(); // flip for use of read
			byte[] bytes = new byte[buf.limit() - buf.position()];
			System.arraycopy(buf.array(), buf.position(), bytes, 0, bytes.length);
			
			return Long.parseLong(new String(bytes));
		} catch(NumberFormatException e) {
			System.err.println(e);
			return System.currentTimeMillis();
		} catch (IOException e) {
			throw new TimeClientException(address);
		}
	}
	
	/**
	 * close the client, along with its connection with server.
	 */
	public void close() {
		try {
			if (channel != null) {
				channel.close();
			}
		} catch (IOException e) {
			e.printStackTrace();
		}
		
	}
	
	public static void main(String[] args) throws IOException {
		TimeClient client = new TimeClient(new InetSocketAddress("localhost", 9999));
		System.out.println(client.currentTimeMillis());
		//client.close();
		System.in.read();
	}

	
}

 

TimeClientException.java:

package cc.lixiaohui.lock.time.nio.client;

import java.net.SocketAddress;

public class TimeClientException extends RuntimeException {

	/**
	 * 
	 */
	private static final long serialVersionUID = 1L;

	public TimeClientException() {
		super();
		// TODO Auto-generated constructor stub
	}

	public TimeClientException(String message) {
		super(message);
		// TODO Auto-generated constructor stub
	}
	
	public TimeClientException(SocketAddress address) {
		super(address.toString());
	}
	
}

 

0
3
分享到:
评论

相关推荐

    Java NIO原理解析

    Java NIO原理解析jdk供的无阻塞I/O(NIO...在NIO中使用多线程,主要目的已不是为了应对每个客户端请求而分配独立的服务线程,而是通过多线程充分使用用多个CPU的处理能力和处理中的等待时间,达到提高服务能力的目的。

    基于时间的NIO多线程服务器

    如果你是开发服务器端的程序,java nio 是一个很好的选择,流I/O的效率大家是知道的

    java解读NIOSocket非阻塞模式.zip

    在NIO中使用多线程,主要目的已不是为了应对每个客户端请求而分配独立的服务线程,而是通过多线程充分使用用多个CPU的处理能力和处理中的等待时间,达到提高服务能力的目的。 client多线程请求server端,server接收...

    基于javatcpsocket通信的拆包和装包源码-DistributedSystemUsingJavaNIO:手把手教你使用JavaNIO构

    JavaNIO网上已有许多不错的文章和教程供开发者学习,如,等等,详细内容本文不再累述,这里只简要总结一下。 普通IO,也叫BIO、Blocking IO,数据在客户和服务器之间交换是阻塞式的,数据通过TC

    JAVA_API1.6文档(中文)

    java.nio.channels.spi 用于 java.nio.channels 包的服务提供者类。 java.nio.charset 定义用来在字节和 Unicode 字符之间转换的 charset、解码器和编码器。 java.nio.charset.spi java.nio.charset 包的服务提供...

    Java下TCP文件传输功能实现

    前一段时间刚做了个java程序和网络上多台机器的c程序通讯的项目,遵循的是TCP/IP协议,用到了java的Socket编程。网络通讯是java的强项,用TCP/IP协议可以方便的和网络上的其他程序互通消息。

    java常用代码

    6.NioFile.java nio操作文件读写 7.ImageThumbnail.java 生成图片缩略图 8.JsonObjs.java json简单操作 9.HttpProxy.java 代理设置 10.CaptureScreen.java 截取桌面 11.XmlReaer.java 读写xml文件 12.ArrayConvers...

    Java编程中的IO模型详解:BIO,NIO,AIO的区别与实际应用场景分析

    IO模型决定了数据的传输方式,Java支持BIO,NIO,AIO三种IO模型。BIO是同步阻塞模型,特点是一对一的客户端与处理线程关系,适用场景是连接数量较小并且固定的,优点是编程简单,但对服务器资源要求高。NIO是同步非...

    Java 1.6 API 中文 New

    java.nio.channels.spi 用于 java.nio.channels 包的服务提供者类。 java.nio.charset 定义用来在字节和 Unicode 字符之间转换的 charset、解码器和编码器。 java.nio.charset.spi java.nio.charset 包的服务提供者...

    Netty:从原生Java的Io操作(BIO)到NIO编程,约会NIO高效并发框架——Netty,需要用到Java的基础知识(多线程,网络编程,IO,设计模式尤其是代理模式),介绍了Netty的高性能架构设计和核心模块组件,Google的Protobuf作为编码解码的数据存储格式,Netty编码器和处理程序的调用机制,TCP粘包和拆包及其解决方法,Netty核心代码剖析,最后自己手动10天左右的时间学完,确实需要对Java编程有一定基础要求,自己也是所有人匪浅

    净额从原生Java的Io操作(BIO)到NIO编程,约会NIO高效并发框架——Netty,需要用到Java的基础知识(多线程,网络编程,IO,设计模式尤其是代理模式),介绍了Netty的高级架构设计和核心模块组件,Google上的...

    JAVA上百实例源码以及开源项目

    显示出当前时间及年份,还可以选择年份及月份和日期 Java编写的HTML浏览器 一个目标文件 摘要:Java源码,网络相关,浏览器  Java编写的HTML浏览器源代码,一个很简单甚至不算是浏览器的HTML浏览器,使用方法:  可...

    java api最新7.0

    java.nio.channels.spi 用于 java.nio.channels 包的服务提供者类。 java.nio.charset 定义用来在字节和 Unicode 字符之间转换的 charset、解码器和编码器。 java.nio.charset.spi java.nio.charset 包的服务提供者...

    JavaAPI1.6中文chm文档 part1

    java.nio.channels.spi 用于 java.nio.channels 包的服务提供者类。 java.nio.charset 定义用来在字节和 Unicode 字符之间转换的 charset、解码器和编码器。 java.nio.charset.spi java.nio.charset 包的服务提供...

    JAVA上百实例源码以及开源项目源代码

    显示出当前时间及年份,还可以选择年份及月份和日期 Java编写的HTML浏览器 一个目标文件 摘要:Java源码,网络相关,浏览器  Java编写的HTML浏览器源代码,一个很简单甚至不算是浏览器的HTML浏览器,使用方法:  可...

    java开源包4

    PortGroper 是一款java写的开源拒绝服务测试工具,它不是僵尸网络类的ddos,而是使用大量的代理作为bots发起DDOS。Port Groper可以与用测试防火墙,干扰web 统计脚本的跟踪,为网站增加流量..往好了用什么都能干,就是...

    java开源包101

    PortGroper 是一款java写的开源拒绝服务测试工具,它不是僵尸网络类的ddos,而是使用大量的代理作为bots发起DDOS。Port Groper可以与用测试防火墙,干扰web 统计脚本的跟踪,为网站增加流量..往好了用什么都能干,就是...

    java开源包11

    PortGroper 是一款java写的开源拒绝服务测试工具,它不是僵尸网络类的ddos,而是使用大量的代理作为bots发起DDOS。Port Groper可以与用测试防火墙,干扰web 统计脚本的跟踪,为网站增加流量..往好了用什么都能干,就是...

    java开源包6

    PortGroper 是一款java写的开源拒绝服务测试工具,它不是僵尸网络类的ddos,而是使用大量的代理作为bots发起DDOS。Port Groper可以与用测试防火墙,干扰web 统计脚本的跟踪,为网站增加流量..往好了用什么都能干,就是...

    java开源包9

    PortGroper 是一款java写的开源拒绝服务测试工具,它不是僵尸网络类的ddos,而是使用大量的代理作为bots发起DDOS。Port Groper可以与用测试防火墙,干扰web 统计脚本的跟踪,为网站增加流量..往好了用什么都能干,就是...

    java开源包5

    PortGroper 是一款java写的开源拒绝服务测试工具,它不是僵尸网络类的ddos,而是使用大量的代理作为bots发起DDOS。Port Groper可以与用测试防火墙,干扰web 统计脚本的跟踪,为网站增加流量..往好了用什么都能干,就是...

Global site tag (gtag.js) - Google Analytics