Flink - sink算子

news2024/12/24 2:55:12

水善利万物而不争,处众人之所恶,故几于道💦

文章目录

  1. Kafka_Sink
  2. Kafka_Sink - 自定义序列化器
  3. Redis_Sink_String
  4. Redis_Sink_list
  5. Redis_Sink_set
  6. Redis_Sink_hash
  7. 有界流数据写入到ES
  8. 无界流数据写入到ES
  9. 自定义sink - mysql_Sink
  10. Jdbc_Sink

官方文档 - Flink1.13

在这里插入图片描述


1. Kafka_Sink

addSink(new FlinkKafkaProducer< String>(kafka_address,topic,序列化器)

要先添加依赖:

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-kafka_2.12</artifactId>
    <version>1.13.6</version>
</dependency>
public static void main(String[] args) {
    Configuration conf = new Configuration();
    conf.setInteger("rest.port",1000);
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);
    env.setParallelism(1);

    ArrayList<WaterSensor> waterSensors = new ArrayList<>();
    waterSensors.add(new WaterSensor("sensor_1", 1607527992000L, 20));
    waterSensors.add(new WaterSensor("sensor_1", 1607527994000L, 50));
    waterSensors.add(new WaterSensor("sensor_1", 1607527996000L, 50));
    waterSensors.add(new WaterSensor("sensor_2", 1607527993000L, 10));
    waterSensors.add(new WaterSensor("sensor_2", 1607527995000L, 30));

    DataStreamSource<WaterSensor> stream = env.fromCollection(waterSensors);

    stream
            .keyBy(WaterSensor::getId)
            .sum("vc")
            .map(JSON::toJSONString)
            .addSink(new FlinkKafkaProducer<String>(
                    "hadoop101:9092",  // kafaka地址
                    "flink_sink_kafka",  //要写入的Kafkatopic
                    new SimpleStringSchema()  // 序列化器
            ));

    try {
        env.execute();
    } catch (Exception e) {
        e.printStackTrace();
    }
}

运行结果:
在这里插入图片描述

2. Kafka_Sink - 自定义序列化器

  自定义序列化器,new FlinkKafkaProducer()的时候,选择四个参数的构造方法,然后使用new KafkaSerializationSchema序列化器。然后重写serialize方法

public static void main(String[] args) {
    Configuration conf = new Configuration();
    conf.setInteger("rest.port",1000);
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);
    env.setParallelism(1);

    ArrayList<WaterSensor> waterSensors = new ArrayList<>();
    waterSensors.add(new WaterSensor("sensor_1", 1607527992000L, 20));
    waterSensors.add(new WaterSensor("sensor_1", 1607527994000L, 50));
    waterSensors.add(new WaterSensor("sensor_1", 1607527996000L, 50));
    waterSensors.add(new WaterSensor("sensor_2", 1607527993000L, 10));
    waterSensors.add(new WaterSensor("sensor_2", 1607527995000L, 30));

    DataStreamSource<WaterSensor> stream = env.fromCollection(waterSensors);

    Properties sinkConfig = new Properties();
    sinkConfig.setProperty("bootstrap.servers","hadoop101:9092");
    stream
            .keyBy(WaterSensor::getId)
            .sum("vc")
            .addSink(new FlinkKafkaProducer<WaterSensor>(
                    "defaultTopic",  // 默认发往的topic ,一般用不上
                    new KafkaSerializationSchema<WaterSensor>() {  // 自定义的序列化器
                        @Override
                        public ProducerRecord<byte[], byte[]> serialize(
                                WaterSensor waterSensor,
                                @Nullable Long aLong
                        ) {
                            String s = JSON.toJSONString(waterSensor);
                            return new ProducerRecord<>("flink_sink_kafka",s.getBytes(StandardCharsets.UTF_8));
                        }
                    },
                    sinkConfig,  // Kafka的配置
                    FlinkKafkaProducer.Semantic.AT_LEAST_ONCE  // 一致性语义:现在只能传入至少一次
            ));

    try {
        env.execute();
    } catch (Exception e) {
        e.printStackTrace();
    }
}

运行结果:
在这里插入图片描述

3. Redis_Sink_String

