Flink(七)【输出算子(Sink)】

news2024/12/22 18:45:51

前言

        今天是我写博客的第 200 篇,恍惚间两年过去了,现在已经是大三的学长了。仍然记得两年前第一次写博客的时候,当时学的应该是 Java 语言,菜的一批,写了就删,怕被人看到丢脸。当时就想着自己一年之后,两年之后能学到什么水平,什么是 JDBC、什么是 MVC、SSM,在当时都是特别好奇的东西,不过都在后来的学习中慢慢接触到,并且好多已经烂熟于心了。

        那,今天我在畅想一下,一年后的今天,我又学到了什么水平?能否达到三花聚顶、草木山石皆可为码的超凡入圣的境界?拿没拿到心仪的 offer?和那个心动过的女孩相处怎么样了?哈哈哈哈哈


输出算子(Sink)

学完了 Flink 在不同执行环境(本地测试环境和集群环境)下的多种读取(多种数据源)和转换操作(多种转换算子),最后就是输出操作了。

1、连接到外部系统

Flink 1.12 之前,Sink 算子是通过调用 DataStream 的 addSink 方法来实现的:

stream.addSink(new SinkFunction(...));

从 Flink 1.12 开始,Flink 重构了 Sink 架构:

stream.sinkTo(...)

查看 Flink 支持的连接器

需要我们自己导入依赖,比如上面的 Kfaka 和 DataGen 我们之前使用的时候都导入过相关依赖,需要知道,有的是只支持source,有的只支持sink,有的全都支持。

 <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-datagen</artifactId>
            <version>${flink.version}</version>
        </dependency>

2、输出到文件

        Flink 专门提供了一个流式文件系统的连接器:FileSink,为批处理和流处理提供了一个统一的 Sink,它可以将分区文件写入 Flink支持的文件系统。
        它的主要操作是将数据写入桶(buckets),每个桶中的数据都可以分割成一个个大小有限的分区文件,这样一来就实现真正意义上的分布式文件存储。我们可以通过各种配置来控制“分桶”的操作;默认的分桶方式是基于时间的,我们每小时写入一个新的桶。换句话说,每个桶内保存的文件,记录的都是 1 小时的输出数据。
        FileSink 支持行编码(Row-encoded)和批量编码(Bulk-encoded,比如 Parquet)格式。这两种不同的方式都有各自的构建器(builder),调用方法也非常简单,可以直接调用 FileSink 的静态方法:

  • 行编码:FileSink.forRowFormat(basePath,rowEncoder)。
  • 批量编码:FileSink.forBulkFormat(basePath,bulkWriterFactory)。

在创建行或批量编码 Sink 时,我们需要传入两个参数,用来指定存储桶的基本路径(basePath)和数据的编码逻辑(rowEncoder 或 bulkWriterFactory)。

package com.lyh.sink;

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.serialization.SimpleStringEncoder;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.connector.source.util.ratelimit.RateLimiterStrategy;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.connector.datagen.source.DataGeneratorSource;
import org.apache.flink.connector.datagen.source.GeneratorFunction;
import org.apache.flink.connector.file.sink.FileSink;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig;
import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.DateTimeBucketAssigner;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;

import java.time.Duration;
import java.time.ZoneId;

/**
 * @author 刘xx
 * @version 1.0
 * @date 2023-11-18 9:51
 */
public class SinkFile {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(2);
        // 必须开启 检查点 不然一直都是 .inprogress
        env.enableCheckpointing(2000, CheckpointingMode.EXACTLY_ONCE);

        DataGeneratorSource<String> dataGeneratorSource = new DataGeneratorSource<String>(
                new GeneratorFunction<Long, String>() {
            @Override
            public String map(Long value) throws Exception {
                return "Number:"+value;
            }
        },
            Long.MAX_VALUE,
            RateLimiterStrategy.perSecond(10), // 每s 10条
            Types.STRING
        );

