Java8流式计算相关

news2024/11/18 11:39:46

目录

lambda

优点

语法介绍

语法格式一 :

语法格式二 :

语法格式三 :

语法格式四 :

语法格式五 :

语法格式六 :

方法引用

stream

Stream流的常用方法:

创建动态list

创建固定长度list

map

filter

groupingBy

sum

list转map:

map转list:

anyMatch():

allMatch():

noneMatch():

count

合并多个一维List:

sorted、distinct:

CompletableFuture

为什么叫CompletableFuture

创建CompletableFuture

构造函数创建

supplyAsync创建

runAsync创建

常见的使用方式

thenApply / thenAccept / thenRun

thenCombine

whenComplete

handle

anyOf

allOf

烧水泡茶”程序

基于Future实现

基于CompletableFuture实现

WorkerWrapper

并行场景可能存在的需求之——每个执行结果的回调

并行场景可能存在的需求之——执行顺序的强依赖和弱依赖

并发场景可能存在的需求之——依赖上游的执行结果作为入参

开始:

基本组件:

示例一:

示例二:

示例三:


Java8流式计算相关

lambda

优点

1、避免匿名内部类定义过多 2、可以让你的代码看起来很简洁 3、去掉了一堆没有意义的代码,留下核心的逻辑

语法介绍

Lambda 表达式的基础语法 : Java8 中引入了一个新的操作符 "->" 该操作符称为箭头操作符或 Lambda 操作符,箭头操作符将 Lambda 表达式拆分成两部分 :

左侧 : Lambda 表达式的参数列表

右侧 : Lambda 表达式中所需执行的功能, 即 Lambda 体

语法格式一 :

无参数,无返回值

() -> System.out.println("Hello Lambda!");

语法格式二 :

有一个参数,并且无返回值

(x) -> System.out.println(x)

语法格式三 :

若只有一个参数,小括号可以省略不写

x -> System.out.println(x)

语法格式四 :

有两个以上的参数,有返回值,并且 Lambda 体中有多条语句

Comparator <Integer> com = (x, y) -> {
    System.out.println("函数式接口");
    return Integer.compare(x, y);
};

语法格式五 :

若 Lambda 体中只有一条语句, return 和 大括号都可以省略不写

Comparator <Integer> com = (x, y) -> Integer.compare(x, y);

语法格式六 :

Lambda 表达式的参数列表的数据类型可以省略不写,因为JVM编译器通过上下文推断出,数据类型,即“类型推断”

(Integer x, Integer y) -> Integer.compare(x, y);

注 : Lambda 表达式中的参数类型都是由编译器推断得出的。 Lambda 表达式中无需指定类型,程序依然可以编译,这是因为 javac 根据程序的上下文,在后台推断出了参数的类型。 Lambda 表达式的类型依赖于上下文环境,是由编译器推断出来的。这就是所谓的 “类型推断”

Lambda 表达式需要 “函数式接口” 的支持

函数式接口 : 接口中只有一个抽象方法的接口,称为函数式接口,可以通过 Lambda 表达式来创建该接口的对象

方法引用

若 Lambda 体中的功能,已经有方法提供实现,可以使用方法引用 (可以将方法引用理解为 Lambda 表达式的另外一种表现形式)

1> 对象的引用 :: 实例方法名

2> 类名 :: 静态方法名

stream

Stream流的常用方法:

PromoteTagVo a = new PromoteTagVo();
PromoteTagVo b = new PromoteTagVo();
PromoteTagVo c = new PromoteTagVo();
a.setInfo("111");
a.setType(1);
b.setInfo("222");
b.setType(2);
c.setInfo("333");
c.setType(3);

创建动态list

List<PromoteTagVo> d = Stream.of(a,b,c).collect(Collectors.toList());

创建固定长度list

List<PromoteTagVo> integers = Arrays.asList(a, b, c);

map

List<String> collect = d.stream().map(PromoteTagVo::getInfo).collect(Collectors.toList());

filter

List<PromoteTagVo> collect1 = d.stream().filter(x -> x.getInfo().equals("111")).collect(Collectors.toList());

groupingBy

Map<String, List<PromoteTagVo>> collect2 = d.stream().collect(Collectors.groupingBy(PromoteTagVo::getInfo, Collectors.toList()));

sum

int sum = d.stream().mapToInt(PromoteTagVo::getType).sum();

list转map:

1、转换map,key重复问题; 代码中使用(key1,key2)->key2表达式可以解决此类问题,如果出现重复的key就使用key2覆盖前面的key1,也可以定义成(key1,key2)->key1,保留key1,根据自己的业务场景来调整。

