SparkStreaming学习之——无状态与有状态转化、遍历kafka的topic消息、WindowOperations

news2024/11/25 20:52:01

目录

一、状态转化

二、kafka topic A→SparkStreaming→kafka topic B

(一)rdd.foreach与rdd.foreachPartition

(二)案例实操1

1.需求:

2.代码实现:

3.运行结果

(三)案例实操2

1.需求:

2.代码实现:

3.运行结果

三、WindowOperations

1.WindowOperations 窗口概述

2.代码示例

3.运行结果


一、状态转化

        无状态转化操作就是把简单的 RDD 转化操作应用到每个批次上,也就是转化 DStream 中的每一个 RDD。

        有状态转化操作就是窗口与窗口之间的数据有关系。上次一UpdateStateByKey 原语用于记录历史记录,有时,我们需要在 DStream 中跨批次维护状态(例如流计算中累加 wordcount)。针对这种情况,updateStateByKey()为我们提供了对一个状态变量的访问,用于键值对形式的 DStream。给定一个由(键,事件)对构成的 DStream,并传递一个指 定如何根据新的事件更新每个键对应状态的函数,它可以构建出一个新的 DStream,其内部数据为(键,状态) 对。

import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
import org.apache.spark.streaming.{Seconds, StreamingContext}

object SparkStreamingKafkaSource {
  def main(args: Array[String]): Unit = {
    val conf: SparkConf = new SparkConf().setAppName("sparkKafkaStream").setMaster("local[*]")
    val streamingContext = new StreamingContext(conf, Seconds(5))

    streamingContext.checkpoint("checkpoint")
    val kafkaParams = Map(
      (ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "lxm147:9092"),
      (ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> "org.apache.kafka.common.serialization.StringDeserializer"),
      (ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> "org.apache.kafka.common.serialization.StringDeserializer"),
      (ConsumerConfig.GROUP_ID_CONFIG -> "sparkstreamgroup1")
    )

    val kafkaStream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream(
      streamingContext,
      LocationStrategies.PreferConsistent,
      ConsumerStrategies.Subscribe(Set("sparkkafkastu"), kafkaParams)
    )

    // TODO 无状态:每个窗口数据独立
    /*val wordCountStream: DStream[(String, Int)] = kafkaStream.flatMap(_.value().toString.split("\\s+"))
      .map((_, 1))
      .reduceByKey(_ + _)
    wordCountStream.print()*/

    // TODO 有状态:窗口与窗口之间的数据有关系
    val sumStateStream: DStream[(String, Int)] = kafkaStream.flatMap(x => x.value().toString.split("\\s+"))
      .map((_, 1))
      .updateStateByKey {
        case (seq, buffer) => {
          println("进入到updateStateByKey函数中")
          println("seqvalue:", seq.toList.toString())
          println("buffer:", buffer.getOrElse(0).toString)
          val sum: Int = buffer.getOrElse(0) + seq.sum
          Option(sum)
        }
      }
    sumStateStream.print()

    streamingContext.start()
    streamingContext.awaitTermination()
  }
}

有状态转化会将之前的历史记录与当前输入的数据进行计算:

二、kafka topic A→SparkStreaming→kafka topic B

(一)rdd.foreach与rdd.foreachPartition

import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
import org.apache.spark.streaming.{Seconds, StreamingContext}

import java.util

/**
 * 将数据从kafka的topic A取出数据后加工处理,之后再输出到kafka的topic B中
 */
