Flink--API 之 Source 使用解析

news2024/11/28 19:51:23

目录

一、Flink Data Sources 分类概览

(一)预定义 Source

(二)自定义 Source

二、代码实战演示

(一)预定义 Source 示例

基于本地集合

基于本地文件

基于网络套接字(socketTextStream)

(二)自定义 Source 示例

三、Kafka Source 应用

四、总结


        在大数据处理领域,Apache Flink 作为一款强大的流式计算框架,既能应对流处理场景,也可处理批处理任务。而数据来源(Data Sources)作为整个计算流程的 “源头活水”,其多样性与合理运用至关重要。本文将深入剖析 Flink 中 Data Sources 的相关知识,并结合丰富代码示例,助力大家透彻理解与灵活运用。

一、Flink Data Sources 分类概览

Flink 在批 / 流处理中常见的 source 主要分为两大类:预定义 Source 和自定义 Source。

(一)预定义 Source

基于本地集合的 source(Collection-based-source)

        通过env.fromElements()可传入可变参数创建 DataStream,支持如 Tuple、自定义对象等复合形式,但要注意类型需一致,不一致时虽可用Object接收但使用易报错,像env.fromElements("haha", 1)这种就会有问题;env.fromCollection()支持多种Collection具体类型(如ListSetQueue)来构建 DataStream;env.fromSequence()可基于开始和结束值创建 DataStream(曾有env.generateSequence()方法创建基于 Sequence 的 DataStream,不过现已废弃),此类方式常应用于学习测试编造数据场景。

基于文件的 source(File-based-source)

        能读取本地文件与 HDFS 路径文件,如env.readTextFile("datas/wc.txt")可读取本地datas目录下wc.txt文件,env.readTextFile("hdfs://bigdata01:9820/home/a.txt")能获取 HDFS 特定路径文件数据。操作时要留意相对路径转绝对路径问题,避免因路径差错引发异常。

基于网络套接字(socketTextStream)

        socketTextStream(String hostname, int port)方法从指定 Socket 读取数据创建 DataStream,其为非并行 Source,有重载方法可指定行分隔符和最大重新连接次数,默认行分隔符是\n,最大重新连接次数为 0。使用前需先启动 Socket 服务(Mac 或 Linux 可在命令行终端输入nc -lk 8888,Windows 需安装netcat命令后操作),且该方式获取的 DataStream 并行度固定为 1。

(二)自定义 Source

SourceFunction

        非并行数据源(并行度只能 = 1),作为接口定义基础数据源规范,实现run方法持续产生数据,cancel方法用于停止数据源。

RichSourceFunction

        多功能非并行数据源(并行度只能 = 1),是类形式,相比SourceFunction,额外功能体现在实例化时有open方法执行一次(多并行度会多次执行,因多实例)、销毁实例时close方法执行一次,且能通过getRuntimeContext获取当前Runtime对象(底层 API)。

ParallelSourceFunction

        并行数据源(并行度能够 >= 1),接口形式,允许创建并行处理的数据源,例如自定义类实现此接口,按设定并行度生成数据。

RichParallelSourceFunction

        多功能并行数据源(并行度能够 >= 1),类形式且功能齐全,建议使用。继承它并重写相关方法,能充分利用并行特性高效产生数据,同时享有Rich类的openclose等方法优势。

二、代码实战演示

(一)预定义 Source 示例

在flink最常见的创建DataStream方式有四种: 

l 使用env.fromElements(),这种方式也支持Tuple,自定义对象等复合形式。
注意:类型要一致,不一致可以用Object接收,但是使用会报错,比如:env.fromElements("haha", 1);

源码注释中有写:


l 使用env.fromCollection(),这种方式支持多种Collection的具体类型,如List,Set,Queue
l 使用env.generateSequence()方法创建基于Sequence的DataStream --已经废弃了
l 使用env.fromSequence()方法创建基于开始和结束的DataStream