        DataStreamSource<String> dataGen = env.fromSource(dataGeneratorSource, WatermarkStrategy.noWatermarks(), "data-generate");

        // todo 输出到文件系统
        FileSink<String> fileSink = FileSink.
                // 泛型方法 需要和输出结果的泛型保持一致
                <String>forRowFormat(
                new Path("D:/Desktop"),    // 指定输出路径 可以是 hdfs:// 路径
                new SimpleStringEncoder<>("UTF-8")) // 指定编码
                .withOutputFileConfig(OutputFileConfig.builder()
                        .withPartPrefix("lyh")
                        .withPartSuffix(".log")
                        .build())
                // 按照目录分桶 一个小时一个目录(这里的时间格式别改为分钟 会报错: flink Relative path in absolute URI:)
                .withBucketAssigner(new DateTimeBucketAssigner<>("yyyy-MM-dd HH", ZoneId.systemDefault()))
                // 设置文件滚动策略-时间或者大小 10s 或 1KB 或 5min内没有新数据写入 滚动一次
                // 滚动的时候 文件就会更名为我们设定的格式(前缀)不再写入
                .withRollingPolicy(
                        DefaultRollingPolicy.builder()
                                .withRolloverInterval(Duration.ofSeconds(10L))  // 10s
                                .withMaxPartSize(new MemorySize(1024)) // 1KB
                                .withInactivityInterval(Duration.ofMinutes(5))  // 5min
                                .build()
                )
                .build();

        dataGen.sinkTo(fileSink);

        env.execute();
    }
}

这里我们创建了一个简单的文件 Sink,通过.withRollingPolicy()方法指定了一个“滚动策略”。“滚动”的概念在日志文件的写入中经常遇到:因为文件会有内容持续不断地写入,所以我们应该给一个标准,到什么时候就开启新的文件,将之前的内容归档保存。也就是说,上面的代码设置了在以下 3 种情况下,我们就会滚动分区文件:
⚫ 至少包含 10 秒的数据
⚫ 最近 5 分钟没有收到新的数据
⚫ 文件大小已达到 1 KB

通过 withOutputFileConfig()方法指定了输出的文件名前缀和后缀。

需要特别注意的就是一定要开启检查点,否则我们的数据一直都是正在写入的状态(具体原因后面学习到检查点的时候会详细说)。

运行结果:

3、输出到 Kafka

  1. 需要添加 Kafka 依赖(之前导入过了)
  2. 启动 Kafka
  3. 编写示例代码
package com.lyh.sink;

import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.connector.base.DeliveryGuarantee;
import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
import org.apache.flink.connector.kafka.sink.KafkaSink;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.kafka.clients.producer.ProducerConfig;

/**
 * @author 刘xx
 * @version 1.0
 * @date 2023-11-18 11:20
 */
public class SinkKafka {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        // 如果是 精准一次 必须开启 checkpoint
        env.enableCheckpointing(2000, CheckpointingMode.EXACTLY_ONCE);

        SingleOutputStreamOperator<String> sensorDS = env.socketTextStream("localhost", 9999);

        KafkaSink<String> kafkaSink = KafkaSink.<String>builder()
                // 指定 kafka 的地址和端口
                .setBootstrapServers("hadoop102:9092,hadoop103:9092,hadoop104:9092")
                // 指定序列化器 我们是发送方 所以我们是生产者
                .setRecordSerializer(
                        KafkaRecordSerializationSchema.<String>builder()
                                .setTopic("like")
                                .setValueSerializationSchema(new SimpleStringSchema())
                                .build()
                )
                // 写到 kafka 的一致性级别: 精准一次 / 至少一次
                // 如果是精准一次
                //  1.必须开启检查点 env.enableCheckpointing(2000, CheckpointingMode.EXACTLY_ONCE)
                //  2.必须设置事务的前缀
                //  3.必须设置事务的超时时间: 大于 checkpoint间隔 小于 max 15分钟
                .setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
                .setTransactionalIdPrefix("lyh-")
                .setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG,10*60*1000+"")
                .build();

        sensorDS.sinkTo(kafkaSink);

        env.execute();
    }
}