object SparkStreamKafkaSourceToKafkaSink {
  def main(args: Array[String]): Unit = {
    val conf: SparkConf = new SparkConf().setAppName("sparkKafkaStream2").setMaster("local[*]")
    val streamingContext = new StreamingContext(conf, Seconds(5))
    streamingContext.checkpoint("checkpoint")

    streamingContext.checkpoint("checkpoint")
    val kafkaParams = Map( // TODO 连接生产者端的topic
      (ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "lxm147:9092"),
      (ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> "org.apache.kafka.common.serialization.StringDeserializer"),
      (ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> "org.apache.kafka.common.serialization.StringDeserializer"),
      (ConsumerConfig.GROUP_ID_CONFIG -> "kfkgroup2")
    )

    val kafkaStream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream(
      streamingContext,
      LocationStrategies.PreferConsistent,
      // 如果没有topic需要创建
      // kafka-topics.sh --create --zookeeper lxm147:2181 --topic sparkkafkademoin --partitions 1 --replication-factor 1
      ConsumerStrategies.Subscribe(Set("sparkkafkademoin"), kafkaParams)
    )

    println("1.配置spark消费kafkatopic")
    // TODO 使用foreachRDD太过消耗资源——不推荐
    kafkaStream.foreachRDD( // 遍历
      rdd => {
         println("2.遍历spark DStream中每个RDD")// 每隔5秒输出一次
        /* rdd.foreach(
          y => { // y:kafka中的keyValue对象
            println(y.getClass + " 遍历RDD中的每一条kafka的记录")
            val props = new util.HashMap[String, Object]()// TODO 连接消费者端的topic
            props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "lxm147:9092")
            props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")
            props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")
            val producer = new KafkaProducer[String, String](props)
            val words: Array[String] = y.value().toString.trim.split("\\s+") // hello world
            for (word <- words) {
              val record = new ProducerRecord[String, String]("sparkkafkademoout", word + ",1")
              producer.send(record)
            }
          }
        )  */
        rdd.foreachPartition(
          rdds => { // rdds是包含rdd某个分区内的所有元素
            println("3.rdd 每个分区内的所有kafka记录集合")
            val props = new util.HashMap[String, Object]() // TODO 连接消费者端的topic
            props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "lxm147:9092")
            props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")
            props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")
            val producer = new KafkaProducer[String, String](props)
            rdds.foreach(
              y => {
                println("4.遍历获取rdd某一个分区内的每一条消息")
                val words: Array[String] = y.value().trim.split("\\s+")
                for (word <- words) {
                  val record = new ProducerRecord[String, String]("sparkkafkademoout", word + ",1")
                  producer.send(record)
                }
              }
            )
          }
        )
      }
    )
    streamingContext.start()
    streamingContext.awaitTermination()
  }
}

(二)案例实操1

1.需求:

清洗前:
user            ,                        friends
3197468391,1346449342 3873244116 4226080662 1222907620

清洗后:
user             ,friends                  目标topic:user_friends2
3197468391,1346449342
3197468391,3873244116
3197468391,4226080662
3197468391,1222907620

2.代码实现:

import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}

import java.util

object SparkStreamUserFriendrawToUserFriend {
  def main(args: Array[String]): Unit = {
    val conf: SparkConf = new SparkConf().setAppName("sparkufStream2").setMaster("local[2]")
    val streamingContext = new StreamingContext(conf, Seconds(5))
    streamingContext.checkpoint("checkpoint")

    val kafkaParams = Map( // TODO 连接生产者端的topic
      (ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "lxm147:9092"),
      (ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> "org.apache.kafka.common.serialization.StringDeserializer"),
      (ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> "org.apache.kafka.common.serialization.StringDeserializer"),
      (ConsumerConfig.GROUP_ID_CONFIG -> "sparkuf3"),
      (ConsumerConfig.AUTO_OFFSET_RESET_CONFIG -> "earliest")
    )

    val kafkaStream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream(
      streamingContext,
      LocationStrategies.PreferConsistent,
      // 如果没有topic需要创建
      // kafka-topics.sh --create --zookeeper lxm147:2181 --topic user_friends2 --partitions 1 --replication-factor 1
      ConsumerStrategies.Subscribe(Set("user_friends_raw"), kafkaParams)
    )

    kafkaStream.foreachRDD(
      rdd => {
        rdd.foreachPartition(x => {
          val props = new util.HashMap[String, Object]() // TODO 连接消费者端的topic
          props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "lxm147:9092")
          props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")
          props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")
          val producer = new KafkaProducer[String, String](props)
          x.foreach(y => {
            val splits: Array[String] = y.value().split(",")
            if (splits.length == 2) {
              val userid: String = splits(0)
              val friends: Array[String] = splits(1).split("\\s+")
              for (friend <- friends) {
                val record = new ProducerRecord[String, String]("user_friends2", userid + "," + friend)
                producer.send(record)
              }
            }
          })
        })
      })
    
    streamingContext.start()
    streamingContext.awaitTermination()
  }
}

3.运行结果

(三)案例实操2

1.需求:

清洗前:

event           ,                   yes               ,        maybe   ,              invited               ,no
1159822043,1975964455 3973364512,2733420590 ,1723091036 795873583,3575574655

清洗前后:

