【Kafka】Kafka Stream简单使用

news2025/4/20 4:28:03

一、实时流式计算

1. 概念

一般流式计算会与批量计算相比较。在流式计算模型中,输入是持续的,可以认为在时间上是无界的,也就意味着,永远拿不到全量数据去做计算。同时,计算结果是持续输出的,也即计算结果在时间上也是无界的。流式计算一般对实时性要求较高,同时一般是先定义目标计算,然后数据到来之后将计算逻辑应用于数据。同时为了提高计算效率,往往尽可能采用增量计算代替全量计算
在这里插入图片描述
流式计算就相当于上图的右侧扶梯,是可以源源不断的产生数据,源源不断的接收数据,没有边界。

2. 应用场景

  • 日志分析: 网站的用户访问日志进行实时的分析,计算访问量,用户画像,留存率等等,实时的进行数据分析,帮助企业进行决策
  • 大屏看板统计: 可以实时的查看网站注册数量,订单数量,购买数量,金额等。
  • 公交实时数据: 可以随时更新公交车方位,计算多久到达站牌等
  • 实时文章分值计算

比如应用较广的 头条类文章的分值计算,通过用户的行为实时文章的分值,分值越高就越被推荐

3. Kafka Stream

近些年来,开源流处理领域涌现出了很多优秀框架。光是在 Apache 基金会孵化的项目,关于流处理的大数据框架就有十几个之多,比如早期的 Apache SamzaApache Storm,以及这些年火爆的 Spark 以及 Flink 等。

