从零开始搭建AI网站(6):如何使用响应式编程

news2024/12/25 9:09:40

响应式编程(Reactive Programming)是一种编程范式,旨在处理异步数据流和事件流。它通过使用观察者模式和函数式编程的概念,将数据流和事件流抽象为可观察的序列,然后通过操作这些序列来实现各种功能。

在响应式编程中,数据流和事件流被视为连续的时间序列,可以通过操作符来转换、过滤和组合它们。这种编程范式的主要优势是它可以简化异步编程,并提供一种声明式的方式来处理数据流和事件流。它还可以提高代码的可读性和可维护性,因为它将复杂的异步逻辑封装在操作符中,使得代码更易于理解和修改。

响应式编程可以应用于各种领域,包括前端开发、后端开发、移动开发等。在前端开发中,响应式编程可以用于处理用户界面的事件流和数据流,使得界面能够动态地响应用户的操作。在后端开发中,响应式编程可以用于处理大量的异步请求和数据流,提高系统的吞吐量和响应速度。

常见的响应式编程框架包括RxJava、RxJS、ReactiveX等。这些框架提供了一系列的操作符和工具,用于处理数据流和事件流,并提供了一种简洁而强大的方式来处理异步编程。

当前响应式编程的典型例子莫过于最近炙手可热的ChatGPT的流式输出了。因为ChatGPT请求响应时间较长,如果采用传统的一直等待全部数据就绪,用户恐怕早就跑光了,而响应式方式则不需要等待所有数据就绪,而只需要有部分数据就绪即可输出,从而极大地提升了用户体验。下面以此为例,来说明实现这种效果的原理(开发语言Java)。

先来看看上文中提到的的三个响应式编程框架:RxJava、RxJS和ReactiveX。它们是三个相关的概念,同时也是不同平台上的实现。

  1. RxJava:RxJava是ReactiveX在Java平台上的实现,它提供了一套丰富的API和操作符,用于处理异步和事件驱动的编程。RxJava是基于观察者模式和迭代器模式的,可以用于处理数据流、事件流和异步任务等。
  2. RxJS:RxJS是ReactiveX在JavaScript平台上的实现,它提供了类似于RxJava的API和操作符,用于处理异步和事件驱动的编程。RxJS可以在浏览器端和Node.js环境中使用,可以处理DOM事件、AJAX请求、定时器等。
  3. ReactiveX:ReactiveX是一个跨平台的响应式编程库,它提供了一套统一的API和操作符,用于处理异步和事件驱动的编程。ReactiveX的目标是提供一种通用的编程模型,使得开发者可以在不同的平台和语言中共享代码和思想。

在Springboot中,另有WebFlux模块可供使用,同时它也可以跟上面的模块一起使用。说起Flux,这里也会涉及到另一个概念:Flowable。其实Flowable和Flux都是响应式流的实现,它们有以下关系:

  1. Flowable是RxJava的一部分,而Flux是Reactor的一部分。RxJava是一个用于Java的响应式编程库,而Reactor是一个用于Java的响应式编程框架。
  2. Flowable是RxJava中的一个类,它实现了Reactive-Streams规范,提供了对背压(backpressure)的支持。Flowable可以处理异步和并发的数据流,并且可以控制数据流的速率,以避免生产者和消费者之间的不匹配。
  3. Flux是Reactor中的一个类,它也实现了Reactive-Streams规范,提供了类似的功能。Flux可以处理异步和并发的数据流,并且可以控制数据流的速率。
  4. Flowable和Flux都提供了一系列的操作符,可以对数据流进行转换、过滤、映射等操作。这些操作符可以帮助开发者处理和操作数据流,使代码更加简洁和可读。

跟tRxJava和Reactor密切相关的开发库之一是WebClien。WebClient是一个用于发送HTTP请求的非阻塞的响应式客户端,它是Reactor项目的一部分。

WebClient提供了一种简洁、灵活和可组合的方式来发送HTTP请求,并处理响应。它可以与RxJava和Reactor的异步和响应式编程模型无缝集成,使得在响应式应用程序中处理HTTP请求变得更加方便和高效。