启动 kafka 并开启一个消费者:

kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic like

运行结果:

需要特别注意的三点:

如果是精准一次
 1.必须开启检查点 env.enableCheckpointing(2000, CheckpointingMode.EXACTLY_ONCE)
 2.必须设置事务的前缀
 3.必须设置事务的超时时间: 大于 checkpoint间隔 小于 max 15分钟

自定义序列化器

我们上面用的自带的序列化器,但是如果我们有 key 的话,就需要自定义序列化器了,替换上面的代码:

.setRecordSerializer(
        /**
         * 如果要指定写入 kafka 的key 就需要自定义序列化器
         * 实现一个接口 重写序列化方法
         * 指定key 转为 bytes[]
         * 指定value 转为 bytes[]
         * 返回一个 ProducerRecord(topic名,key,value)对象
         */
        new KafkaRecordSerializationSchema<String>() {
            @Nullable
            @Override
            // ProducerRecord<byte[], byte[]> 返回一个生产者消息,key,value 分别对应两个字节数组
            public ProducerRecord<byte[], byte[]> serialize(String element, KafkaSinkContext context, Long timestamp) {
                String[] datas = element.split(",");
                byte[] key = datas[0].getBytes(StandardCharsets.UTF_8);
                byte[] value = element.getBytes(StandardCharsets.UTF_8);
                return new ProducerRecord<>("like",key,value);
            }
        }
)

运行结果: 

4、输出到 MySQL

添加依赖(1.17版本的依赖需要指定仓库才能找到,因为阿里云和默认的maven仓库是没有的):

<dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-jdbc</artifactId>
            <version>1.17-SNAPSHOT</version>
        </dependency>
<dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>8.0.31</version>
        </dependency>

....

    <repositories>
        <repository>
            <id>apache-snapshots</id>
            <name>apache snapshots</name>
            <url>https://repository.apache.org/content/repositories/snapshots/</url>
        </repository>
    </repositories>

创建表格 

编写代码,将输入的数据行分隔为对象参数,每行数据生成一个对象进行处理。 

package com.lyh.sink;

import com.lyh.bean.WaterSensor;
import function.WaterSensorFunction;
import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
import org.apache.flink.connector.jdbc.JdbcSink;
import org.apache.flink.connector.jdbc.JdbcStatementBuilder;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;

import java.sql.PreparedStatement;
import java.sql.SQLException;

/**
 * @author 刘xx
 * @version 1.0
 * @date 2023-11-18 12:32
 */
public class SinkMySQL {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        SingleOutputStreamOperator<WaterSensor> sensorDS = env.socketTextStream("localhost", 9999)
                .map(new WaterSensorFunction()); //输入进来的数据自动转为 WaterSensor类型


        /**
         * todo 写入 mysql
         * 1.这里需要用旧的sink写法:addSink
         * 2.JDBC的4个参数
         *   (1) 执行的sql语句
         *   (2) 对占位符进行填充
         *   (3) 执行选项  -> 攒批,重试
         *   (4) 连接选项 -> driver,username,password,url
         */
        SinkFunction<WaterSensor> jdbcSink = JdbcSink.sink("insert into flink.ws values(?,?,?)",
                // 指定 sql 中占位符的值
                new JdbcStatementBuilder<WaterSensor>() {
                    @Override
                    public void accept(PreparedStatement stmt, WaterSensor sensor) throws SQLException {
                        // 占位符从 1 开始
                        stmt.setString(1, sensor.getId());
                        stmt.setLong(2, sensor.getTs());
                        stmt.setInt(3, sensor.getVc());
                    }
                }, JdbcExecutionOptions.builder()
                        .withMaxRetries(3)  //最多重试3次(不包括第一次,共4次)
                        .withBatchSize(100) //每收集100条记录进行一次写入
                        .withBatchIntervalMs(3000)  // 批次3s(即使没有达到100条记录,只要过了3s JDBCSink也会进行记录的写入),这有助于确保数据及时写入,而不是无限期地等待批处理大小达到。
                        .build()
                , new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
                        .withUrl("jdbc:mysql://localhost:3306/flink?serverTimezone=Asia/Shanghai&useUnicode=true&characterEncoding=UTF-8")
                        .withDriverName("com.mysql.cj.jdbc.Driver")
                        .withUsername("root")
                        .withPassword("Yan1029.")
                        // mysql 默认8小时不使用连接就主动断开连接
                        .withConnectionCheckTimeoutSeconds(60) // 重试连接直接的间隔,上面我们设置最多重试3次,每次间隔60s
                        .build()
        );