3.1 Kafka Streams的特点

  • Kafka Stream提供了一个非常简单而轻量的Library,它可以非常方便地嵌入任意Java应用中,也可以任意方式打包和部署
  • 除了Kafka外,无任何外部依赖
  • 充分利用Kafka分区机制实现水平扩展顺序性保证
  • 通过可容错的state store实现高效的状态操作(如windowed joinaggregation
  • 支持正好一次处理语义
  • 提供记录级的处理能力,从而实现毫秒级的低延迟
  • 支持基于事件时间的窗口操作,并且可处理晚到的数据(late arrival of records)
  • 同时提供底层的处理原语Processor(类似于Storm的spout和bolt),以及高层抽象的DSL(类似于Spark的map/group/reduce)

在这里插入图片描述

3.2 关键概念

一个最简单的Streaming的结构如下图所示:
在这里插入图片描述

从一个Topic中读取到数据,经过一些处理操作之后,写入到另一个Topic中,这就是一个最简单的Streaming流式计算。其中,Source Topic中的数据会源源不断的产生新数据。
那么,我们再在上面的结构之上扩展一下,假设定义了多个Source TopicDestination Topic,那就构成如下图所示的较为复杂的拓扑结构:
在这里插入图片描述

  • 源处理器(Source Processor):源处理器是一个没有任何上游处理器的特殊类型的流处理器。它从一个或多个kafka主题生成输入流。通过消费这些主题的消息并将它们转发到下游处理器
  • Sink处理器:sink处理器是一个没有下游流处理器的特殊类型的流处理器。它接收上游流处理器的消息发送到一个指定的Kafka主题
    在这里插入图片描述
    Kafka Streams被认为是开发实时应用程序的最简单方法。它是一个Kafka的客户端API库,编写简单的java就可以实现流式处理。

3.3 KStream

KStream:数据结构类似于map,如下图,key-value键值对

在这里插入图片描述

KStream数据流(data stream),是一段顺序的,可以无限长,不断更新的数据集。
数据流中比较常记录的是事件,这些事件可以是一次鼠标点击(click),一次交易,或是传感器记录的位置数据。

KStream负责抽象的,就是数据流。与Kafka自身topic中的数据一样,类似日志,每一次操作都是向其中插入(insert)新数据。

二、测试kafkaStream

先看下简单的kafkaStreamKStream测试

需求分析:求单词个数(word count)
在这里插入图片描述

1. pom.xml引入依赖:

       <!-- kafka -->
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
            <exclusions>
                <exclusion>
                    <groupId>org.apache.kafka</groupId>
                    <artifactId>kafka-clients</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
        </dependency>

        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-streams</artifactId>
            <exclusions>
                <exclusion>
                    <artifactId>connect-json</artifactId>
                    <groupId>org.apache.kafka</groupId>
                </exclusion>
                <exclusion>
                    <groupId>org.apache.kafka</groupId>
                    <artifactId>kafka-clients</artifactId>
                </exclusion>
            </exclusions>
        </dependency>

2. 配置文件

server:
  port: 9991
spring:
  application:
    name: kafka-demo
  kafka:
    bootstrap-servers: 192.168.200.130:9092
    producer:
      retries: 10
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
      compression-type: lz4
    consumer:
      group-id: ${spring.application.name}-test
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

3. 编写生产者

ProducerQuickStart.java

package com.kafka.sample;

import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.producer.*;

import java.util.Properties;

@Slf4j
public class ProducerQuickStart {

    public static void main(String[] args) {

        //1. kafka的配置信息
        Properties prop = new Properties();
        //kafka的链接信息
        prop.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.200.130:9092");
        //配置重试次数
        prop.put(ProducerConfig.RETRIES_CONFIG, 5);
        //数据压缩
        prop.put(ProducerConfig.COMPRESSION_TYPE_CONFIG,"lz4");
        //ack配置  消息确认机制   默认ack=1,即只要集群首领节点收到消息,生产者就会收到一个来自服务器的成功响应
//        prop.put(ProducerConfig.ACKS_CONFIG,"all");

        消息key的序列化器
        prop.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        //消息value的序列化器
        prop.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");

        //2. 生产者对象
        KafkaProducer<String, String> producer = new KafkaProducer<String, String>(prop);
        //封装发送的消息
        ProducerRecord<String, String> producerRecord = new ProducerRecord<String, String>("itcast-topic-input", "key_001", "hello kafka");

        //3. 发送消息
        for (int i = 0; i < 5; i++) {
            producer.send(producerRecord);
        }

        //4. 关闭消息通道  必须关闭,否则消息发不出去
        producer.close();

    }
}

4 编写kafkaStream流式处理

KafkaStreamQuickStart.java

package com.kafka.sample;

import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.kstream.ValueMapper;

import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;

/**
 * 流式处理
 */
public class KafkaStreamQuickStart {

    public static void main(String[] args) {

        //kafka的配置信心
        Properties prop = new Properties();
        prop.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.200.130:9092");
        prop.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        prop.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        prop.put(StreamsConfig.APPLICATION_ID_CONFIG,"streams-quickstart");

        //stream 构建器
        StreamsBuilder streamsBuilder = new StreamsBuilder();

        //流式计算
        streamProcessor(streamsBuilder);


        //创建kafkaStream对象
        KafkaStreams kafkaStreams = new KafkaStreams(streamsBuilder.build(),prop);
        //开启流式计算
        kafkaStreams.start();
    }

    /**
     * 流式计算
     * 消息的内容:hello kafka  hello itcast
     * @param streamsBuilder
     */
    private static void streamProcessor(StreamsBuilder streamsBuilder) {
        //创建kstream对象,同时指定从那个topic中接收消息
        KStream<String, String> stream = streamsBuilder.stream("itcast-topic-input");
        /**
         * 处理消息的value
         */
        stream.flatMapValues(new ValueMapper<String, Iterable<String>>() {
            @Override
            public Iterable<String> apply(String value) {
                return Arrays.asList(value.split(" "));
            }
        })
                //按照value进行聚合处理
                .groupBy((key,value)->value)
                //时间窗口
                .windowedBy(TimeWindows.of(Duration.ofSeconds(10)))
                //统计单词的个数
                .count()
                //转换为kStream
                .toStream()
                .map((key,value)->{
                    System.out.println("key:"+key+",vlaue:"+value);
                    return new KeyValue<>(key.key().toString(),value.toString());
                })
                //发送消息
                .to("itcast-topic-out");

    }
}

5. 编写消费者

ConsumerQuickStart.java

package com.kafka.sample;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.time.Duration;
import java.util.Collections;
import java.util.Properties;

public class ConsumerQuickStart {

    public static void main(String[] args) {

        //1. 添加kafka的配置信息
        Properties properties = new Properties();
        // 配置链接信息
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.200.130:9092");
        //配置消费者组
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, "group-2");
        //配置消息的反序列化器
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");

        //2. 消费者对象
        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);

        //3. 订阅主题
        consumer.subscribe(Collections.singletonList("itcast-topic-out"));

        //当前线程一直监听消息
        while(true){
            //4. 消费者拉取消息: 每秒拉取一次
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
            for (ConsumerRecord<String, String> record : records) {
                System.out.println(record.key());
                System.out.println(record.value());
            }
        }

    }
}

