【Flink系列】开发篇:1. Flink维表关联方案

news2025/1/21 3:04:03

数据流往往需要访问外部的数据源来丰富自己的信息,比如通过record中的ip地址查询ip数据库maxmind的GeoIP2 Databases得到ip对应的城市名称,城市经纬度,将这些作为新的字段添加到原来的record中。这就涉及到本篇的主题:维表关联。

网上关于flink中维表关联的博文很多,本文我想谈一谈个人对不同方案的理解和尝试后发现的一些问题。如果想要比较全面地了解维表关联的各个解决方案,建议阅读参考文献前两篇。

技术选型

维表关联方案主要有以下几种

  • 实时数据库查找关联,又叫热存储维表
  • 预加载维表关联
  • 广播维表关联
  • 维表变更日志关联,最常见的就是Temporal table function join

这几种方案各有优劣,没有最好的方案,只有最适合的方案。

所谓实时数据库查找就是Flink中的算子保持与数据库的连接,每来一条record就提取关键字,直接查找外部的数据库。这个方案最致命的问题在于这种实时访问外部数据库进行查询的方式是很影响作业性能的,对数据库的负载很大,导致吞吐量很难提上去。而且大数据的流量一般都很大,频繁访问数据库导致产线上的数据库挂掉那就是重大的生产故障。当然,针对这个问题也有一些解决方案,比如同步查找可以替换为异步查找,还可以使用缓存使得热点数据直接在内存中就能找到不用访问外部数据库。Anyway,带来的性能提升效果有限,这种方案主要还是适用于流量不大的场景。

预加载维表关联就是在任务启动的时候就把维表加载在内存中,查找的时候直接在内存中找就可以了。这个方案查找的性能是最高的,毕竟直接在内存中查找。但它也有一些局限性,一是占用更多的内存资源,如果维表非常大(比如大于TM内存),就不可取;二是维表很难实时更新,尽管可以设置定时器定时刷新维表,但是如果维表更新的太频繁性能消耗就太大了。总的来说,这种方案适合维表不是非常大,维表更新也不是很频繁的场景。(该方案实现简单,性能高,也是我最终选择的方案)

前面两种方案都属于数据流与静态的表之间的关联,而后面两种方案则是数据流与数据流之间的关联。所谓广播维表就是将维表转化为广播流从Source广播到下游的算子中,然后作为广播态保存到State Backend中,可以是内存,也可以是rocksdb。将广播态保存到rocksdb中每次读取状态都涉及到序列化和反序列化,对性能是有一定影响的。将广播态以MapState的形式保存在heap中和预加载维表关联就比较类似。

我这边将预加载维表关联和广播维表这两个方案做一个对比:

  1. 都可以将完整的维表保存在内存中,维表查询性能较高。但是广播维表需要将维表从上游广播到下游,涉及到不同节点的数据传输(网络传输,序列化和反序列化等),会带来额外的性能损失。但是作为广播态保存,不同的slots可以共享广播态,每个TM只需要保存一份维表,而不是每个slots保存一份,内存的利用率更高。
    在这里插入图片描述
  2. 广播维表需要把维表转化为数据流。好处是维表的实时性更高,方便实时更新。不好的地方是通常是把维表存储在Kafka中,考虑到实时性,实现上更复杂。也可以自定义source定时把最新的维表转化为数据流,和预加载维表的定时刷新方案一样,但这样维表更新就有延迟。
  3. 维表广播只能是数据流和一条广播流的join,不可以数据流和多条广播流join。预加载维表方案同一个算子可以预加载多个维表,维表广播的方案就需要把多个维表转化为同一个数据流进行广播,然后保存在不同名字的广播态中。实现起来比较复杂,另外就是代码聚合程度太高,很不优雅。

总的来说,广播维表方案维表的实时性高,数据查询性能高,资源利用率也高,属于比较全面的一个方案。缺点主要在于实现上较为复杂,而且也要求维表不能太大。

最后提一下维表变更日志关联,主要是Temporal table function join。目前Datastream API不支持,需要写Table API/Sql。这一块我没有做太多研究,就此不表。

代码示例

实时查询外部数据库

使用cache来减轻访问压力

package join;

import com.google.common.cache.*;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;

public class JoinDemo2 {
    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStream<Tuple2<String, Integer>> textStream = env.socketTextStream("localhost", 9000, "\n")
                .map(p -> {
                    //输入格式为:user,1000,分别是用户名称和城市编号
                    String[] list = p.split(",");
                    return new Tuple2<String, Integer>(list[0], Integer.valueOf(list[1]));
                })
                .returns(new TypeHint<Tuple2<String, Integer>>() {
                });