一般用于学习测试时编造数据时使用
1.env.fromElements(可变参数);
2.env.fromColletion(各种集合);
3.env.fromSequence(开始,结束);

基于本地集合

package com.bigdata.source;

import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

public class _01YuDingYiSource {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 各种获取数据的Source
        DataStreamSource<String> dataStreamSource = env.fromElements("hello world txt", "hello nihao kongniqiwa");
        dataStreamSource.print();
        // 演示一个错误的
        //DataStreamSource<Object> dataStreamSource2 = env.fromElements("hello", 1,3.0f);
        //dataStreamSource2.print();
        DataStreamSource<Tuple2<String, Integer>> elements = env.fromElements(
                Tuple2.of("张三", 18),
                Tuple2.of("lisi", 18),
                Tuple2.of("wangwu", 18)
        );
        elements.print();

        // 有一个方法,可以直接将数组变为集合  复习一下数组和集合以及一些非常常见的API
        String[] arr = {"hello","world"};
        System.out.println(arr.length);
        System.out.println(Arrays.toString(arr));
        List<String> list = Arrays.asList(arr);
        System.out.println(list);

        env.fromElements(
                Arrays.asList(arr),
                Arrays.asList(arr),
                Arrays.asList(arr)
        ).print();




        // 第二种加载数据的方式
        // Collection 的子接口只有 Set 和 List
        ArrayList<String> list1 = new ArrayList<>();
        list1.add("python");
        list1.add("scala");
        list1.add("java");
        DataStreamSource<String> ds1 = env.fromCollection(list1);
        DataStreamSource<String> ds2 = env.fromCollection(Arrays.asList(arr));

        // 第三种
        DataStreamSource<Long> ds3 = env.fromSequence(1, 100);
        ds3.print();


        // execute 下面的代码不运行,所以,这句话要放在最后。
        env.execute("获取预定义的Source");
    }
}

可以在代码中指定并行度

l 指定全局并行度:

env.setParallelism(12);

l 获得全局并行度:

env.getParallelism();

指定算子设置并行度:

获取指定算子并行度:

eventSource.getParallelism();

基于本地文件

package com.bigdata.source;

import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import java.io.File;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

public class _02YuDingYiSource {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 获取并行度
        System.out.println(env.getParallelism());

        // 讲第二种Source File类型的
        // 给了一个相对路径,说路径不对,老闫非要写,我咋办?
        // 相对路径,转绝对路径
        File file = new File("datas/wc.txt");
        File file2 = new File("./");
        System.out.println(file.getAbsoluteFile());
        System.out.println(file2.getAbsoluteFile());
        DataStreamSource<String> ds1 = env.readTextFile("datas/wc.txt");
        ds1.print();
        // 还可以获取hdfs路径上的数据
        DataStreamSource<String> ds2 = env.readTextFile("hdfs://bigdata01:9820/home/a.txt");
        ds2.print();



        // execute 下面的代码不运行,所以,这句话要放在最后。
        env.execute("获取预定义的Source");
    }
}

基于网络套接字(socketTextStream)

socketTextStream(String hostname, int port) 方法是一个非并行的Source,该方法需要传入两个参数,第一个是指定的IP地址或主机名,第二个是端口号,即从指定的Socket读取数据创建DataStream。该方法还有多个重载的方法,其中一个是socketTextStream(String hostname, int port, String delimiter, long maxRetry),这个重载的方法可以指定行分隔符和最大重新连接次数。这两个参数,默认行分隔符是”\n”,最大重新连接次数为0。

提示:

如果使用socketTextStream读取数据,在启动Flink程序之前,必须先启动一个Socket服务,为了方便,Mac或Linux用户可以在命令行终端输入nc -lk 8888启动一个Socket服务并在命令行中向该Socket服务发送数据。Windows用户可以在百度中搜索windows安装netcat命令。

通过网盘分享的文件:netcat-win32-1.11.zip

如果是windows平台:nc -lp 8888

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

