Flink学习——DataStream API

news2024/11/28 1:13:37

        一个flink程序,其实就是对DataStream的各种转换。具体可以分成以下几个部分:

  • 获取执行环境(Execution Environment)
  • 读取数据源(Source)
  • 定义基于数据的转换操作(Transformations)
  • 定义计算结果的输出位置(Sink)
  • 触发程序执行(Execute)

一、执行环境(Execution Environment) 

        flink程序可以在各种上下文环境中运行:我们可以在本地JVM中执行程序,也可以提交到远程集群上运行。

        不同的环境代码的提交运行过程会有所不同。这就要求我们再提交作业执行计算时,必须获取当前flink的运行环境,从而建立起与flink框架之间的联系。只有获取了环境上下文信息,才能将具体的任务调度到不同的TaskManager执行。

1.1 创建执行环境

        执行环境是 StreamExecutionEnvironment 类的对象。创建执行环境的方式,就是调用这个类的静态方法。

1.1.1 getExecutionEnvironment

        getExecutionEnvironment 方法会根据当前运行的方式,自行决定该返回什么样的运行环境。如果程序是独立运行的,就返回一个本地执行环境;如果创建了jar包然后从命令行调用后提交到集群执行,那么久返回集群的执行环境。

val env = StreamExecutionEnvironment.getExecutionEnvironment

1.1.2 createLocalEnvironment

        返回一个本体执行环境。可以在调用时传入一个参数,指定默认的并行度。如果传入,默认就是本地的CPU核心数。

val localEnvironment = StreamExecutionEnvironment.getExecutionEnvironment()

1.1.3 createRemoteEnvironment

        返回集群执行环境。需要在调用时指定JobManager的主机名和端口号,并指定在集群中运行的jar包。

        获取执行环境后,还可以对执行环境进行灵活的设置。如可以全局设置程序的并行度、禁用算子链,还可以定义程序的时间语义、配置容错机制等。

val localEnvironment = StreamExecutionEnvironment
    .getExecutionEnvironment(
        "host",    // JobManager主机名
        1234,    // 进程端口号
        "path/to/jarFile.jar"    // 提交给JobManager的JAR包
    )    

1.2 执行模式(Execution Mode)

// 批处理环境
// 1.12.0版本起,可以通过“执行模式: execution mode”实现切换
val batchEnv = ExecutionEnvironment.getExecutionEnvironment

// 流处理环境
val env = StreamEnvironment.getExecutionEnvironment
  • 流执行模式(STREAMING)

        DataStream API经典模式,一般用于需要持续实时处理的无界数据流。默认情况下使用的就是流执行模式。

  • 批执行模式(BATCH)

        专门用于批处理的执行模式,这种模式下,flink处理作业的方式类似于MapReduce。对于不会持续计算的有界数据,这种模式处理会更方便。

配置方式:
1. 命令行配置
bin/flink run -Dexecution.runtime-mode=BATCH

2. 代码配置(不推荐)
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setRuntimeMode(RuntimeExecutionMode.BATCH)
  • 自动模式(AUTOMATIC)

        根据数据源是否有界,来自动选择执行模式 

        总结:用 BATCH 处理批数据,用 Streaming 处理流数据。

1.3 触发程序执行

        写完输出(sink)操作之后不代表程序已经结束。这是因为main()方法被调用时,只定义了作业的每个执行操作,然后添加到数据流图中,这时候并没有真正的处理数据。

        Flink是事件驱动的,只有等数据到来,才会触发真正的计算,是懒执行/延迟执行。所以我们需要显式的调用执行环境的execute()方法,来触发程序执行。execute()方法将一直等待作业完成,然后返回一个执行结果。

env.execute()

二、源算子(Source)

        flink可以从各种源获取数据,然后构建DataStream进行转换处理。数据的输入来源就是数据源,读取数据的算子就是源算子(Source)

2.1 准备工作

        我们可以定义一个样例类Event,字段如下:

字段名数据类型说明
idString用户id
timestampLong时间戳
temperatureDouble温度
//定义样例类  温度传感器
  case class SensorReading(id:String,timestamp:Long,temperature:Double)