2、空指针异常,即转为map的value是null。这个可以用filter过滤;

Map<String, PromoteTagVo> collect3 = d.stream().collect(Collectors.toMap(PromoteTagVo::getInfo, Function.identity(), (key1, key2) -> key2));

map转list:

ArrayList<String> strings = new ArrayList<>(collect3.keySet());
ArrayList<PromoteTagVo> promoteTagVos = new ArrayList<>(collect3.values());

anyMatch():

判断的条件里,任意一个元素成功,返回true;

boolean b1 = d.stream().anyMatch(x -> x.getType() > 2);

allMatch():

判断条件里的元素,所有的都是,返回true;

boolean b2 = d.stream().allMatch(x -> x.getType() > 2);

noneMatch():

与allMatch相反,判断条件里的元素,所有的都不是,返回true

boolean b3 = d.stream().noneMatch(x -> x.getType() > 2);

count

long count = d.stream().filter(x -> x.getType()>1).count();

合并多个一维List:

List<PromoteTagVo> collect4 = Stream.of(promoteTagVos1, promoteTagVos2, promoteTagVos3).flatMap(Collection::stream).collect(Collectors.toList());

sorted、distinct:

List<Integer> collect5 = d.stream().map(PromoteTagVo::getType).sorted().distinct().collect(Collectors.toList());

CompletableFuture

CompletableFuture是java.util.concurrent库在java 8中新增的主要工具,同传统的Future相比,其支持流式计算、函数式编程、完成通知、自定义异常处理等很多新的特性。

CompletableFuture是对Future的扩展和增强,同时CompletableFuture实现了对任务编排的能力。

CompletionStage接口定义了任务编排的方法,执行某一阶段,可以向下执行后续阶段。异步执行的,默认线程池是ForkJoinPool.commonPool(),但为了业务之间互不影响,且便于定位问题,强烈推荐使用自定义线程池

为什么叫CompletableFuture

CompletableFuture字面翻译过来,就是“可完成的Future”。同传统的Future相比较,CompletableFuture能够主动设置计算的结果值(主动终结计算过程,即completable),从而在某些场景下主动结束阻塞等待。而Future由于不能主动设置计算结果值,一旦调用get()进行阻塞等待,要么当计算结果产生,要么超时,才会返回。

下面的示例,比较简单的说明了,CompletableFuture是如何被主动完成的。在下面这段代码中,由于调用了complete方法,所以最终的打印结果是“manual test”,而不是"test"。

 CompletableFuture<String> future = CompletableFuture.supplyAsync(()->{
            try{
                Thread.sleep(1000L);
                return "test";
            } catch (Exception e){
                return "failed test";
            }
        });
        future.complete("manual test");
        System.out.println(future.join());

创建CompletableFuture

构造函数创建

最简单的方式就是通过构造函数创建一个CompletableFuture实例。如下代码所示。由于新创建的CompletableFuture还没有任何计算结果,这时调用join,当前线程会一直阻塞在这里。

CompletableFuture<String> future = new CompletableFuture();
//future.complete("1234");
String result = future.join();
System.out.println(result);

此时,如果在另外一个线程中,主动设置该CompletableFuture的值,则上面线程中的结果就能返回。

future.complete("test");

supplyAsync创建

CompletableFuture.supplyAsync()也可以用来创建CompletableFuture实例。通过该函数创建的CompletableFuture实例会异步执行当前传入的计算任务。在调用端,则可以通过get或join获取最终计算结果。

CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> "11");

runAsync创建

CompletableFuture.runAsync()也可以用来创建CompletableFuture实例。与supplyAsync()不同的是,runAsync()传入的任务要求是Runnable类型的,所以没有返回值。因此,runAsync适合创建不需要返回值的计算任务。

CompletableFuture<Void> future = CompletableFuture.runAsync(() -> System.out.println("123"));

常见的使用方式

同Future相比,CompletableFuture最大的不同是支持流式(Stream)的计算处理,多个任务之间,可以前后相连,从而形成一个计算流。比如:任务1产生的结果,可以直接作为任务2的入参,参与任务2的计算,以此类推。

CompletableFuture中常用的流式连接函数包括:

thenApply
thenApplyAsync
 
thenAccept
thenAcceptAsync
 
thenRun
thenRunAsync
 
thenCombine
thenCombineAsync
 
thenCompose
thenComposeAsync
 
whenComplete
whenCompleteAsync
 
handle
handleAsync

后缀Async的是支持异步的

thenApply / thenAccept / thenRun