public class SourceDemo02_Socket {
    public static void main(String[] args) throws Exception {
        //TODO 1.env-准备环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
        //TODO 2.source-加载数据
        DataStream<String> socketDS = env.socketTextStream("bigdata01", 8889);

        //TODO 3.transformation-数据转换处理
        //3.1对每一行数据进行分割并压扁
        DataStream<String> wordsDS = socketDS.flatMap(new FlatMapFunction<String, String>() {
            @Override
            public void flatMap(String value, Collector<String> out) throws Exception {
                String[] words = value.split(" ");
                for (String word : words) {
                    out.collect(word);
                }
            }
        });
        //3.2每个单词记为<单词,1>
        DataStream<Tuple2<String, Integer>> wordAndOneDS = wordsDS.map(new MapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public Tuple2<String, Integer> map(String value) throws Exception {
                return Tuple2.of(value, 1);
            }
        });
        //3.3分组
        KeyedStream<Tuple2<String, Integer>, String> keyedDS = wordAndOneDS.keyBy(new KeySelector<Tuple2<String, Integer>, String>() {
            @Override
            public String getKey(Tuple2<String, Integer> value) throws Exception {
                return value.f0;
            }
        });

        //3.4聚合
        SingleOutputStreamOperator<Tuple2<String, Integer>> result = keyedDS.sum(1);

        //TODO 4.sink-数据输出
        result.print();

        //TODO 5.execute-执行
        env.execute();
    }
}

(二)自定义 Source 示例

SourceFunction:非并行数据源(并行度只能=1) --接口

RichSourceFunction:多功能非并行数据源(并行度只能=1) --类

ParallelSourceFunction:并行数据源(并行度能够>=1) --接口

RichParallelSourceFunction:多功能并行数据源(并行度能够>=1) --类 【建议使用的】

简单自定义非并行 Source(实现 SourceFunction)

package com.bigdata.day02;


import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;

import java.util.Random;
import java.util.UUID;

/**
 * 需求: 每隔1秒随机生成一条订单信息(订单ID、用户ID、订单金额、时间戳)
 * 要求:
 * - 随机生成订单ID(UUID)
 * - 随机生成用户ID(0-2)
 * - 随机生成订单金额(0-100)
 * - 时间戳为当前系统时间
 */

@Data  // set get toString
@AllArgsConstructor
@NoArgsConstructor
class OrderInfo{
    private String orderId;
    private int uid;
    private int money;
    private long timeStamp;
}
// class MySource extends RichSourceFunction<OrderInfo> {
//class MySource extends RichParallelSourceFunction<OrderInfo> {
class MySource implements SourceFunction<OrderInfo> {
    boolean flag = true;

    @Override
    public void run(SourceContext ctx) throws Exception {
        // 源源不断的产生数据
        Random random = new Random();
        while(flag){
            OrderInfo orderInfo = new OrderInfo();
            orderInfo.setOrderId(UUID.randomUUID().toString());
            orderInfo.setUid(random.nextInt(3));
            orderInfo.setMoney(random.nextInt(101));
            orderInfo.setTimeStamp(System.currentTimeMillis());
            ctx.collect(orderInfo);
            Thread.sleep(1000);// 间隔1s
        }
    }

    // source 停止之前需要干点啥
    @Override
    public void cancel() {
        flag = false;
    }
}
public class CustomSource {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        env.setParallelism(2);
        // 将自定义的数据源放入到env中
        DataStreamSource dataStreamSource = env.addSource(new MySource())/*.setParallelism(1)*/;
        System.out.println(dataStreamSource.getParallelism());
        dataStreamSource.print();
        env.execute();
    }


}


 自定义并行 Source(实现 ParallelSourceFunction)

import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;

import java.util.UUID;

/**
 * 自定义多并行度Source
 */
public class CustomerSourceWithParallelDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<String> mySource = env.addSource(new MySource()).setParallelism(6);
        mySource.print();
        env.execute();
    }
    public static class MySource implements ParallelSourceFunction<String> {
        @Override
        public void run(SourceContext<String> ctx) throws Exception {
            ctx.collect(UUID.randomUUID().toString());
            /*
            如果不设置无限循环可以看出,设置了多少并行度就打印出多少条数据
             */
        }

        @Override
        public void cancel() {}
    }
}


自定义多功能并行 Source(实现 RichParallelSourceFunction)

import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;

import java.util.UUID;

/**
 * 自定义一个RichParallelSourceFunction的实现
 */
public class CustomerRichSourceWithParallelDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStreamSource<String> mySource = env.addSource(new MySource()).setParallelism(6);
        mySource.print();

        env.execute();
    }

    /*
    Rich 类型的Source可以比非Rich的多出有:
    - open方法,实例化的时候会执行一次,多个并行度会执行多次的哦(因为是多个实例了)
    - close方法,销毁实例的时候会执行一次,多个并行度会执行多次的哦
    - getRuntime方法可以获得当前的Runtime对象(底层API)
     */
    public static class MySource extends RichParallelSourceFunction<String> {
        @Override
        public void open(Configuration parameters) throws Exception {
            super.open(parameters);
            System.out.println("open......");
        }

        @Override
        public void close() throws Exception {
            super.close();
            System.out.println("close......");
        }

        @Override
        public void run(SourceContext<String> ctx) throws Exception {
            ctx.collect(UUID.randomUUID().toString());
        }

        @Override
        public void cancel() {}
    }
}

三、Kafka Source 应用

Kafka 作为常用消息队列,与 Flink 集成紧密。使用时需添加依赖

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-connector-kafka_2.11</artifactId>
  <version>${flink.version}</version>
</dependency>

并配置相关属性,如下示例展示从 Kafka 主题读取数据并筛选含特定字样消息后打印。

创建一个topic1 这个主题:
 

cd /opt/installs/kafka3/

bin/kafka-topics.sh --bootstrap-server bigdata01:9092 --create --partitions 1 --replication-factor 3 --topic topic1

通过控制台向topic1发送消息:
bin/kafka-console-producer.sh  --bootstrap-server bigdata01:9092 --topic topic1

import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;

import java.util.Properties;

public class KafkaSource {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "bigdata01:9092");
        properties.setProperty("group.id", "g1");
        FlinkKafkaConsumer<String> kafkaSource = new FlinkKafkaConsumer<>("topic1", new SimpleStringSchema(), properties);
        DataStreamSource<String> dataStreamSource = env.addSource(kafkaSource);
        dataStreamSource.filter(new FilterFunction<String>() {
            @Override
            public boolean filter(String word) throws Exception {
                return word.contains("success");
            }
        }).print();
        env.execute();
    }
}

四、总结

        掌握 Flink 普通 API 里 Source 的各类使用方式,无论是预定义 Source 快速搭建测试数据场景、灵活运用并行度设置优化资源,还是对接 Kafka 这类外部数据源,都是构建高效、稳定大数据处理管道的关键基石。后续可深入各部分细节实践,深挖性能调优等进阶玩法,让 Flink 在数据处理之旅中大放异彩。希望这篇文章能助大家在 Flink Source 使用上理清思路、顺利上手,开启大数据流式计算的精彩探索!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2249278.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

typescript基础入门

在官网的页页https://www.typescriptlang.org/可以调试。 &#xff03;类型推断 let str: string abclet str1: string str1a&#xff03;数组及类型断言let numArr [1, 2, 3]const result numArr.find(item > item >2) as numberresult * 5&#xff03;基础类型和…

SenseVoice 音频转文字情绪识别 - python 实现

具体代码实现如下&#xff1a; from funasr import AutoModel from funasr.utils.postprocess_utils import rich_transcription_postprocesspath_audio "emo/happy.mp3"# 音频文件 # 加载模型 model_dir "iic/SenseVoiceSmall" model AutoModel(model…

Java学习笔记--继承方法的重写介绍,重写方法的注意事项,方法重写的使用场景,super和this