        DataStream<Tuple3<String, Integer, String>> result = textStream.map(new MapJoinDemo1());
        result.print();
        env.execute("joinDemo1");
    }

    static class MapJoinDemo1 extends RichMapFunction<Tuple2<String, Integer>, Tuple3<String, Integer, String>> {
        LoadingCache<Integer, String> dim;

        @Override
        public void open(Configuration parameters) throws Exception {
            //使用google LoadingCache来进行缓存
            dim = CacheBuilder.newBuilder()
                    //最多缓存个数,超过了就根据最近最少使用算法来移除缓存
                    .maximumSize(1000)
                    //在更新后的指定时间后就回收
                    .expireAfterWrite(10, TimeUnit.MINUTES)
                    //指定移除通知
                    .removalListener(new RemovalListener<Integer, String>() {
                        @Override
                        public void onRemoval(RemovalNotification<Integer, String> removalNotification) {
                            System.out.println(removalNotification.getKey() + "被移除了,值为:" + removalNotification.getValue());
                        }
                    })
                    .build(
                            //指定加载缓存的逻辑
                            new CacheLoader<Integer, String>() {
                                @Override
                                public String load(Integer cityId) throws Exception {
                                    String cityName = readFromHbase(cityId);
                                    return cityName;
                                }
                            }
                    );

        }

        private String readFromHbase(Integer cityId) {
            //读取hbase
            //这里写死,模拟从hbase读取数据
            Map<Integer, String> temp = new HashMap<>();
            temp.put(1001, "beijing");
            temp.put(1002, "shanghai");
            temp.put(1003, "wuhan");
            temp.put(1004, "changsha");
            String cityName = "";
            if (temp.containsKey(cityId)) {
                cityName = temp.get(cityId);
            }

            return cityName;
        }

        @Override
        public Tuple3<String, Integer, String> map(Tuple2<String, Integer> value) throws Exception {
            //在map方法中进行主流和维表的关联
            String cityName = "";
            if (dim.get(value.f1) != null) {
                cityName = dim.get(value.f1);
            }
            return new Tuple3<>(value.f0, value.f1, cityName);
        }
    }
}

使用异步IO来提高访问吞吐量

package join;

import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.AsyncDataStream;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.async.ResultFuture;
import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;

public class JoinDemo3 {
    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStream<Tuple2<String, Integer>> textStream = env.socketTextStream("localhost", 9000, "\n")
                .map(p -> {
                    //输入格式为:user,1000,分别是用户名称和城市编号
                    String[] list = p.split(",");
                    return new Tuple2<String, Integer>(list[0], Integer.valueOf(list[1]));
                })
                .returns(new TypeHint<Tuple2<String, Integer>>() {
                });


        DataStream<Tuple3<String,Integer, String>> orderedResult = AsyncDataStream
                //保证顺序:异步返回的结果保证顺序,超时时间1秒,最大容量2,超出容量触发反压
                .orderedWait(textStream, new JoinDemo3AyncFunction(), 1000L, TimeUnit.MILLISECONDS, 2)
                .setParallelism(1);

        DataStream<Tuple3<String,Integer, String>> unorderedResult = AsyncDataStream
                //允许乱序:异步返回的结果允许乱序,超时时间1秒,最大容量2,超出容量触发反压
                .unorderedWait(textStream, new JoinDemo3AyncFunction(), 1000L, TimeUnit.MILLISECONDS, 2)
                .setParallelism(1);

        orderedResult.print();
        unorderedResult.print();
        env.execute("joinDemo");
    }

    //定义个类,继承RichAsyncFunction,实现异步查询存储在mysql里的维表
    //输入用户名、城市ID,返回 Tuple3<用户名、城市ID,城市名称>
    static class JoinDemo3AyncFunction extends RichAsyncFunction<Tuple2<String, Integer>, Tuple3<String, Integer, String>> {
        // 链接
        private static String jdbcUrl = "jdbc:mysql://192.168.145.1:3306?useSSL=false";
        private static String username = "root";
        private static String password = "123";
        private static String driverName = "com.mysql.jdbc.Driver";
        java.sql.Connection conn;
        PreparedStatement ps;

        @Override
        public void open(Configuration parameters) throws Exception {
            super.open(parameters);

            Class.forName(driverName);
            conn = DriverManager.getConnection(jdbcUrl, username, password);
            ps = conn.prepareStatement("select city_name from tmp.city_info where id = ?");
        }

        @Override
        public void close() throws Exception {
            super.close();
            conn.close();
        }

        //异步查询方法
        @Override
        public void asyncInvoke(Tuple2<String, Integer> input, ResultFuture<Tuple3<String,Integer, String>> resultFuture) throws Exception {
            // 使用 city id 查询
            ps.setInt(1, input.f1);
            ResultSet rs = ps.executeQuery();
            String cityName = null;
            if (rs.next()) {
                cityName = rs.getString(1);
            }
            List list = new ArrayList<Tuple2<Integer, String>>();
            list.add(new Tuple3<>(input.f0,input.f1, cityName));
            resultFuture.complete(list);
        }

        //超时处理
        @Override
        public void timeout(Tuple2<String, Integer> input, ResultFuture<Tuple3<String,Integer, String>> resultFuture) throws Exception {
            List list = new ArrayList<Tuple2<Integer, String>>();
            list.add(new Tuple3<>(input.f0,input.f1, ""));
            resultFuture.complete(list);
        }
    }
}

