Spark【Spark Streaming】

news2025/1/12 23:04:49

1、基本数据源

1.1、文件流

在spark Shell 下运行:

[lyh@hadoop102 spark-yarn-3.2.4]$ spark-shell 
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
2022-09-08 08:56:21,875 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Spark context Web UI available at http://hadoop102:4040
Spark context available as 'sc' (master = local[*], app id = local-1662598583370).
Spark session available as 'spark'.
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 3.2.4
      /_/
         
Using Scala version 2.12.15 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_241)
Type in expressions to have them evaluated.
Type :help for more information.

scala> import org.apache.spark.streaming._
import org.apache.spark.streaming._

scala> val ssc = new StreamingContext(sc,Seconds(20))
ssc: org.apache.spark.streaming.StreamingContext = org.apache.spark.streaming.StreamingContext@379899f4

scala> val lines = ssc.textFileStream("file:///home/lyh/streaming/logfile")
lines: org.apache.spark.streaming.dstream.DStream[String] = org.apache.spark.streaming.dstream.MappedDStream@531245fe

scala> val kv = lines.map((_,1)).reduceByKey(_+_)
kv: org.apache.spark.streaming.dstream.DStream[(String, Int)] = org.apache.spark.streaming.dstream.ShuffledDStream@c207c10

scala> kv.print()

scala> ssc.start()

------------------------------------------
Time: 1662598860000 ms
-------------------------------------------

-------------------------------------------
Time: 1662598880000 ms
-------------------------------------------

-------------------------------------------
Time: 1662598900000 ms
-------------------------------------------
(c#,1)
(hh,1)
(h,1)
(javafx,1)
(spark,1)
(hadoop,1)
(js,1)
(java,1)
(s,1)
(c,1)

执行后立即新建终端在  /home/lyh/streaming/logfile 目录下创建文件并写入数据

1.2、Socket 套接字流

// todo 创建环境对象
    val conf = new SparkConf()
    conf.setAppName("word count").setMaster("local[*]")
    val ssc = new StreamingContext(conf,Seconds(3))

    // todo 逻辑处理
    // 获取端口数据(Socket)
    val lines: ReceiverInputDStream[String] = ssc.socketTextStream("localhost", 9999)
    val words: DStream[String] = lines.flatMap(_.split(" "))
    val word: DStream[(String,Int)] = words.map((_, 1))
    val wordCount: DStream[(String,Int)] = word.reduceByKey(_ + _)
    wordCount.print()
    // todo 关闭环境
    // 由于SparkStreaming的采集器是长期运行的,所以不能直接关闭
    // 而且main方法的关闭也会使SparkStreaming的采集器关闭
    ssc.start()
    // 等待采集器关闭
    ssc.awaitTermination()

启动 NetCat

> nc -lp 9999 
> hello world
> hello spark
> ...

运行结果: 

1.3、自定义 Socket 数据源

通过自定义 Socket 实现数据源不断产生数据

import java.io.PrintWriter
import java.net.{ServerSocket, Socket}
import scala.io.Source

/**
 * 通过自定义的Socket来不断给客户端发送数据
 */
object MySocketReceiver {

  def index(length: Int): Int = {
    val rdm = new java.util.Random()
    rdm.nextInt(length)
  }

  def main(args: Array[String]): Unit = {

    val fileName = "input/1.txt"
    val lines: List[String] = Source.fromFile(fileName).getLines().toList

    val listener: ServerSocket = new ServerSocket(9999)

    while(true){
      val socket: Socket = listener.accept()
      new Thread(){
        override def run(){
          val out: PrintWriter = new PrintWriter(socket.getOutputStream,true)
          while (true){
            Thread.sleep(1000)
            val content = lines(index(lines.length)) // 源源不断,每次打印list的第(1~length)随机行
            println(content)
            out.write(content + '\n')
            out.flush()
          }
          socket.close()
        }
      }.start()
    }

  }
}

定义一个处理器接收自定义数据源端口发送过来的数据。

def main(args: Array[String]): Unit = {

    // todo 创建环境对象
    val conf = new SparkConf()
    conf.setAppName("word count").setMaster("local[*]")
    val ssc = new StreamingContext(conf,Seconds(3))

    // todo 逻辑处理
    // 获取端口数据(Socket)
    val lines: ReceiverInputDStream[String] = ssc.socketTextStream("localhost", 9999)
    val words: DStream[String] = lines.flatMap(_.split(" "))
    val word: DStream[(String,Int)] = words.map((_, 1))
    val wordCount: DStream[(String,Int)] = word.reduceByKey(_ + _)
    wordCount.print()
    // todo 关闭环境
    // 由于SparkStreaming的采集器是长期运行的,所以不能直接关闭
    ssc.start()
    // 等待采集器关闭
    ssc.awaitTermination()

  }

先运行我们的数据源,再运行处理器:

处理器:

1.4、RDD 队列流

import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}

