3.Netty源码分析三(ChannelFuture)
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 100)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
if (sslCtx != null) {
p.addLast(sslCtx.newHandler(ch.alloc()));
}
//p.addLast(new LoggingHandler(LogLevel.INFO));
p.addLast(serverHandler);
}
});
// Start the server.
//异步启动服务
ChannelFuture f = b.bind(PORT).sync();
// Wait until the server socket is closed.
f.channel().closeFuture().sync();
关于Future接口,用得最多的就是在使用 Java 的线程池 ThreadPoolExecutor 的时候了。在 submit 一个任务到线程池中的时候,返回的就是一个 Future 实例,通过它来获取提交的任务的执行状态和最终的执行结果,我们最常用它的 isDone()
和 get()
方法。
JDK java.util.concurrent.Future
package java.util.concurrent;
public interface Future<V> {
//取消任务
boolean cancel(boolean mayInterruptIfRunning);
//任务是否已取消
boolean isCancelled();
//任务是否已完成
boolean isDone();
//阻塞获取任务执行结果
V get() throws InterruptedException, ExecutionException;
//带超时参数的获取任务执行结果
V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}
Netty Future,继承自JDK Future,然后增加了一些方法。
public interface Future<V> extends java.util.concurrent.Future<V> {
//是否成功
boolean isSuccess();
//是否可取消
boolean isCancellable();
//任务如果执行失败,该方法返回异常信息
Throwable cause();
//添加Listener进行回调
Future<V> addListener(GenericFutureListener<? extends Future<? super V>> listener);
Future<V> addListeners(GenericFutureListener<? extends Future<? super V>>... listeners);
Future<V> removeListener(GenericFutureListener<? extends Future<? super V>> listener);
Future<V> removeListeners(GenericFutureListener<? extends Future<? super V>>... listeners);
//阻塞等待任务执行结束,任务执行异常则将导致异常的原因抛出
Future<V> sync() throws InterruptedException;
//不响应中断的sync()方法
Future<V> syncUninterruptibly();
//阻塞等待任务结束,和 sync() 功能是一样的,不过如果任务失败,它不会抛出执行过程中的异常
Future<V> await() throws InterruptedException;
Future<V> awaitUninterruptibly();
boolean await(long timeout, TimeUnit unit) throws InterruptedException;
boolean await(long timeoutMillis) throws InterruptedException;
boolean awaitUninterruptibly(long timeout, TimeUnit unit);
boolean awaitUninterruptibly(long timeoutMillis);
//获取执行结果,不阻塞。我们都知道 java.util.concurrent.Future 中的 get() 是阻塞的
V getNow();
// 取消任务执行,如果取消成功,任务会因为 CancellationException 异常而导致失败
// 也就是 isSuccess()==false,同时上面的 cause() 方法返回 CancellationException 的实例。
// mayInterruptIfRunning 说的是:是否对正在执行该任务的线程进行中断(这样才能停止该任务的执行),
// 似乎 Netty 中 Future 接口的各个实现类,都没有使用这个参数
@Override
boolean cancel(boolean mayInterruptIfRunning);
}
看完上面的 Netty 的 Future 接口,我们可以发现,它加了 sync() 和 await() 用于阻塞等待,还加了 Listeners,只要任务结束去回调 Listener 们就可以了,那么我们就不一定要主动调用 isDone() 来获取状态,或通过 get() 阻塞方法来获取值。
所以它其实有两种使用范式
顺便说下 sync() 和 await() 的区别:sync() 内部会先调用 await() 方法,等 await() 方法返回后,会检查下这个任务是否失败,如果失败,重新将导致失败的异常抛出来。也就是说,如果使用 await(),任务抛出异常后,await() 方法会返回,但是不会抛出异常,而 sync() 方法返回的同时会抛出异常。
我们也可以看到,Future 接口没有和 IO 操作关联在一起,还是比较_纯净_的接口。
接下来,我们来看 Future 接口的子接口 ChannelFuture,这个接口用得最多,它将和 IO 操作中的 Channel 关联在一起了,用于异步处理 Channel 中的事件。
public interface ChannelFuture extends Future<Void> {
// ChannelFuture 关联的 Channel
Channel channel();
// 覆写以下几个方法,使得它们返回值为 ChannelFuture 类型
@Override
ChannelFuture addListener(GenericFutureListener<? extends Future<? super Void>> listener);
@Override
ChannelFuture addListeners(GenericFutureListener<? extends Future<? super Void>>... listeners);
@Override
ChannelFuture removeListener(GenericFutureListener<? extends Future<? super Void>> listener);
@Override
ChannelFuture removeListeners(GenericFutureListener<? extends Future<? super Void>>... listeners);
@Override
ChannelFuture sync() throws InterruptedException;
@Override
ChannelFuture syncUninterruptibly();
@Override
ChannelFuture await() throws InterruptedException;
@Override
ChannelFuture awaitUninterruptibly();
// 用来标记该 future 是 void 的,
// 这样就不允许使用 addListener(...), sync(), await() 以及它们的几个重载方法
boolean isVoid();
}
我们看到,ChannelFuture 接口相对于 Future 接口,除了将 channel 关联进来,没有增加什么东西。还有个 isVoid() 方法算是不那么重要的存在吧。其他几个都是方法覆写,为了让返回值类型变为 ChannelFuture,而不是原来的 Future。