        sensorDS.addSink(jdbcSink);

        env.execute();
    }
}

 查询结果:

5、自定义 Sink 输出

与 Source 类似,Flink 为我们提供了通用的 SinkFunction 接口和对应的 RichSinkDunction抽象类,只要实现它,通过简单地调用 DataStream 的.addSink()方法就可以自定义写入任何外部存储。

这里我们自定义实现一个向 HBase 中插入数据的 Sink。

注意:这里只是做一个简单的 Demo,下面的代码不难发现,我们只是对 nosq:student 表下的 info:name 进行了两次的覆盖。如果要实现复杂的处理功能,需要对数据类型进行定义,因为 HBase 的数据是按列存储的,所以对于复杂的 Hbase 表,我们难以通过 Java bean 来插入数据。而且,一般经常用的连接器,Flink 大部分已经提供了,开发中我们一般也很少自定义 Sink 输出。

package com.lyh.sink;

import com.lyh.utils.HBaseConnection;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Table;

import java.nio.charset.StandardCharsets;

/**
 * @author 刘xx
 * @version 1.0
 * @date 2023-11-18 15:59
 */
public class SinkCustomHBase {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        env.fromElements("tom","bob").addSink(new RichSinkFunction<String>() {
            public Connection con;
            @Override
            public void open(Configuration parameters) throws Exception {
                super.open(parameters);
                con = HBaseConnection.getConnection("hadoop102:2181");
            }

            @Override
            public void invoke(String value, Context context) throws Exception {
                super.invoke(value, context);
                Table table = con.getTable(TableName.valueOf("nosql","student"));
                Put put = new Put("1001".getBytes(StandardCharsets.UTF_8));
                put.addColumn("info".getBytes(StandardCharsets.UTF_8)
                ,"name".getBytes(StandardCharsets.UTF_8),
                        value.getBytes(StandardCharsets.UTF_8));
                table.put(put);
                table.close();
            }

            @Override
            public void close() throws Exception {
                super.close();
                HBaseConnection.close();
            }

        });

        env.execute();
    }
}

这里用到一个简单的连接 HBase 的工具类:
 

package com.lyh.utils;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;

import java.io.IOException;

/**
 * @author 刘xx
 * @version 1.0
 * @date 2023-11-18 16:04
 */
public class HBaseConnection {

    private static Connection connection;

    public static Connection getConnection(String hosts) throws IOException {
        Configuration conf = new Configuration();
        conf.set("hbase.zookeeper.quorum", hosts);
        conf.setInt("hbase.rpc.timeout", 10000); // 设置最大超时 10 s
        connection = ConnectionFactory.createConnection(conf);
        return connection;
    }

    public static void close() throws IOException {
        if (connection!=null)
            connection.close();
    }
}

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1223820.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

客户端性能优化实践

背景 双十一大促时&#xff0c;客户客服那边反馈商品信息加载卡顿&#xff0c;在不断有订单咨询时&#xff0c;甚至出现了商品信息一直处于加载状态的情况&#xff0c;显然&#xff0c;在这种高峰期接待客户时&#xff0c;是没法进行正常的接待工作的。 起初&#xff0c;页面一…

