【Java】 异步调用实践

news2025/1/23 3:54:48

本文要点:

  • 为什么需要异步调用
  • CompletableFuture 基本使用
  • RPC 异步调用
  • HTTP 异步调用
  • 编排 CompletableFuture 提高吞吐量

 

BIO 模型

当用户进程调用了recvfrom 这个系统调用,kernel 就开始了 IO 的第一个阶段:准备数据。对于 network io 来说,很多时候数据在一开始还没有到达(比如,还没有收到一个完整的UDP包),这个时候 kernel 就要等待足够的数据到来。而在用户进程这边,整个进程会被阻塞。当 kernel 一直等到数据准备好了,它就会将数据从 kernel 中拷贝到用户内存,然后 kernel 返回结果,用户进程才解除 block 的状态,重新运行起来。所以,Blocking IO 的特点就是在 IO 执行的两个阶段都被 block 了。

同步调用

 

 

在同步调用的场景下,依次请求多个接口,耗时长、性能差,接口响应时长 T > T1+T2+T3+……+Tn。

减少同步等待

一般这个时候为了减少同步等待时间,会使用线程池来同时处理多个任务,接口的响应时间就是 MAX(T1,T2,T3):

 

线程池异步

大概代码如下

Future<String> future = executorService.submit(() -> {
  Thread.sleep(2000);
  return "hello world";
});
while (true) {
  if (future.isDone()) {
    System.out.println(future.get());
    break;
  }
}

同步模型中使用线程池确实能实现异步调用的效果,也能压缩同步等待的时间,但是也有一些缺陷:

  • CPU 资源大量浪费在阻塞等待上,导致 CPU 资源利用率低。
  • 为了增加并发度,会引入更多额外的线程池,随着 CPU 调度线程数的增加,会导致更严重的资源争用,上下文切换占用 CPU 资源。
  • 线程池中的线程都是阻塞的,硬件资源无法充分利用,系统吞吐量容易达到瓶颈。

NIO 模型

为了解决 BIO 中的缺陷,引入 NIO 模型:

 

NIO 模型

当用户进程发出 read 操作时,如果 kernel 中的数据还没有准备好,那么它并不会 block 用户进程,而是立刻返回一个 error。从用户进程角度讲 ,它发起一个 read 操作后,并不需要等待,而是马上就得到了一个结果。用户进程判断结果是一个 error 时,它就知道数据还没有准备好,于是它可以再次发送 read 操作。一旦 kernel 中的数据准备好了,并且又再次收到了用户进程的 system call,那么它马上就将数据拷贝到了用户内存,然后返回。所以,用户进程其实是需要不断的主动询问 kernel 数据好了没有。

异步优化思路

我们知道了 NIO 的调用方式比 BIO 好,那我们怎么能在业务编码中使用到 NIO 呢?自己动手将 BIO 替换成 NIO 肯定不现实,已有组件支持 NIO 的可以直接使用,不支持的继续使用自定义线程池。

  • 通过 RPC NIO 异步调用、 HTTP 异步调用的方式降低线程数,从而降低调度(上下文切换)开销。
  • 没有原生支持 NIO 异步调用的继续使用线程池。
  • 引入 CompletableFuture 对业务流程进行编排,降低依赖之间的阻塞。

简述CompletableFuture

CompletableFuture 是 java.util.concurrent 库在 java 8 中新增的主要工具,同传统的 Future 相比,其支持流式计算、函数式编程、完成通知、自定义异常处理等很多新的特性

常用 API 举例

supplyAsync

CompletableFuture<String> future = CompletableFuture.supplyAsync(()->{    
  try{
    Thread.sleep(1000L);
    return "hello world";
  } catch (Exception e){
    return "failed";
  }
});
System.out.println(future.join());
// output
hello world

开启异步任务,到另一个线程执行。

complete

CompletableFuture<String> future1 = new CompletableFuture<>();
future.complete("hello world");     //异步线程执行
future.whenComplete((res, throwable) -> {
  System.out.println(res);
});
System.out.println(future1.join());
CompletableFuture<String> future2 = new CompletableFuture<>();
future.completeExceptionally(new Throwable("failed")); //异步线程执行
System.out.println(future2.join());
// output
hello world
hello world
  