addSink(new RedisSink<>(config, new RedisMapper< WaterSensor>() {}

写到String结构里面

添加依赖:

<dependency>
    <groupId>com.alibaba</groupId>
    <artifactId>fastjson</artifactId>
    <version>1.2.83</version>
</dependency>

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-redis_2.11</artifactId>
    <version>1.1.5</version>
</dependency>
public static void main(String[] args) {
    Configuration conf = new Configuration();
    conf.setInteger("rest.port",1000);
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);
    env.setParallelism(1);

    ArrayList<WaterSensor> waterSensors = new ArrayList<>();
    waterSensors.add(new WaterSensor("sensor_1", 1607527992000L, 20));
    waterSensors.add(new WaterSensor("sensor_1", 1607527994000L, 50));
    waterSensors.add(new WaterSensor("sensor_1", 1607527996000L, 50));
    waterSensors.add(new WaterSensor("sensor_2", 1607527993000L, 10));
    waterSensors.add(new WaterSensor("sensor_2", 1607527995000L, 30));

    DataStreamSource<WaterSensor> stream = env.fromCollection(waterSensors);

    SingleOutputStreamOperator<WaterSensor> result = stream
            .keyBy(WaterSensor::getId)
            .sum("vc");

/*
往redis里面写字符串,string   命令提示符用set
假设写的key是id,value是整个json格式的字符串
key         value
sensor_1    json格式字符串
 */
		// new一个单机版的配置
    FlinkJedisPoolConfig config = new FlinkJedisPoolConfig.Builder()
            .setHost("hadoop101")
            .setPort(6379)
            .setMaxTotal(100)  //最大连接数量
            .setMaxIdle(10)  // 连接池里面的最大空闲
            .setMinIdle(2)   // 连接池里面的最小空闲
            .setTimeout(10*1000)  // 超时时间
            .build();
    // 写出到redis中
    result.addSink(new RedisSink<>(config, new RedisMapper<WaterSensor>() {
        // 返回命令描述符:往不同的数据结构写数据用的方法不一样
        @Override
        public RedisCommandDescription getCommandDescription() {
            // 写入到字符串,用set
            return new RedisCommandDescription(RedisCommand.SET);
        }

        @Override
        public String getKeyFromData(WaterSensor waterSensor) {
            return waterSensor.getId();
        }

        @Override
        public String getValueFromData(WaterSensor waterSensor) {
            return JSON.toJSONString(waterSensor);
        }
    }));

    try {
        env.execute();
    } catch (Exception e) {
        e.printStackTrace();
    }
}

运行结果:
在这里插入图片描述

4. Redis_Sink_list

addSink(new RedisSink<>(config, new RedisMapper< WaterSensor>() {}

写到 list 结构里面

public static void main(String[] args) {
    Configuration conf = new Configuration();
    conf.setInteger("rest.port",1000);
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);
    env.setParallelism(1);

    ArrayList<WaterSensor> waterSensors = new ArrayList<>();
    waterSensors.add(new WaterSensor("sensor_1", 1607527992000L, 20));
    waterSensors.add(new WaterSensor("sensor_1", 1607527994000L, 50));
    waterSensors.add(new WaterSensor("sensor_1", 1607527996000L, 50));
    waterSensors.add(new WaterSensor("sensor_2", 1607527993000L, 10));
    waterSensors.add(new WaterSensor("sensor_2", 1607527995000L, 30));

    DataStreamSource<WaterSensor> stream = env.fromCollection(waterSensors);

    SingleOutputStreamOperator<WaterSensor> result = stream
            .keyBy(WaterSensor::getId)
            .sum("vc");
            
    // key是id,value是处理后的json格式字符串
    FlinkJedisPoolConfig config = new FlinkJedisPoolConfig.Builder()
            .setHost("hadoop101")
            .setPort(6379)
            .setMaxTotal(100)  //最大连接数量
            .setMaxIdle(10)  // 连接池里面的最大空闲
            .setMinIdle(2)   // 连接池里面的最小空闲
            .setTimeout(10*1000)  // 超时时间
            .build();
    result.addSink(new RedisSink<>(config, new RedisMapper<WaterSensor>() {
        @Override
        public RedisCommandDescription getCommandDescription() {
            // 写入list
            return new RedisCommandDescription(RedisCommand.RPUSH);
        }

        @Override
        public String getKeyFromData(WaterSensor waterSensor) {
            return waterSensor.getId();
        }

        @Override
        public String getValueFromData(WaterSensor waterSensor) {
            return JSON.toJSONString(waterSensor);
        }
    }));

    try {
        env.execute();
    } catch (Exception e) {
        e.printStackTrace();
    }
}

运行结果:
在这里插入图片描述

5. Redis_Sink_set

addSink(new RedisSink<>(config, new RedisMapper< WaterSensor>() {}

写到 set 结构里面

public static void main(String[] args) {
    Configuration conf = new Configuration();
    conf.setInteger("rest.port",1000);
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);
    env.setParallelism(1);

    ArrayList<WaterSensor> waterSensors = new ArrayList<>();
    waterSensors.add(new WaterSensor("sensor_1", 1607527992000L, 20));
    waterSensors.add(new WaterSensor("sensor_1", 1607527994000L, 50));
    waterSensors.add(new WaterSensor("sensor_1", 1607527996000L, 50));
    waterSensors.add(new WaterSensor("sensor_2", 1607527993000L, 10));
    waterSensors.add(new WaterSensor("sensor_2", 1607527995000L, 30));

    DataStreamSource<WaterSensor> stream = env.fromCollection(waterSensors);

    SingleOutputStreamOperator<WaterSensor> result = stream
            .keyBy(WaterSensor::getId)
            .sum("vc");

    FlinkJedisPoolConfig config = new FlinkJedisPoolConfig.Builder()
            .setHost("hadoop101")
            .setPort(6379)
            .setMaxTotal(100)
            .setMaxIdle(10)
            .setMinIdle(2)
            .setTimeout(10*1000)
            .build();
    result.addSink(new RedisSink<>(config, new RedisMapper<WaterSensor>() {
        @Override
        public RedisCommandDescription getCommandDescription() {
            // 数据写入set集合
            return new RedisCommandDescription(RedisCommand.SADD);
        }

        @Override
        public String getKeyFromData(WaterSensor waterSensor) {
            return waterSensor.getId();
        }

        @Override
        public String getValueFromData(WaterSensor waterSensor) {
            return JSON.toJSONString(waterSensor);
        }
    }));

    try {
        env.execute();
    } catch (Exception e) {
        e.printStackTrace();
    }
}

运行结果:
在这里插入图片描述

6. Redis_Sink_hash

addSink(new RedisSink<>(config, new RedisMapper< WaterSensor>() {}

写到 hash结构里面

public static void main(String[] args) {
    Configuration conf = new Configuration();
    conf.setInteger("rest.port",1000);
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);
    env.setParallelism(1);

    ArrayList<WaterSensor> waterSensors = new ArrayList<>();
    waterSensors.add(new WaterSensor("sensor_1", 1607527992000L, 20));
    waterSensors.add(new WaterSensor("sensor_1", 1607527994000L, 50));
    waterSensors.add(new WaterSensor("sensor_1", 1607527996000L, 50));
    waterSensors.add(new WaterSensor("sensor_2", 1607527993000L, 10));
    waterSensors.add(new WaterSensor("sensor_2", 1607527995000L, 30));

    DataStreamSource<WaterSensor> stream = env.fromCollection(waterSensors);

    SingleOutputStreamOperator<WaterSensor> result = stream
            .keyBy(WaterSensor::getId)
            .sum("vc");

    FlinkJedisPoolConfig config = new FlinkJedisPoolConfig.Builder()
            .setHost("hadoop101")
            .setPort(6379)
            .setMaxTotal(100)
            .setMaxIdle(10)
            .setMinIdle(2)
            .setTimeout(10*1000)
            .build();
    result.addSink(new RedisSink<>(config, new RedisMapper<WaterSensor>() {
        @Override
        public RedisCommandDescription getCommandDescription() {
            // 数据写入hash
            return new RedisCommandDescription(RedisCommand.HSET,"a");
        }

        @Override
        public String getKeyFromData(WaterSensor waterSensor) {
            return waterSensor.getId();
        }

        @Override
        public String getValueFromData(WaterSensor waterSensor) {
            return JSON.toJSONString(waterSensor);
        }
    }));

    try {
        env.execute();
    } catch (Exception e) {
        e.printStackTrace();
    }
}

运行结果:
在这里插入图片描述

7. 有界流数据写入到ES中

new ElasticsearchSink.Builder()

public static void main(String[] args) {
    Configuration conf = new Configuration();
    conf.setInteger("rest.port",1000);
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);
    env.setParallelism(1);

    ArrayList<WaterSensor> waterSensors = new ArrayList<>();
    waterSensors.add(new WaterSensor("sensor_1", 1607527992000L, 20));
    waterSensors.add(new WaterSensor("sensor_1", 1607527994000L, 50));
    waterSensors.add(new WaterSensor("sensor_1", 1607527996000L, 50));
    waterSensors.add(new WaterSensor("sensor_2", 1607527993000L, 10));
    waterSensors.add(new WaterSensor("sensor_2", 1607527995000L, 30));

    DataStreamSource<WaterSensor> stream = env.fromCollection(waterSensors);

    SingleOutputStreamOperator<WaterSensor> result = stream
            .keyBy(WaterSensor::getId)
            .sum("vc");

    List<HttpHost> hosts = Arrays.asList(
            new HttpHost("hadoop101", 9200),
            new HttpHost("hadoop102", 9200),
            new HttpHost("hadoop103", 9200)
    );

    ElasticsearchSink.Builder<WaterSensor> builder = new ElasticsearchSink.Builder<WaterSensor>(
            hosts,
            new ElasticsearchSinkFunction<WaterSensor>() {
                @Override
                public void process(WaterSensor element,  // 需要写出的元素
                                    RuntimeContext runtimeContext, // 运行时上下文   不是context上下文对象
                                    RequestIndexer requestIndexer) {  // 把要写出的数据,封装到RequestIndexer里面
                    String msg = JSON.toJSONString(element);

                    IndexRequest ir = Requests
                            .indexRequest("sensor")
                            .type("_doc")  // 定义type的时候, 不能下划线开头. _doc是唯一的特殊情况
                            .id(element.getId())  // 定义每条数据的id. 如果不指定id, 会随机分配一个id. id重复的时候会更新数据
                            .source(msg, XContentType.JSON);

                    requestIndexer.add(ir);  // 把ir存入到indexer, 就会自动的写入到es中
                }
            }
    );

    result.addSink(builder.build());

    try {
        env.execute();
    } catch (Exception e) {
        e.printStackTrace();
    }
}

8. 无界流数据写入到ES

  和有界差不多 ,只不过把数据源换成socket,然后因为无界流,它高效不是你来一条就刷出去,所以设置刷新时间、大小、条数,才能看到结果。
public static void main(String[] args) {
    Configuration conf = new Configuration();
    conf.setInteger("rest.port",1000);
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);
    env.setParallelism(1);
    
    SingleOutputStreamOperator<WaterSensor> result = env.socketTextStream("hadoop101",9999)
            .map(line->{
                String[] data = line.split(",");
                return new WaterSensor(data[0],Long.valueOf(data[1]),Integer.valueOf(data[2]));
            })
            .keyBy(WaterSensor::getId)
            .sum("vc");

    List<HttpHost> hosts = Arrays.asList(
            new HttpHost("hadoop101", 9200),
            new HttpHost("hadoop102", 9200),
            new HttpHost("hadoop103", 9200)
    );

    ElasticsearchSink.Builder<WaterSensor> builder = new ElasticsearchSink.Builder<WaterSensor>(
            hosts,
            new ElasticsearchSinkFunction<WaterSensor>() {
                @Override
                public void process(WaterSensor element,  // 需要写出的元素
                                    RuntimeContext runtimeContext, // 运行时上下文   不是context上下文对象
                                    RequestIndexer requestIndexer) {  // 把要写出的数据,封装到RequestIndexer里面
                    String msg = JSON.toJSONString(element);

                    IndexRequest ir = Requests
                            .indexRequest("sensor")
                            .type("_doc")  // 定义type的时候, 不能下划线开头. _doc是唯一的特殊情况
                            .id(element.getId())  // 定义每条数据的id. 如果不指定id, 会随机分配一个id. id重复的时候会更新数据
                            .source(msg, XContentType.JSON);

                    requestIndexer.add(ir);  // 把ir存入到indexer, 就会自动的写入到es中
                }
            }
    );

    // 自动刷新时间
    builder.setBulkFlushInterval(2000);  // 默认不会根据时间自动刷新
    builder.setBulkFlushMaxSizeMb(1024);  // 当批次中的数据大于等于这个值刷新
    builder.setBulkFlushMaxActions(2);   // 每来多少条数据刷新一次
    // 这三个是或的关系,只要有一个满足就会刷新

    result.addSink(builder.build());

    try {
        env.execute();
    } catch (Exception e) {
        e.printStackTrace();
    }
}

9. 自定义sink - mysql_Sink

  需要写一个类,实现RichSinkFunction,然后实现invoke方法。这里因为是写MySQL所以需要建立连接,那就用Rich版本。

  记得导入MySQL依赖

public static void main(String[] args) {
    Configuration conf = new Configuration();
    conf.setInteger("rest.port", 1000);
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);
    env.setParallelism(1);

    ArrayList<WaterSensor> waterSensors = new ArrayList<>();
    waterSensors.add(new WaterSensor("sensor_1", 1607527992000L, 20));
    waterSensors.add(new WaterSensor("sensor_1", 1607527994000L, 50));
    waterSensors.add(new WaterSensor("sensor_1", 1607527996000L, 50));
    waterSensors.add(new WaterSensor("sensor_2", 1607527993000L, 10));
    waterSensors.add(new WaterSensor("sensor_2", 1607527995000L, 30));


    DataStreamSource<WaterSensor> stream = env.fromCollection(waterSensors);

    SingleOutputStreamOperator<WaterSensor> result = stream
            .keyBy(WaterSensor::getId)
            .sum("vc");

    result.addSink(new MySqlSink());


    try {
        env.execute();
    } catch (Exception e) {
        e.printStackTrace();
    }

}

public static class MySqlSink extends RichSinkFunction<WaterSensor> {

    private Connection connection;

    @Override
    public void open(Configuration parameters) throws Exception {
        Class.forName("com.mysql.cj.jdbc.Driver");
        connection = DriverManager.getConnection("jdbc:mysql://hadoop101:3306/test?useSSL=false", "root", "123456");
    }

    @Override
    public void close() throws Exception {
        if (connection!=null){
            connection.close();
        }
    }

    // 调用:每来一条元素,这个方法执行一次
    @Override
    public void invoke(WaterSensor value, Context context) throws Exception {
        // jdbc的方式想MySQL写数据
//            String sql = "insert into sensor(id,ts,vc)values(?,?,?)";
        //如果主键不重复就新增,主键重复就更新
//            String sql = "insert into sensor(id,ts,vc)values(?,?,?) duplicate key update vc=?";
        String sql = "replace into sensor(id,ts,vc)values(?,?,?)";
        // 1. 得到预处理语句
        PreparedStatement ps = connection.prepareStatement(sql);
        // 2. 给sql中的占位符进行赋值
        ps.setString(1,value.getId());
        ps.setLong(2,value.getTs());
        ps.setInt(3,value.getVc());
//            ps.setInt(4,value.getVc());
        // 3. 执行
        ps.execute();
        // 4. 提交
//            connection.commit();  MySQL默认自动提交,所以这个地方不用调用
        // 5. 关闭预处理
        ps.close();
    }
}

运行结果:
在这里插入图片描述

10. Jdbc_Sink

addSink(JdbcSink.sink(sql,JdbcStatementBuilder,执行参数,连接参数)

  对于jdbc数据库,我们其实没必要自定义,因为官方给我们了一个JDBC Sink -> 官方JDBC Sink 传送门

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-jdbc_2.11</artifactId>
    <version>1.13.6</version>
</dependency>
public static void main(String[] args) {
    Configuration conf = new Configuration();
    conf.setInteger("rest.port",1000);
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);
    env.setParallelism(1);

    ArrayList<WaterSensor> waterSensors = new ArrayList<>();
    waterSensors.add(new WaterSensor("sensor_1", 1607527992000L, 20));
    waterSensors.add(new WaterSensor("sensor_1", 1607527994000L, 50));
    waterSensors.add(new WaterSensor("sensor_1", 1607527996000L, 50));
    waterSensors.add(new WaterSensor("sensor_2", 1607527993000L, 10));
    waterSensors.add(new WaterSensor("sensor_2", 1607527995000L, 30));


    DataStreamSource<WaterSensor> stream = env.fromCollection(waterSensors);

    SingleOutputStreamOperator<WaterSensor> result = stream
            .keyBy(WaterSensor::getId)
            .sum("vc");

    result.addSink(JdbcSink.sink(
            "replace into sensor(id,ts,vc)values(?,?,?)",
            new JdbcStatementBuilder<WaterSensor>() {
                @Override
                public void accept(
                        PreparedStatement ps,
                        WaterSensor waterSensor) throws SQLException {
                    // 只做一件事:给占位符赋值
                    ps.setString(1,waterSensor.getId());
                    ps.setLong(2,waterSensor.getTs());
                    ps.setInt(3,waterSensor.getVc());
                }
            },
            new JdbcExecutionOptions.Builder()  //设置执行参数
                .withBatchSize(1024)   // 刷新大小上限
                .withBatchIntervalMs(2000) //刷新间隔
                .withMaxRetries(3)  // 重试次数
                .build(),
            new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
                .withDriverName("com.mysql.cj.jdbc.Driver")
                .withUrl("jdbc:mysql://hadoop101:3306/test?useSSL=false")
                .withUsername("root")
                .withPassword("123456")
                .build()
    ));

    try {
        env.execute();
    } catch (Exception e) {
        e.printStackTrace();
    }
}

运行结果:
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/825175.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

nodejs环境部署

1.前言 前端的打包必须要使用nodejs环境 2.部署 下载安装包&#xff0c;在下面的路径中找到适合自己项目合适的版本在下载&#xff0c;可以问一下前端开发人员 路径&#xff1a;Index of /dist/ 我这里是下载了14.21.3版本的nodejs环境,上传到需要部署的主机中 mkdir /opt…

制砖机系统比例控制阀放大器

制砖机系统是一种生产砖块的机器设备系统。该系统由多个部分组成&#xff0c;包括压力系统、模具和振动系统、烘干和烧制系统等。压力系统是制砖机的主要组成部分之一&#xff0c;它通过压力将原料压缩成一定形状和尺寸的块状&#xff0c;然后经过烘干和烧制等步骤&#xff0c;…

【嵌入式系统开发实训】学生实验报告

一、实验内容 1、过程考核60分&#xff1a; &#xff08;1&#xff09;顺序点亮3个LED灯&#xff08;分数&#xff1a;10分&#xff09;&#xff1b; &#xff08;2&#xff09;按键顺序点亮3个LED灯&#xff08;分数&#xff1a;10分&#xff09;&#xff1b; &#xff08;3&a…

恒运资本:炒股知识有用吗?

炒股是指通过购买和出售股票来赚取差价的一种出资行为。在现代社会&#xff0c;炒股已经成为许多人重视的话题。然而&#xff0c;有些人以为炒股常识是非常有用的&#xff0c;而另一些人则以为炒股常识并不有用。那么&#xff0c;炒股常识终究有多大的用途呢&#xff1f;本文将…

【NLP概念源和流】 06-编码器-解码器模型(6/20 部分)

一、说明 在机器翻译等任务中,我们必须从一系列输入词映射到一系列输出词。读者必须注意,这与“序列标记”不同,在“序列标记”中,该任务是将序列中的每个单词映射到预定义的类,如词性或命名实体任务。 作者生成 在上面的

实现本地上传、FTP上传、阿里云OSS上传三者合一处理

1、选项模式【Options】的处理 文件上传处理应该由程序进行配置&#xff0c;决定使用那种方式&#xff0c;那么这里面我们为了弹性化处理&#xff0c; 在文件上传模块中采用选项模式【Options】处理常规上传和FTP文件上传的配置参数信息。 微软引入选项模式,它是用于配置框架…

宇凡微2.4g遥控船开发方案,采用合封芯片

2.4GHz遥控船的开发方案是一个有趣且具有挑战性的项目。这样的遥控船可以通过无线2.4GHz频率进行远程控制&#xff0c;让用户在池塘或湖泊上畅游。以下是一个简要的2.4GHz遥控船开发方案&#xff1a; 基本构想如下 mcu驱动两个小电机&#xff0c;小电机上安装两个螺旋桨&#…

三、JVM-如何判断对象已死问题

内存模型以及如何判定对象已死问题 体验与验证 2.4.5.1 使用visualvm visualgc插件下载链接 &#xff1a;https://visualvm.github.io/pluginscenters.html 选择对应JDK版本链接—>Tools—>Visual GC 若上述链接找不到合适的&#xff0c;大家也可以自己在网上下载对应…

聊聊虚拟定位工具新宠儿:AnyGo的原理与识别

市面上已经有很多基于位置服务的应用场景&#xff0c;如运动品类应用基于地理位置生成运动轨迹&#xff0c;企业办公应用基于定位信息进行打卡&#xff0c;游戏品类应用基于位置信息开发区域排名&#xff0c;电商品类应用基于位置发放区域性优惠券等等。在黑灰产嗅探到背后的商…

自然语言处理学习笔记(二)————语料库与开源工具

目录 1.语料库 2.语料库建设 &#xff08;1&#xff09;规范制定 &#xff08;2&#xff09;人员培训 &#xff08;3&#xff09;人工标注 3.中文处理中的常见语料库 &#xff08;1&#xff09;中文分词语料库 &#xff08;2&#xff09;词性标注语料库 &#xff08;3…

Elasticsearch:语义搜索 - Semantic Search in python

当 OpenAI 于 2022 年 11 月发布 ChatGPT 时&#xff0c;引发了人们对人工智能和机器学习的新一波兴趣。 尽管必要的技术创新已经出现了近十年&#xff0c;而且基本原理的历史甚至更早&#xff0c;但这种巨大的转变引发了各种发展的“寒武纪大爆炸”&#xff0c;特别是在大型语…

电容笔和触控笔的区别是什么?好用的苹果平替电容笔

如今&#xff0c;随着无纸化学习以及办公的发展&#xff0c;电容笔更是倍受关注。但是&#xff0c;很多人都对于电容笔和触控笔之间的区别都存在着疑惑。其实&#xff0c;这两种产品很好区分&#xff0c;第一种是电容笔&#xff0c;它是适用于我们最常用的电容屏&#xff0c;例…

视频两侧有黑边怎么处理?教你裁切视频黑边方法

现在的大多数电视是16:9的宽屏&#xff0c;而大多数视频都是4:3的标清或是16:9的高清。当你看一个标清或高清视频时&#xff0c;如果它的比例与你的电视屏幕比例不同&#xff0c;视频两侧就会出现黑边。这些黑边会对视频的质量或观看体验产生影响&#xff0c;那么怎么处理呢&am…

双系统安装后开机时没有GRUB界面,直接进入windows系统

电脑配置&#xff1a;512固态1T机械&#xff0c;安装了win10Ubuntu22.04双系统&#xff0c;ubuntu安装在机械硬盘上。安装完成后没有出现GRUB启动管理供选择进入哪一个系统&#xff0c;直接进入windows系统。 解决办法&#xff1a; 进入Bios&#xff08;惠普电脑的快捷键是F1…

Python入门指南:从零开始学习Python编程

文章目录 前言安装Python变量以及数据类型总结 前言 Python是一种简单而又强大的编程语言&#xff0c;它在全球范围内广受欢迎&#xff0c;适用于各种应用场景&#xff0c;包括Web开发、数据分析、人工智能和科学计算等。本篇博客将为初学者提供一份Python入门指南&#xff0c;…

甄知科技新一代AIGC产品发布

文/玉娇龙 本文约3200字&#xff0c;阅读全文需要大约3分钟左右 演讲嘉宾&#xff1a;张礼军 甄知科技联合创始人&#xff0c;CTO 首席产品官 一、甄知科技简介 2023年7月29日&#xff0c;甄知科技联合创始人兼 CTO 张礼军作为演讲嘉宾&#xff0c;在2023汉得新一代产品体系发布…

webgpu之旅05

看一下粒子 这次看官方的demo吧分析 这次看官方的demo吧 演示了一个粒子如何用cs动起来 分析 这里是着色部分&#xff0c;看起来没什么特别的&#xff0c;接下来看cs部分 binding(0) group(0) var<uniform> sim_params : SimulationParams; binding(1) group(0) var&l…

《向量数据库指南》——当前向量数据库的赛道有哪些?

当前&#xff0c;向量数据库赛道主要分为四个类别&#xff1a; 基于PG、Clickhouse 等进行魔改或者插件化实现的向量数据库&#xff1a;这类解决方案以现有的关系数据库或列存数据库作为基础&#xff0c;通过修改或插件扩展的方式添加向量搜索功能。PG Vector 是这类解决方案的…

领航ADR新赛道 边界无限靖云甲究竟新在哪儿?

随着5G、物联网、大数据、人工智能和云计算等新技术的加速落地应用&#xff0c;全球已经迈入数字化时代&#xff0c;传统的网络边界持续瓦解&#xff0c;物联网安全、云安全、移动安全、数据安全、安全智能运维等全新的挑战越发受到业界关注&#xff0c;产业技术的发展尤其是云…

Django框架之路由用法

简介 路由简单的来说就是根据用户请求的 URL 链接来判断对应的处理程序&#xff0c;并返回处理结果&#xff0c;也就是 URL 与 Django 的视图建立映射关系。 Django 路由在 urls.py 配置&#xff0c;urls.py 中的每一条配置对应相应的处理方法。 Django 不同版本 urls.py 配…