启动项目:

  1. 在远端(192.168.200.130:9092)启动docker中的kafka容器
  2. 启动消费者ConsumerQuickStartmain函数
  3. 启动kafkastreammian函数
  4. 启动生产者ProducerQuickStartmain函数

5. 控制台打印结果:

在这里插入图片描述
在这里插入图片描述

整个过程:
生产者向kafka中发送了5条“hello kafka”消息,topic均为itcast-topic-input。kafkastream监听这个topic,每10秒进行一次流式处理,将“hello kakfa”字符串分割,并统计每个单词出现的次数。然后转为kstream,发送消息到kafka中的topic=itcast-topic-out”。消费者监听“itcast-topic-out”的topic,消费消息。

三、Springboot整合kafkaStream

1. 配置文件新增

application.yml

server:
  port: 9991
spring:
  application:
    name: kafka-demo
  kafka:
    bootstrap-servers: 192.168.200.130:9092
    producer:
      retries: 10
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
      compression-type: lz4
    consumer:
      group-id: ${spring.application.name}-test
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
# kafkaStream新增以下配置
kafka:
  hosts: 192.168.200.130:9092
  group: ${spring.application.name}

2. 在微服务中新增配置类

KafkaStreamConfig.java

package com.kafka.config;

import lombok.Getter;
import lombok.Setter;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.StreamsConfig;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafkaStreams;
import org.springframework.kafka.annotation.KafkaStreamsDefaultConfiguration;
import org.springframework.kafka.config.KafkaStreamsConfiguration;

import java.util.HashMap;
import java.util.Map;

/**
 * 通过重新注册KafkaStreamsConfiguration对象,设置自定配置参数
 */

@Setter
@Getter
@Configuration
@EnableKafkaStreams
@ConfigurationProperties(prefix="kafka")
public class KafkaStreamConfig {
    private static final int MAX_MESSAGE_SIZE = 16* 1024 * 1024;
    private String hosts;
    private String group;
    @Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
    public KafkaStreamsConfiguration defaultKafkaStreamsConfig() {
        Map<String, Object> props = new HashMap<>();
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, hosts);
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, this.getGroup()+"_stream_aid");
        props.put(StreamsConfig.CLIENT_ID_CONFIG, this.getGroup()+"_stream_cid");
        props.put(StreamsConfig.RETRIES_CONFIG, 10);
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        return new KafkaStreamsConfiguration(props);
    }
}

3. 使用kafkaStream监听消息

KafkaStreamHelloListener.java

package com.kafka.stream;

import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.kstream.ValueMapper;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.time.Duration;
import java.util.Arrays;

@Configuration
@Slf4j
public class KafkaStreamHelloListener {

    @Bean
    public KStream<String,String> kStream(StreamsBuilder streamsBuilder){
        //创建kstream对象,同时指定从那个topic中接收消息
        KStream<String, String> stream = streamsBuilder.stream("itcast-topic-input");
        stream.flatMapValues(new ValueMapper<String, Iterable<String>>() {
            @Override
            public Iterable<String> apply(String value) {
                return Arrays.asList(value.split(" "));
            }
        })
                //根据value进行聚合分组
                .groupBy((key,value)->value)
                //聚合计算时间间隔
                .windowedBy(TimeWindows.of(Duration.ofSeconds(10)))
                //求单词的个数
                .count()
                .toStream()
                //处理后的结果转换为string字符串
                .map((key,value)->{
                    System.out.println("key:"+key+",value:"+value);
                    return new KeyValue<>(key.key().toString(),value.toString());
                })
                //发送消息
                .to("itcast-topic-out");
        return stream;
    }
}

测试:

启动springboot应用程序,运行之前的ProducerQuickStart来生产消息,约10秒后,看到kafkaStream消息的处理结果
在这里插入图片描述