Exception in thread "main" 
java.util.concurrent.CompletionException: 
java.lang.Throwable: failed

complete 正常完成该 CompletableFuture。

completeExceptionally 异常完成该 CompletableFuture。

thenApply

String original = "Message";
CompletableFuture<String> cf = 
 CompletableFuture.completedFuture(original).thenApply(String::toUpperCase);
System.out.println(cf.join());
// output
MESSAGE

任务后置处理。

图示:

 

thenApply 图示

thenCombine

CompletableFuture<String> cf = 
 CompletableFuture.completedFuture("Message").thenApply(String::toUpperCase);
CompletableFuture<String> cf1 = 
 CompletableFuture.completedFuture("Message").thenApply(String::toLowerCase);
CompletableFuture<String> allCf = cf.thenCombine(cf1, (s1, s2) -> s1 + s2);
System.out.println(allCf.join());
// output
MSGmsg

合并任务,两个任务同时执行,结果由合并函数 BiFunction 返回。

图示:

 

thenCombine 图示

allOf

CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> "Message1");
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> "Message2");
CompletableFuture<String> future3 = CompletableFuture.supplyAsync(() -> "Message3");
CompletableFuture<String> future = 
 CompletableFuture.allOf(future1, future2, future3).thenApply(v -> {
  String join1 = future1.join();
  String join2 = future2.join();
  String join3 = future3.join();
  return join1 + join2 + join3;});
System.out.println(future.join());
// output
Msg1Msg2Msg3

allOf 会阻塞等待所有异步线程任务结束。

allOf 里的 join 并不会阻塞,传给 thenApply 的函数是在 future1, future2, future3 全部完成时,才会执行 。

图示:

allOf 图示
allOf 图示

 

CF 执行线程

下面有两个小demo,可以先试着想想输出的结果:

String original = "Message";
CompletableFuture cf = CompletableFuture.supplyAsync(() -> {
  System.out.println("supplyAsync thread: " + Thread.currentThread().getName());
  return original;
}).thenApply(r -> {
  System.out.println("thenApply thread: " + Thread.currentThread().getName());
  return r;
});
System.out.println(cf.join());
// output
supplyAsync thread: ForkJoinPool.commonPool-worker-1
thenApply thread: main
Message
String original = "Message";
CompletableFuture cf = CompletableFuture.supplyAsync(() -> {
  System.out.println("supplyAsync thread: " + Thread.currentThread().getName());
  try {
    Thread.sleep(100);
  } catch (InterruptedException e) {
    throw new RuntimeException(e);
  }
  return original;
}).thenApply(r -> {
  System.out.println("thenApply thread: " + Thread.currentThread().getName());
  return r;
});
System.out.println(cf.join());
// output
supplyAsync thread: ForkJoinPool.commonPool-worker-1
thenApply thread: ForkJoinPool.commonPool-worker-1
Message

先看结论:

  • 执行 complete 的线程会执行当前调用链上的所有CF。
  • 如果 CF 提前 complete,后续 CF 由初始线程执行。

异步任务里没有 sleep 的时候,异步任务很快就会完成,意味着 JVM 执行到 thenApply 的时候,前置 CF 已经提前完成所以后续的 CF 会被初始线程 main 线程执行。

异步任务里有 sleep 的时候, JVM 执行到 thenApply 时,前置 CF 还没有完成,前置 CF complete 的线程会执行所有后续的 CF。

CF 嵌套join

ExecutorService executorService = Executors.newFixedThreadPool(2);
CompletableFuture<Integer> cf1 = CompletableFuture.supplyAsync(() -> {
  Thread.sleep(3000);
  return 1;
}, executorService);
CompletableFuture<Integer> cf2 = CompletableFuture.supplyAsync(() -> {
  Thread.sleep(3000);
  return 2;
}, executorService);
Integer join1 = cf1.thenApply((cf1Val) -> {
  System.out.println("cf1 start value:" + cf1Val);
  Integer cf2Val = cf2.join();
  System.out.println("cf2 end value:" + cf2Val);
  return 3;
}).join();
//output
cf1 start value:1
cf2 end value:2

代码很简单,有一个线程数为 2 的线程池,cf1、cf2 都使用这个线程执行异步任务,特别的是在 cf1.thenApply 中会调用 cf2.join(),当线程数是2的时候可以顺利输出

