响应式编程一、Reactor核心

news2024/12/27 6:39:27

目录

  • 一、前置知识
    • 1、Lambda表达式
    • 2、==函数式接口 Function==
    • 3、==StreamAPI==
    • 4、Reactive-Stream
      • 1)几个实际的问题
      • 2)Reactive-Stream是什么?
      • 3)==核心接口==
      • 4)处理器 Processor
      • 5)总结
  • 二、Reactor核心
    • 1、Reactor
      • 1)介绍
      • 2)Reactor的三个核心特性
      • 3)响应式编程

课程内容
在这里插入图片描述

一、前置知识

1、Lambda表达式

interface MyInterface {
    int sum(int i, int j);
}

interface MyHaha {
    int haha();

    default int heihei() {
        return 2;
    }

    ; //默认实现
}

@FunctionalInterface //检查注解,帮我们快速检查我们写的接口是否函数式接口
interface MyHehe {
    int hehe(int i);


}

/**
     * lambda简化函数式接口实例创建
     *
     * @param args
     */
    public static void aaaa(String[] args) {

        //1、自己创建实现类对象
        MyInterface myInterface = new MyInterfaceImpl();
        System.out.println(myInterface.sum(1, 2));

        //2、创建匿名实现类
        MyInterface myInterface1 = new MyInterface() {
            @Override
            public int sum(int i, int j) {
                return i * i + j * j;
            }
        };
//        System.out.println(myInterface1.sum(2, 3));
        //冗余写法

        //3、lambda表达式:语法糖  参数列表  + 箭头 + 方法体
        MyInterface myInterface2 = (x, y) -> {
            return x * x + y * y;
        };
        System.out.println(myInterface2.sum(2, 3));
  }

//参数位置最少情况
        MyHaha myHaha = () -> {
            return 1;
        };

        MyHehe myHehe = y -> {
            return y * y;
        };


        MyHehe hehe2 = y -> y - 1;
        //总结:
		//1)、参数类型可以不写,只写(参数名),参数变量名随意定义;
        //    参数表最少可以只有一个 (),或者只有一个参数名;
        //2、方法体如果只有一句话,{} 可以省略

2、函数式接口 Function

接口中有且只有一个未实现的方法,这个接口就叫做函数式接口

函数式接口的出入参定义:
1、有入参,无出参【消费者】BiConsumer

 BiConsumer<String,String> function = (a,b)->{ //能接受两个入参
        System.out.println("哈哈:"+a+";呵呵:"+b);
    };
    function.accept("1","2");

2、有入参,有出参【多功能函数】: Function

Function<String,Integer> function = (String x) -> Integer.parseInt(x);
 System.out.println(function.apply("2"));

3、无入参,无出参【普通函数】:

Runnable runnable = () -> System.out.println("aaa");
new Thread(runnable).start();

4、无入参 ,有出参【提供者】: supplier

Supplier<String> supplier = ()-> UUID.randomUUID().toString();
String s = supplier.get();
System.out.println(s);

java.util.function包下的所有function定义:
● Consumer: 消费者
● function: 功能函数
● Supplier: 提供者
● Predicate: 断言
get/test/apply/accept调用的函数方法;

3、StreamAPI

中间操作:Intermediate Operations

  • filter:过滤; 挑出我们用的元素
  • map: 映射: 一一映射,a 变成 b
    • mapToInt、mapToLong、mapToDouble
  • flatMap:一对多映射

filter、 map、mapToInt、mapToLong、mapToDouble flatMap、flatMapToInt、flatMapToLong、flatMapToDouble mapMulti、mapMultiToInt、mapMultiToLong、mapMultiToDouble、 parallel、unordered、onClose、sequential distinct、sorted、peek、limit、skip、takeWhile、dropWhile、

终止操作:Terminal Operation
forEach、forEachOrdered、toArray、reduce、collect、toList、min、 max、count、anyMatch、allMatch、noneMatch、findFirst、findAny、iterator



4、Reactive-Stream

1)几个实际的问题