eventid        ,friendid        ,status
1159822043,1975964455,yes
1159822043,3973364512,yes
1159822043,2733420590,maybe
1159822043,1723091036,invited

1159822043,795873583,invited

1159822043,3575574655,no

2.代码实现:

import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
import org.apache.spark.streaming.{Seconds, StreamingContext}

import java.util

object SparkStreamEventAttToEvent2 {
  def main(args: Array[String]): Unit = {
    val conf: SparkConf = new SparkConf().setAppName("sparkufStream2").setMaster("local[2]")
    val streamingContext = new StreamingContext(conf, Seconds(5))
    streamingContext.checkpoint("checkpoint")

    val kafkaParams = Map( // TODO 连接生产者端的topic
      (ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "lxm147:9092"),
      (ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> "org.apache.kafka.common.serialization.StringDeserializer"),
      (ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> "org.apache.kafka.common.serialization.StringDeserializer"),
      (ConsumerConfig.GROUP_ID_CONFIG -> "sparkevent"),
      (ConsumerConfig.AUTO_OFFSET_RESET_CONFIG -> "earliest")
    )

    val kafkaStream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream(
      streamingContext,
      LocationStrategies.PreferConsistent,
      // 如果没有topic需要创建
      // kafka-topics.sh --create --zookeeper lxm147:2181 --topic event2 --partitions 1 --replication-factor 1
      ConsumerStrategies.Subscribe(Set("event_attendees_raw"), kafkaParams)
    )

    kafkaStream.foreachRDD(
      rdd => {
        rdd.foreachPartition(x => {
          val props = new util.HashMap[String, Object]() // TODO 连接消费者端的topic
          props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "lxm147:9092")
          props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")
          props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")
          val producer = new KafkaProducer[String, String](props)

          x.foreach(y => { // todo 遍历获取rdd某一个分区内的每一条消息
            val splits: Array[String] = y.value().split(",")
            val eventID: String = splits(0)

            if (eventID.trim.nonEmpty) {
              if (splits.length >= 2) {
                val yesarr: Array[String] = splits(1).split("\\s+")
                for (yesID <- yesarr) {
                  val yes = new ProducerRecord[String, String]("event2", eventID + "," + yesID + ",yes")
                  producer.send(yes)
                }
              }
              if (splits.length >= 3) {
                val maybearr: Array[String] = splits(2).split("\\s+")
                for (maybeID <- maybearr) {
                  val yes = new ProducerRecord[String, String]("event2", eventID + "," + maybeID + ",maybe")
                  producer.send(yes)
                }
              }
              if (splits.length >= 4) {
                val invitedarr: Array[String] = splits(3).split("\\s+")
                for (invitedID <- invitedarr) {
                  val invited = new ProducerRecord[String, String]("event2", eventID + "," + invitedID + ",invited")
                  producer.send(invited)
                }
              }
              if (splits.length >= 5) {
                val noarr: Array[String] = splits(4).split("\\s+")
                for (noID <- noarr) {
                  val no = new ProducerRecord[String, String]("event2", eventID + "," + noID + ",no")
                  producer.send(no)
                }
              }
            }
          })
        })
      })
    streamingContext.start()
    streamingContext.awaitTermination()
  }
}

3.运行结果

三、WindowOperations

1.WindowOperations 窗口概述

        Window Operations 可以设置窗口的大小和滑动窗口的间隔来动态的获取当前Steaming 的允许状态。所有基于窗口的操作都需要两个参数,分别为窗口时长以及滑动步长。

➢ 窗口时长:计算内容的时间范围;

➢ 滑动步长:隔多久触发一次计算。

注意:这两者都必须为采集周期大小的整数倍。

2.代码示例

import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}

object SparkWindowDemo1 {
  def main(args: Array[String]): Unit = {
    val conf: SparkConf = new SparkConf().setAppName("sparkwindow1").setMaster("local[*]")
    val streamingContext = new StreamingContext(conf, Seconds(3))
    streamingContext.checkpoint("checkpoint")

    val kafkaParams = Map( // TODO 连接生产者端的topic
      (ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "lxm147:9092"),
      (ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> "org.apache.kafka.common.serialization.StringDeserializer"),
      (ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> "org.apache.kafka.common.serialization.StringDeserializer"),
      (ConsumerConfig.GROUP_ID_CONFIG -> "sparkwindow"),
      (ConsumerConfig.AUTO_OFFSET_RESET_CONFIG -> "latest")
    )

    val kafkaStream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream(
      streamingContext,
      LocationStrategies.PreferConsistent,
      ConsumerStrategies.Subscribe(Set("sparkkafkastu"), kafkaParams)
    )
    val winStream: DStream[(String, Int)] = kafkaStream.flatMap(x => x.value().trim.split("\\s+"))
      .map((_, 1))
      .window(Seconds(9), Seconds(3))
    winStream.print()

    streamingContext.start()
    streamingContext.awaitTermination()
  }
}