2.2 从元素中读取数据 

从元素中读取数据
val stream1: DataStream[SensorReading] = env.fromElements(
    SensorReading("北京",1684201960L,23.5),
    SensorReading("南京",1684201960L,32.8)
)

2.3 从集合中读取数据

从集合中读取数据
val temp = List(
    SensorReading("北京",1684201960L,23.5),
    SensorReading("南京",1684201960L,32.8)
)
val stream2: DataStream[SensorReading] = env.fromCollection(temp)

2.4 从文件中读取数据

从文件中读取数据
val path = "F:\\Server\\flink\\resources\\sensor.txt"
val value: DataStream[String] = env.readTextFile(path)

2.5 从Socket读取数据

        socket并行度默认为1,且不够稳定,一般仅测试使用。

val parameterTool = ParameterTool.fromArgs(args)
val hostname = parameterTool .get("host")
val port = parameterTool .get("port")
val lineDataStream = env.socketTtextStream(hostname, port)

2.6 从Kafka读取数据

        kafka进行数据的收集与传输,flink进行分析与计算,这种架构目前已经称为很多企业的首选。但是Kafka与flink的连接比较复杂,flink内部没有提供预实现的方法,所以我们需要通过调用addSource()来传入一个 SourceFunction 的实现类。而同时,Flink官方提供了连接工具flink-connector-kafka 帮我们实现了一个消费者 FlinkKafkaConsumer ,用来读取Kafka数据的 SourceFunction。

1> 导入依赖

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>
    <version>${flink.version}</version>
</dependency>

2> 传入FlinkKafkaConsumer实例对象

object SourceTest {

  def main(args: Array[String]): Unit = {

//    1. 创建环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment

//    2. 用Properties保存Kafka连接的相关配置
    val properties = new Properties()
    properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.136.20:9092")
    properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG,"sensorgroup1")

//    3. 调用 env.addSource()
    val stream1 = env.addSource(new FlinkKafkaConsumer[String](
        "sensor",        // topic
        new SimpleStringSchema(),    // 当前值的反序列化器
        properties       // prop
    ))

//    4. 输出
    stream1.print()

//    5. 启动
    env.execute()
  }
}

2.7 读取自定义源算子

class MySensorSource() extends SourceFunction[SensorReading]{

// 标志位
var running = true

// run方法:不停循环,发送数据
  override def run(sourceContext: SourceFunction.SourceContext[SensorReading]): Unit = {
    // 1. 随机数生成器
    val random = new Random()
    // 2. 用标志位作为循环判断的条件,不断发送数据
    while (running){
      val i = random.nextInt()
    // 3. 调用sourceContext的方法向下游发送数据
      sourceContext.collect(SensorReading("生成:"+i,1,1))
    }
    Thread.sleep(500)
  }

// cancel方法:定义标志位,用于run中断的控制
  override def cancel(): Unit = {
    running = false
  }

}
val env = StreamExecutionEnvironment.getExecutionEnvironment

读取自定义的数据源
val stream1 = env.addSource(new MySensorSource)

stream1.print()

env.execute()

三、转换算子(Transformation)

        数据源读入数据之后,可以使用各种转换算子,将一个或多个DataStream转换为新的DataStream。可以对一条数据流进行转换操作,也可以进行分流、合流等多流转换操作,从而组合成复杂的数据流拓扑。

3.1 基本转换算子

1. 映射 map

2. 过滤 filter

3. 扁平映射 flatMap

3.2 聚合算子

1. 按键分区 keyBy

        DataStream是没有直接进行聚合的API的。所以如果需要聚合,需要先进行分区操作。

        keyBy()通过指定键key,将一条流从逻辑上划分成不同的分区partitions,也就是并行处理的子任务,对应着任务槽task slots。基于不同的key,流中的数据将被分配到不同的分区中去,下一步算子将会在同一个slot中进行处理。

 键选择器

1. 使用选择器
stream.keyBy( new MyKeySelector() )

// 键选择器KetSelector泛型:in-输入数据的数据类型, key-提取出来键的类型
class MyKeySelector extends KeySelector[SensorReading, String] {