说明kafkaStream接收到消息并将多条消息进行了统一处理。

参考(推荐阅读):

  1. https://cloud.tencent.com/developer/article/2100664
  2. https://www.cnblogs.com/tree1123/p/11457851.html

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/937569.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

微信小程序 基于Android的美容理发师预约管理系统

&#xff0c;本系统主要根据管理员、用户及理发师的实际需要&#xff0c;方便用户利用互联网实现对商品信息进行立即订购&#xff0c;同时让管理者可以通过这个系统对用户实际需求以及各信息进行管理。设计该系统主要目的是为了方便用户、理发师可以有一个非常好的平台体验&…

Mac下Docker Desktop开启本地远程访问

mac系统下&#xff0c;为了在idea里方便使用docker&#xff0c;需要开启Docker Desktop本地远程访问。 开启方法是在设置-高级下&#xff0c;开启“Allow the default Docker socket to be used (requires password)”&#xff0c;特此记录一下&#xff1a; 开启后的效果&…

iOS swift5 扫描二维码

文章目录 1.生成二维码图片2.扫描二维码&#xff08;含上下扫描动画&#xff09;2.1 记得在info.plist中添加相机权限描述 1.生成二维码图片 import UIKit import CoreImagefunc generateQRCode(from string: String) -> UIImage? {let data string.data(using: String.En…

计算机视觉:深层卷积神经网络的构建

本文重点 上一节课程中我们学习了单卷积层的前向传播,本次课程我们构建一个具有三个卷积层的卷积神经网络,然后从输入(39*39*3)开始进行三次卷积操作,我们来看一下每次卷积的输入和输出维度的变化。 第一层 第一层使用3*3*3的过滤器来提取特征,那么f[1]=3,然后步长s[…

4. 池化层相关概念

4.1 池化层原理 ① 最大池化层有时也被称为下采样。 ② dilation为空洞卷积&#xff0c;如下图所示。 ③ Ceil_model为当超出区域时&#xff0c;只取最左上角的值。 ④ 池化使得数据由5 * 5 变为3 * 3,甚至1 * 1的&#xff0c;这样导致计算的参数会大大减小。例如1080P的电…

PHP8的匿名函数-PHP8知识详解

php 8引入了匿名函数&#xff08;Anonymous Functions&#xff09;&#xff0c;它是一种创建短生命周期的函数&#xff0c;不需要命名&#xff0c;并且可以在其作用域内直接使用。以下是在PHP 8中使用匿名函数的知识要点&#xff1a; 1、创建匿名函数&#xff0c;语法格式如下&…

7.react useReducer使用与常见问题

useReducer函数 1. useState的替代方案.接收一个(state, action)>newState的reducer, 并返回当前的state以及与其配套的dispatch方法2. 在某些场景下,useReducer会比useState更加适用,例如state逻辑较为复杂, 且**包含多个子值**,或者下一个state依赖于之前的state等清楚us…

postgresql-日期函数

postgresql-日期函数 日期时间函数计算时间间隔获取时间中的信息截断日期/时间创建日期/时间获取系统时间CURRENT_DATE当前事务开始时间 时区转换 日期时间函数 PostgreSQL 提供了以下日期和时间运算的算术运算符。 计算时间间隔 age(timestamp, timestamp)函数用于计算两…

Uniapp笔记(五)uniapp语法4

本章目标 授权登录【难点、重点】 条件编译【理解】 小程序分包【理解】 一、授权登录 我的模块其实是两个组件&#xff0c;一个是登录组件&#xff0c;一个是用户信息组件&#xff0c;根据用户的登录状态判断是否要显示那个组件 1、登录的基本布局 <template><…

LLMs NLP模型评估Model evaluation ROUGE and BLEU SCORE

在整个课程中&#xff0c;你看到过类似模型在这个任务上表现良好&#xff0c;或者这个微调模型在性能上相对于基础模型有显著提升等陈述。 这些陈述是什么意思&#xff1f;如何形式化你的微调模型在你起初的预训练模型上的性能改进&#xff1f;让我们探讨一些由大型语言模型开…

【Linux】【驱动】注册字符设备号

【Linux】【驱动】注册字符设备号 1. 绪论1 、静态分配设备号2、动态分配设备号3、注销设备号 2 实现的代码3 加载驱动程序 1. 绪论 在之前杂项设备的时候&#xff0c;设备号是固定的&#xff0c;字符设备就需要自己去申请设备号了&#xff0c; 申请设备号有两个方式&#xff…

2024年Android应用开发的6大框架

2024年Android应用开发的6大Framwork 2024年Android应用开发的6大框架&#xff0c;影响移动应用开发领域&#xff0c;改变应用的创建和用户使用方式。随着移动应用市场不断发展&#xff0c;对灵活和高效框架的需求也在增加。这些框架为开发人员提供资源和工具&#xff0c;构建…

手写RPC框架--1.介绍与网络传输

介绍与网络传输 0.介绍a.什么是rpcb.rpc的通信流程 1.网络传输a.零拷贝1) 零拷贝的概念2) Netty的零拷贝 b.IO多路复用c.Netty入门1) netty中的helloworld d.封装报文1) 协议结构2) 模拟封装报文 e.序列化f.压缩和解压缩 0.介绍 a.什么是rpc rpc 的全称是 Remote Procedure C…