import scala.collection.mutable

object SparkStreaming02_RDDStream {

  def main(args: Array[String]): Unit = {

    // 1. 初始化配置信息
    val conf = new SparkConf()
    conf.setAppName("rdd Stream").setMaster("local[*]")

    // 2.初始化SparkStreamingContext
    val ssc = new StreamingContext(conf,Seconds(4))

    // 3.创建RDD队列
    val rddQueue: mutable.Queue[RDD[Int]] = new mutable.Queue[RDD[Int]]()

    // 4.创建QueueInputStream
    // oneAtATime = true 默认,一次读取队列里面的一个数据
    // oneAtATime = false, 按照设定的时间,读取队列里面数据
    val inputStream: InputDStream[Int] = ssc.queueStream(rddQueue,oneAtATime = false)

    // 5. 处理队列中的RDD数据
    val sumStream: DStream[Int] = inputStream.reduce(_ + _)

    // 6. 打印结果
    sumStream.print()

    // 7.启动任务
    ssc.start()

    // 8.向队列中放入RDD
    for(i <- 1 to 5){
      rddQueue += ssc.sparkContext.makeRDD(1 to 5)
      Thread.sleep(2000)
    }

    // 9. 等待数据源进程停止后关闭
    ssc.awaitTermination()
  }

}

2、高级数据源

2.1、Kafka 数据源

2.1.1、消费者程序处理流数据

import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
import org.apache.spark.streaming.{Seconds, StreamingContext}

object SparkStreaming03_Kafka {

  def main(args: Array[String]): Unit = {

    val conf = new SparkConf().setMaster("local[*]").setAppName("kafka source")
    val ssc = new StreamingContext(conf,Seconds(3))

    // 定义Kafka参数: kafka集群地址、消费者组名称、key序列化、value序列化
    val kafkaPara: Map[String,Object] = Map[String,Object](
      ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "hadoop102:9092,hadoop103:9092,hadoop104:9092",
      ConsumerConfig.GROUP_ID_CONFIG ->"lyh",
      ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> "org.apache.kafka.common.serialization.StringDeserializer",
      ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer]
    )

    // 读取Kafka数据创建DStream
    val kafkaDStream: InputDStream[ConsumerRecord[String,String]] = KafkaUtils.createDirectStream[String,String](
      ssc,
      LocationStrategies.PreferConsistent,  //优先位置
      ConsumerStrategies.Subscribe[String,String](Set("testTopic"),kafkaPara) // 消费策略:(订阅多个主题,配置参数)
    )

    // 将每条消息的KV取出
    val valueDStream: DStream[String] = kafkaDStream.map(_.value())

    // 计算WordCount
    valueDStream.flatMap(_.split(" "))
      .map((_,1))
      .reduceByKey(_+_)
      .print()

    // 开启任务
    ssc.start()
    ssc.awaitTermination()

  }

}

2.1.2、生产者生产数据

(1)kafka 端生产数据