ExecutorService executorService = Executors.newFixedThreadPool(1);
CompletableFuture<Integer> cf1 = CompletableFuture.supplyAsync(() -> {
  Thread.sleep(3000);
  return 1;
}, executorService);
CompletableFuture<Integer> cf2 = CompletableFuture.supplyAsync(() -> {
  Thread.sleep(3000);
  return 2;
}, executorService);
 Integer join1 = cf1.thenApply((cf1Val) -> {
  System.out.println("cf1 start value:" + cf1Val);
  Integer cf2Val = cf2.join();
  System.out.println("cf2 end value:" + cf2Val);
  return 3;
}).join();
//output
cf1 start value:1

这时候我们将线程池的线程数调整为 1,这时只会输出 cf1 start value:1,然后就一直阻塞。

使用 jstack -l pid 查看线程状态,发现是 WAITING,等待的地方正是我们在代码里调用的cf2.join()

"pool-1-thread-1" #11 prio=5 os_prio=31 tid=0x00000001429f5000 nid=0xa903 waiting on condition
   java.lang.Thread.State: WAITING (parking)
 at sun.misc.Unsafe.park(Native Method)
 - parking to wait for  <0x000000076ba5f7d0> (a java.util.concurrent.CompletableFuture$Signaller)
 at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
 at java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1707)
 at java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323)
 at java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1742)
 at java.util.concurrent.CompletableFuture.join(CompletableFuture.java:1947)
 at com.ppphuang.demo.threadPool.ExecutorsTest.lambda$main$2(ThreadPoolExecutorsTest.java:34)

原因是我们在唯一一个线程中调用 cf2.join(),阻塞等待 cf2 完成,但是 cf2 需要等待 cf1 完成之后才有空闲线程去执行。这就类似于你右手正拿着一个水杯,然后等待右手拿水壶倒满水,这是不可能完成的。所以尽量不要嵌套join,不注意隔离线程池的话很容易造成’死锁‘(线程阻塞)。

CF 常用 API

API描述
supplyAsync开启异步任务,到另一个线程执行,异步任务有返回值。
complete完成任务。
completeExceptionally异常结束任务。
thenCombine合并任务,两个任务同时执行,结果由合并函数 BiFunction 返回。
thenApply任务后置处理。
applyToEither会取两个任务最先完成的任务,上个任务和这个任务同时进行,哪个先结束,先用哪个结果。
handle后续处理。
whenComplete完成后的处理。
allOf等待所有异步线程任务结束。
join获取返回值,没有complete的 CF 对象调用join时,会等待complete再返回,已经 complete的 CF 对象调用join时,会立刻返回结果。

优化过程

异步 RPC 客户端

我们手写的这个 RPC 框架支持异步调用,如果你想看具体的实现,可以在文末找到源码链接。异步调用之前会设置一个 CallBack 方法,异步调用时会直接返回 null,不会等待服务端返回接果,服务端返回结果之后会通过 RPC 客户端自带的线程池执行设置的 CallBack 方法。

RPC 异步调用图示:

 

 

RPC 异步调用

包装异步RPC Client

通过 AsyncExecutor 包装 RPC的客户端,AsyncExecutor 类中的 client 属性值为创建的某个 RPC 服务的异步客户端代理类,这个代理类在构造方法中创建并赋值给 client 属性。

类中的 async 方法接受 Function 类型的参数 function,可以通过 function.apply(client) 来通过 client 执行真正的 RPC 调用。

在 async 方法中实例化一个 CompletableFuture, 并将 CompletableFuture 作为异步回调的上下文设置到 RPC 的异步回调中,之后将该 CompletableFuture 返回给调用者。

public class AsyncExecutor<C> {

    private C client;

    public AsyncExecutor(ClientProxyFactory clientProxyFactory, Class<C> clazz, String group, String version) {
        this.client = clientProxyFactory.getProxy(clazz, group, version, true);
    }

    public <R> CompletableFuture<R> async(Function<C, R> function) {
        CompletableFuture<R> future = new CompletableFuture<>();
        ClientProxyFactory.setLocalAsyncContextAndAsyncReceiveHandler(future, CompletableFutureAsyncCallBack.instance());
        try {
            function.apply(client);
        } catch (Exception e) {
            future.completeExceptionally(e);
        }
        return future;
    }
}

 异步回调类

