3.Netty源码分析三(ChannelFuture)

ServerBootstrap b = new ServerBootstrap();  
b.group(bossGroup, workerGroup)  
 .channel(NioServerSocketChannel.class)  
 .option(ChannelOption.SO_BACKLOG, 100)  
 .handler(new LoggingHandler(LogLevel.INFO))  
 .childHandler(new ChannelInitializer<SocketChannel>() {  
     @Override  
     public void initChannel(SocketChannel ch) throws Exception {  
         ChannelPipeline p = ch.pipeline();  
         if (sslCtx != null) {  
             p.addLast(sslCtx.newHandler(ch.alloc()));  
         }  
         //p.addLast(new LoggingHandler(LogLevel.INFO));  
         p.addLast(serverHandler);  
     }  
 });  
  
// Start the server.  
//异步启动服务
ChannelFuture f = b.bind(PORT).sync();  
  
// Wait until the server socket is closed.  
f.channel().closeFuture().sync();

关于Future接口,用得最多的就是在使用 Java 的线程池 ThreadPoolExecutor 的时候了。在 submit 一个任务到线程池中的时候,返回的就是一个 Future 实例,通过它来获取提交的任务的执行状态和最终的执行结果,我们最常用它的 isDone()get() 方法。
JDK java.util.concurrent.Future

package java.util.concurrent;

public interface Future<V> {  
	//取消任务
    boolean cancel(boolean mayInterruptIfRunning);  
	//任务是否已取消
    boolean isCancelled();  
	//任务是否已完成
    boolean isDone();  
    //阻塞获取任务执行结果
    V get() throws InterruptedException, ExecutionException;  
    //带超时参数的获取任务执行结果
    V get(long timeout, TimeUnit unit)  
        throws InterruptedException, ExecutionException, TimeoutException;  
}

Netty Future,继承自JDK Future,然后增加了一些方法。

public interface Future<V> extends java.util.concurrent.Future<V> {  
	//是否成功
    boolean isSuccess();  
    //是否可取消
    boolean isCancellable();  
    //任务如果执行失败,该方法返回异常信息
    Throwable cause();  
    //添加Listener进行回调
	Future<V> addListener(GenericFutureListener<? extends Future<? super V>> listener);  
  
    Future<V> addListeners(GenericFutureListener<? extends Future<? super V>>... listeners);  
  
    Future<V> removeListener(GenericFutureListener<? extends Future<? super V>> listener);  
  
    Future<V> removeListeners(GenericFutureListener<? extends Future<? super V>>... listeners);  
	//阻塞等待任务执行结束,任务执行异常则将导致异常的原因抛出
    Future<V> sync() throws InterruptedException;  
	//不响应中断的sync()方法
    Future<V> syncUninterruptibly();  
   //阻塞等待任务结束,和 sync() 功能是一样的,不过如果任务失败,它不会抛出执行过程中的异常
    Future<V> await() throws InterruptedException;  
  
    Future<V> awaitUninterruptibly();  
  
    boolean await(long timeout, TimeUnit unit) throws InterruptedException;  
  
    boolean await(long timeoutMillis) throws InterruptedException;  
  
    boolean awaitUninterruptibly(long timeout, TimeUnit unit);  
  
   boolean awaitUninterruptibly(long timeoutMillis);  
   
    //获取执行结果,不阻塞。我们都知道 java.util.concurrent.Future 中的 get() 是阻塞的
    V getNow();  
    // 取消任务执行,如果取消成功,任务会因为 CancellationException 异常而导致失败
    //      也就是 isSuccess()==false,同时上面的 cause() 方法返回 CancellationException 的实例。
    // mayInterruptIfRunning 说的是:是否对正在执行该任务的线程进行中断(这样才能停止该任务的执行),
    //       似乎 Netty 中 Future 接口的各个实现类,都没有使用这个参数
   @Override  
    boolean cancel(boolean mayInterruptIfRunning);  
}

看完上面的 Netty 的 Future 接口,我们可以发现,它加了 sync() 和 await() 用于阻塞等待,还加了 Listeners,只要任务结束去回调 Listener 们就可以了,那么我们就不一定要主动调用 isDone() 来获取状态,或通过 get() 阻塞方法来获取值。

所以它其实有两种使用范式

顺便说下 sync() 和 await() 的区别:sync() 内部会先调用 await() 方法,等 await() 方法返回后,会检查下这个任务是否失败,如果失败,重新将导致失败的异常抛出来。也就是说,如果使用 await(),任务抛出异常后,await() 方法会返回,但是不会抛出异常,而 sync() 方法返回的同时会抛出异常。

我们也可以看到,Future 接口没有和 IO 操作关联在一起,还是比较_纯净_的接口。

接下来,我们来看 Future 接口的子接口 ChannelFuture,这个接口用得最多,它将和 IO 操作中的 Channel 关联在一起了,用于异步处理 Channel 中的事件。

public interface ChannelFuture extends Future<Void> {

    // ChannelFuture 关联的 Channel
    Channel channel();

    // 覆写以下几个方法,使得它们返回值为 ChannelFuture 类型 
    @Override
    ChannelFuture addListener(GenericFutureListener<? extends Future<? super Void>> listener);
    @Override
    ChannelFuture addListeners(GenericFutureListener<? extends Future<? super Void>>... listeners);
    @Override
    ChannelFuture removeListener(GenericFutureListener<? extends Future<? super Void>> listener);
    @Override
    ChannelFuture removeListeners(GenericFutureListener<? extends Future<? super Void>>... listeners);

    @Override
    ChannelFuture sync() throws InterruptedException;
    @Override
    ChannelFuture syncUninterruptibly();

    @Override
    ChannelFuture await() throws InterruptedException;
    @Override
    ChannelFuture awaitUninterruptibly();

    // 用来标记该 future 是 void 的,
    // 这样就不允许使用 addListener(...), sync(), await() 以及它们的几个重载方法
    boolean isVoid();
}

我们看到,ChannelFuture 接口相对于 Future 接口,除了将 channel 关联进来,没有增加什么东西。还有个 isVoid() 方法算是不那么重要的存在吧。其他几个都是方法覆写,为了让返回值类型变为 ChannelFuture,而不是原来的 Future。