利用 CompletionService 实现 Dubbo 中的 Forking Cluster
- Dubbo 中有一种叫做 Forking 的集群模式,这种集群模式下,支持并行地调用多个查询服务,只要有一个成功返回结果,整个服务就可以返回了。
- 例如你需要提供一个地址转坐标的服务,为了保证该服务的高可用和性能,你可以并行地调用 3 个地图服务商的 API,然后只要有 1 个正确返回了结果 r,那么地址转坐标这个服务就可以直接返回 r 了。这种集群模式可以容忍 2 个地图服务商服务异常,但缺点是消耗的资源偏多。(原本只需调用一个,现在并行调用三个,并发一下就翻成了三倍)
-
利用 CompletionService 可以快速实现 Forking 这种集群模式
-
首先我们创建了一个线程池 executor 、一个 CompletionService 对象 cs 和一个Future<Integer>类型的列表 futures,
- 每次通过调用 CompletionService 的 submit() 方法提交一个异步任务,会返回一个 Future 对象,我们把这些 Future 对象保存在列表 futures 中。
-
通过调用 cs.take().get(),我们能够拿到最快返回的任务执行结果,只要我们拿到一个正确返回的结果,就可以取消所有任务并且返回最终结果了。
-
CompletionService的实现原理和构造函数
- 由 Java SDK 并发包提供
- 实现原理:内部维护了一个阻塞队列,当任务执行结束就把任务的执行结果加入到阻塞队列中,CompletionService 会把任务执行结果的 Future 对象加入到阻塞队列中;上面的实例代码是把任务最终的执行结果放入阻塞队列中
- 如何创建:CompletionService 接口的实现类是 ExecutorCompletionService,共有两个构造方法
- ExecutorCompletionService(Executor executor);
- ExecutorCompletionService(Executor executor, BlockingQueue<Future<V>> completionQueue);
- 这两个构造方法都需要传入一个线程池,如果不指定 completionQueue,那么默认会使用无界的 LinkedBlockingQueue。任务执行结果的 Future 对象就是加入到 completionQueue 中。
CompletionService 接口的五个方法
- CompletionService 接口提供的方法有 5 个
- 其中,submit() 相关的方法有两个。一个方法参数是Callable<V> task,前面利用 CompletionService 实现询价系统的示例代码中,我们提交任务就是用的它。
- 另外一个方法有两个参数,分别是Runnable task和V result,这个方法类似于 ThreadPoolExecutor 的 <T> Future<T> submit(Runnable task, T result)
- 其余的 3 个方法,都是和阻塞队列相关的
- take()、poll() 都是从阻塞队列中获取并移除一个元素;
- 它们的区别在于如果阻塞队列是空的,那么调用 take() 方法的线程会被阻塞,而 poll() 方法会返回 null 值。
-
poll(long timeout, TimeUnit unit) 方法支持以超时的方式获取并移除阻塞队列头部的一个元素,如果等待了 timeout unit 时间,阻塞队列还是空的,那么该方法会返回 null 值。
总结
- 当需要批量提交异步任务的时候建议你使用 CompletionService。CompletionService 将线程池 Executor 和阻塞队列 BlockingQueue 的功能融合在了一起,能够让批量异步任务的管理更简单。
- 除此之外,CompletionService 能够让异步任务的执行结果有序化,先执行完的先进入阻塞队列,利用这个特性,你可以轻松实现后续处理的有序性,避免无谓的等待,同时还可以快速实现诸如 Forking Cluster 这样的需求。
- CompletionService 的实现类 ExecutorCompletionService,需要你自己创建线程池,虽看上去有些啰嗦,但好处是你可以让多个 ExecutorCompletionService 的线程池隔离,这种隔离性能避免几个特别耗时的任务拖垮整个应用的风险。