    override def getKey(in: ):String = in.id
}

2. 使用lambda表达式
stream.keyBy( data => data.id )
或
stream.keyBy( _.id )

2. 简单聚合

        如sum()、min()、max()、minBy()。聚合方法在调用时,也需要传入参数。但不像基本转换算子那样需要实现自定义函数,只要说明聚合指定的字段就可以了。指定字段的方式有两种:指定位置,和指定名称。

        minBy()和min()类似,都是用于简单聚合的函数,求指定字段的最小值。min()只计算指定字段的最小值,其他字段回保留最初的第一个数据的值。而minBy()会返回包含字段最小值的整条数据。

stream.keyBy(_.user).max("timestamp").print()

3. 归约聚合 reduce

        与简单聚合类似,reduce()操作也会将KeyStream转换为DataStream。不会改变流的元素数据类型,所以输出类型和输入类型一致。

reduce归约聚合,提取当前最活跃用户
stream.map( data => (data.user, 1L) )
    .keyBy(_._1)
    .reduce( new MySum() )    // 统计每个用户的活跃度
    .keyBy( _ => true )    // 将所有数据按照同样的key分到同一个组中
    .reduce((state, data) => if(data._2 > state._2) data else state)    // 选取当前最活跃用户

class MySum() extends ReduceFunction[(String, Long)]{
    override def reduce(t: (String, Long), t1: (String, Long)): (String, Long) = 
        (t._1, t._2 + t1._2)
}

3.3 用户自定义函数UDF

        Flink 的 DataStream API 编程风格其实是一致的:基本上都是基于 DataStream 调用一个方

法,表示要做一个转换操作;方法需要传入一个参数,这个参数都是需要实现一个接口。
        这些接口有一个共同特点:全部都以算子操作名称 + Function 命名。例如源算子需要实现SourceFunction 接口,map 算子需要实现 MapFunction 接口,reduce()算子需要实现 ReduceFunction 接口。我们不仅可以通过自定义函数类或者匿名类来实现接口,也可以直接传入 Lambda 表达式。这就是所谓的用户自定义函数(user-defined function,UDF)。

1. 函数类

——实现一个自定义的函数类

// 通过传入自定义FilterFunction实现过滤
val stream = clicks.filter( new FlinkFilter )

// 自定义FilterFunction函数类
class FlinkFilter extends FilterFunction[Event]{
    override def filter(value: Event): Boolean = value.url.contains("home")
}

——使用匿名类

stream.filter( new FilterFunction[Event]{
    override def filter(t: Event): Boolean = t.url.contains("prod")
} )

 ——使用 lambda 表达式

stream.filter( _.url.contains("prod") )

2. 富函数类

        富函数类是DataStream API的一个函数类的接口,所有的flink函数类都有其rich版本。

        与常规函数类不同主要在于:富函数类可以获取运行环境的上下文,并拥有一些生命周期方法,可以实现更复杂的功能。几乎每一个算子都有对应的rich版本

        典型的生命周期有:

open()方法:

        Rich Function的初始化方法,开启一个算子的生命周期。当一个算子的实际工作方法被调用之前,会先调用open()方法

close()方法:

        生命周期的最后一个调用方法。

// 自定义一个RichMapFunction,测试富函数类的功能
stream.map( new MyRichMap() )

class MyRichMap() extends RichMapFunction[Evnet, Long]{

    override def open(parameters: Configuration): Unit = {
        println("索引号为:"+ getRuntimeContext.getIndefOfThisSubtask + "的任务开始")
    }

    override def map(in: Event): Long = {
        in.timestamp
    }

    override def close(): Unit = {
        println("索引号为:"+ getRuntimeContext.getIndefOfThisSubtask + "的任务结束")
    }
}

3.4 物理分区

1. 随机分区shuffle

        随即分区服从均匀分布,可以把流中的数据随机打乱,均匀地传递到下游任务分区。如下图所示。

val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)

val stream = env.addSource( new ClickSource )
stream.shuffle.setParallelism(4)

2. 轮询分区 Round-Robin

        按照先后顺序将数据依次分发。通过调用DataStream的rebalance()方法,实现轮询重分区。