启动 Kafka 集群

创建 Topic(指定一个分区三个副本): 

kafka-topics.sh --bootstrap-server hadoop102:9092 --topic <topic名称> --create --partitions 1 --replication-factor 3 

 查看是否生成 Topic:

kafka-topics.sh --bootstrap-server hadoop102:9092 --list

生产者生产数据:

> kafka-console-producer.sh --bootstrap-server hadoop102:9092 --topic <topic名称>
> hello world
> hello spark
> ...
(2)编写生产者程序
package com.lyh

import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
import org.apache.spark.streaming.{Seconds, StreamingContext}

object SparkStreaming03_Kafka {

  def main(args: Array[String]): Unit = {

    val conf = new SparkConf().setMaster("local[*]").setAppName("kafka source")
    val ssc = new StreamingContext(conf,Seconds(3))

    // 定义Kafka参数: kafka集群地址、消费者组名称、key序列化、value序列化
    val kafkaPara: Map[String,Object] = Map[String,Object](
      ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "hadoop102:9092,hadoop103:9092,hadoop104:9092",
      ConsumerConfig.GROUP_ID_CONFIG ->"lyh",
      ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> "org.apache.kafka.common.serialization.StringDeserializer",
      ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer]
    )

    // 读取Kafka数据创建DStream
    val kafkaDStream: InputDStream[ConsumerRecord[String,String]] = KafkaUtils.createDirectStream[String,String](
      ssc,
      LocationStrategies.PreferConsistent,  //优先位置
      ConsumerStrategies.Subscribe[String,String](Set("testTopic"),kafkaPara) // 消费策略:(订阅多个主题,配置参数)
    )

    // 将每条消息的KV取出
    val valueDStream: DStream[String] = kafkaDStream.map(_.value())

    // 计算WordCount
    valueDStream.flatMap(_.split(" "))
      .map((_,1))
      .reduceByKey(_+_)
      .print()

   // 开启任务
    ssc.start()
    ssc.awaitTermination()

  }

}

3、转换操作

3.1、无状态转换操作

3.2、有状态转换操作

3.1.1、滑动窗口转换操作

import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.{Seconds, StreamingContext}

object SparkStreaming05_Window {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setMaster("local[*]").setAppName("sparkStreaming window")
    val ssc = new StreamingContext(conf,Seconds(3))

    val lines:DStream[String] = ssc.socketTextStream("localhost", 9999)

    val word_kv = lines.map((_, 1))

    /**
     * 收集器收集RDD合成DStream: 3s 窗口范围: 12s 窗口滑动间隔: 6s/次
     * 1. windowLength:表示滑动窗口的长度,即窗口内包含的数据的时间跨度。它是一个Duration对象,用于指定窗口的时间长度。
     * 2. slideInterval:表示滑动窗口的滑动间隔,即每隔多长时间将窗口向右滑动一次。同样是一个Duration对象。
     * 返回一个新的 DStream
     **/
    val wordToOneByWindow:DStream[(String,Int)] = word_kv.window(Seconds(12), Seconds(6))

    // 窗口每滑动一次(6s),对窗口内的数据进行一次聚合操作.

    val res: DStream[(String,Int)] = wordToOneByWindow.reduceByKey(_ + _)

    res.print()

    ssc.start()
    ssc.awaitTermination()
  }
}

3.1.2、updateStateByKey

import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.{Seconds, StreamingContext}

/**
 * DStream 有状态转换操作之 updateStateByKey(func) 转换操作
 */
object SparkStreaming04_State {

  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setMaster("local[*]").setAppName("kafka state")
    val ssc = new StreamingContext(conf,Seconds(3))