4、FFmpeg命令行操作8

生成测试文件 找三个不同的视频每个视频截取10秒内容 ffmpeg -i 沙海02.mp4 -ss 00:05:00 -t 10 -codec copy 1.mp4 ffmpeg -i 复仇者联盟3.mp4 -ss 00:05:00 -t 10 -codec copy 2.mp4 ffmpeg -i 红海行动.mp4 -ss 00:05:00 -t 10 -codec copy 3.mp4 如果音视…

记录一些涉及到界的题

文章目录 coppersmith的一些相关知识题1 [N1CTF 2023] e2Wrmup题2 [ACTF 2023] midRSA题3 [qsnctf 2023]浅记一下 coppersmith的一些相关知识 上界 X c e i l ( 1 2 ∗ N β 2 d − ϵ ) X ceil(\frac{1}{2} * N^{\frac{\beta^2}{d} - \epsilon}) Xceil(21​∗Ndβ2​−ϵ) …

Os-ByteSec

Os-ByteSec 一、主机发现和端口扫描 主机发现&#xff0c;靶机地址192.168.80.144 端口扫描&#xff0c;开放了80、139、445、2525端口 二、信息收集 访问80端口 路径扫描 dirsearch -u "http://192.168.80.144/" -e *访问扫描出来的路径&#xff0c;没有发现…

【有源码】基于asp.net的旅游度假村管理系统C#度假村美食住宿一体化平台源码调试 开题 lw ppt

&#x1f495;&#x1f495;作者&#xff1a;计算机源码社 &#x1f495;&#x1f495;个人简介&#xff1a;本人七年开发经验&#xff0c;擅长Java、Python、PHP、.NET、微信小程序、爬虫、大数据等&#xff0c;大家有这一块的问题可以一起交流&#xff01; &#x1f495;&…

聊聊氮化硅(SiNx)在芯片中的重要性

在芯片制造中&#xff0c;有一种材料扮演着至关重要的角色&#xff0c;那就是氮化硅&#xff08;SiNx&#xff09;。尽管它可能并未获得和其他更为熟知的半导体材料&#xff0c;如硅&#xff08;Si&#xff09;、砷化镓&#xff08;GaAs&#xff09;或氮化镓&#xff08;GaN&am…

链式队列的基本操作与实现(数据结构与算法)

链队列的表示与实现如下图&#xff1a; 代码如下&#xff1a; #include<iostream> using namespace std;#define MAXQSIZE 100 //最大队列长度 typedef int QElemType; //typedef struct Qnode {QElemType data;struct Qnode* next; }QNode, *QueuePtr; //队列结点类型…

KVM Cloud云平台

项目介绍 KVM Cloud 是一款基于Java实现的轻量级私有云平台&#xff0c;旨在帮助中小企业快速实现计算、存储、网络等资源的管理&#xff0c;让企业拥有自己的云平台&#xff0c;包括但不限于如下功能: 1、基于KVM的VM基础功能(创建、启动、停止、重装、webVNC等功能) 2、使用…

【Proteus仿真】【STM32单片机】防火防盗GSM智能家居设计

文章目录 一、功能简介二、软件设计三、实验现象联系作者 一、功能简介 本项目使用Proteus8仿真STM32单片机控制器&#xff0c;使用声光报警模块、LCD1602显示模块、DS18B20温度、烟雾传感器模块、按键模块、PCF8591 ADC模块、红外检测模块等。 主要功能&#xff1a; 系统运行…

Web之CSS笔记

Web之HTML、CSS、JS 二、CSS&#xff08;Cascading Style Sheets层叠样式表&#xff09;CSS与HTML的结合方式CSS选择器CSS基本属性CSS伪类DIVCSS轮廓CSS边框盒子模型CSS定位 Web之HTML笔记 二、CSS&#xff08;Cascading Style Sheets层叠样式表&#xff09; Css是种格式化网…