预加载维表+定时刷新

我的维表是以文件的形式保存在本地磁盘中的。

如果是保存在外部数据库可参考参考文献4

 public static class MyMapFunction extends RichMapFunction<String,String>{
        private transient HashMap<String, String> hashMap;
        private HashMap<String,String> readTxtFile(String filePath) {
            HashMap<String,String> hashMap = new HashMap<>();
            File file = new File(filePath);
            try {
                if (file.isFile() && file.exists()) { //判断文件是否存在
                    InputStreamReader read = new InputStreamReader(new FileInputStream(file), "UTF-8");//考虑到编码格式
                    BufferedReader bufferedReader = new BufferedReader(read);
                    // String lineTxt = null;
                    while (bufferedReader.readLine() != null) {
                        String lineTxt = bufferedReader.readLine();
                        String[] str = lineTxt.split(",");
                        if (str.length == 2) {
                            hashMap.put(str[0], str[1]);
                        }
                    }
                    read.close();
                } else {
                    System.out.println("找不到指定的文件");
                }
            } catch (Exception e) {
                System.out.println("读取文件内容出错");
                e.printStackTrace();
            }
            return hashMap;
        }
        @Override
        public void open(Configuration parameters) throws Exception {
            super.open(parameters);

            String filePath = "input/data100.txt";

            ScheduledExecutorService timer = Executors.newSingleThreadScheduledExecutor();
            timer.scheduleAtFixedRate(new Runnable() {
                @Override
                public void run() {
                    hashMap = readTxtFile(filePath);
                }
            }, 0, 5, TimeUnit.SECONDS);

        }

        @Override
        public String map(String s) throws Exception {
            return hashMap.size() + "";
        }
    }
  

广播态

package join;

import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.state.BroadcastState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.ReadOnlyBroadcastState;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.BroadcastStream;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction;
import org.apache.flink.util.Collector;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

/**
 * 这个例子是从socket中读取的流,数据为用户名称和城市id,维表是城市id、城市名称,
 * 主流和维表关联,得到用户名称、城市id、城市名称
 * 这个例子采用 Flink 广播流的方式来做为维度
 **/
public class JoinDemo4 {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //定义主流
        DataStream<Tuple2<String, Integer>> textStream = env.socketTextStream("localhost", 9000, "\n")
                .map(p -> {
                    //输入格式为:user,1000,分别是用户名称和城市编号
                    String[] list = p.split(",");
                    return new Tuple2<String, Integer>(list[0], Integer.valueOf(list[1]));
                })
                .returns(new TypeHint<Tuple2<String, Integer>>() {
                });

        //定义城市流
        DataStream<Tuple2<Integer, String>> cityStream = env.socketTextStream("localhost", 9001, "\n")
                .map(p -> {
                    //输入格式为:城市ID,城市名称
                    String[] list = p.split(",");
                    return new Tuple2<Integer, String>(Integer.valueOf(list[0]), list[1]);
                })
                .returns(new TypeHint<Tuple2<Integer, String>>() {
                });

        //将城市流定义为广播流
        final MapStateDescriptor<Integer, String> broadcastDesc = new MapStateDescriptor("broad1", Integer.class, String.class);
        BroadcastStream<Tuple2<Integer, String>> broadcastStream = cityStream.broadcast(broadcastDesc);