当请求量巨大的时候,tomcat会被压垮,此时就需要采取背压的策略,让tomcat根据自己的能力主动去消费请求。
在这里插入图片描述

服务器核心数固定时,多线程情况下:线程越多越好吗?
当核心数固定时,线程并不是越多越好,操作系统是分时间片执行任务的,当线程越多时,线程间的切换就越频繁,导致cpu性能消耗越多。
在这里插入图片描述
在这里插入图片描述

2)Reactive-Stream是什么?

Reactive-Streams 是一个标准规范,定义了异步数据流处理的 API 和行为规则,专注于解决异步流式数据的**背压(Backpressure)**问题。

主要特性

  • 异步:通过非阻塞方式处理数据流。
  • 流式处理:支持连续数据流的逐步消费,避免一次性加载大量数据。
  • 背压机制:允许消费者控制生产者的速率,防止消费者被超量数据淹没。
  • 非阻塞:在处理数据时不阻塞线程,提高资源利用率。

背压机制(Backpressure)
Reactive-Streams 的核心之一是通过 Subscription 提供背压支持。

消费者可以通过 request(n) 方法控制生产者的生产速率。
如果消费者处理能力不足,可以减少请求数据量,避免内存溢出或阻塞。

使用场景
事件流处理:如消息队列、用户事件。
高性能网络请求:如 RESTful API、WebSocket。
大数据流处理:需要逐步消费大规模数据的场景。
异步系统集成:将不同系统间的数据流通过异步方式连接起来。

在这里插入图片描述

3)核心接口

  • Publisher:发布者;产生数据流
  • Subscriber:订阅者; 消费数据流
  • Subscription:订阅关系;
    订阅关系是发布者和订阅者之间的关键接口。订阅者通过订阅来表示对发布者产生的数据的兴趣。订阅者可以请求一定数量的元素,也可以取消订阅。
  • Processor:处理器;
    处理器是同时实现了发布者和订阅者接口的组件。它可以接收来自一个发布者的数据,进行处理,并将结果发布给下一个订阅者。处理器在Reactor中充当中间环节,代表一个处理阶段,允许你在数据流中进行转换、过滤和其他操作。

【扩展】
以前的编程模型是命令式编程: 过程编程,全自定义
流式编程是响应式|声明式编程,说清楚要干什么,最终结果要怎么样

public class MyFlowDemo {
    public static void main(String[] args) {

        // 1、定义一个发布者,发布数据
        SubmissionPublisher<String> publisher = new SubmissionPublisher<>();


        //3、定义一个订阅者; 订阅者感兴趣发布者的数据;
        Flow.Subscriber<String> subscriber = new Flow.Subscriber<String>() {

            private Flow.Subscription subscription;

            @Override //在订阅时  onXxxx:在xxx事件发生时,执行这个回调
            public void onSubscribe(Flow.Subscription subscription) {
                System.out.println(Thread.currentThread() + "订阅开始了:" + subscription);
                this.subscription = subscription;
                //从上游请求一个数据
                subscription.request(1);
            }

            @Override //在下一个元素到达时; 执行这个回调;   接受到新数据
            public void onNext(String item) {
                System.out.println(Thread.currentThread() + "订阅者,接受到数据:" + item);
                //从上游请求一个数据
                subscription.request(1);
            }

            @Override //在错误发生时,
            public void onError(Throwable throwable) {
                System.out.println(Thread.currentThread() + "订阅者,接受到错误信号:" + throwable);
            }

            @Override //在完成时
            public void onComplete() {
                System.out.println(Thread.currentThread() + "订阅者,接受到完成信号:");
            }
        };

        publisher.subscribe(subscriber);

        for (int i = 0; i < 10; i++) {
            publisher.submit("p-" + i);
        }

        try {
            Thread.sleep(20000);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }
}

在这里插入图片描述

4)处理器 Processor

在这里插入图片描述

public class FlowDemo {

    //定义流中间操作处理器; 只用写订阅者的接口
    static class MyProcessor extends SubmissionPublisher<String>  implements Flow.Processor<String,String> {