    /**
     * 设置检查点目录的作用是为了确保Spark Streaming应用程序的容错性和可恢复性。
     * 在Spark Streaming应用程序运行过程中,它会将接收到的数据分成一批批进行处理。
     * 通过设置检查点目录,Spark Streaming会定期将当前的处理状态、接收到的数据偏移量等信息保存到可靠的存储系统中,
     * 比如分布式文件系统(如HDFS)或云存储服务(如Amazon S3)。
     * 一旦应用程序出现故障或崩溃,它可以从最近的检查点中恢复状态,并从上次处理的位置继续处理数据,从而确保数据的完整性和一致性。
     */
    //检查点的路径如果是本地路径要+ file:// 否则认为是 hdfs路径 / 开头
    ssc.checkpoint("file:///D://IdeaProject/SparkStudy/data/")  //设置检查点,检查点具有容错机制

    val lines: DStream[String] = ssc.socketTextStream("localhost",9999)

    val word_kv = lines.map((_, 1))

    val stateDStream: DStream[(String, Int)] = word_kv.updateStateByKey(
      /** 参数是一个函数
       1. Seq[Int]: 当前key对应的所有value值的集合,因为我们的value是Int,所以这里也是Int
       2. Option[Int]: 当前key的历史状态,对于wordCount,历史值就是上一个DStream中这个key的value计算结果(求和结果)
       Option 是 Scala 中用来表示可能存在或可能不存在的值的容器,是一种避免空引用(null reference)问题的模式。
       Option[Int] 有两个可能的实例:
          (1) Some(value: Int):表示一个包含 Int 类型值的 Option。
          (2) None:表示一个空的 Option,不包含任何值。
      **/
      (values: Seq[Int], state: Option[Int]) => {
        val currentCount = values.foldLeft(0)(_ + _)
        val previousCount = state.getOrElse(0)
        Option(currentCount + previousCount)
      }
    )

    stateDStream.print()
    stateDStream.saveAsTextFiles("./out") //输出结果保存到 文本文件中
    ssc.start()
    ssc.awaitTermination()
  }
}

4、输出操作

4.1、输出到文本文件

上面 3.1.2 中就保存DStream输出到了本地:

stateDStream.saveAstextFiles("./out")

4.2、输出到MySQL数据库

import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}

import java.sql.{Connection, PreparedStatement}

object NetWorkWordCountStateMySQL {

  def main(args: Array[String]): Unit = {

    val updateFunc = (values: Seq[Int],state: Option[Int]) => {
      val currentCount = values.foldLeft(0)(_+_)
      val previousCount = state.getOrElse(0)
      Some(currentCount + previousCount)
    }

    val conf = new SparkConf().setMaster("local[*]").setAppName("state mysql")
    val ssc = new StreamingContext(conf,Seconds(5))
    // file:\\ 代表本地文件系统 如果用的是 /user/... 这种形式是 HDFS 文件系统 需要启动Hadoop集群
    ssc.checkpoint("file:\\D:\\IdeaProjects\\SparkStudy\\data\\state")

    val lines: ReceiverInputDStream[String] = ssc.socketTextStream("localhost", 9999)
    val word_kv: DStream[(String, Int)] = lines.flatMap(_.split(" ").map((_, 1))).reduceByKey(_ + _)

    val stateDStream: DStream[(String, Int)] = word_kv.updateStateByKey[Int](updateFunc)
    stateDStream.print()

    stateDStream.foreachRDD( rdd=> {

      def func(records: Iterator[(String,Int)]): Unit ={
        var conn: Connection = null
        var stmt: PreparedStatement = null
        try{
          conn = DBUtils.getConnection("jdbc:mysql://127.0.0.1:3306/spark","root","Yan1029.")
          records.foreach(p=>{
            val sql = "insert into wordcount values (?,?)"
            stmt = conn.prepareStatement(sql)
            stmt.setString(1,p._1.trim)
            stmt.setInt(2,p._2)
            stmt.executeUpdate()    //不executeUpdate就不会写入数据库
          })
        }catch {
          case e: Exception => e.printStackTrace()
        }finally {
//          if (stmt!=null) stmt.close()
//          DBUtils.close()
        }
      }
      val repartitionedRDD: RDD[(String,Int)] = rdd.repartition(3)  //扩大分区用 repartition
      repartitionedRDD.foreachPartition(func)
    })
    ssc.start()
    ssc.awaitTermination()
  }

}