        DataStream result = textStream.connect(broadcastStream)
                .process(new BroadcastProcessFunction<Tuple2<String, Integer>, Tuple2<Integer, String>, Tuple3<String, Integer, String>>() {
                    //处理非广播流,关联维度
                    @Override
                    public void processElement(Tuple2<String, Integer> value, ReadOnlyContext ctx, Collector<Tuple3<String, Integer, String>> out) throws Exception {
                        ReadOnlyBroadcastState<Integer, String> state = ctx.getBroadcastState(broadcastDesc);
                        String cityName = "";
                        if (state.contains(value.f1)) {
                            cityName = state.get(value.f1);
                        }
                        out.collect(new Tuple3<>(value.f0, value.f1, cityName));
                    }

                    @Override
                    public void processBroadcastElement(Tuple2<Integer, String> value, Context ctx, Collector<Tuple3<String, Integer, String>> out) throws Exception {
                        System.out.println("收到广播数据:" + value);
                        ctx.getBroadcastState(broadcastDesc).put(value.f0, value.f1);
                    }
                });


        result.print();
        env.execute("joinDemo");
    }
}

参考文献

  1. 实时数仓之Flink维表关联难点解决方案
  2. Flink重点难点:维表关联理论和Join实战
  3. Flink State 误用之痛,你中招了吗?
  4. Flink 维表关联之全量预加载+定时刷新

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/158013.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

分布式锁方案分析:看图说话(图+文)

1 缘起 曾经在看分布式锁的时候&#xff0c;还是处于了解阶段&#xff0c; 回头总结时&#xff0c;发现有很多细节没有探究到&#xff0c; 本文以-看图说话的方式分析不同的分布式锁方案&#xff0c; 分布式锁需要保证&#xff1a; &#xff08;1&#xff09;互斥性&#xff1…

【从零开始学习深度学习】46. 目标检测中锚框的概念、计算方法、样本锚框标注方式及如何选取预测边界框

本文主要介绍目标检测中常用到的锚框相关概念、计算方式、样本标注及如何选取预测边界框并输出的相关内容。 目录1. 锚框介绍1.1 生成多个锚框2. 交并比--Jaccard系数3. 标注训练集的锚框4. 输出预测边界框---非极大值抑制方法总结1. 锚框介绍 在目标检测算法中通常会在输入图…

Linux常用命令——xhost命令