        private Flow.Subscription subscription; //保存绑定关系
        @Override
        public void onSubscribe(Flow.Subscription subscription) {
            System.out.println("processor订阅绑定完成");
            this.subscription = subscription;
            subscription.request(1); //找上游要一个数据
        }

        @Override //数据到达,触发这个回调
        public void onNext(String item) {
            System.out.println("processor拿到数据:"+item);
            //再加工
            item += ":哈哈";
            submit(item);//把我加工后的数据发出去
            subscription.request(1); //再要新数据
        }

        @Override
        public void onError(Throwable throwable) {

        }

        @Override
        public void onComplete() {

        }
    }

    /**
     * 1、Publisher:发布者
     * 2、Subscriber:订阅者
     * 3、Subscription: 订阅关系
     * 4、Processor: 处理器
     * @param args
     */

    //发布订阅模型:观察者模式,
    public static void main(String[] args) throws InterruptedException {

        //1、定义一个发布者; 发布数据;
        SubmissionPublisher<String> publisher = new SubmissionPublisher<>();



        //2、定一个中间操作:  给每个元素加个 哈哈 前缀
        MyProcessor myProcessor1 = new MyProcessor();

        //3、定义一个订阅者; 订阅者感兴趣发布者的数据;
        Flow.Subscriber<String> subscriber = new Flow.Subscriber<String>() {

            private Flow.Subscription subscription;

            @Override //在订阅时  onXxxx:在xxx事件发生时,执行这个回调
            public void onSubscribe(Flow.Subscription subscription) {
                System.out.println(Thread.currentThread()+"订阅开始了:"+subscription);
                this.subscription = subscription;
                //从上游请求一个数据
                subscription.request(1);
            }

            @Override //在下一个元素到达时; 执行这个回调;   接受到新数据
            public void onNext(String item) {
                System.out.println(Thread.currentThread()+"订阅者,接受到数据:"+item);

                if(item.equals("p-7")){
                    subscription.cancel(); //取消订阅
                }else {
                    subscription.request(1);
                }
            }

            @Override //在错误发生时,
            public void onError(Throwable throwable) {
                System.out.println(Thread.currentThread()+"订阅者,接受到错误信号:"+throwable);
            }

            @Override //在完成时
            public void onComplete() {
                System.out.println(Thread.currentThread()+"订阅者,接受到完成信号:");
            }
        };

        //4、绑定发布者和订阅者
        publisher.subscribe(myProcessor1); //此时处理器相当于订阅者


        myProcessor1.subscribe(subscriber); //此时处理器相当于发布者
        //绑定操作;就是发布者,记住了所有订阅者都有谁,有数据后,给所有订阅者把数据推送过去。


//        publisher.subscribe(subscriber);

        for (int i = 0; i < 10; i++) {
            //发布10条数据
            if(i == 5){
//                publisher.closeExceptionally(new RuntimeException("5555"));
            }else {
                publisher.submit("p-"+i);
            }
            //publisher发布的所有数据在它的buffer区;
            //中断
//            publisher.closeExceptionally();
        }



        //ReactiveStream
        //jvm底层对于整个发布订阅关系做好了 异步+缓存区处理 = 响应式系统;

        //发布者通道关闭
        publisher.close();


//        publisher.subscribe(subscriber2);


        //发布者有数据,订阅者就会拿到
        Thread.sleep(20000);

    }
}

5)总结

在这里插入图片描述

二、Reactor核心

响应式编程:

1、底层:基于数据缓冲队列 + 消息驱动模型 + 异步回调机制
2、编码:流式编程 + 链式调用 + 声明式API
3、效果:优雅全异步 + 消息实时处理 + 高吞吐量 + 占用少量资源

回调机制:类似于SpringBoot的事件机制,在SpringBoot应用的启动过程中触发事件。

1、Reactor

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

在这里插入图片描述

在这里插入图片描述

1)介绍

Reactor 是一个用于在JVM构建非阻塞应用的响应式编程框架 !

2)Reactor的三个核心特性

3)响应式编程

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2251889.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Vue3之弹窗