public class CompletableFutureAsyncCallBack extends AsyncReceiveHandler {
    private static volatile CompletableFutureAsyncCallBack INSTANCE;

    private CompletableFutureAsyncCallBack() {
    }

    @Override
    public void callBack(Object context, Object result) {
        if (!(context instanceof CompletableFuture)) {
            throw new IllegalStateException("the context must be CompletableFuture");
        }
        CompletableFuture future = (CompletableFuture) context;
        if (result instanceof Throwable) {
            future.completeExceptionally((Throwable) result);
            return;
        }
        log.info("result:{}", result);
        future.complete(result);
    }
}

AsyncReceiveHandler 是 RPC 的异步回调抽象类,类中的 callBack、onException 抽象方法需要子类实现。

CompletableFutureAsyncCallBack 实现了这个 callBack 抽象方法,第一个参数是我们在包装异步 RPC Client 时设置的 CompletableFuture 上下文,第二个参数是 RPC 返回的结果。方法中判断 RPC 返回的结果是否异常,若异常通过 completeExceptionally 异常结束这个 CompletableFuture,若正常通过 complete 正常结束这个 CompletableFuture。

注册异步客户端Bean 

@Component
public class AsyncExecutorConfig {
    @Autowired
    ClientProxyFactory clientProxyFactory;

    @Bean
    public AsyncExecutor<DemoService> demoServiceAsyncExecutor() {
        return new AsyncExecutor<>(clientProxyFactory, DemoService.class, "", "");
    }
}

 异步 RPC 调用

@Autowired
AsyncExecutor<DemoService> demoServiceAsyncExecutor;

CompletableFuture<String> pppName = demoServiceAsyncExecutor.async(service -> service.hello("ppp"));

String name = pppName.join();

 

异步HTTP WebClient

WebClient 是从 Spring WebFlux 5.0 版本开始提供的一个非阻塞的基于响应式编程的进行 HTTP 请求的客户端工具。它的响应式编程的基于 Reactor 的。

WebClient VS RestTemplate

WebClient的优势在于:

  • 非阻塞响应式 IO,单位时间内有限资源下支持更高的并发量。
  • 支持使用 Java8 Lambda 表达式函数。
  • 支持同步、异步、Stream 流式传输。

WebClient 使用 

public CompletableFuture<String> asyncHttp(String url) {
        WebClient localhostWebClient = WebClient.builder().baseUrl("http://localhost:8080").build();
        Mono<HttpResult<String>> userMono = localhostWebClient.method(HttpMethod.GET).uri(url)
                .retrieve()
                .bodyToMono(new ParameterizedTypeReference<HttpResult<String>>() {})
                //异常处理 有onErrorReturn时doOnError不会触发,所以不需要后续在CompletableFuture中handle处理异常
                //如果不使用onErrorReturn,建议在后续CompletableFuture中handle处理异常
                .onErrorReturn(new HttpResult<>(201, "default", "default hello"))
                //超时处理
                .timeout(Duration.ofSeconds(3))
                //返回值过滤
                .filter(httpResult -> httpResult.code == 200)
                //默认值
                .defaultIfEmpty(new HttpResult<>(201, "defaultIfEmpty", "defaultIfEmpty hello"))
                //失败重试
                .retryWhen(Retry.backoff(1, Duration.ofSeconds(1)));
        CompletableFuture<HttpResult<String>> stringCompletableFuture = WebClientFutureFactory.getCompletableFuture(userMono);
        return stringCompletableFuture.thenApply(HttpResult::getData);
    }

WebClient 整合 CF

WebClientFutureFactory.getCompletableFuture 方法会把 WebClient 返回的结果组装成 CompletableFuture ,使用的是 Mono 类的 doOnError 和 subscribe 方法,当正常返回时通过 subscribe 来调用 completableFuture.complete,当异常时通过 doOnError 来调用 completableFuture.completeExceptionally:

public class WebClientFutureFactory {
    public static <T> CompletableFuture<T> getCompletableFuture(Mono<T> mono) {
        CompletableFuture<T> completableFuture = new CompletableFuture<>();
        mono.doOnError(throwable -> {
            completableFuture.completeExceptionally(throwable);
            log.error("mono.doOnError throwable:{}", throwable.getMessage());
        }).subscribe(result -> {
            completableFuture.complete(result);
            log.debug("mono.subscribe execute thread: {}", Thread.currentThread().getName());
        });
        return completableFuture;
    }
}

WebClient 对同一服务的多次调用:

public Flux<User> fetchUsers(List<Integer> userIds) {
    return Flux.fromIterable(userIds)
        .parallel()
        .flatMap(this::getUser)
        .ordered((u1, u2) -> u2.id() - u1.id());
}

对返回相同类型的不同服务进行多次调用:

public Flux<User> fetchUserAndOtherUser(int id) {
    return Flux.merge(getUser(id), getOtherUser(id))
        .parallel()
        .runOn(Schedulers.elastic())
        .ordered((u1, u2) -> u2.id() - u1.id());
}

对不同类型的不同服务的多次调用:

public Mono fetchUserAndItem(int userId, int itemId) {
    Mono<User> user = getUser(userId).subscribeOn(Schedulers.elastic());
    Mono<Item> item = getItem(itemId).subscribeOn(Schedulers.elastic());
 
    return Mono.zip(user, item, UserWithItem::new);
}

异步数据库调用

使用 CompletableFuture.supplyAsync 执行异步任务时,必须指定成自己的线程池,否则 CompletableFuture 会使用默认的线程池 ForkJoinPool,默认线程池数量为 cpus - 1:

WebClient<Boolean> dbFuture = CompletableFuture.supplyAsync(() -> getDb(id), ThreadPoolConfig.ASYNC_TASK_EXECUTOR);

编排 CF

构造了所有需要异步执行的 CompletableFuture 之后,使用 allOf 方法阻塞等待所有的 CompletableFuture 结果,allOf 响应之后可以通过 join 获取各个 CompletableFuture 的响应接口,这里的 join 是会立刻返回的,不会阻塞:

//RPC 的 CompletableFuture
CompletableFuture<String> pppName = demoServiceAsyncExecutor.async(service -> service.hello("ppp"));

//RPC 的 CompletableFuture
CompletableFuture<String> huangName = demoServiceAsyncExecutor.async(service -> service.hello("huang"));

//DB 操作的 CompletableFuture
WebClient<Boolean> dbFuture = CompletableFuture.supplyAsync(() -> getDb(id), ThreadPoolConfig.ASYNC_TASK_EXECUTOR);

//allOf 方法阻塞等待所有的 CompletableFuture 结果     
return CompletableFuture.allOf(pppName, huangName, dbFuture)
     //组装结果返回
        .thenApply(r -> pppName.join() && huangName.join() && dbFuture.join()).join();

超时处理

java9 中 CompletableFuture 才有超时处理,使用方法如下:

CompletableFuture.supplyAsync(() -> 6 / 3).orTimeout(1, TimeUnit.SECONDS);

java8 中需要配合 ScheduledExecutorService + applyToEither:

public class TimeoutUtils {
    private static final ScheduledExecutorService scheduledExecutor = Executors.newScheduledThreadPool(1);

    static {
        Runtime.getRuntime().addShutdownHook(new Thread(scheduledExecutor::shutdownNow));
    }

    public static <T> CompletableFuture<T> timeout(CompletableFuture<T> cf, long timeout, TimeUnit unit) {
        CompletableFuture<T> result = new CompletableFuture<>();
        scheduledExecutor.schedule(() -> result.completeExceptionally(new TimeoutException()), timeout, unit);
        return cf.applyToEither(result, Function.identity());
    }
}

TimeoutUtils 类与有一个静态属性,值为初始化的一个 ScheduledExecutorService ,还有一个静态方法 timeout ,这个方法将传入的 cf 用 applyToEither 接口与一个调度计时的 CompletableFuture 组合,哪个 CompletableFuture 先执行完成,就返回哪个的结果。

具体使用如下:

CompletableFuture<Integer> future = demoServiceAsyncExecutor.async(service -> service.getAge(18));
CompletableFuture<Integer> futureWithTimeout = TimeoutUtils.timeout(future, 3, TimeUnit.SECONDS);
futureWithTimeout.join();