注意:window的步长不进行设置,默认是采集周期

3.运行结果

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/461005.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

代码在洛谷上跑得慢怎么办?

前言 你有没有试过以下几种情况&#xff1a; 代码在别的OJ上能过&#xff0c;在洛谷上就T了你的代码和同学的几乎相同&#xff0c;但他的AC了&#xff0c;你的却TLE了 遇到这些情况&#xff0c;你可能要花上一个多小时才能解决&#xff0c;甚至难以解决&#xff0c;将问题一…

【springboot-04】ElasticSearch8.7搜索

为什么学&#xff1f;因为它查询速度很快&#xff0c;而且是非关系型数据库 &#xff08;NoSql&#xff09; 一些增删改查已经配置好了&#xff0c;无需重复敲码 ElasticSearch 更新快&#xff0c;本篇文章将主要介绍一些常用方法。 对于 spirngboot 整合 Es 的文章很少&#x…

看了这一篇文章,你还不懂MySQL体系结构,你来找我

前言 工作很长时间了&#xff0c;对于数据库的掌握程度却仅仅停留在表面的CRUD阶段&#xff0c;对于深层次的原理和技术知识了解的少之又少&#xff0c;随着岁数不断的增长。很多时候&#xff0c;出去找工作很迷茫&#xff0c;被面试官问的感觉自己很菜。现在利用工作休息时间&…

微信小程序第五节——登录那些事儿(超详细的前后端完整流程)

&#x1f4cc; 微信小程序第一节 ——自定义顶部、底部导航栏以及获取胶囊体位置信息。 &#x1f4cc; 微信小程序第二节 —— 自定义组件 &#x1f4cc; 微信小程序第三节 —— 页面跳转的那些事儿 &#x1f4cc; 微信小程序第四节—— 网络请求那些事儿 &#x1f61c;作 …

人工智能时代背景下,如何发展与应用自动化测试?

人工智能时代为自动化测试提供了机会和挑战。在发展自动化测试方面&#xff0c;是人工智能领域下的一个应用方向&#xff0c;和无人驾驶、机器人等一样&#xff0c;都是AI技术的应用场景。从技术的发展角度看&#xff0c;自动化测试一共经历了四代发展变化。从最早提出自动化测…

关于 变量

关于局部变量和静态变量&#xff08;基于有一定指针基础&#xff09; #include<stdio.h> void aaa() {int n10;} int main() {printf("%d",n);return 0; } 在这个代码里&#xff0c;很明显会报错&#xff0c;未定义该n标识符&#xff0c;因为这个n是局部变量…

在程序里面执行system(“cd /某个目录“),为什么路径切换不成功?

粉丝提问&#xff1a; 彭老师&#xff0c;问下&#xff0c;在程序里面执行system(“cd /某个目录”)&#xff0c;这样会切换不成功&#xff0c;为啥呢 实例代码&#xff1a; 粉丝的疑惑是明明第10行执行了cd /media操作&#xff0c; 为什么12行执行的pwd > test2.txt 结…

Unity InputField滑动条

InputField增加滑动条效果 类似图中效果 添加一个InputField组件 2 .添加一个Scrollbar放在InputField内 调整属性 调整InputFiled组件属性 需要将Scrollbal添加到InputField的scrollbar上 然后根据美术需求将位置进行调整&#xff0c;记得InputFiled下的Text不要被Scr…

【浓缩概率】浓缩概率思想帮我蒙选择题的概率大大提升!

今天在学习的时候遇到一个很有趣的思想叫作浓缩概率&#xff0c;可以帮我们快速解决一下概率悖论问题&#xff01; 什么是概率 计算概率有下面两个最简单的原则&#xff1a; 原则一、计算概率一定要有一个参照系&#xff0c;称作「样本空间」&#xff0c;即随机事件可能出现…

Docker容器--Consul部署