运行结果:

5、优雅的关闭和恢复数据

5.1、关闭SparkStreaming

        流式任务通常都需要7*24小时执行,但是有时涉及到升级代码需要主动停止程序,但是分布式程序,没办法做到一个个进程去杀死,所以配置优雅的关闭就显得至关重要了。
       关闭方式:我们通常使用外部文件系统来控制内部程序关闭。

package com.lyh

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.{Seconds, StreamingContext, StreamingContextState}

import java.net.URI

object SparkStreaming06_Close {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setMaster("local[*]").setAppName("sparkStreaming close")
    val ssc = new StreamingContext(conf,Seconds(3))

    val lines:DStream[String] = ssc.socketTextStream("localhost", 9999)
    val word_kv = lines.map((_, 1))

    word_kv.print()

    ssc.start()

    // 再创建一个线程去关闭
    new Thread(new MonitorStop(ssc)).start()

    ssc.awaitTermination()  //阻塞当前main线程
  }
}

class MonitorStop(ssc: StreamingContext) extends Runnable{
  override def run(): Unit = {
    while (true){ // 一直轮询判断
      Thread.sleep(5000)  //每5s检查一遍
      val fs: FileSystem = FileSystem.get(new URI("hdfs://hadoop102:9000"),new Configuration(),"lyh")
      val exists: Boolean = fs.exists(new Path("hdfs://hadoop102:9000/stopSpark"))
      if (exists) { //如果比如(MySQL出现了一行数据、Zookeeper的某个节点出现变化、hdfs是否存在某个目录...)就关闭
        val state: StreamingContextState = ssc.getState()
        if (state == StreamingContextState.ACTIVE){
          // 优雅地关闭-处理完当前的数据再关闭
          // 计算节点不再接受新的数据,而是把现有的数据处理完毕,然后关闭
          ssc.stop(true,true)
          System.exit(0)
        }
      }
    }
  }
}

5.2、恢复检查点的数据

使用 getActiveOrCreate 的方法来对上一个失败的 Spark 任务进行数据恢复(通过检查点来进行恢复)

方法说明:

        若Application为首次重启,将创建一个新的StreamingContext实例;如果Application从失败中重启,从checkpoint目录导入checkpoint数据来重新创建StreamingContext实例。

import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.{Seconds, StreamingContext, StreamingContextState}

import java.net.URI