这里将thenApply / thenAccept / thenRun放在一起讲,因为这几个连接函数之间的唯一区别是提交的任务类型不一样。thenApply提交的任务类型需遵从Function签名,也就是有入参和返回值,其中入参为前置任务的结果。thenAccept提交的任务类型需遵从Consumer签名,也就是有入参但是没有返回值,其中入参为前置任务的结果。thenRun提交的任务类型需遵从Runnable签名,即没有入参也没有返回值。

CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> 1);
CompletableFuture<Integer> future2 = future1.thenApply(p -> p + 10);

需要注意的是,通过thenApply / thenAccept / thenRun连接的任务,当且仅当前置任务计算完成时,才会开始后置任务的计算。因此,这组函数主要用于连接前后有依赖的任务链。

thenCombine

同前面一组连接函数相比,thenCombine最大的不同是连接任务可以是一个独立的CompletableFuture(或者是任意实现了CompletionStage的类型),从而允许前后连接的两个任务可以并行执行(后置任务不需要等待前置任务执行完成),最后当两个任务均完成时,再将其结果同时传递给下游处理任务,从而得到最终结果。

CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> 1);
CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> 4);
CompletableFuture<Integer> future = future1.thenCombine(future2, Integer::sum);

一般,在连接任务之间互相不依赖的情况下,可以使用thenCombine来连接任务,从而提升任务之间的并发度。

whenComplete

whenComplete主要用于注入任务完成时的回调通知逻辑。这个解决了传统future在任务完成时,无法主动发起通知的问题。前置任务会将计算结果或者抛出的异常作为入参传递给回调通知函数。

  CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> 1);
        CompletableFuture future2 = future1.whenComplete((r, e) -> {
            if (e != null) {
                System.out.println("error");
            } else {
                System.out.println(r + 10);
            }
        });

需要注意的是,future2获得的结果是前置任务的结果,whenComplete中的逻辑不会影响计算结果。

handle

handle与whenComplete的作用有些类似,但是handle接收的处理函数有返回值,而且返回值会影响最终获取的计算结果。

  CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> 1);
        CompletableFuture<Integer> handle = future1.handle((r, e) -> {
            if (e != null) {
                System.out.println("error");
                return 0;
            } else {
                System.out.println("right");
                return r + 10;
            }
        });

anyOf

anyOf()的参数是多个给定的CompletableFuture,当其中的任何一个完成时,方法返回这个CompletableFuture

   CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(5000);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
            return "我是future1";
        });
        CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
            return "我是future2";
        });
        CompletableFuture<Object> objectCompletableFuture = CompletableFuture.anyOf(future1, future2);

allOf

allOf方法用来实现多 CompletableFuture 的同时完成任务,没有返回。

  CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(5000);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
            System.out.println("我是future1");
            return "我是future1";
        });
        CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
            System.out.println("我是future2");
            return "我是future2";
        });
        CompletableFuture<Void> future = CompletableFuture.allOf(future1, future2);

烧水泡茶”程序

基于Future实现

public class FutureTaskTest{

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        // 创建任务T2的FutureTask
        FutureTask<String> ft2 = new FutureTask<>(new T2Task());
        // 创建任务T1的FutureTask
        FutureTask<String> ft1 = new FutureTask<>(new T1Task(ft2));

        // 线程T1执行任务ft2
        Thread T1 = new Thread(ft2);
        T1.start();
        // 线程T2执行任务ft1
        Thread T2 = new Thread(ft1);
        T2.start();
        // 等待线程T1执行结果
        System.out.println(ft1.get());

    }
}

// T1Task需要执行的任务:
// 洗水壶、烧开水、泡茶
class T1Task implements Callable<String> {
    FutureTask<String> ft2;
    // T1任务需要T2任务的FutureTask
    T1Task(FutureTask<String> ft2){
        this.ft2 = ft2;
    }
    @Override
    public String call() throws Exception {
        System.out.println("T1:洗水壶...");
        TimeUnit.SECONDS.sleep(1);

        System.out.println("T1:烧开水...");
        TimeUnit.SECONDS.sleep(15);
        // 获取T2线程的茶叶
        String tf = ft2.get();
        System.out.println("T1:拿到茶叶:"+tf);

        System.out.println("T1:泡茶...");
        return "上茶:" + tf;
    }
}
// T2Task需要执行的任务:
// 洗茶壶、洗茶杯、拿茶叶
class T2Task implements Callable<String> {
    @Override
    public String call() throws Exception {
        System.out.println("T2:洗茶壶...");
        TimeUnit.SECONDS.sleep(1);

        System.out.println("T2:洗茶杯...");
        TimeUnit.SECONDS.sleep(2);

        System.out.println("T2:拿茶叶...");
        TimeUnit.SECONDS.sleep(1);
        return "龙井";
    }
}