Docker容器--Consul部署 一、简介1、概述2、Consul两种模式 二、Consul特性1、特性2、应用场景 三、部署Consul集群&#xff08;Server端&#xff09;1、建立Consul服务2、设置代理&#xff0c;在后台启动consul服务端3、查看集群信息 四、Consul部署&#xff08;Client端&…

第4章:运算符

1.算术运算符 ① SELECT 10010,100-35.5,100*2,100/2,100%30 FROM DUAL;②在sql中“”没有连接作用&#xff0c;表示加法运算&#xff0c;字符串转换为数值&#xff08;隐式转换&#xff09;。非数值看作0处理 SELECT 1001 1 FROM DUAL;SELECT 100 a FROM DUAL;③加法运算…

Trie Tree(字典树)例题

字典树. 又称单词查找树&#xff0c; Trie树 &#xff0c;是一种 树形结构 &#xff0c;是一种哈希树的变种。经常被搜索引擎系统用于文本词频统计。. 它的优点是&#xff1a;利用字符串的公共前缀来减少查询时间&#xff0c;最大限度地减少无谓的字符串比较&#xff0c;查询效…

Python 使用chatGPT帮忙写一个有序集类 OrderedSet

需求:需要实现一个有序的集合&#xff0c;像python普通集合一样&#xff0c;除了 它是有序的 我这边穿插着使用了gpt3.5和gpt4,发现确实还是gpt4好用&#xff0c;一分钱一分货啊 问&#xff1a;我的要求是这样&#xff0c;data是一个集合&#xff0c;往里面放了2&#xff0c;…

【大厂面试问题】:飞机绕行地球问题

你的阅读是我最大的动力 目录 你的阅读是我最大的动力 问题描述&#xff1a; 引出思路&#xff1a; 一台加油飞机 两台加油飞机 返航方案一&#xff1a;加油机I、II同时起飞。 返航方案二&#xff1a;加油机I先起飞加油机II再起飞 答案 不直接说答案&#xff0c;一步一…

14个WooCommerce商城网站必备插件

开始建立 WooCommerce 网站&#xff1f;您需要一个具有许多有助于吸引和留住客户的有用功能的网站。虽然基本的 WooCommerce 设置非常方便&#xff0c;但您可以通过使用有用的插件扩展 WooCommerce 来做更多的事情。 有数百个插件需要考虑&#xff0c;我们已经完成了研究&…

[C++初阶]栈和队列_优先级队列的模拟实现 deque类 的理解

为了更好的理解优先级队列priority_queue&#xff0c;这里会同时进行栈和队列的提及 文章目录 简要概念&#xff08;栈和队列&#xff09;栈和队列的模拟实现与使用stack&#xff08;栈&#xff09;deque的理解和操作queue priority_queue&#xff08;优先级队列&#xff09;框…

悲观锁、乐观锁、自旋锁和读写锁

悲观锁和乐观锁 悲观锁&#xff1a;在每次取数据时&#xff0c;总是担心数据会被其他线程修改&#xff0c;所以会在取数据前先加锁&#xff08;读锁&#xff0c;写锁&#xff0c;行 锁等&#xff09;&#xff0c;当其他线程想要访问数据时&#xff0c;被阻塞挂起。&#xff08…

金融贷款行业如何高效获客,积累意向客户群体——运营商大数据

现如今贷款行业面对的运营压力日益扩大&#xff0c;顾客贮备是生存的关键&#xff0c;传统式的陌生拜访&#xff0c;一切随缘销售市场已不能满足其要求。互联网消费行为的融合与转变是在销售市场端反映&#xff0c;直接影响着广告推广广告策略的确立与运用。 可是&#xff0c;…

移除元素【数组】

⭐前言⭐ ※※※大家好&#xff01;我是同学〖森〗&#xff0c;一名计算机爱好者&#xff0c;今天让我们进入练习模式。若有错误&#xff0c;请多多指教。更多有趣的代码请移步Gitee &#x1f44d; 点赞 ⭐ 收藏 &#x1f4dd;留言 都是我创作的最大的动力&#xff01; 题目 27…

IPEmotion 2023 R1支持在线能量分析

新发布的IPEmotion 2023 R1提供了许多新功能&#xff0c;其中最重要的是新的“在线功率计算&#xff08;Online Power Calculation&#xff09;”功能。该功能允许使用预定义的功率计算来进行测量任务和数据分析。此外&#xff0c;IPEmotion 2023 R1现在支持一种新的存储模式&a…