val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)

val stream = env.addSource( new ClickSource )
stream.rebalance.setParallelism(4)

3. 重缩放分区 rescale

        重缩放分区与轮询分区类似,当调用rescale()方法时,底层也是使用轮询,但是只会讲数据轮询发送到下游并行任务的一部分中。

        如果理解成发牌,rebalance()是每个发牌人都面向所有人发牌;而rescale()是分成小团体,发牌人只给自己团体内的所有人轮流发牌。所以当下游任务数量是上有任务数量的整数倍时,rescale()的效率明显会更高。

 

object RescaleExample {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)

 // 使用匿名类的方式自定义数据源,这里使用了并行数据源函数的富函数版本
    env
     .addSource(new RichParallelSourceFunction[Int] {
      override def run(sourceContext: SourceFunction.SourceContext[Int]): Unit = {
        for (i <- 0 to 7) {
     // 将偶数发送到下游索引为 0 的并行子任务中去
     // 将奇数发送到下游索引为 1 的并行子任务中去
          if ((i + 1) % 2 == getRuntimeContext.getIndexOfThisSubtask) {
            sourceContext.collect(i + 1)
           }
        }
    }

  // 这里???是 Scala 中的占位符
    override def cancel(): Unit = ???
    })
    .setParallelism(2)
    .rescale
    .print()
    .setParallelism(4)

    env.execute()
  }
}

4. 广播 broadcast

        经过广播之后,数据会在不同的分区都保留一份,可能进行重复处理。通过调用DataStream的broadcast()方法,将输入数据复制并发送到下游算子的所有并行任务中去。

object BroadcastTest {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)

    // 读取数据源,并行度为1
    val stream = env.addSource(new ClientSource)

    // 经过广播后打印输出,并行度为4
    stream.broadcast.print("broadcast").setParallelism(4)

    env.execute()

  }
}

5. 全局分区 global

        通过.global()方法,将所有的输入流数据都发送到下游算子的第一个并行子任务中去。对程序压力很大,谨慎使用。

6. 自定义分区 Custom

        当flink提供的所有分区策略都不能满足用户需求时,可以通过使用partitionCustom()方法来自定义分区策略。

partitionCustom()方法传参:

- 自定义分区器(Partitioner)对象

- 应用分区器的字段

object TransCustomPartitioner {
  def main(args: Array[String]): Unit = {
    val env =  StreamExecutionEnvironment.getExecutionEnvironment
    env.fromElements(1,2,3,4,5,6,7,8)
       .partitionCustom(
            // 根据 key 的奇偶性计算出数据将被发送到哪个分区
          new Partitioner[Int] {
            override def partition(key: Int, numPartitions: Int) = key % 2
          },
            // 以自身作为key
          data => data
       ).print()

  env.execute()
  }
}

 

四、输出算子(Sink)

4.1 输出到外部系统

        与 source 类似,一般情况下sink算子的创建是通过调用 DataStream 的 addSink() 方法来实现的。

4.2 输出到文件

        flink有一些输出到文件的预实现方法,如writeAsText()、writeAsCsv()。但是对于大数据来说,这种方法过于简单,无法满足分布式的需求。StreamingFileSink支持行编码和批量编码,这两种不同的方式都有各自的构造器,可以直接调用StreamingFileSink的静态方法:

行编码:StreamingFileSink.forRowFormat( basePath, rowEncoder )

批量编码:StreamingFileSink.forBulkFormat( basePath, bulkWriterFactory )

stream.addSink( StreamFileSink.forRowFormat(
    new Path("F:\Server\flink\resources\out1.txt"),
    new SimpleStringEncoder[String]("UTF-8")
) )

4.3 输出到Kafka

        flink为Kafka提供了source和sink的连接器,我们可以用它方便地从Kafka读写数据。而且flink和Kafka的连接器提供了端到端的精确一次保证。