异常与默认值处理

CompletableFuture 中可以处理异常有下面三个 API :

public <U> CompletableFuture<U> handle(java.util.function.BiFunction<? super T, Throwable, ? extends U> fn)

handle 接口不论 CompletableFuture 执行成功还是异常都会被处罚,handle 接受一个 BiFunction 参数,BiFunction 中的第一个参数为 CompletableFuture 的结果,另一个参数为 CompletableFuture 执行过程中的异常,Handle可以返回任意类型的值。可以给 handle 传入自定义函数,根据结果跟执行异常返回最终数据。

public CompletableFuture<T> whenComplete(java.util.function.BiConsumer<? super T, ? super Throwable> action)

whenComplete 接口与 handle 类似,whenComplete 接受一个 BiConsumer 参数,BiConsumer 中的第一个参数为 CompletableFuture 的结果,另一个参数为 CompletableFuture 执行过程中的异常,但是没有返回值。

public CompletableFuture<T> exceptionally(java.util.function.Function<Throwable, ? extends T> fn)

exceptionally 接口只有在执行异常的时候才会被触发,接受一个 Function 参会, Function 只有一个参数为 CompletableFuture 执行过程中的异常,可以有一个任意返回值。

下表是三个接口的对比:

handle()whenComplete()exceptionly()
访问成功YesYesNo
访问失败YesYesYes
能从失败中恢复YesNoYes
能转换结果从T 到 UYesNoNo
成功时触发YesYesNo
失败时触发YesYesYes
有异步版本YesYesYes

我们使用 handle 接口来处理异常与默认值,下面是封装的一个 handle 接口入参:

public class DefaultValueHandle<R> extends AbstractLogAction<R> implements BiFunction<R, Throwable, R> {
   public DefaultValueHandle(boolean isNullToDefault, R defaultValue, String methodName, Object... args) {
        super(methodName, args);
        this.defaultValue = defaultValue;
        this.isNullToDefault = isNullToDefault;
    }
   @Override
    public R apply(R result, Throwable throwable) {
        logResult(result, throwable);
        if (throwable != null) {
            return defaultValue;
        }
        if (result == null && isNullToDefault) {
            return defaultValue;
        }
        return result;
    }
}

这个类实现了 handle 接口需要的 BiFunction 类型,在构造方法中有四个参数 boolean isNullToDefault, R defaultValue, String methodName, Object... args 第一个参数是决定执行结果为空值时,是否将我们传进来的第二个参数作为默认值返回。当异常时也会将第二个参数作为默认返回值。最后两个参数一个是方法名称,一个是调用参数,可以给父类用作日志记录。

与 CompletableFuture 配合使用如下:

CompletableFuture<String> pppName = demoServiceAsyncExecutor.async(service -> service.hello("ppp"))
        .handle(new DefaultValueHandle<>(true, "name", "service.hello", "ppp"));

日志

封装了一个实现 BiConsumer 的 LogErrorAction 类,父类有个抽象类 AbstractLogAction 这个类就是简单使用 logReslut 方法记录日志,可以自己随意实现:

public class LogErrorAction<R> extends AbstractLogAction<R> implements BiConsumer<R, Throwable>{
  @Override
    public void accept(R result, Throwable throwable) {
        logResult(result, throwable);
    }
}

与 CompletableFuture 配合使用如下

CompletableFuture<String> pppName = demoServiceAsyncExecutor.async(service -> service.hello("ppp"))
  .whenComplete(
  new LogErrorAction<>("hello", "ppp")
 );

优化效果

优化前接口平均响应耗时 350ms,优化后平均响应耗时 180ms,下降 49% 左右。

最佳实践

  • 禁止嵌套 join,避免“死锁”(线程阻塞)。
  • 多个 CompletableFuture 聚合时建议使用 allOf。
  • HTTP 使用无阻塞的 Spring webclient,避免自定义线程池线程阻塞。
  • 使用 RPC 或者 HTTP 异步调用生成的 CompletableFuture, 后续的 thenAppply,handle 等禁止耗时操作,避免阻塞异步框架线程池。
  • 禁止使用 CompletableFuture 的默认线程池,不同任务自定义线程池,不同级别业务线程池隔离,根据测试情况设置线程数,队列长度,拒绝策略。
  • 异步执行的操作都加上超时,CF 超时后不会终止线程中的超时任务,不设置超时可能导致线程长时间阻塞。
  • 建议使用异常、默认值、空值替换、错误日志等工具记录信息,方便排查问题。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/386657.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