在线Linux命令查询工具(http://www.lzltool.com/LinuxCommand) xhost 制哪些X客户端能够在X服务器上显示 补充说明 xhost命令是X服务器的访问控制工具&#xff0c;用来控制哪些X客户端能够在X服务器上显示。该命令必须从有显示连接的机器上运行。可以通过使用-host参数&…

​Topaz Photo AI 人工智能图像降噪锐化放大

Topaz Photo AI 是一款强大的基于人工智能技术的降噪、锐化及放大的工具。它不仅可以作为独立的软件使用&#xff0c;也可作为 Photoshop 的插件&#xff0c;以及能在 Lightroom Classic、Capture One 中调用。在 Lightroom Classic 中提供了两种工作流程&#xff0c;一种是直接…

while和do while的用法区别

前言在上一篇文章中&#xff0c;壹哥给大家讲解了循环的概念&#xff0c;并重点给大家讲解了for循环的使用。但在Java中&#xff0c;除了for循环之外&#xff0c;还有while、do-while、foreach等循环形式。今天小千就再用一篇文章&#xff0c;给大家讲解while循环的使用。本文带…

webshell 一句话木马

Webshell&#xff08;大马&#xff09;&#xff1a;webshell就是以asp、aspx、php、jsp或者cgi等网页形式存在的一种命令执行环境&#xff0c;也将其称为一种网页后门。黑客入侵一个网站后&#xff0c;通常会将 asp、aspx、php 或 jsp 后门文件与网站 web 服务器目录下正常的网…

基础算法(三)——二分查找

二分查找 介绍 一种复杂度为O(logn)O(logn)O(logn)级别的查找算法&#xff0c;需要被查找的数列具有某种单调性质&#xff0c;其本质其实是搜索一个符合check条件的区间。 二分分为两种&#xff1a; 整数二分浮点数二分 核心思想&#xff1a; 首先讨论整数二分&#xff1…

Django搭建个人博客Blog-Day02

配置文件的介绍&#xff1a;dev.py&#xff08;原来的setting.py文件&#xff09;# django的配置文件中的配置项是什么意思&#xff1f; import os # 导入模块# Build paths inside the project like this: os.path.join(BASE_DIR, ...) BASE_DIR os.path.dirname(os.path.d…

测牛学堂:软件测试学习之python调试和判断嵌套

python中调试代码 在python中&#xff0c;使用debug来调试代码。 我们使用debug的目的&#xff0c;就是可以查看代码的执行过程。 步骤&#xff1a; 1 打断点。打断点是开发的术语&#xff0c;类似于打标记&#xff0c;debug会让程序在你打断点的地方停止执行。 如果要查看代码…

RabbitMQ(六)消息应答和持久化

目录一、RabbitMQ 消息应答二、RabbitMQ 持久化1.交换机的持久化2.队列的持久化3.消息的持久化4.持久化问题官网地址&#xff1a;https://www.rabbitmq.com/ 下载地址&#xff1a;https://www.rabbitmq.com/download.html 一、RabbitMQ 消息应答 ​ 执行一个任务可能需要花费…

petitlyrics 歌词提取 有感

想做一下歌曲的时间轴&#xff0c;搜歌词搜到了这个网站。奇怪的是看前端代码和network监听请求都不能获得完整歌词。如 https://petitlyrics.com/lyrics/934773a. 歌词截图如下&#xff1a;b. 控制台查看前端代码只有部分歌词c. Network查看请求数据&#xff0c;发现是日语对应…

小白和设计师都能用的 3D 渲染神器 #Rotato

“我非常喜欢它。它为我节省了很多时间&#xff0c;而不必在 Adobe After Effects 等应用程序中挣扎。”——Dominik Sobe on Product HuntRotato 是什么&#xff1f;Rotato 是一款功能强大的 3D 样机渲染神器&#xff0c;支持 PNG 、JPG 、 avi、mov 、mp4 等多种格式。不仅能…

10分钟在 Rainbond 上部署 mall 电商项目

很多小伙伴在学习 mall 电商项目时&#xff0c;都会在部署上折腾许久&#xff0c;虽然目前已经提供了很多种部署方式&#xff0c;比如 在 Linux 上部署 mall 、使用 Docker 或 DockerCompose 部署 mall &#xff0c;但对于正在学习的我们都显得比较复杂&#xff0c;需要理解并学…

Vue.js组件编程的知识要点

在C/S编程中&#xff0c;对程序员来说&#xff0c;组件编程是一个不能忽视或者越过的技术能力&#xff0c;特别是自定义的组件编程以及构建基础组件库。虽然组件编程不是必须的&#xff0c;全部使用系统或者别人的组件&#xff08;控件&#xff09;也可以完成系统的开发&#x…

使用Python根据原始Excel表格批量生成目标Excel表格

点击上方“Python爬虫与数据挖掘”&#xff0c;进行关注回复“书籍”即可获赠Python从入门到进阶共10本电子书今日鸡汤亭台六七座&#xff0c;八九十枝花。大家好&#xff0c;我是Python进阶者。一、前言前几天在帮助粉丝解决问题的时候&#xff0c;遇到一个简单的小需求&#…

程序员如何通过兼职赚钱?有哪些渠道?

程序员的工资是没有网上说的那么夸张。 就我自己来说&#xff0c;在刚刚工作的那几年&#xff0c;月薪没有超过1万块钱。但是刚刚来到大城市&#xff0c;这点工资连我交房租都不够&#xff0c;生存都成了问题。于是我开始考虑进行兼职&#xff0c;虽然在最开始的几个月也只能有…

(Week 11)综合复习(C++,图论,动态规划,搜索)

目录汤姆斯的天堂梦&#xff08;C&#xff0c;Dijkstra&#xff09;题目描述输入格式输出格式样例 #1样例输入 #1样例输出 #1提示解题思路&#xff1a;[蓝桥杯 2022 国 A] 环境治理&#xff08;C&#xff0c;Floyd&#xff09;题目描述输入格式输出格式样例 #1样例输入 #1样例输…

工控安全—工控常见协议识别

文章目录一、Nmap常见参数介绍二、工控常见协议识别三、工控设备指纹识别3.1 S73.2 Modbus3.3 IEC 60870-5-1043.4 DNP33.5 EtherNet/IP3.6 BACnet3.7 Tridium Niagara Fox3.8 Crimson V33.9 OMRON FINS3.10 PCWorx3.11 ProConOs3.12 MELSEC-Q四、测试一、Nmap常见参数介绍 -s…

STM32F411CE驱动Xbox摇杆

外观 引脚说明和原理 GND-GND 5V-5V VRX-ADC1通道1 VRX-ADC1通道2 SW独立按键-单片机的输入检测 本质上这个遥感就是集成了一个按键和两个电位器&#xff0c;遥感转动改变电位器也会转动&#xff0c;电压输出的值也就不一样&#xff0c;通过检测数值可自定义的做出判断&a…

linux发送tcp/udp请求

本文章介绍下通过nc工具和iperf工具&#xff0c;发送tcp/udp请求一、nc工具&#xff08;netcat工具&#xff09;这个工具linux系统默认是自带的&#xff0c;以下是命令的常用参数1.1 发送tcp请求在服务端监听端口nc -l port客户端连接并发送请求nc -v host port在服务端收到了信…