object SinkToKafka {
  def main(agrs: Array[String]): Unit = {

// 1. 配置环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)

// 2. 编辑Kafka环境
    val properties = new Properties()
    properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.136.20:9092")
    properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG,"sensorgroup1")

// 3. 读取文件
    val stream = env.readTextFile("input/clicks.csv")

// 4. 数据处理后写入到Kafka
    stream.addSink( new FlinkKafkaProducer[String](
        "clicks",
        new SimpleStringSchema(),
        properties
    ) )

// 5. 执行
    env.execute()
  }
    
}

4.4 输出到HBase

添加对应pom依赖

    <dependency>
      <groupId>org.apache.hbase</groupId>
      <artifactId>hbase-client</artifactId>
      <version>2.3.5</version>
    </dependency>
    <dependency>
      <groupId>org.apache.hbase</groupId>
      <artifactId>hbase-server</artifactId>
      <version>2.3.5</version>
    </dependency>

main函数中调用addSink方法

dataStream.addSink(new MyHbaseSink)

 实现MyHbaseSink方法

class MyHbaseSink extends RichSinkFunction[SensorReading] {
  var connection: Connection = _
  var mutator: BufferedMutator = _

  override def open(parameters: Configuration): Unit = {
    val configuration: conf.Configuration = HBaseConfiguration.create()
    configuration.set(HConstants.HBASE_DIR, "hdfs://192.168.78.20:9000/hbase")
    configuration.set(HConstants.ZOOKEEPER_QUORUM, "192.168.136.20")
    configuration.set(HConstants.CLIENT_PORT_STR, "2181")

    connection = ConnectionFactory.createConnection(configuration) 
    val params: BufferedMutatorParams = new BufferedMutatorParams(TableName.valueOf("ha:test"))
    params.writeBufferSize(10*1024*1024)
    params.setWriteBufferPeriodicFlushTimeoutMs(5*1000L)
    mutator = connection.getBufferedMutator(params)
    
  }

  override def close() = {
    connection.close()
  }


  override def invoke(value: SensorReading, context: SinkFunction.Context) = {
    val put = new Put(Bytes.toBytes(value.id + value.temperature + value.timestamp))
    put.addColumn("sensor".getBytes(), "id".getBytes(), value.id.getBytes())
    put.addColumn("sensor".getBytes(), "timestamp".getBytes(), value.timestamp.toString.getBytes())
    put.addColumn("sensor".getBytes(), "temperature".getBytes(), value.temperature.toString.getBytes())

    mutator.mutate(put)
    mutator.flush()
  }
    
}

4.5 输出到MySQL

添加依赖

    <dependency>
      <groupId>mysql</groupId>
      <artifactId>mysql-connector-java</artifactId>
      <version>8.0.29</version>
    </dependency>

 main函数中调用addSink方法

dataStream.addSink(new MyJdbcSink)

实现MyJdbcSink方法 

class MyJdbcSink extends RichSinkFunction[SensorReading]{
  var connection: Connection = _
  var insertState: PreparedStatement = _
  var updateState: PreparedStatement = _

  override def open(parameters: Configuration): Unit = {
    connection = DriverManager.getConnection("jdbc:mysql://192.168.136.20:3306/kb21?useSSL=false","root","root")
    insertState = connection.prepareStatement("insert into sensor_temp(id, temp) value(?,?)")
    updateState = connection.prepareStatement("update sensor_temp set temp=? where id=?")
  }

  override def invoke(value: SensorReading, context: SinkFunction.Context): Unit = {
    updateState.setDouble(1,value.temperature)
    updateState.setString(2, value.id)
    val i: Int = updateState.executeUpdate()
    if (i==0){
      insertState.setString(1,value.id)
      insertState.setDouble(2,value.temperature)
      insertState.execute()
    }
  }

  override def close(): Unit = {
    insertState.close()
    updateState.close()
    connection.close()
  }


}

4.6 自定义Sink

        与Source类似,Flink为我们提供了通用的SinkFunction接口和对应的RichSinkFunction抽象类。可以通过简单的调用DataStream的addSink()方法来自定义写入任何外部存储。比如hbase的连接。

        在实现SinkFunction的时候需要重写关键方法invoke(),在这个方法中我们可以实现将流里的数据发送出去的逻辑。创建连接以及关闭连接分别放在open()和close()方法中。这里不做赘述。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/537819.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