文章目录 第一步、引入JS第二步、弹框 在前端开发语言Vue3&#xff0c;在管理端如何进行弹窗&#xff1f;下面根据API实现效果。 Element API文档&#xff1a; Element-plus文档 搭建环境可参考博客【 初探Vue3环境搭建与nvm使用】 第一步、引入JS <script lang"ts&…

w~大模型~合集24

我自己的原文哦~ https://blog.51cto.com/whaosoft/12707697 #Time Travelling Pixels (TTP) 一种名为“时空旅行”&#xff08;TTP&#xff09;的新方法&#xff0c;该方法将SAM基础模型的通用知识整合到变化检测任务中。该方法有效地解决了在通用知识转移中的领域偏移问题…

git的简单使用与gdb

版本控制器git 为了能够更方便管理这些不同版本的文件&#xff0c;有了版本控制器&#xff0c;可以了解一个文件的历史&#xff0c;以及它的发展过程的系统&#xff0c;通俗的说就是一个可以记录工程的每一次改动和版本迭代的一个管理系统&#xff0c;同时也方便多人协作。 三…

从0开始学PHP面向对象内容之常用设计模式(策略,观察者)

PHP设计模式——行为型模式 PHP 设计模式中的行为模式&#xff08;Behavioral Patterns&#xff09;主要关注对象之间的通信和交互。行为模式的目的是在不暴露对象之间的具体通信细节的情况下&#xff0c;定义对象的行为和职责。它们常用于解决对象如何协调工作的问题&#xff…

解决windows下php8.x及以上版本,在Apache2.4中无法加载CURL扩展的问题

本文已首发于&#xff1a;秋码记录 若你也想搭建一个个人博客&#xff0c;可参考&#xff1a;国内 gitee.com Pages 下线了&#xff0c;致使众多站长纷纷改用 github、gitlab Pages 托管平台 在日新月异的信息化下&#xff0c;软件也在跟随着互联网的脚步&#xff0c;逐步推进…

git 常用命令及问题

一、常用命令 git add filename git add . git commit -m "messge" git commit --amend 修改最近一次的提交 git push origin HEAD:refs/for/master git clone url git checkout branchname 切换分支 git branch -r 查看远程仓库分支列表 git branch br…

【Gitlab】gitrunner并发配置

并发介绍 涉及到并发控制的一共有4个参数: concurrent , limit ,request_concurrency,parallel 全局的配置: [rootiZ2vc6igbukkxw6rbl64ljZ config]# vi config.toml concurrent 4 #这是一个总的全局控制&#xff0c;它限制了所有pipline&#xff0c;所有runner执行器…

利用Python爬虫精准获取淘宝商品详情的深度解析

在数字化时代&#xff0c;数据的价值日益凸显&#xff0c;尤其是在电子商务领域。淘宝作为中国最大的电商平台之一&#xff0c;拥有海量的商品数据&#xff0c;对于研究市场趋势、分析消费者行为等具有重要意义。本文将详细介绍如何使用Python编写爬虫程序&#xff0c;精准获取…

NFT Insider #157:The Sandbox 开启新一期 VoxEdit 比赛

市场数据 加密艺术及收藏品新闻 Artnames 项目上线&#xff0c;将用户姓名转化为个性化 NFT 艺术品 由知名数字艺术家 Arrotu 发起的生成艺术项目「Artnames」正式上线&#xff0c;利用区块链技术将用户姓名转化为独一无二的 NFT 艺术品。该项目于 11 月 14 日启动&#xff0…

Mysql数据库基础篇笔记

目录 sql语句 DDL——数据库定义语言&#xff08;定义库&#xff0c;表&#xff0c;字段&#xff09; 数据库操作&#xff1a; 表操作&#xff1a; DML 增删改语句 DQL 语法编写顺序&#xff1a; 条件查询 DCL 用户管理&#xff1a; 权限管理&#xff1a; 函数 常见字符串内置函…

基于单片机的频率测量电路设计