基于CompletableFuture实现

public class CompletableFutureTest {

    public static void main(String[] args) {

        //任务1:洗水壶->烧开水
        CompletableFuture<Void> f1 = CompletableFuture
            .runAsync(() -> {
                System.out.println("T1:洗水壶...");
                sleep(1, TimeUnit.SECONDS);

                System.out.println("T1:烧开水...");
                sleep(15, TimeUnit.SECONDS);
            });
        //任务2:洗茶壶->洗茶杯->拿茶叶
        CompletableFuture<String> f2 = CompletableFuture
            .supplyAsync(() -> {
                System.out.println("T2:洗茶壶...");
                sleep(1, TimeUnit.SECONDS);

                System.out.println("T2:洗茶杯...");
                sleep(2, TimeUnit.SECONDS);

                System.out.println("T2:拿茶叶...");
                sleep(1, TimeUnit.SECONDS);
                return "龙井";
            });
        //任务3:任务1和任务2完成后执行:泡茶
        CompletableFuture<String> f3 = f1.thenCombine(f2, (__, tf) -> {
            System.out.println("T1:拿到茶叶:" + tf);
            System.out.println("T1:泡茶...");
            return "上茶:" + tf;
        });
        //等待任务3执行结果
        System.out.println(f3.join());
    }

    static void sleep(int t, TimeUnit u){
        try {
            u.sleep(t);
        } catch (InterruptedException e) {
        }
    }
}

WorkerWrapper

零售的一个并行框架,目前在京东app后台运行。

并行场景可能存在的需求之——每个执行结果的回调

传统的Future、CompleteableFuture一定程度上可以完成任务编排,并可以把结果传递到下一个任务。如CompletableFuture有then方法,但是却无法做到对每一个执行单元的回调。譬如A执行完毕成功了,后面是B,我希望A在执行完后就有个回调结果,方便我监控当前的执行状况,或者打个日志什么的。失败了,我也可以记录个异常信息什么的。

此时,CompleteableFuture就无能为力了。

并行场景可能存在的需求之——执行顺序的强依赖和弱依赖

有些场景下,我们希望B和C都执行完毕后,才能执行A,CompletableFuture里有个allOf(futures...).then()方法可以做到。

有些场景下,我们希望B或者C任何一个执行完毕,就执行A,CompletableFuture里有个anyOf(futures...).then()方法可以做到。

此框架同样提供了类似的功能,通过设定wrapper里的addDepend依赖时,可以指定依赖的任务是否must执行完毕。如果依赖的是must要执行的,那么就一定会等待所有的must依赖项全执行完毕,才执行自己。

如果依赖的都不是must,那么就可以任意一个依赖项执行完毕,就可以执行自己了。

并发场景可能存在的需求之——依赖上游的执行结果作为入参

譬如A-B-C三个执行单元,A的入参是String,出参是int,B呢它需要用A的结果作为自己的入参。也就是说A、B并不是独立的,而是有结果依赖关系的。

在A执行完毕之前,B是取不到结果的,只是知道A的结果类型。

那么,此框架也支持这样的场景。可以在编排时,就取A的结果包装类,作为B的入参。虽然此时尚未执行,必然是空,但可以保证A执行完毕后,B的入参会被赋值。

在V1.3后,框架支持在worker的action的入参Map<String, WorkerWrapper>中获取任意一个执行单元的执行结果,当然,可以取其中的1个、多个执行结果作为自己的入参。Key就是在定义wrapper时通过id传进来的唯一id标识。

开始:

 <dependency>
    <groupId>com.jd.platform</groupId>
    <artifactId>asyncTool</artifactId>
    <version>1.4.1-SNAPSHOT</version>
 </dependency>

基本组件:

worker:

一个最小的任务执行单元。通常是一个网络调用,或一段耗时操作。

T,V两个泛型,分别是入参和出参类型。

譬如该耗时操作,入参是String,执行完毕的结果是Integer,那么就可以用泛型来定义。

多个不同的worker之间,没有关联,分别可以有不同的入参、出参类型。

/**
 * 每个最小执行单元需要实现该接口
 * @author wuweifeng wrote on 2019-11-19.
 */
public interface IWorker<T, V> {
    /**
     * 在这里做耗时操作,如rpc请求、IO等
     *
     * @param object
     *         object
     */
    V action(T object, Map<String, WorkerWrapper> allWrappers);