.net6 接收json数据 Controller http post

.net6 接收json数据 Controller http post 要添加这两个包 前端ajax请求 function tst() {$.ajax({type: "POST",contentType: "application/json",url: "/HelloWorld/Welcome",data: JSON.stringify({ "ID":1,"name": &…

Qt实现undo和redo功能--连续后退

刚开始想做这个的时候&#xff0c;我专门去找了Qt官方的测试例子&#xff0c;运行起来点了点&#xff0c;代码翻了翻。然后照猫画虎般的写了个测试例子。 不明白&#xff0c;为什么每个例子旁边会有个命令的显示列表&#xff0c;还巨丑的那种&#xff0c;这如果要放在别的程序…

DJ6-4 文件存储空间的管理

目录 6.4.1 空闲表 1、存储空间的分配与回收 2、空闲表法的优缺点 6.4.2 空闲链表 1、空闲盘块链 2、空闲盘区链 6.4.3 位示图 1、位示图的表示 2、存储空间的分配 3、存储空间的回收 4、位示图法的优缺点 6.4.4 成组链接 1、空闲盘块的组织 plus 个人理解图…

前端web入门-HTML-day01

(创作不易&#xff0c;感谢有你&#xff0c;你的支持&#xff0c;就是我前行的最大动力&#xff0c;如果看完对你有帮助&#xff0c;请留下您的足迹&#xff09; 目录 HTML初体验 HTML 定义 标签语法 总结&#xff1a; HTML 基本骨架 基础知识&#xff1a; 总结&#…

20年磨一剑,数慧时空推出智能遥感云平台DIEY及自然资源多模态大模型“长城”

5月17日&#xff0c;主题为“时空智能 从感知到决策”的第十二届全球地理信息开发者大会&#xff08;WGDC2023&#xff09;在北京昆泰酒店举行。大会聚集了千余位产业专家、行业用户、创新企业等业界精英&#xff0c;共话时空智能时代下的技术发展与应用创新。中国科学院院士、…

初始计算机操作系统——进程与线程,多线程以及Thread类的创建和属性

目录 通过前半篇文章需要了解 1.进程&#xff08;process/task&#xff09;&#xff1a;运行起来的可执行文件。 为啥要有进程&#xff1f; 如何解决这个问题&#xff1f; &#xff08;1&#xff09;进程池&#xff1a; &#xff08;2&#xff09;使用线程&#xff1a; 为啥线…

国考省考行测:判断推理,类比推理1,概念关系,包含关系,交叉关系,并列关系,全同关系

国考省考行测&#xff1a;判断推理&#xff0c;类比推理1&#xff0c;概念关系&#xff0c;包含关系&#xff0c;交叉关系&#xff0c;并列关系&#xff0c;全同关系 2022找工作是学历、能力和运气的超强结合体! 公务员特招重点就是专业技能&#xff0c;附带行测和申论&#x…

chatgpt赋能Python-libreoffice_python扩展

LibreOffice Python扩展: 提升办公效率的利器 如果你一直在寻找一种提高办公效率的方法&#xff0c;那么你肯定会喜欢LibreOffice Python扩展。作为LibreOffice的一个特性&#xff0c;它可以让你使用Python编写宏程序自动化你的日常办公任务。 什么是LibreOffice Python扩展&…

力扣sql中等篇练习(二十一)

力扣sql中等篇练习(二十一) 1 最大数量高于平均水平的订单 1.1 题目内容 1.1.1 基本题目信息 1.1.2 示例输入输出 a 示例输入 b 示例输出 1.2 示例sql语句 # Write your MySQL query statement below WITH t1 as (SELECT order_id,avg(quantity) AquantityFROM OrdersDeta…

chatgpt赋能Python-numpy数据预处理

Numpy数据预处理综述 介绍 Numpy是Python中最流行的数学库之一&#xff0c;可以用于高效的处理大型数据。Numpy提供了各种强大的数据结构和函数&#xff0c;使得数据分析和处理变得更加容易和直观。本文将介绍numpy中的一些数据预处理技术&#xff0c;包括数据清洗、缩放、归…

chatgpt赋能Python-mingw编译python

Mingw编译Python&#xff1a;一种常用的解决方案 在Python开发中&#xff0c;为了获得更好的性能&#xff0c;我们通常会选择编译Python源代码。而在Windows平台上&#xff0c; Mingw编译器是一种常用的解决方案。本文将介绍Mingw编译Python的过程&#xff0c;并探讨其优缺点。…

chatgpt赋能Python-numpy创建

Numpy&#xff1a;Python中的数学计算利器 作为Python中进行数学计算和科学计算最重要的库之一&#xff0c;Numpy已经成为了Python编程中的标配。Numpy以其出色的数组处理能力和矩阵运算效果&#xff0c;让Python用户的数学计算和科学计算变得更加简单高效。在本篇文章中&…

【半监督学习】Match系列.4

介绍几篇关于半监督学习的论文&#xff1a;CLS&#xff08;arXiv2022&#xff09;&#xff0c;Ada-CM&#xff08;CVPR2022&#xff09;&#xff0c;SemiMatch&#xff08;CVPR2022&#xff09;. CLS: Cross Labeling Supervision for Semi-Supervised Learning, arXiv2022 解…

mysql增量备份

目录 一、修改配置文件&#xff0c;开启增量备份功能 &#xff08;1&#xff09;查看是否已经开启了 &#xff08;2&#xff09;修改配置文件开启 &#xff08;3&#xff09;增量记录文件 二、还原增量备份 &#xff08;1&#xff09;修改了数据 &#xff08;2&#xff…

使用thrift进行RPC通信(附c程序示例)

前言 为了实现不同语言的程序跨进程、跨主机通信&#xff0c;一般可以采用mq或rpc框架来实现。 对于异步通知的场景可以使用mq&#xff0c;如zeroMQ。 但对于某些实时性较强且同步的应用场景&#xff0c;使用成熟的rpc框架来实现也是一种比较更好的选择。 开源的rpc框架有很…

MySQL---游标,异常处理,循环构建表

1. 游标 游标(cursor)是用来存储查询结果集的数据类型 , 在存储过程和函数中可以使用光标对结果集进行 循环的处理。光标的使用包括光标的声明、OPEN、FETCH 和 CLOSE. -- 声明语法 declare cursor_name cursor for select_statement -- 打开语法 open cursor_name -- 取值语…

由浅入深Netty基础知识NIO三大组件原理实战

目录 1 三大组件1.1 Channel & Buffer1.2 Selector1.3 多线程版设计1.4 多线程版缺点1.5 线程池版设计1.6 线程池版缺点1.7 selector 版设计 2 ByteBuffer2.1 ByteBuffer 正确使用姿势2.2 ByteBuffer 结构2.3 调试工具类2.4 ByteBuffer 常见方法2.4.1 分配空间2.4.2 向 buf…

chatgpt赋能Python-numpy查找

Numpy查找 - 了解numpy中的查找功能 什么是Numpy&#xff1f; Numpy是Python语言中的一种开源的数学计算库&#xff0c;允许开发者轻松高效地进行数学运算。它提供了一整套矩阵运算方式&#xff0c;支持各种各样的数学函数和数据类型&#xff0c;并且可以与其他Python库良好地…

chatgpt赋能Python-macbook怎么用python

使用MacBook进行Python编程的完全指南 如果您是一名Python编程工程师&#xff0c;那么您需要一台性能良好的电脑来进行编程工作。今天&#xff0c;我们将探讨如何使用MacBook来编写Python代码&#xff0c;以及如何使您的Mac运行最佳状态。 安装Python 在开始使用Python之前&…

还在老一套?STM32使用新KEIL5的IDE,全新开发模式RTE介绍及使用

Keil新版本出来了&#xff0c;推出了一种全新开发模式RTE框架( Run-Time Environment)&#xff0c;更好用了。然而网上的教程资料竟还都是把Keil5当成Keil4来用&#xff0c;直接不使用这个功能。当前正点原子或野火的教程提供的例程虽有提到Keil5&#xff0c;但也是基本上当Kei…