目录 一&#xff0c;方法的重写 二&#xff0c;重写方法的注意事项 三&#xff0c;方法重写的使用场景 四&#xff0c;super和this 1.继承中构造方法的特点 2.super和this的具体使用 super的具体使用 this的具体使用 一&#xff0c;方法的重写 1.概述:子类中有一个和父类…

深入浅出摸透AIGC文生图产品SD(Stable Diffusion)

hihi,朋友们,时隔半年(24年11月),终于能腾出时间唠一唠SD了🤣,真怕再不唠一唠,就轮不到SD了,技术更新换代是在是太快! 朋友们,最近(24年2月)是真的没时间整理笔记,每天都在疯狂的学习Stable Diffusion和WebUI & ComfyUI,工作实在有点忙,实践期间在飞书上…

OSI七层模型和TCP/IP五层模型详细介绍

这里写目录标题 一.OSI含义二.OSI七层模型1.应用层2.表示层3.会话层4.传输层5.网络层6.数据链路层7.物理层 TCP/IP五层协议1.应用层2.运输层运行在TCP上的协议运行在UDP上的协议 3.网络层IP协议配套使用的协议 4.数据链路层 四.网络协议分层的好处 一.OSI含义 OSI即是开放式通…

360推出全新的生成式 AI 搜索产品:纳米搜索,要重塑搜索产品

【大力财经】直击互联网最前线&#xff1a;360 集团在 2024 年 11 月 27 日开发布会&#xff0c;重磅推出了一款全新的生成式 AI 搜索产品——纳米搜索&#xff0c;并且已经上架到苹果 App Store 以及应用宝等安卓应用商店&#xff0c;直接与百度、阿里夸克、秘塔 AI、Perplexi…

【超全】目标检测模型分类对比与综述:单阶段、双阶段、有无锚点、DETR、旋转框

《------往期经典推荐------》 一、AI应用软件开发实战专栏【链接】 项目名称项目名称1.【人脸识别与管理系统开发】2.【车牌识别与自动收费管理系统开发】3.【手势识别系统开发】4.【人脸面部活体检测系统开发】5.【图片风格快速迁移软件开发】6.【人脸表表情识别系统】7.【…

c#:winform引入bartender

1、vs新建项目 ①选择Windows窗体应用&#xff08;.NET Framework&#xff09; 2、将bartender引入vs中 ①找到bartender的安装目录&#xff0c;复制Seagull.BarTender.Print.dll文件 ②粘贴到项目->bin->Debug文件&#xff0c;并可创建Model文件夹&#xff1a;为了存放…

vue 实现关键字高亮效果

vue 实现关键字高亮效果 这是啥子意思呢&#xff0c;就是类似于百度搜索&#xff0c;根据关键词搜索结果&#xff0c;搜索结果中&#xff0c;与关键词相同的字显示红色&#xff0c;仅此而已&#xff0c;没有什么大的功能。简单写一下demo。 环境 我使用的是 vue3 ts 的语法来…

初始Python篇(7)—— 正则表达式

找往期文章包括但不限于本期文章中不懂的知识点&#xff1a; 个人主页&#xff1a;我要学编程(ಥ_ಥ)-CSDN博客 所属专栏&#xff1a; Python 目录 正则表达式的概念 正则表达式的组成 元字符 限定符 其他字符 正则表达式的使用 正则表达式的常见操作方法 match方法的…

[DL]深度学习_扩散模型正弦时间编码

1 扩散模型时间步嵌入 1.1 时间步正弦编码 在扩散模型按时间步 t 进行加噪去噪过程时&#xff0c;需要包括反映噪声水平的时间步长 t 作为噪声预测器的额外输入。但是最初与图像配套的时间步 t 是数字&#xff0c;需要将代表时间步 t 的数字编码为向量嵌入。嵌入时间向量的宽…

【Linux】网络基本配置命令