    /**
     * 超时、异常时,返回的默认值
     * @return 默认值
     */
    V defaultValue();
}

callBack:

对每个worker的回调。worker执行完毕后,会回调该接口,带着执行成功、失败、原始入参、和详细的结果。

/**
 * 每个执行单元执行完毕后,会回调该接口</p>
 * 需要监听执行结果的,实现该接口即可
 * @author wuweifeng wrote on 2019-11-19.
 */
public interface ICallback<T, V> {

    void begin();

    /**
     * 耗时操作执行完毕后,就给value注入值
     *
     */
    void result(boolean success, T param, WorkResult<V> workResult);
}

示例一:

public class ParWorker implements ICallback<String, String>, IWorker<String, String> {

    @Override
    public void begin() {
        System.out.println(Thread.currentThread().getName() + "- start --" + System.currentTimeMillis());
    }

    @Override
    public void result(boolean success, String param, WorkResult<String> workResult) {
        if (success) {
            System.out.println("callback parWorker success--" + SystemClock.now() + "----" + workResult.getResult()
                    + "-threadName:" +Thread.currentThread().getName());
        } else {
            System.err.println("callback parWorker failure--" + SystemClock.now() + "----"  + workResult.getResult()
                    + "-threadName:" +Thread.currentThread().getName());
        }
    }


    @Override
    public String action(String object, Map<String, WorkerWrapper> allWrappers) {
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return "result = " + SystemClock.now() + "---param = " + object + " from parWorker";
    }

    @Override
    public String defaultValue() {
        return "parWorker没有数据啦";
    }
}

public class ParWorker1 implements ICallback<String, String>, IWorker<String, String> {

    @Override
    public void begin() {
        System.out.println(Thread.currentThread().getName() + "- start --" + System.currentTimeMillis());
    }

    @Override
    public void result(boolean success, String param, WorkResult<String> workResult) {
        if (success) {
            System.out.println("callback parWorker1 success--" + SystemClock.now() + "----" + workResult.getResult()
                    + "-threadName:" +Thread.currentThread().getName());
        } else {
            System.err.println("callback parWorker1 failure--" + SystemClock.now() + "----"  + workResult.getResult()
                    + "-threadName:" +Thread.currentThread().getName());
        }
    }


    @Override
    public String action(String object, Map<String, WorkerWrapper> allWrappers) {
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return "result = " + SystemClock.now() + "---param = " + object + " from parWorker1";
    }

    @Override
    public String defaultValue() {
        return "parWorker1没有数据啦";
    }
}
public class ParWorker2 implements ICallback<String, String>, IWorker<String, String> {

    @Override
    public void begin() {
        System.out.println(Thread.currentThread().getName() + "- start --" + System.currentTimeMillis());
    }

    @Override
    public void result(boolean success, String param, WorkResult<String> workResult) {
        if (success) {
            System.out.println("callback parWorker2 success--" + SystemClock.now() + "----" + workResult.getResult()
                    + "-threadName:" +Thread.currentThread().getName());
        } else {
            System.err.println("callback parWorker2 failure--" + SystemClock.now() + "----"  + workResult.getResult()
                    + "-threadName:" +Thread.currentThread().getName());
        }
    }