3D建模基础教程:可编辑多边形建模的基础认识

可编辑多边形建模是3D建模中的一种常见方法&#xff0c;它允许用户对模型进行细致的调整和编辑。以下是对可编辑多边形建模的详细介绍&#xff1a; 1、层级概念&#xff1a;在可编辑多边形建模中&#xff0c;有五个层级&#xff0c;分别是点层级、边层级、边界层级、面层级和元…

2023年亚太杯数学建模思路 - 案例:异常检测

文章目录 赛题思路一、简介 -- 关于异常检测异常检测监督学习 二、异常检测算法2. 箱线图分析3. 基于距离/密度4. 基于划分思想 建模资料 赛题思路 &#xff08;赛题出来以后第一时间在CSDN分享&#xff09; https://blog.csdn.net/dc_sinor?typeblog 一、简介 – 关于异常…

[内存泄漏][PyTorch](create_graph=True)

PyTorch保存计算图导致内存泄漏 1. 内存泄漏定义2. 问题发现背景3. pytorch中关于这个问题的讨论 1. 内存泄漏定义 内存泄漏&#xff08;Memory Leak&#xff09;是指程序中已动态分配的堆内存由于某种原因程序未释放或无法释放&#xff0c;造成系统内存的浪费&#xff0c;导致…

蓝桥杯每日一题2023.11.18

题目描述 蓝桥杯大赛历届真题 - C 语言 B 组 - 蓝桥云课 (lanqiao.cn) 题目分析 本题使用搜索&#xff0c;将每一个格子进行初始赋值方便确定是否为相邻的数&#xff0c;将空出的两个格子首先当作已经填好数值为100&#xff0c;此时从第一个格子右边的格子开始搜索&#xff…

vscode编写verilog的插件【对齐、自动生成testbench文件】

vscode编写verilog的插件&#xff1a; 插件名称&#xff1a;verilog_testbench,用于自动生成激励文件 安装教程&#xff1a;基于VS Code的Testbench文件自动生成方法——基于VS Code的Verilog编写环境搭建SP_哔哩哔哩_bilibili 优化的方法&#xff1a;https://blog.csdn.net…

ROSCon 2023 大会回顾

系列文章目录 文章目录 系列文章目录前言一、会议内容二、其他活动 前言 我们与 ROSCon 2023 全体 700 多名与会者的合影。 视频回放链接 一、会议内容 ROSCon 2023 是我们第十二届年度 ROS 开发者大会&#xff0c;于 2023 年 10 月 18 日至 20 日在路易斯安那州新奥尔良举行。…

字符串函数详解

一.字母大小写转换函数. 1.1.tolower 结合cppreference.com 有以下结论&#xff1a; 1.头文件为#include <ctype.h> 2.使用规则为 #include <stdio.h> #include <ctype.h> int main() {char ch A;printf("%c\n",tolower(ch));//大写转换为小…

ThinkPHP 系列漏洞

目录 2、thinkphp5 sql注入2 3、thinkphp5 sql注入3 4、 thinkphp5 SQL注入4 5、 thinkphp5 sql注入5 6、 thinkphp5 sql注入6 7、thinkphp5 文件包含漏洞 8、ThinkPHP5 RCE 1 9、ThinkPHP5 RCE 2 10、ThinkPHP5 rce3 11、ThinkPHP 5.0.X 反序列化漏洞 12、ThinkPHP…

anaconda安装依赖报错ERROR: Cannot unpack file C:\Users\33659\AppData\Loca...|问题记录

执行命令&#xff1a; # 安装matplotlib依赖 pip install matplotlib-i http://mirrors.aliyun.com/pypi/simple/ --trusted-host mirrors.aliyun.com出现问题&#xff1a; ERROR: Cannot unpack file C:\Users\33659\AppData\Local\Temp\pip-unpack-0au_blfq\simple (downloa…