WebClient可以与RxJava的Flowable一起使用,通过toFlowable()方法将响应转换为Flowable流,从而实现对响应的处理和操作。

WebClient webClient = WebClient.create();
Flowable<String> response = webClient.get()
        .uri("https://example.com")
        .retrieve()
        .bodyToFlux(String.class)
        .toFlowable();

同样,WebClient也可以与Reactor的Flux一起使用,通过bodyToFlux()方法将响应转换为Flux流,从而实现对响应的处理和操作。

WebClient webClient = WebClient.create();
Flux<String> response = webClient.get()
        .uri("https://example.com")
        .retrieve()
        .bodyToFlux(String.class);

下面我们将关注点放在Reactor框架中,在Reactor中,不得不提另一个跟Flux相对的概念:Mono。Flux和Mono是Reactor框架中的两个关键类,它们都是用于处理响应式流的。

  1. Flux是一个表示0到N个元素的响应式流。它可以发出多个元素,并以异步的方式产生这些元素。Flux可以用于处理多个值的数据流,例如从数据库查询结果、文件读取等。
  2. Mono是一个表示0或1个元素的响应式流。它要么发出一个元素,要么不发出任何元素。Mono可以用于处理单个值的数据流,例如从缓存中获取数据、获取单个实体等。
  3. Flux和Mono之间有以下关系:
  • Flux可以被转换成Mono。
Flux<Integer> flux = Flux.just(1, 2, 3);
Mono<Integer> mono = flux.single();
    • Mono可以被转换成Flux。
Mono<Integer> mono = Mono.just(1);
Flux<Integer> flux = mono.flux();

Flux和Mono可以通过一系列的操作符进行转换、过滤、映射等操作,使得对响应式流的处理变得更加灵活和方便。它们是Reactor框架中的核心类,用于构建响应式应用程序。

webClient可以实现复杂的处理逻辑,比如异常处理:

webClient.get()
         .uri(url)
         .retrieve()
         .onStatus(HttpStatus::is4xxClientError, response -> Mono.error(new CustomException("客户端错误")))
         .onStatus(HttpStatus::is5xxServerError, response -> Mono.error(new CustomException("服务器错误")))
         .bodyToMono(String.class)
         .onErrorResume(throwable -> {
             if (throwable instanceof WebClientResponseException) {
                 WebClientResponseException ex = (WebClientResponseException) throwable;
                 // 处理响应异常
             } else {
                 // 处理其他异常
             }
         });

在使用 Spring Boot 的 WebClient 时,bodyToMono 和 bodyToFlux 方法都可以用于将响应体转换为 Mono 或 Flux 对象。

bodyToMono 方法用于将响应体转换为 Mono 对象,适用于响应体只有一个元素的情况,例如返回一个 JSON 对象或者一个字符串。

bodyToFlux 方法用于将响应体转换为 Flux 对象,适用于响应体有多个元素的情况,例如返回一个 JSON 数组或者一个流式数据。

因此,当我们需要处理的响应体只有一个元素时,应该使用 bodyToMono 方法;当我们需要处理的响应体有多个元素时,应该使用 bodyToFlux 方法。

在 Reactor 中,Flux 流结束的实现原理是通过发送一个 onComplete 信号来通知订阅者流已经结束。当 Flux 流中的所有元素都被消费完毕时,会自动发送一个 onComplete 信号。

例如,当我们使用 Flux.range(1, 10) 创建一个包含 1 到 10 的整数序列的 Flux 流时,当订阅者订阅该流并消费完所有元素后,会自动发送一个 onComplete 信号来通知订阅者流已经结束。

在使用 Spring Boot 的 WebClient 时,当我们使用 bodyToFlux 方法将响应体转换为 Flux 对象时,如果响应体是一个流式数据,那么当流式数据传输完毕后,会自动发送一个 onComplete 信号来通知订阅者流已经结束。

webClient.get()
         .uri(url)
         .retrieve()
         .bodyToFlux(String.class)
         .doFinally(signalType -> {
             if (signalType == SignalType.ON_COMPLETE) {
                 System.out.println("流已结束");
             }
         })
         .subscribe();