object SparkStreaming07_Resume {
  def main(args: Array[String]): Unit = {

    //好处:若Application为首次重启,将创建一个新的StreamingContext实例;如果Application从失败中重启,从checkpoint目录导入checkpoint数据来重新创建StreamingContext实例。
    val ssc: StreamingContext = StreamingContext.getActiveOrCreate("file:\\D:\\IdeaProjects\\SparkStudy\\data\\state", () => {
      val conf = new SparkConf().setMaster("local[*]").setAppName("sparkStreaming resume")
      val ssc = new StreamingContext(conf, Seconds(3))

      val lines: DStream[String] = ssc.socketTextStream("localhost", 9999)
      val word_kv = lines.map((_, 1))

      word_kv.print()

      ssc
    })
    // 依然设置检查点 防止application失败后丢失数据
    ssc.checkpoint("file:\\D:\\IdeaProjects\\SparkStudy\\data\\state")

    ssc.start()
    ssc.awaitTermination()  //阻塞当前main线程
  }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1134573.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

自动驾驶,从“宠儿”走进“淘汰赛”

从“一步到位”到场景、技术降维。从拼落地路径&#xff0c;到拼雷达、算力&#xff0c;再到如今的性价比之争&#xff0c;自动驾驶似乎变得愈发“接地气”。 作者|斗斗 编辑|皮爷 出品|产业家 比起去年&#xff0c;黄文欢和张放今年显得更加忙碌。 “自动驾驶赛道&…

Redis 配置文件(redis.conf)中文注释及说明

文章目录 一、概述二、觉见基础配置1.1 导入另一个配置文件1.2 添加Redis扩展1.3 绑定Redis服务在那些网卡上&#xff0c;也就是远程可以通过那个的IP地址访问。1.2 指定Redis服务监听端口1.2 最大分配内容大小1.2 后台服务方式运行1.2 日志记录文件1.2 添加扩展 三、完整配置文…

网络协议--DNS:域名系统

14.1 引言 域名系统&#xff08;DNS&#xff09;是一种用于TCP/IP应用程序的分布式数据库&#xff0c;它提供主机名字和IP地址之间的转换及有关电子邮件的选路信息。这里提到的分布式是指在Internet上的单个站点不能拥有所有的信息。每个站点&#xff08;如大学中的系、校园、…

XTU-OJ 1227-Robot

题目描述 假设在一个XOY坐标的平面上&#xff0c;机器人一开始位于原点&#xff0c;面向Y轴正方向。 机器人可以执行向左转&#xff0c;向右转&#xff0c;向后转&#xff0c;前进四个指令。 指令为 LEFT:向左转RIGHT:向右转BACK:向后转FORWORD n:向前走n(1≤n≤100)个单位 现在…

边缘计算:云计算的延伸

云计算已经存在多年&#xff0c;并已被证明对大大小小的企业都有好处&#xff1b;然而&#xff0c;直到最近边缘计算才变得如此重要。它是指发生在网络边缘的一种数据处理&#xff0c;更接近数据的来源地。 这将有助于提高效率并减少延迟以及设备和云之间的数据传输成本。边缘…

金融领域:怎么保持电力系统连续供应?

银行作为金融领域的关键机构&#xff0c;依赖于高度可靠的电力供应&#xff0c;以保持银行操作的连续性。在电力中断或电力质量问题的情况下&#xff0c;银行可能面临严重的风险&#xff0c;包括数据丢失、交易中断和客户满意度下降。 UPS监控系统在这一背景下变得至关重要&…

排序(上):为什么插入排序比冒泡排序更受欢迎?

排序对于任何一个程序员来说&#xff0c;可能都不会陌生。你学的第一个算法&#xff0c;可能就是排序。大部分编程语言中&#xff0c;也都提供了排序函数。在平常的项目中&#xff0c;我们也经常会用到排序。排序非常重要&#xff0c;所以我会花多一点时间来详细讲一讲经典的排…

Pytorch使用torchvision.datasets.ImageFolder读取数据集,数据集的内容排列状况

当使用torchvision.datasets.ImageFolder读取猫狗数据集时,dataset中存的图片是 猫狗猫狗猫狗猫狗 还是 猫猫猫猫狗狗狗狗 呢? 数据集文件的存放路径如下图 测试代码如下 import torch import torchvisiontransform torchvision.transforms.Compose([torchvision.transform…

Python【修饰器/装饰器】

Python【修饰器/装饰器】 修饰器&#xff08;装饰器&#xff09;在Python中也是一个很重要的内容&#xff0c;它可以让其他函数在不需要做任何代码变动的前提下增加额外功能&#xff0c;相当于一个语法糖&#xff0c;可能在新手看来&#xff0c;这是一个难以理解或者不知道有啥…

FPGA/SoC控制机械臂

FPGA/SoC控制机械臂 机器人技术处于工业 4.0、人工智能和边缘革命的前沿。让我们看看如何创建 FPGA 控制的机器人手臂。 介绍 机器人技术与人工智能和机器学习一起处于工业 4.0 和边缘革命的最前沿。 因此&#xff0c;我认为创建一个基础机器人手臂项目会很有趣&#xff0c;我们…

系统架构设计师之使用McCabe方法可以计算程序流程图的环形复杂度

系统架构设计师之使用McCabe方法可以计算程序流程图的环形复杂度

Elasticsearch:使用 Open AI 和 Langchain 的 RAG - Retrieval Augmented Generation (二)

这是继上一篇文章 “Elasticsearch&#xff1a;使用 Open AI 和 Langchain 的 RAG - Retrieval Augmented Generation &#xff08;一&#xff09;” 的续篇。在这篇文章中&#xff0c;我主要来讲述 ElasticVectorSearch 的使用。 我们的设置和之前的那篇文章是一样的&#xff…

【C++基础入门】43.C++中多态的概念和意义

一、函数重写回顾 父类中被重写的函数依然会继承给子类子类中重写的函数将覆盖父类中的函数通过作用域分辨符&#xff08;::&#xff09;可以访问到父类中的函数 二、多态的概念和意义 面向对象中期望的行为 根据实际的对象类型判断如何调用重写函数父类指针&#xff08;引用…

【蓝桥杯】蓝桥杯双周赛第二场ABCD题

A题&#xff1a;新生 知识点&#xff1a;下一届是第几届蓝桥杯…… 新一届蓝桥杯大赛即将在2024年拉开序! 作为大一新生的小蓝&#xff0c;在听说了这场盛大的比赛后&#xff0c;对其充满了期待与热情。但作为初次参赛的新手&#xff0c;他对蓝桥杯的相关赛制和历史并…

LVS负载均衡(LVS简介、三种工作模式、十种调度算法)

LVS简介 LVS&#xff08;Linux Virtual Server&#xff09;是一种基于Linux内核的高可用性负载均衡软件。它通过将客户端请求分发到多个后端真实服务器&#xff0c;提高系统性能和可靠性。LVS支持多种调度算法&#xff0c;如轮询、最少连接、源地址哈希等&#xff0c;用于决定…

番外8.2---配置/管理硬盘

""" Step1&#xff1a;清楚磁盘、硬盘&#xff08;HDD&#xff09;、光驱的概念及是否具有包含关系。 Step2&#xff1a;硬件设备&#xff08;IDE、SCSI、SATA、NVMe、软驱等&#xff09;命名方式及在linux系统里对应的文件名称。 Step3&#xff1a;&#xff1…

保存 uboot图像配置

一. 简介 本文学习如何保存经过图像配置&#xff0c;与加载 自己的配置文件。 之前几篇文章学习了&#xff1a;uboot 经过图形化配置 dns 命令功能。地址如下&#xff1a; uboot通过图像化界面配置 dns命令-CSDN博客 uboot通过图像化界面配置 dns命令验证-CSDN博客 二. 保…

微信管理系统的便捷功能:自动回复

宝子们 你有遇到以下头疼的问题吗&#xff1f; 1、每日手动一遍又一遍点“添加”来通过大量好友? 2、每日总要花至少半个或1个小时来回复刚通过的好友? 3、经常切换聊天窗口复制粘贴同样的内容回复客户&#xff1f; 4、一键转发操作多了被系统提示过于频繁&#xff1f; 5、…

虹科 | 解决方案 | 汽车示波器 学校教学方案

虹科Pico汽车示波器是基于PC的设备&#xff0c;特别适用于大课堂的教学、备课以及与师生的互动交流。老师展现讲解波形数据&#xff0c;让学生直观形象地理解汽车的工作原理 高效备课 课前实测&#xff0c;采集波形数据&#xff0c;轻松截图与标注&#xff0c;制作优美的课件&…

Ubuntu22.04 交叉编译阿里oss c-sdk

一、交叉编译openssl Ubuntu20.04 交叉编译openssl 1.0.1f_编译前去除 makefile 中所有的"-m64"字段_qq76211822的博客-CSDN博客文章浏览阅读319次。Ubuntu20.04 交叉编译openssl_编译前去除 makefile 中所有的"-m64"字段https://blog.csdn.net/sz7621182…