react Context学习记录

react Context学习记录1.Context是干嘛的2.可以倒是可以实现的做法-props逐级传递3.Context1.Context是干嘛的 一种React组件间通信方式, 常用于【祖组件】与【后代组件】间通信 2.可以倒是可以实现的做法-props逐级传递 import React, { Component } from "react";…

Qt资源文件.qrc

目录 一 用途 二 使用效果 三 如何添加资源文件 一 用途 有时候我们想添加图片或者动画的话&#xff0c;就可以使用资源文件 二 使用效果 我在标签上添加了一个蝴蝶的图片&#xff0c;最后呈现出来的效果就是这样子的 三 如何添加资源文件 添加资源文件要指定特定的路径&…

iptables语法规则

iptables命令基本语法 iptables [-t table] command [链名] [条件匹配] [-j 目标动作]以下是对iptables命令的拆分讲解&#xff1a; -t table 用来指明使用的表&#xff0c;有三种选项&#xff1a;filter,nat,mangle。若未指定&#xff0c;则默认使用filter表。 command参数 …

【数据库】基础知识,创建一个表

计算机硬件软件硬件&#xff1a;1.输入输出设备 2.控制器 3.运算器 4.存储器&#xff08;内存&#xff09;软件&#xff1a;1.应用软件 2.系统软件&#xff08;操作系统、数据库管理系统、语言处理系统……&#xff09;从开发者角度分软件&#xff1a;B/S浏览器和服务器结构&am…

umi学习(umi4)

umi 官方文档 官方建议使用 pnpm node版本在 14 以上 创建项目&#xff1a; 根据 包管理工具不同 &#xff0c;官方推荐 这里使用 pnpm&#xff1a; 1. pnpm dlx create-umilatest 2. 选择模板 &#xff08;这里使用 Simple App&#xff09; 想对module处理需要使用 Ant Desig…

19 客户端服务订阅机制的核心流程

Nacos客户端服务订阅机制的核心流程 说起Nacos的服务订阅机制&#xff0c;大家会觉得比较难理解&#xff0c;那我们就来详细分析一下&#xff0c;那我们先从Nacos订阅的概述说起 Nacos订阅概述 Nacos的订阅机制&#xff0c;如果用一句话来描述就是&#xff1a;Nacos客户端通…

vue自定义指令以及angular自定义指令(以禁止输入空格为例)

哈喽&#xff0c;小伙伴们&#xff0c;大家好啊&#xff0c;最近要实现一个vue自定义指令&#xff0c;就是让input输入框禁止输入空格建立一个directives的指令文件&#xff0c;里面专门用来建立各个指令的官方文档&#xff1a;自定义指令 | Vue.js (vuejs.org)我们都知道vue中…

小白学Pytorch 系列--Torch API

小白学Pytorch 系列–Torch API Torch version 1.13 Tensors TORCH.IS_TENSOR 如果obj是PyTorch张量&#xff0c;则返回True。 注意&#xff0c;这个函数只是简单地执行isinstance(obj, Tensor)。使用isinstance 更适合用mypy进行类型检查&#xff0c;而且更显式-所以建议使…

开发手册——一、编程规约_5.集合处理

这篇文章主要梳理了在java的实际开发过程中的编程规范问题。本篇文章主要借鉴于《阿里巴巴java开发手册终极版》 下面我们一起来看一下吧。 1. 【强制】关于 hashCode 和 equals 的处理&#xff0c;遵循如下规则&#xff1a; 只要重写 equals&#xff0c;就必须重写 hashCod…

I.MX6ULL_Linux_系统篇(21) kernel启动流程

链接脚本 vmlinux.lds 要分析 Linux 启动流程&#xff0c;同样需要先编译一下 Linux 源码&#xff0c;因为有很多文件是需要编译才会生成的。首先分析 Linux 内核的连接脚本文件 arch/arm/kernel/vmlinux.lds&#xff0c;通过链接脚本可以 找到 Linux 内核的第一行程序是从哪里…