S905L3A(M401A)拆解, 运行EmuELEC和Armbian

关于S905L3A / S905L3AB S905Lx系列没有公开资料, 猜测是Amlogic用于2B的芯片型号, 最早的 S905LB 是 S905X 的马甲, 而这个 S905L3A/S905L3AB 则是 S905X2 的马甲, 因为在性能评测里这两个U的得分几乎一样. S905L3A/S905L3AB 和 S905X2, S905X3 一样 GPU 是 G31, 相比前一代的…

【Linux】深入理解文件操作

文章目录 初次谈论文件重温C语言文件操作系统文件操作接口openwriteread 再次谈论文件文件描述符文件描述符的分配规则 重定向什么是重定向重定向的本质系统调用接口实现重定向<、>、>> 初次谈论文件 开始之前先谈论一下关于文件的一些共识性问题。 一个文件可以…

(笔记一)利用open_cv在图像上进行点标记,文字注记,画圆、多边形、椭圆

&#xff08;1&#xff09;CV2中的绘图函数&#xff1a; cv2.line() 绘制线条cv2.circle() 绘制圆cv2.rectangle() 绘制矩形cv2.ellipse() 绘制椭圆cv2.putText() 添加注记 &#xff08;2&#xff09;注释 img表示需要绘制的图像color表示线条的颜色&#xff0c;采用颜色矩阵…

联想电脑装系统无法按F9后无法从系统盘启动的解决方案

开机时按F9发现没有加载系统盘. 打开BIOS设置界面&#xff0c;调整设置如下: BOOT MODE: Legacy Support.允许legacy方式boot. BOOT PRIORITY: Legacy First. Legacy方式作为首选的boot方式. USB BOOT: ENABLED. 允许以usb方式boot. Legacy: 这里设置legacy boot的优先级,…

保姆级教程:从0到1使用Stable Diffusion XL训练LoRA模型 |【人人都是算法专家】

Rocky Ding 公众号&#xff1a;WeThinkIn 写在前面 【人人都是算法专家】栏目专注于分享Rocky在AI行业中对业务/竞赛/研究/产品维度的思考与感悟。欢迎大家一起交流学习&#x1f4aa; 大家好&#xff0c;我是Rocky。 Rocky在知乎上持续撰写Stable Diffusion XL全方位的解析文章…

HTML之VSCode简单配置与创建

目录 插件下载 然后输入源码&#xff1a; 使用 效果 插件下载 下载这个插件后可以直接运行&#xff1a; 然后创建一个文件&#xff1a; 然后输入源码&#xff1a; <!DOCTYPE html> <html lang"en"> <head><meta charset"UTF-8"…

渗透测试工具ZAP入门教程(1)-安装和快速开始

介绍 ZAP Zed Attack Proxy&#xff08;ZAP&#xff09;是一个免费的开源渗透测试工具&#xff0c;在 软件安全项目 &#xff08;SSP&#xff09;。ZAP 专为测试 Web 应用程序而设计&#xff0c;既灵活又可扩展。 ZAP的核心是所谓的“中间人代理”。它位于测试人员的浏览器和…