摘 要&#xff1a; 传统的频率测量设备大多因硬件电路庞大&#xff0c;导致设备整体体积相对较大&#xff0c;且测量频率信号的精确度低&#xff0c;测量范围较小&#xff0c;运行速度较慢。 据此&#xff0c;介绍了一种以 AT89C51 单片机为控制核心&#xff0c;由放大整形模块…

解决Ubuntu DNS覆盖写入127.0.0.53

ubuntu22.04解析网址时报错如图所示&#xff1a; 因为/etc/resolve.conf中存在 nameserver 127.0.0.53回环地址造成循环引用 原因&#xff1a; ubuntu17.0之后特有&#xff0c;systemd-resolvd服务会一直覆盖 解决方法&#xff1a; 1、修改resolv.config文件中的nameserver…

IDEA报错: java: JPS incremental annotation processing is disabled 解决

起因 换了个电脑打开了之前某个老项目IDEA启动springcloud其中某个服务直接报错&#xff0c;信息如下 java: JPS incremental annotation processing is disabled. Compilation results on partial recompilation may be inaccurate. Use build process “jps.track.ap.depen…

C++20: 像Python一样逐行读取文本文件并支持切片操作

概要 逐行读取文本文件&#xff0c;并提取其中连续的几行&#xff0c;这对于 Python 来说是小菜一碟。 C 则很笨拙&#xff0c; 语言不自带这些。 这次我来拯救 C boys & girls&#xff0c; 在 C20 环境下&#xff0c;山寨一个 Python 下的逐行读文本文件、支持 slice 操作…

【NLP高频面题 - LLM架构篇】LLM对Transformer都有哪些优化?

【NLP高频面题 - LLM架构篇】LLM对Transformer都有哪些优化&#xff1f; ⚠︎ 重要性&#xff1a;★★★ &#x1f4af; NLP Github 项目&#xff1a; NLP 项目实践&#xff1a;fasterai/nlp-project-practice 介绍&#xff1a;该仓库围绕着 NLP 任务模型的设计、训练、优化、…

电脑还原重置Windows系统不同操作模式

电脑有问题,遇事不决就重启,一切都不是问题!是真的这样吗。其实不然,主机系统重启确实可以自动修复一些文件错误,或者是设置问题,但是,当你由于安装了错误的驱动或者中毒严重,亦或是蓝屏,那么重启这个方子可能就治不了你的电脑了。 那么,除了当主机出现异常故障现象…

深度学习模型: BERT(Bidirectional Encoder Representations from Transformers)详解

一、引言 自然语言处理&#xff08;NLP&#xff09;领域在过去几十年取得了显著的进展。从早期基于规则的方法到统计机器学习方法&#xff0c;再到如今基于深度学习的模型&#xff0c;NLP 不断向着更高的准确性和效率迈进。BERT 的出现为 NLP 带来了新的突破&#xff0c;它能够…

亚马逊开发视频人工智能模型,The Information 报道

根据《The Information》周三的报道&#xff0c;电子商务巨头亚马逊&#xff08;AMZN&#xff09;已开发出一种新的生成式人工智能&#xff08;AI&#xff09;&#xff0c;不仅能处理文本&#xff0c;还能处理图片和视频&#xff0c;从而减少对人工智能初创公司Anthropic的依赖…

LLM学习笔记(13)分词器 tokenizer

由于神经网络模型不能直接处理文本&#xff0c;因此我们需要先将文本转换为数字&#xff0c;这个过程被称为编码 (Encoding)&#xff0c;其包含两个步骤&#xff1a; 使用分词器 (tokenizer) 将文本按词、子词、字符切分为 tokens&#xff1b;将所有的 token 映射到对应的 tok…

通过LabVIEW项目判断开发环境是否正版

在接收或分析他人提供的LabVIEW项目时&#xff0c;判断其开发环境是否为正版软件对于保护知识产权和避免使用非法软件至关重要。本文将详细介绍如何通过项目文件、可执行程序及开发环境信息判断LabVIEW是否为正版。 ​ 1. 从项目文件判断 LabVIEW项目的源码&#xff08;VI 文件…