计算机网络安全基础知识3:网站漏洞,安装phpstudy,安装靶场漏洞DVWA,搭建一个网站

计算机网络安全基础知识3&#xff1a;网站漏洞&#xff0c;安装phpstudy&#xff0c;安装靶场漏洞DVWA&#xff0c;搭建一个网站 2022找工作是学历、能力和运气的超强结合体&#xff0c;遇到寒冬&#xff0c;大厂不招人&#xff0c;可能很多算法学生都得去找开发&#xff0c;测…

7.桥接模式

目录 简介 定义 特点 结构 示例 1. 新建 Brand.interface 接口类&#xff0c;定义不同品牌手机共有的基本功能 2. 新建 Xiaomi.class 类&#xff0c;实现 Brand.interface 接口&#xff0c;实现具体功能 3. 新建 Vivo.class 类&#xff0c;实现 Brand.interface 接口&…

Mybatis源码学习笔记(六)之Mybatis中集成日志框架原理解析

1 Mybatis中集成日志框架示例 1.1 Mybatis使用log4j示例&#xff08;推荐方式&#xff09; 第一步&#xff1a;pom.xml引入log4j依赖 <!-- slf4j日志门面 --><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-api</artifactId&…

Thinkphp6使用RabbitMQ消息队列

Thinkphp6连接使用RabbitMQ&#xff08;不止tp6&#xff0c;其他框架对应改下也一样&#xff09;&#xff0c;如何使用Docker部署RabbitMQ&#xff0c;在上一篇已经讲了->传送门<-。 部署环境 开始前先进入RabbitMQ的web管理界面&#xff0c;选择Queues菜单&#xff0c;点…

深度学习实战20(进阶版)-文件智能搜索系统,可以根据文件内容进行关键词搜索,快速找到文件

大家好&#xff0c;我是微学AI&#xff0c;今天给大家带来深度学习实战项目-文件智能搜索系统&#xff0c;文件智能搜索系统是一种能够帮助用户通过文件的内容快速搜索和定位文件的软件系统。 随着互联网和数字化技术的普及&#xff0c;数据和信息呈现爆炸式增长的趋势&#xf…

ubuntu 将jupyter-lab保存为桌面快捷方式和favourites

ubuntu: 将jupyter-lab保存为桌面快捷方式和favourites desktop shortcut 建立一个新的desktop文件 cd ~/Desktop touch Jupyter-lab.desktop将文件修改成如下&#xff1a; [Desktop Entry] Version1.0 NameJupyterlab CommentBack up your data with one click Exec/home/cjb/…

SpringCloud学习笔记(一)

单体应用架构 在诞⽣之初&#xff0c;拉勾的⽤户量、数据量规模都⽐较⼩&#xff0c;项目所有的功能模块都放在一个工程中编码、编译、打包并且部署在一个Tomcat容器中的架构模式就是单体应用架构。 优点&#xff1a; 高效开发&#xff1a;项⽬前期开发节奏快&#xff0c;团…

02-Oracle数据库的启动与关闭

本文章主要讲解Oracle数据库的启动与关闭方法&#xff0c;详细讲解启动Oracle的命令&#xff0c;三种启动数据库的方法及区别&#xff1b;关闭数据库的4种方法及他们的区别。 启动和关闭数据库 •数据库没启动前&#xff0c;只有拥有DBA权限或者以sysoper或sysdba身份才能连接到…

设计跳表(动态设置节点高度)

最近学习redis的zset时候&#xff0c;又看到跳表的思想&#xff0c;突然对跳表的设置有了新的思考 这是19年设计的跳表&#xff0c;在leetcode的执行时间是200ms 现在我对跳表有了新的想法 1、跳表的设计&#xff0c;类似二分查找&#xff0c;但是不是二分查找&#xff0c;比较…

基于Canal的数据同步

基于Canal的数据同步 一、 系统结构 该数据同步系统由Spring Boot和Canal共同组成。 Spring Boot 是一个流行的 Java Web 框架&#xff0c;而 Canal 则是阿里巴巴开源的 MySQL 数据库的数据变更监听框架。结合 Spring Boot 和 Canal&#xff0c;可以实现 MySQL 数据库的实时数…