    @Override
    public String action(String object, Map<String, WorkerWrapper> allWrappers) {
        try {
            Thread.sleep(3000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return "result = " + SystemClock.now() + "---param = " + object + " from parWorker2";
    }

    @Override
    public String defaultValue() {
        return "parWorker2没有数据啦";
    }
}
        ParWorker parWorker = new ParWorker();
        ParWorker1 parWorker1 = new ParWorker1();
        ParWorker2 parWorker2 = new ParWorker2();
        WorkerWrapper<String, String> workerWrapper =  new WorkerWrapper.Builder<String, String>()
                .worker(parWorker)
                .callback(parWorker)
                .param("0")
                .build();
        WorkerWrapper<String, String> workerWrapper1 =  new WorkerWrapper.Builder<String, String>()
                .worker(parWorker1)
                .callback(parWorker1)
                .param("1")
                .build();
        WorkerWrapper<String, String> workerWrapper2 =  new WorkerWrapper.Builder<String, String>()
                .worker(parWorker2)
                .callback(parWorker2)
                .param("2")
                .build();
        long now = SystemClock.now();
        System.out.println("begin-" + now);

        Async.beginWork(1500, workerWrapper, workerWrapper1, workerWrapper2);

        System.out.println("end-" + SystemClock.now());
        System.err.println("cost-" + (SystemClock.now() - now));
        System.out.println(Async.getThreadCount());

        System.out.println(workerWrapper.getWorkResult());
        System.out.println(workerWrapper1.getWorkResult());
        System.out.println(workerWrapper2.getWorkResult());
        Async.shutDown();

输出结果:

begin-1672122289597
pool-1-thread-1- start --1672122289643
pool-1-thread-3- start --1672122289643
pool-1-thread-2- start --1672122289643
callback parWorker success--1672122290659----result = 1672122290659---param = 0 from parWorker-threadName:pool-1-thread-1
callback parWorker1 failure--1672122291155----parWorker1没有数据啦-threadName:main
end-1672122291155
callback parWorker2 failure--1672122291155----parWorker2没有数据啦-threadName:main
activeCount=2  completedCount 1  largestCount 3
cost-1558
WorkResult{result=result = 1672122290659---param = 0 from parWorker, resultState=SUCCESS, ex=null}
WorkResult{result=parWorker1没有数据啦, resultState=TIMEOUT, ex=null}
WorkResult{result=parWorker2没有数据啦, resultState=TIMEOUT, ex=null}

示例二:

public class ParWorker3 implements ICallback<String, String>, IWorker<String, String> {

    @Override
    public void begin() {
        System.out.println(Thread.currentThread().getName() + "- start --" + System.currentTimeMillis());
    }

    @Override
    public void result(boolean success, String param, WorkResult<String> workResult) {
        if (success) {
            System.out.println("callback parWorker3 success--" + SystemClock.now() + "----" + workResult.getResult()
                    + "-threadName:" +Thread.currentThread().getName());
        } else {
            System.err.println("callback parWorker3 failure--" + SystemClock.now() + "----"  + workResult.getResult()
                    + "-threadName:" +Thread.currentThread().getName());
        }
    }


    @Override
    public String action(String object, Map<String, WorkerWrapper> allWrappers) {
        try {
            Thread.sleep(4000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return "result = " + SystemClock.now() + "---param = " + object + " from parWorker3";
    }

    @Override
    public String defaultValue() {
        return "parWorker3没有数据啦";
    }
}
        ParWorker parWorker = new ParWorker();
        ParWorker1 parWorker1 = new ParWorker1();
        ParWorker2 parWorker2 = new ParWorker2();
        ParWorker3 parWorker3 = new ParWorker3();
        WorkerWrapper<String, String> workerWrapper3 =  new WorkerWrapper.Builder<String, String>()
                .worker(parWorker3)
                .callback(parWorker3)
                .param("3")
                .build();
        WorkerWrapper<String, String> workerWrapper2 =  new WorkerWrapper.Builder<String, String>()
                .worker(parWorker2)
                .callback(parWorker2)
                .param("2")
                .next(workerWrapper3)
                .build();
        WorkerWrapper<String, String> workerWrapper1 =  new WorkerWrapper.Builder<String, String>()
                .worker(parWorker1)
                .callback(parWorker1)
                .param("1")
                .next(workerWrapper3)
                .build();
        WorkerWrapper<String, String> workerWrapper =  new WorkerWrapper.Builder<String, String>()
                .worker(parWorker)
                .callback(parWorker)
                .param("0")
                .next(workerWrapper1,workerWrapper2)
                .build();
        long now = SystemClock.now();
        System.out.println("begin-" + now);

        Async.beginWork(3100, workerWrapper);

        System.out.println("end-" + SystemClock.now());
        System.err.println("cost-" + (SystemClock.now() - now));

        System.out.println(Async.getThreadCount());
        Async.shutDown();

输出结果:

begin-1672143320339
pool-1-thread-1- start --1672143320381
callback parWorker success--1672143321390----result = 1672143321390---param = 0 from parWorker-threadName:pool-1-thread-1
pool-1-thread-2- start --1672143321392
pool-1-thread-3- start --1672143321392
callback parWorker1 success--1672143323394----result = 1672143323394---param = 1 from parWorker1-threadName:pool-1-thread-2
callback parWorker3 failure--1672143323488----parWorker3没有数据啦-threadName:main
end-1672143323488
callback parWorker2 failure--1672143323488----parWorker2没有数据啦-threadName:main
cost-3149
activeCount=2  completedCount 1  largestCount 3

示例三:

       WorkerWrapper<String, String> workerWrapper =  new WorkerWrapper.Builder<String, String>()
                .worker(parWorker)
                .callback(parWorker)
                .param("0")
                .next(workerWrapper1,workerWrapper2)
                .id("workerWrapper")
                .build();
    @Override
    public String action(String object, Map<String, WorkerWrapper> allWrappers) {
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return "result = " + SystemClock.now() + "---param + 前置结果= " + object + "+"+ allWrappers.get("workerWrapper").getId()+" from parWorker1";
    }

输出结果:

begin-1672144055826
pool-1-thread-1- start --1672144055869
callback parWorker success--1672144056875----result = 1672144056875---param = 0 from parWorker-threadName:pool-1-thread-1
pool-1-thread-2- start --1672144056877
pool-1-thread-3- start --1672144056877
callback parWorker1 success--1672144058886----result = 1672144058886---param + 前置结果= 1+workerWrapper from parWorker1-threadName:pool-1-thread-2
callback parWorker3 failure--1672144058979----parWorker3没有数据啦-threadName:main
callback parWorker2 failure--1672144058979----parWorker2没有数据啦-threadName:main
end-1672144058979
cost-3153
activeCount=2  completedCount 1  largestCount 3

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/148279.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

谷粒商城学习笔记

docker 安装docker docker官方centos镜像下载地址&#xff1a;https://docs.docker.com/engine/install/centos/ 步骤&#xff1a; 先卸载&#xff0c;如果不是root用户在前边加上sudo sudo yum remove docker \docker-client \docker-client-latest \docker-common \docke…

C 程序设计教程(05)—— C 语言的数据类型(三):指针类型

C 程序设计教程&#xff08;05&#xff09;—— C 语言的数据类型&#xff08;三&#xff09;&#xff1a;指针类型 该专栏主要介绍 C 语言的基本语法&#xff0c;作为《程序设计语言》课程的课件与参考资料&#xff0c;用于《程序设计语言》课程的教学&#xff0c;供入门级用…

MySql中json类型数据的查询以及在MyBatis-Plus中的使用

表结构和初始数据 新建表结构 CREATE TABLE json_test (id int NOT NULL AUTO_INCREMENT,roles json DEFAULT NULL COMMENT 角色,project json DEFAULT NULL COMMENT 项目,PRIMARY KEY (id) ) ENGINEInnoDB;初始数据 INSERT INTO ctts_dev.json_test(id, roles, project) VALU…

SpringBoot 整合 xxl-job

文章目录部署 xxl-jobSpringBoot 配置maven 配置application.yaml配置 XxlJobConfigXxlJobSpringExecutor新建执行任务配置 xxl-job-admin执行器管理任务管理部署 xxl-job K8S 部署 xxl-job 参考文档&#xff1a;https://blog.csdn.net/weixin_42555971/article/details/12489…

【Web开发】Python实现Web服务器(Docker下部署Flask)

&#x1f37a;基于Python的Web服务器系列相关文章编写如下&#x1f37a;&#xff1a; &#x1f388;【Web开发】Python实现Web服务器&#xff08;Flask快速入门&#xff09;&#x1f388;&#x1f388;【Web开发】Python实现Web服务器&#xff08;Flask案例测试&#xff09;&a…

分享112个PHP源码,总有一款适合您

PHP源码 分享112个PHP源码&#xff0c;总有一款适合您 链接&#xff1a;https://pan.baidu.com/s/1MaBtjYZk08o0eJT5_E79aQ?pwduldm 提取码&#xff1a;uldm 下面是文件的名字&#xff0c;我放了一些图片&#xff0c;文章里不是所有的图主要是放不下...&#xff0c;大家下载…

实战丨从0到1搭建结算平台

一、概述我们最开始分享了O2O电商支付清结算体系&#xff0c;接着分享了如何从0-1搭建计费体系&#xff0c;接下来我们分享&#xff1a;各方的钱算完之后怎么付出去&#xff0c;也即结算平台建设的实操与设计思路。1.什么是结算&#xff1f;说结算平台之前&#xff0c;先说一下…

22年 | 年前总结 | 主业谋生存,副业谋发展

22年关键词 复盘 | 极简 | 长期主义 | 阅读 | 斜杠青年 | 一事无成 | … 当然了&#xff0c;2023也会继续延续某些关键词。 一壶清酒&#xff0c;敬这红尘也敬我 很多人都在说&#xff0c;疫情存在的时间比任何一段恋情还要长。 而我想说&#xff0c;我失败的次数还超过了做…

Java设计模式中策略模式是怎么回事/怎么替代繁琐if-else语句/如何优化条件选择语句

继续整理记录这段时间来的收获&#xff0c;详细代码可在我的Gitee仓库SpringBoot克隆下载学习使用&#xff01; 6.3 策略模式 6.3.1 概述 定义了一系列算法&#xff0c;并将每个算法封装起来&#xff0c;使它们可以相互替代&#xff0c;且算法的变化不会影响使用算法的用户属…

【自学Python】Python注释

文章来源嗨客网&#xff08;www.haicoder.net&#xff09; Python注释 Python注释教程 用于注解说明解释程序的文字就是注释&#xff0c;注释提高了代码的阅读性。一旦程序中某部分内容被注释掉&#xff0c;则该内容将会被 Python 解释器忽略&#xff0c;换句话说&#xff0c…

一般颜色直方图

颜色直方图是一种用于图像处理和分析的图表&#xff0c;它可以显示图像中不同颜色的数量。通常&#xff0c;颜色直方图会将颜色分成几个色调区间&#xff0c;每个区间对应一个条形图&#xff0c;其中条形图的高度表示该色调区间中的像素数量。通过颜色直方图&#xff0c;你可以…

Mask RCNN网络源码解读(Ⅰ) --- 语义分割前言与转置卷积

目录 1.什么是语义分割 2.语义分割常见的数据集格式 3.常见的语义分割评价指标 4.转置卷积 1.什么是语义分割 常见分割任务&#xff1a;语义分割、实例分割、全景分割 图一 原始图片图二 语义分割图三 实例分割语义分割&#xff08;例如FCN网络&#xff09;可以理解为一个…

开发与项目经理之间的打情骂俏——数据库篇

&#x1f466;&#x1f466;一个帅气的boy&#xff0c;你可以叫我Love And Program &#x1f5b1; ⌨个人主页&#xff1a;Love And Program的个人主页 &#x1f496;&#x1f496;如果对你有帮助的话希望三连&#x1f4a8;&#x1f4a8;支持一下博主 由数据库引发的一系列探…

小论文写作指南(AI类)

参考b站沃恩智慧课程 论文结构 标题:不要太长或太短,抓住重点,简明扼要。 作者:你的作品一定力争一作,通讯作者是导师/大老板/出资人。 摘要Abstract:点明大背景(如为什么研究微表情识别,对社会有什么价值,拔高立意层次),阐述目标(我们提出模型为了在什么问题上达…

9个时间序列交叉验证方法的介绍和对比

评估性能对预测模型的开发至关重要。交叉验证是一种流行的技术。但是在处理时间序列时&#xff0c;应该确保交叉验证处理了数据的时间依赖性质。在之前的文章中&#xff0c;我们也做过相应的介绍。 在本文中&#xff0c;我们收集了时间序列的常用的9种交叉验证方法。这些包括样…

【博客578】LVS NAT配合MASQUERADE实现FULLNAT的场景,及此场景下net.ipv4.vs.conntrack参数的重要作用

LVS NAT配合MASQUERADE实现FULLNAT的场景&#xff0c;及此场景下net.ipv4.vs.conntrack参数的重要作用 1、LVS基本原理&#xff1a; 流程&#xff1a; 当用户向负载均衡调度器&#xff08;Director Server&#xff09;发起请求&#xff0c;调度器将请求发往至内核空间 PREROU…

第6章 线程通信

6.2.1 管道 管道是一个线性字节数组,类似文件,使用文件读写进行访问&#xff1b;在程序里面,创建管道需要使用popen()或者pipe(); 管道的一个重要特点是使用管道的两个线程之间必须存在某种关系, 例如,使用popen需要提供另一端进程的文件名,使用pipe的两个线程分别隶属于父子进…

Linux常用命令——fgrep命令

在线Linux命令查询工具 fgrep 为文件搜索文字字符串 补充说明 fgrep命令是用来搜索 file 参数指定的输入文件&#xff08;缺省为标准输入&#xff09;中的匹配模式的行。fgrep 命令特别搜索 Pattern 参数&#xff0c;它们是固定的字符串。如果在 File 参数中指定一个以上的…

dp刷题(三)编辑距离(Hard)

编辑距离_牛客题霸_牛客网 描述 给定两个单词word1和word2&#xff0c;请计算将word1转换为word2至少需要多少步操作。 你可以对一个单词执行以下3种操作&#xff1a; a&#xff09;在单词中插入一个字符 b&#xff09;删除单词中的一个字符 c&#xff09;替换单词中的一个字…

C#,图像二值化(14)——全局阈值的最佳迭代算法(Iterate Thresholding)及源代码

1、图像二值化 图像二值化是将彩色图像转换为黑白图像。大多数计算机视觉应用程序将图片转换为二进制表示。图像越是未经处理&#xff0c;计算机就越容易解释其基本特征。 二值化过程 在计算机存储器中&#xff0c;所有文件通常以灰度级的形式存储&#xff0c;灰度级具有从0…