有了这些基础知识的准备,我们再来看看ChatGPT的响应结果样例。OpenAI的聊天接口是:

http://api.openai.com/v1/chat/completitions。

该接口接受这样的一个请求数据结构:ChatCompletionRequest。其中有个属性stream 可以设定是否采用流输出。默认false。

这个例子是非stream输出,输出格式为:ChatCompletionResponse

$ curl https://api.openai.com/v1/chat/completions -H 'Content-Type: application/json' -H "Authorization: Bearer sk-zDxkX0Na0e63B18c9c6bT3BlBkFJf3De387b398749c5bD1d" -d '{"model": "gpt-3.5-turbo","stream":"false","messages": [{"role": "user", "content": "Hello!"}]}'
{"id":"chatcmpl-7tywVQ4vSPzs8yuZy5FqvL0CX07W0","object":"chat.completion","created":1693576659,"model":"gpt-3.5-turbo-0613","choices":[{"index":0,"message":{"role":"assistant","content":"Hello

这个例子是stream输出,输出结构体为:字符串格式的ChatCompletionResponse:

curl https://api.openai.com/v1/chat/completions -H 'Content-Type: application/json' -H "Authorization: Bearer sk-zDxkX0Na0e63B18c9c6bT3BlBkFJf3De387b398749c5bD1d" -d '{"model": "gpt-3.5-turbo","stream":"true","messages": [{"role": "user", "content": "Hello!"}]}'

比较stream和非stream的输出区别,有一下几点:

1.非stream 输出只有一条记录;stream 有若干条,取决于响应内容大小;

2. 非stream 输出包含消耗的tokens数量,stream 没有;

3. 非stream 输出结果是json格式的ChatCompletionResponse结构,stream 输出j格式类似:data:str(ChatCompletionResponse),同时以data:[NONE]结尾;

结合上面的知识,我们就能实现上述功能:

public Publisher<String> generateChatCompletion(ChatCompletionRequest chatCompletionRequest) {
        WebClient.ResponseSpec responseSpec = webClient.post()
                .uri(this.apiUrl + "/chat/completions").header("Authorization", "Bearer " + this.apiKey)//                .accept(MediaType.TEXT_EVENT_STREAM)                
                .bodyValue(chatCompletionRequest)
                .retrieve();
        if (chatCompletionRequest.getStream())
            return                    
            responseSpec.bodyToFlux(ChatCompletionResponse.class)
                            .onErrorResume(error -> {
                                // 异常处理逻辑 
                                logger.error("bodyToFlux error: {}", error);
                                return Flux.empty();
                            })
                            .filter(response -> {
                                ChatMessage message = response.getChoices().get(0).getMessage();
                                if (message != null) {
                                    String content = message.getContent();
                                    return StringUtils.isNotEmpty(StringUtils.trim(content));
                                }
                                return false;
                            })
                            .mapNotNull(response -> {
                                try {
                                    return objectMapper.writeValueAsString(response);
                                } catch (JsonProcessingException e) {
                                    logger.error(e);
                                    return null;
                                }
                            }).concatWithValues("[DONE]");
        else            
        return     
        responseSpec.bodyToMono(ChatCompletionResponse.class)
                            .onErrorResume(error -> {
                                // 异常处理逻辑   
                                logger.error("bodyToMono error: {}", error);
                                return Mono.empty();
                            }).mapNotNull(response -> {
                                try {
                                    return objectMapper.writeValueAsString(response);
                                } catch (JsonProcessingException e) {
                                    logger.error(e);
                                    return null;
                                }
                            });
    }

Publisher是一个通用的概念,它代表一个发布者,可以发布数据或事件。在Spring WebFlux中,Flux和Mono都是Publisher的实现类。

试用地址:https://chatgpt-discount.zeabur.app

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/962817.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

NetSuite as OIDC Provider 演示

书接上回。上次谈了借助第三方身份认证服务SSO登录NetSuite。 NetSuite OIDC、SAML SSO 演示_NetSuite知识会的博客-CSDN博客NetSuite的SSO的策略&#xff1a;第三方的身份认证服务商NetSuite as OIDC Provider。本文演示前者。https://blog.csdn.net/remottshanghai/article/…

“智越”界限,SSOT上海国际智慧办公展览会来啦

随着人工智能、大数据、云计算等技术的快速发展&#xff0c;目前物联网应用已经从概念踏进规模部署的阶段&#xff0c;场景化和规模化已成常态。传统办公室作为企业行政运营的核心场景&#xff0c;也开始受到“科技办公“移动办公”“共享办公”等非传“智慧办公”概念的影响不…

SpringCloudAlibaba Gateway(二)详解-内置Predicate、Filter及自定义Predicate、Filter

Predicate(断言) ​ Predicate(断言)&#xff0c;用于进行判断&#xff0c;如果返回为真&#xff0c;才会路由到具体服务。SpirnngCloudGateway由路由断言工厂实现&#xff0c;直接配置即生效&#xff0c;当然也支持自定义路由断言工厂。 内置路由断言工厂实现 ​ SpringClo…

电子学会 2023年3月 青少年软件编程Python编程等级考试三级真题解析(选择题+判断题+编程题)

青少年编程Python编程等级考试三级真题解析(选择题+判断题+编程题) 2023年3月 一、选择题(共25题,共50分) 十进制数111转换成二进制数是?( ) A. 111 B. 1111011 C. 101111 D. 1101111 答案选:D 考点分析:考察python 进制转换 十进制转二进制,采用除二倒取余数,直到商…

虚拟机ping不通和在虚拟机里不能ping通百度时

一、出现的问题 1.finalshell连接虚拟机超时 2.ping: www.baidu.com: 未知的名称或服务 3.使用cmd无法ping通虚拟机的地址 二、解决方法 查看虚拟机设置 1.选择自定义VMnet8(NAT模式) 2.点击虚拟机虚拟网络编辑器 虚拟机的ip地址必须是192.168.123.xxx 如果不是 例如ip地…

堆对象数组

C自学精简教程 目录(必读) 之前我们学习了基础类型的堆数组 现在我们来看堆数组的元素是类对象的场景 堆对象数组 堆对象数组的每一个元素都是一个类对象。 使用堆对象数组的语法和使用堆数组的语法是一样的。 #include <iostream> #include <string> using …

Sui流动性质押黑客松|那些或许你并不知的SUI质押硬核知识

Sui流动性质押黑客松正在如火如荼的报名&#xff08;9月16日截止&#xff09;&#xff0c;Sui基金会诚邀全球开发者前来参与&#xff0c;助力资产再流通。了解黑客松详情&#xff1a;Sui流动性质押黑客松开启报名&#xff0c;赢取千万美金质押和奖励&#xff01; 黑客松官方网站…

司徒理财:9.1黄金非农前多空如何布局?操作策略

黄金行情走势分析&#xff1a;      昨日黄金的走势和收线变化非常重要&#xff0c;短线内也继续维持于1940上方整理&#xff0c;而日内表现平淡&#xff0c;毫无进军之意&#xff0c;这或许是跟今日非农有关&#xff0c;而本周总体而言&#xff0c;虽然有所突破&#xff0…

多目标应用:基于多目标哈里斯鹰优化算法(MOHHO)的微电网多目标优化调度研究MATLAB

一、微网系统运行优化模型 参考文献&#xff1a; [1]李兴莘,张靖,何宇,等.基于改进粒子群算法的微电网多目标优化调度[J].电力科学与工程, 2021, 37(3):7 二、多目标哈里斯鹰优化算法MOHHO 多目标哈里斯鹰优化算法&#xff08;Multi-Objective Harris Hawks Optimizer&#…

openpyxl: Value must be either numerical or a string containing a wildcard

使用 openpyxl库解析excel表格时遇到如图问题&#xff1a; 后排查在其他电脑上相同的py脚本&#xff0c;相同的excel文件&#xff0c;程序正常; 通过 pip show openyxl 检查发现两者的 openyxl 版本有差异&#xff0c;有问题的是 3.1.2 没问题的是 3.0.10 解决办法&#xff1a…

博弈论基础

简单记录一下博弈论的知识。 博弈的分类&#xff1a; 关键词&#xff1a;正则博弈、扩展博弈、第三类博弈。 市场进入和阻挠博弈&#xff1a; 不完美博弈和不完全博弈&#xff1a; 混合策略和纯策略&#xff1a;

10分钟搞懂敏捷度量

敏捷团队需要选取一些关键指标对生产力、开发流程、产品质量进行度量&#xff0c;从而不断优化开发过程&#xff0c;提升团队效率。原文: Agile Metrics — What Matters and Why? 敏捷已经在几十年内占领了软件开发行业&#xff0c;每个技术组织要么是敏捷的&#xff0c;要么…

【LeetCode题目详解】第九章 动态规划part03 343. 整数拆分 96.不同的二叉搜索树 (day41补)

本文章代码以c为例&#xff01; 一、力扣第343题&#xff1a;整数拆分 题目&#xff1a; 给定一个正整数 n &#xff0c;将其拆分为 k 个 正整数 的和&#xff08; k > 2 &#xff09;&#xff0c;并使这些整数的乘积最大化。 返回 你可以获得的最大乘积 。 示例 1: 输…

Unity生命周期函数

1、Awake 当对象&#xff08;自己这个类对象&#xff0c;就是这个脚本&#xff09;被创建时 才会调用该生命周期函数 类似构造函数的存在 我们可以在一个类对象创建时进行一些初始化操作 2、OnEnable 失活激活&#xff08;这个勾&#xff09; 想要当一个对象&#xff08;游戏…

Android 1.2 开发环境搭建

目录 1.2 开发环境搭建 1.JDK安装与配置 2.开发工具二选一 3.相关术语的解析 4.ADB命令行的一些指令 5.APP程序打包与安装的流程&#xff1a; 6.APP的安装过程&#xff1a; 7.本节小结 1.2 开发环境搭建 现在主流的Android开发环境有: ①Eclipse ADT SDK ②Android Stu…

虚拟掌控者:快速掌握VMware安装技巧

本章目录 一、VMware是什么&#xff1f;二、下载1、方式一&#xff08;直接下载&#xff09;1.1、点击产品1.2、下滑选择Workstation Pro1.3、下滑选择试用版1.4、选择下载对应的环境Windows | Linux 2、方式二 &#xff08;进行登录下载&#xff09;2.1、进入官方地址&#xf…

自动化运维工具-------Ansible(超详细)

一、Ansible相关 1、简介 Ansible是自动化运维工具&#xff0c;基于Python开发&#xff0c;分布式,无需客户端,轻量级&#xff0c;实现了批量系统配置、批量程序部署、批量运行命令等功能&#xff0c;ansible是基于模块工作的,本身没有批量部署的能力。真正具有批量部署的是a…

Android OTA 相关工具(八) 使用 lpadd 添加镜像到 super.img

文章目录 1. lpadd 的编译2. lpadd 的帮助信息3. lpadd 的用法3.1 准备工作empty 的 super 设备镜像raw 格式的 super 设备镜像sparse 格式的 super 设备镜像 3.1 lpadd 分区操作示例 4. 其它 我一直以为没有人会使用 lpadd 工具&#xff0c;就像我以为没有人会去使用 lpmake 手…

文件夹中lib,dll含义

.dll文件是动态链接库&#xff08;Dynamic Link Library&#xff09;的缩写&#xff0c;它包含了一组可执行的函数和数据&#xff0c;供程序调用。它可以被多个应用程序共享和重用&#xff0c;减少了代码的冗余。通过动态链接库&#xff0c;可以实现代码的模块化和提高代码的复…

希尔贝壳入选“北京市人工智能大模型高质量数据集发布(第二批)”合作企业

8月28日&#xff0c;2023中国国际服务贸易交易会通用人工智能算力论坛在石景山区举办。论坛上&#xff0c;北京市人工智能大模型高质量数据集&#xff08;第二批&#xff09;发布&#xff0c;其中包含北京希尔贝壳科技有限公司的“大模型方言口语语音数据集”和“智能会议场景高…