`

Java并发编程之异步Future机制的原理和实现

    博客分类:
  • Java
阅读更多

Java并发编程之异步Future机制的原理和实现

 

        项目中经常有些任务需要异步(提交到线程池中)去执行,而主线程往往需要知道异步执行产生的结果,这时我们要怎么做呢?用runnable是无法实现的,我们需要用callable看下面的代码:

import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

public class AddTask implements Callable<Integer> {

	private int a,b;
	
	public AddTask(int a, int b) {
		this.a = a;
		this.b = b;
	}
	
	@Override
	public Integer call() throws Exception {
		Integer result = a + b;
		return result;
	}
	
	public static void main(String[] args) throws InterruptedException, ExecutionException {
		ExecutorService executor = Executors.newSingleThreadExecutor();
		//JDK目前为止返回的都是FutureTask的实例
		Future<Integer> future = executor.submit(new AddTask(1, 2));
		Integer result = future.get();// 只有当future的状态是已完成时(future.isDone() = true),get()方法才会返回
	}
}

  虽然可以实现获取异步执行结果的需求,但是我们发现这个Future其实很不好用,因为它没有提供通知的机制,也就是说我们不知道future什么时候完成(如果我们需要轮询isDone()来判断的话感觉就没有用这个的必要了)。看下java.util.concurrent.future.Future 的接口方法:

public interface Future<V> {
    boolean cancel(boolean mayInterruptIfRunning);
    boolean isCancelled();
    boolean isDone();
    V get() throws InterruptedException, ExecutionException;
    V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
}

 由此可见JDK的Future机制其实并不好用,如果能给这个future加个监听器,让它在完成时通知监听器的话就比较好用了,就像下面这个IFuture:

 

package future;

import java.util.concurrent.CancellationException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

/**
 * The result of an asynchronous operation.
 * 
 * @author lixiaohui
 * @param <V> 执行结果的类型参数
 */
public interface IFuture<V> extends Future<V> {	
	boolean isSuccess(); // 是否成功	
	V getNow();	//立即返回结果(不管Future是否处于完成状态)
	Throwable cause();	//若执行失败时的原因
        boolean isCancellable(); //是否可以取消
	IFuture<V> await() throws InterruptedException; //等待future的完成
	boolean await(long timeoutMillis) throws InterruptedException; // 超时等待future的完成
	boolean await(long timeout, TimeUnit timeunit) throws InterruptedException;
        IFuture<V> awaitUninterruptibly(); //等待future的完成,不响应中断
        boolean awaitUninterruptibly(long timeoutMillis);//超时等待future的完成,不响应中断
	boolean awaitUninterruptibly(long timeout, TimeUnit timeunit);
	IFuture<V> addListener(IFutureListener<V> l); //当future完成时,会通知这些加进来的监听器
	IFuture<V> removeListener(IFutureListener<V> l);
	
}

 

 

接下来就一起来实现这个IFuture,在这之前要说明下Object.wait(),Object.notifyAll()方法,因为整个Future实现的原理的核心就是这两个方法.看看JDK里面的解释:

public class Object {
    /**
     * Causes the current thread to wait until another thread invokes the
     * {@link java.lang.Object#notify()} method or the
     * {@link java.lang.Object#notifyAll()} method for this object.
     * In other words, this method behaves exactly as if it simply
     * performs the call {@code wait(0)}.
     * 调用该方法后,当前线程会释放对象监视器锁,并让出CPU使用权。直到别的线程调用notify()/notifyAll()
     */
    public final void wait() throws InterruptedException {
        wait(0);
    }

    /**
     * Wakes up all threads that are waiting on this object's monitor. A
     * thread waits on an object's monitor by calling one of the
     * {@code wait} methods.
     * <p>
     * The awakened threads will not be able to proceed until the current
     * thread relinquishes the lock on this object. The awakened threads
     * will compete in the usual manner with any other threads that might
     * be actively competing to synchronize on this object; for example,
     * the awakened threads enjoy no reliable privilege or disadvantage in
     * being the next thread to lock this object.
     */
    public final native void notifyAll();
}

 知道这个后,我们要自己实现Future也就有了思路,当线程调用了IFuture.await()等一系列的方法时,如果Future还未完成,那么就调用future.wait() 方法使线程进入WAITING状态。而当别的线程设置Future为完成状态(注意这里的完成状态包括正常结束和异常结束)时,就需要调用future.notifyAll()方法来唤醒之前因为调用过wait()方法而处于WAITING状态的那些线程。完整的实现如下(代码应该没有很难理解的地方,我是参考netty的Future机制的。有兴趣的可以去看看netty的源码):

package future;

import java.util.Collection;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

/**
 * <pre>
 * 正常结束时, 若执行的结果不为null, 则result为执行结果; 若执行结果为null, 则result = {@link AbstractFuture#SUCCESS_SIGNAL}
 * 异常结束时, result为 {@link CauseHolder} 的实例;若是被取消而导致的异常结束, 则result为 {@link CancellationException} 的实例, 否则为其它异常的实例
 * 以下情况会使异步操作由未完成状态转至已完成状态, 也就是在以下情况发生时调用notifyAll()方法:
 * <ul>
 * <li>异步操作被取消时(cancel方法)</li>
 * <li>异步操作正常结束时(setSuccess方法)</li>
 * <li>异步操作异常结束时(setFailure方法)</li>
 * </ul>
 * </pre>
 * 
 * @author lixiaohui
 *
 * @param <V>
 *            异步执行结果的类型
 */
public class AbstractFuture<V> implements IFuture<V> {

	protected volatile Object result; // 需要保证其可见性
        /**
         * 监听器集
         */
	protected Collection<IFutureListener<V>> listeners = new CopyOnWriteArrayList<IFutureListener<V>>();

	/**
	 * 当任务正常执行结果为null时, 即客户端调用{@link AbstractFuture#setSuccess(null)}时, 
	 * result引用该对象
	 */
	private static final SuccessSignal SUCCESS_SIGNAL = new SuccessSignal();

	@Override
	public boolean cancel(boolean mayInterruptIfRunning) {
		if (isDone()) { // 已完成了不能取消
			return false;
		}

		synchronized (this) {
			if (isDone()) { // double check
				return false;
			}
			result = new CauseHolder(new CancellationException());
			notifyAll(); // isDone = true, 通知等待在该对象的wait()的线程
		}
		notifyListeners(); // 通知监听器该异步操作已完成
		return true;
	}
	
	@Override
	public boolean isCancellable() {
		return result == null;
	}
	
	@Override
	public boolean isCancelled() {
		return result != null && result instanceof CauseHolder && ((CauseHolder) result).cause instanceof CancellationException;
	}

	@Override
	public boolean isDone() {
		return result != null;
	}

	@Override
	public V get() throws InterruptedException, ExecutionException {
		await(); // 等待执行结果

		Throwable cause = cause();
		if (cause == null) { // 没有发生异常,异步操作正常结束
			return getNow();
		}
		if (cause instanceof CancellationException) { // 异步操作被取消了
			throw (CancellationException) cause;
		}
		throw new ExecutionException(cause); // 其他异常
	}

	@Override
	public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
		if (await(timeout, unit)) {// 超时等待执行结果
			Throwable cause = cause();
			if (cause == null) {// 没有发生异常,异步操作正常结束
				return getNow();
			}
			if (cause instanceof CancellationException) {// 异步操作被取消了
				throw (CancellationException) cause;
			}
			throw new ExecutionException(cause);// 其他异常
		}
		// 时间到了异步操作还没有结束, 抛出超时异常
		throw new TimeoutException();
	}

	@Override
	public boolean isSuccess() {
		return result == null ? false : !(result instanceof CauseHolder);
	}

	@SuppressWarnings("unchecked")
	@Override
	public V getNow() {
		return (V) (result == SUCCESS_SIGNAL ? null : result);
	}

	@Override
	public Throwable cause() {
		if (result != null && result instanceof CauseHolder) {
			return ((CauseHolder) result).cause;
		}
		return null;
	}

	@Override
	public IFuture<V> addListener(IFutureListener<V> listener) {
		if (listener == null) {
			throw new NullPointerException("listener");
		}
		if (isDone()) { // 若已完成直接通知该监听器
			notifyListener(listener);
			return this;
		}
		synchronized (this) {
			if (!isDone()) {
				listeners.add(listener);
				return this;
			}
		}
		notifyListener(listener);
		return this;
	}

	@Override
	public IFuture<V> removeListener(IFutureListener<V> listener) {
		if (listener == null) {
			throw new NullPointerException("listener");
		}

		if (!isDone()) {
			listeners.remove(listener);
		}

		return this;
	}

	@Override
	public IFuture<V> await() throws InterruptedException {
		return await0(true);
	}

	
	private IFuture<V> await0(boolean interruptable) throws InterruptedException {
		if (!isDone()) { // 若已完成就直接返回了
			// 若允许终端且被中断了则抛出中断异常
			if (interruptable && Thread.interrupted()) {
				throw new InterruptedException("thread " + Thread.currentThread().getName() + " has been interrupted.");
			}

			boolean interrupted = false;
			synchronized (this) {
				while (!isDone()) {
					try {
						wait(); // 释放锁进入waiting状态,等待其它线程调用本对象的notify()/notifyAll()方法
					} catch (InterruptedException e) {
						if (interruptable) {
							throw e;
						} else {
							interrupted = true;
						}
					}
				}
			}
			if (interrupted) {
				// 为什么这里要设中断标志位?因为从wait方法返回后, 中断标志是被clear了的, 
				// 这里重新设置以便让其它代码知道这里被中断了。
				Thread.currentThread().interrupt();
			}
		}
		return this;
	}
	
	@Override
	public boolean await(long timeoutMillis) throws InterruptedException {
		return await0(TimeUnit.MILLISECONDS.toNanos(timeoutMillis), true);
	}
	
	@Override
	public boolean await(long timeout, TimeUnit unit) throws InterruptedException {
		return await0(unit.toNanos(timeout), true);
	}

	private boolean await0(long timeoutNanos, boolean interruptable) throws InterruptedException {
		if (isDone()) {
			return true;
		}

		if (timeoutNanos <= 0) {
			return isDone();
		}

		if (interruptable && Thread.interrupted()) {
			throw new InterruptedException(toString());
		}

		long startTime = timeoutNanos <= 0 ? 0 : System.nanoTime();
		long waitTime = timeoutNanos;
		boolean interrupted = false;

		try {
			synchronized (this) {
				if (isDone()) {
					return true;
				}

				if (waitTime <= 0) {
					return isDone();
				}

				for (;;) {
					try {
						wait(waitTime / 1000000, (int) (waitTime % 1000000));
					} catch (InterruptedException e) {
						if (interruptable) {
							throw e;
						} else {
							interrupted = true;
						}
					}

					if (isDone()) {
						return true;
					} else {
						waitTime = timeoutNanos - (System.nanoTime() - startTime);
						if (waitTime <= 0) {
							return isDone();
						}
					}
				}
			}
		} finally {
			if (interrupted) {
				Thread.currentThread().interrupt();
			}
		}
	}

	@Override
	public IFuture<V> awaitUninterruptibly() {
		try {
			return await0(false);
		} catch (InterruptedException e) { // 这里若抛异常了就无法处理了
			throw new java.lang.InternalError();
		}
	}
	
	@Override
	public boolean awaitUninterruptibly(long timeoutMillis) {
		try {
			return await0(TimeUnit.MILLISECONDS.toNanos(timeoutMillis), false);
		} catch (InterruptedException e) {
			throw new java.lang.InternalError();
		}
	}

	@Override
	public boolean awaitUninterruptibly(long timeout, TimeUnit unit) {
		try {
			return await0(unit.toNanos(timeout), false);
		} catch (InterruptedException e) {
			throw new java.lang.InternalError();
		}
	}

	protected IFuture<V> setFailure(Throwable cause) {
		if (setFailure0(cause)) {
			notifyListeners();
			return this;
		}
		throw new IllegalStateException("complete already: " + this);
	}

	private boolean setFailure0(Throwable cause) {
		if (isDone()) {
			return false;
		}

		synchronized (this) {
			if (isDone()) {
				return false;
			}
			result = new CauseHolder(cause);
			notifyAll();
		}

		return true;
	}

	protected IFuture<V> setSuccess(Object result) {
		if (setSuccess0(result)) { // 设置成功后通知监听器
			notifyListeners();
			return this;
		}
		throw new IllegalStateException("complete already: " + this);
	}

	private boolean setSuccess0(Object result) {
		if (isDone()) {
			return false;
		}

		synchronized (this) {
			if (isDone()) {
				return false;
			}
			if (result == null) { // 异步操作正常执行完毕的结果是null
				this.result = SUCCESS_SIGNAL;
			} else {
				this.result = result;
			}
			notifyAll();
		}
		return true;
	}

	private void notifyListeners() {
		for (IFutureListener<V> l : listeners) {
			notifyListener(l);
		}
	}

	private void notifyListener(IFutureListener<V> l) {
		try {
			l.operationCompleted(this);
		} catch (Exception e) {
			e.printStackTrace();
		}
	}

	private static class SuccessSignal {

	}

	private static final class CauseHolder {
		final Throwable cause;

		CauseHolder(Throwable cause) {
			this.cause = cause;
		}
	}
}

 

那么要怎么使用这个呢,有了上面的骨架实现,我们就可以定制各种各样的异步结果了。下面模拟一下一个延时的任务:

package future.test;

import future.IFuture;
import future.IFutureListener;

/**
 * 延时加法
 * @author lixiaohui
 *
 */
public class DelayAdder {
	
	public static void main(String[] args) {
		new DelayAdder().add(3 * 1000, 1, 2).addListener(new IFutureListener<Integer>() {
			
			@Override
			public void operationCompleted(IFuture<Integer> future) throws Exception {
				System.out.println(future.getNow());
			}
			
		});
	}
	/**
	 * 延迟加
	 * @param delay 延时时长 milliseconds
	 * @param a 加数
	 * @param b 加数
	 * @return 异步结果
	 */
	public DelayAdditionFuture add(long delay, int a, int b) {
		DelayAdditionFuture future = new DelayAdditionFuture(); 
		new Thread(new DelayAdditionTask(delay, a, b, future)).start();
		return future;
	}
	
	private class DelayAdditionTask implements Runnable {

		private long delay;
		
		private int a, b;
		
		private DelayAdditionFuture future;
		
		public DelayAdditionTask(long delay, int a, int b, DelayAdditionFuture future) {
			super();
			this.delay = delay;
			this.a = a;
			this.b = b;
			this.future = future;
		}

		@Override
		public void run() {
			try {
				Thread.sleep(delay);
				Integer i = a + b;
				// TODO 这里设置future为完成状态(正常执行完毕)
				future.setSuccess(i);
			} catch (InterruptedException e) {
				// TODO 这里设置future为完成状态(异常执行完毕)
				future.setFailure(e.getCause());
			}
		}
		
	}
}

 

package future.test;

import future.AbstractFuture;
import future.IFuture;
//只是把两个方法对外暴露
public class DelayAdditionFuture extends AbstractFuture<Integer> {
	
	@Override
	public IFuture<Integer> setSuccess(Object result) {
		return super.setSuccess(result);
	}
	
	@Override
	public IFuture<Integer> setFailure(Throwable cause) {
		return super.setFailure(cause);
	}
	
}

  可以看到客户端不用主动去询问future是否完成,而是future完成时自动回调operationcompleted方法,客户端只需在回调里实现逻辑即可。

 

 

 

 

2
2
分享到:
评论
4 楼 Dream_Trc 2017-04-06  
写的不错,感谢分享
3 楼 莫名的拉风 2016-08-24  
tieye 写道
如何在 线程池 中 给线程设置名称 了解吗?

这是我项目中用到的
public class NamedThreadFactory implements ThreadFactory {
    private static final AtomicInteger THREAD_NUMBER = new AtomicInteger(1);
    private final ThreadGroup GROUP;
    private final boolean DAEMON;
    private String namePrefix = "AFA-Thread";

    public NamedThreadFactory() {
        this(null, false);
    }

    public NamedThreadFactory(String namePrefix) {
        this(namePrefix, false);
    }

    public NamedThreadFactory(String namePrefix, boolean daemon) {
        if (namePrefix != null) {
            this.namePrefix = namePrefix;
        }
        this.DAEMON = daemon;
        SecurityManager s = System.getSecurityManager();
        GROUP = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup();
    }

    /*
     * @see java.util.concurrent.ThreadFactory#newThread(java.lang.Runnable)
     */
    @Override
    public Thread newThread(Runnable r) {
        String name = namePrefix + "-" + THREAD_NUMBER.getAndIncrement();
        Thread t = new Thread(GROUP, r, name, 0);

        if (t.isDaemon() != DAEMON) {
            t.setDaemon(DAEMON);
        }
        if (t.getPriority() != Thread.NORM_PRIORITY) {
            t.setPriority(Thread.NORM_PRIORITY);
        }
        return t;
    }

}
2 楼 莫名的拉风 2016-08-24  
tieye 写道
如何在 线程池 中 给线程设置名称 了解吗?

嗯,在创建ThreadPoolExecutor时可以构造函数可以传一个ThreadFactory,这时可以自己实现一个ThreadFactory,然后在newThread时顺便给设置一个名字就可以了
1 楼 tieye 2016-08-24  
如何在 线程池 中 给线程设置名称 了解吗?

相关推荐

    Java并发编程实战

    6.3.3 示例:使用Future实现页面渲染器 6.3.4 在异构任务并行化中存在的局限 6.3.5 CompletionService:Executor与BlockingQueue 6.3.6 示例:使用CompletionService实现页面渲染器 6.3.7 为任务设置时限 6.3.8...

    Java并发编程原理与实战

    线程之间通信之join应用与实现原理剖析.mp4 ThreadLocal 使用及实现原理.mp4 并发工具类CountDownLatch详解.mp4 并发工具类CyclicBarrier 详解.mp4 并发工具类Semaphore详解.mp4 并发工具类Exchanger详解.mp4 ...

    龙果 java并发编程原理实战

    龙果 java并发编程原理实战 第2节理解多线程与并发的之间的联系与区别 [免费观看] 00:11:59分钟 | 第3节解析多线程与多进程的联系以及上下文切换所导致资源浪费问题 [免费观看] 00:13:03分钟 | 第4节学习并发的四...

    Java 并发编程原理与实战视频

    java并发编程原理实战 第2节理解多线程与并发的之间的联系与区别 [免费观看] 00:11:59分钟 | 第3节解析多线程与多进程的联系以及上下文切换所导致资源浪费问题 [免费观看] 00:13:03分钟 | 第4节学习并发的四个...

    Java 并发编程实战

    6.3.3 示例:使用Future实现页面渲染器 6.3.4 在异构任务并行化中存在的局限 6.3.5 CompletionService:Executor与BlockingQueue 6.3.6 示例:使用CompletionService实现页面渲染器 6.3.7 为任务设置时限 6.3.8...

    龙果java并发编程完整视频

    第35节线程之间通信之join应用与实现原理剖析00:10:17分钟 | 第36节ThreadLocal 使用及实现原理00:17:41分钟 | 第37节并发工具类CountDownLatch详解00:22:04分钟 | 第38节并发工具类CyclicBarrier 详解00:11:52...

    汪文君高并发编程实战视频资源下载.txt

    │ Java并发编程.png │ ppt+源码.rar │ 高并发编程第二阶段01讲、课程大纲及主要内容介绍.wmv │ 高并发编程第二阶段02讲、介绍四种Singleton方式的优缺点在多线程情况下.wmv │ 高并发编程第二阶段03讲、...

    java并发编程

    第35节线程之间通信之join应用与实现原理剖析00:10:17分钟 | 第36节ThreadLocal 使用及实现原理00:17:41分钟 | 第37节并发工具类CountDownLatch详解00:22:04分钟 | 第38节并发工具类CyclicBarrier 详解00:11:52...

    汪文君高并发编程实战视频资源全集

    │ Java并发编程.png │ ppt+源码.rar │ 高并发编程第二阶段01讲、课程大纲及主要内容介绍.wmv │ 高并发编程第二阶段02讲、介绍四种Singleton方式的优缺点在多线程情况下.wmv │ 高并发编程第二阶段03讲、...

    使用Java异步编程实现一个简单的网络请求.txt

    这个代码实现了一个简单的网络请求,使用了Java中的异步编程模型。 首先,我们定义了一个URL地址和一个超时时间(TIMEOUT...这个例子展示了如何使用Java异步编程模型来实现并发的网络请求,可以提高程序的效率和性能。

    Java-Concurrency-Programming-Practice:学习 Java 并发编程

    学习并发编程的一些高级主题,如Java内存模型、JVM IO/NIO机制等。 在实践中学习: 在实践中学习:并发集合 在实践中学习:如何对并发应用程序进行测试。 实践学习:Java异步编程(Future、FutureTask、Guava....

    javafuture源码-demo-java-completablefuture:“带有CompletableFuture的Java异步编程”

    future源码Java中具有CompletableFuture的异步编程 介绍 CompletableFuture API是用于Java异步编程的高级API。 该API支持将多个异步计算流水线化(也称为链接或合并)成单个结果,而不会造成嵌套回调(“ callback ...

    CompletableFuture:Java异步编程利器.pptx.pptx

    CompletableFuture是Java 8中引入的一种新的Future,它是一种异步编程工具,可以用于处理异步任务的结果。 CompletableFuture的特性 CompletableFuture具有非阻塞性,可以在等待结果的同时执行其他任务,而且它支持...

    简单讲解Java的Future编程模式

    主要介绍了Java的Future编程模式,包括对异步和并发的一些设计思维,需要的朋友可以参考下

    精通并发与 netty 视频教程(2018)视频教程

    52_NioEventLoopGroup源码分析与线程数设定 53_Netty对Executor的实现机制源码分析 54_Netty服务端初始化过程与反射在其中的应用分析 55_Netty提供的Future与ChannelFuture优势分析与源码讲解 56_Netty服务器地址...

    精通并发与netty视频教程(2018)视频教程

    81_Netty引用计数的实现机制与自旋锁的使用技巧 82_Netty引用计数原子更新揭秘与AtomicIntegerFieldUpdater深度剖析 83_AtomicIntegerFieldUpdater实例演练与volatile关键字分析 84_Netty引用计数注意事项与内存泄露...

    精通并发与netty 无加密视频

    第81讲:Netty引用计数的实现机制与自旋锁的使用技巧 第82讲:Netty引用计数原子更新揭秘与AtomicIntegerFieldUpdater深度剖析 第83讲:AtomicIntegerFieldUpdater实例演练与volatile关键字分析 第84讲:Netty...

Global site tag (gtag.js) - Google Analytics