一、使用网络配置命令 1.常用网络配置文件 1.1. /etc/hosts “/etc/hosts”文件保存着IP地址和主机名或域名的静态映射关系。当用户使用一个主机名或域名时&#xff0c;系统会在该文件中查找与它对应的IP地址。 [rootlocalhost ~]# cat /etc/hosts 127.0.0.1 localhost l…

如何搭建一个小程序:从零开始的详细指南

在当今数字化时代&#xff0c;小程序以其轻便、无需下载安装即可使用的特点&#xff0c;成为了连接用户与服务的重要桥梁。无论是零售、餐饮、教育还是娱乐行业&#xff0c;小程序都展现了巨大的潜力。如果你正考虑搭建一个小程序&#xff0c;本文将为你提供一个从零开始的详细…

学习threejs,使用设置lightMap光照贴图创建阴影效果

&#x1f468;‍⚕️ 主页&#xff1a; gis分享者 &#x1f468;‍⚕️ 感谢各位大佬 点赞&#x1f44d; 收藏⭐ 留言&#x1f4dd; 加关注✅! &#x1f468;‍⚕️ 收录于专栏&#xff1a;threejs gis工程师 文章目录 一、&#x1f340;前言1.1 ☘️THREE.MeshLambertMaterial…

【前端】JavaScript中的柯里化(Currying)详解及实现

博客主页&#xff1a; [小ᶻ☡꙳ᵃⁱᵍᶜ꙳] 本文专栏: 前端 文章目录 &#x1f4af;前言&#x1f4af;什么是柯里化&#xff1f;&#x1f4af;柯里化的特点&#x1f4af;柯里化的简单示例&#x1f4af;通用的柯里化实现&#x1f4af;柯里化让代码更易读的原因&#x1f4af…

springboot 整合 rabbitMQ (延迟队列)

前言&#xff1a; 延迟队列是一个内部有序的数据结构&#xff0c;其主要功能体现在其延时特性上。这种队列存储的元素都设定了特定的处理时间&#xff0c;意味着它们需要在规定的时间点或者延迟之后才能被取出并进行相应的处理。简而言之&#xff0c;延时队列被设计用于存放那…

Java代码操作Zookeeper(使用 Apache Curator 库)

1. Zookeeper原生客户端库存在的缺点 复杂性高&#xff1a;原生客户端库提供了底层的 API&#xff0c;需要开发者手动处理很多细节&#xff0c;如连接管理、会话管理、异常处理等。这增加了开发的复杂性&#xff0c;容易出错。连接管理繁琐&#xff1a;使用原生客户端库时&…

Django实现智能问答助手-基础配置

设置 Django 项目、创建应用、定义模型和视图、实现问答逻辑&#xff0c;并设计用户界面。下面是一步一步的简要说明&#xff1a; 目录&#xff1a; QnAAssistant/ # 项目目录 │ ├── QnAAssistant/ # 项目文件夹 │ ├── init.py # 空文件 │ ├── settings.py # 项目配…

【ESP32CAM+Android+C#上位机】ESP32-CAM在STA或AP模式下基于UDP与手机APP或C#上位机进行视频流/图像传输

前言: 本项目实现ESP32-CAM在STA或AP模式下基于UDP与手机APP或C#上位机进行视频流/图像传输。本项目包含有ESP32源码(arduino)、Android手机APP源码以及C#上位机源码,本文对其工程项目的配置使用进行讲解。实战开发,亲测无误。 AP模式,就是ESP32发出一个WIFI/热点提供给电…

从〇开始深度学习(0)——背景知识与环境配置

从〇开始深度学习(0)——背景知识与环境配置 文章目录 从〇开始深度学习(0)——背景知识与环境配置写在前面1.背景知识1.1.Pytorch1.2.Anaconda1.3.Pycharm1.4.CPU与GPU1.5.整体关系 2.环境配置2.1.准备工作2.1.1.判断有无英伟达显卡2.1.2.清理电脑里的旧环境 2.1.安装Anaconda…