轻松通关Flink第24讲:Flink 消费 Kafka 数据业务开发

news2025/1/23 10:36:33

在上一课时中我们提过在实时计算的场景下,绝大多数的数据源都是消息系统,而 Kafka 从众多的消息中间件中脱颖而出,主要是因为高吞吐低延迟的特点;同时也讲了 Flink 作为生产者像 Kafka 写入数据的方式和代码实现。这一课时我们将从以下几个方面介绍 Flink 消费 Kafka 中的数据方式和源码实现。

Flink 如何消费 Kafka

Flink 在和 Kafka 对接的过程中,跟 Kafka 的版本是强相关的。上一课时也提到了,我们在使用 Kafka 连接器时需要引用相对应的 Jar 包依赖,对于某些连接器比如 Kafka 是有版本要求的,一定要去官方网站找到对应的依赖版本。

我们本地的 Kafka 版本是 2.1.0,所以需要对应的类是 FlinkKafkaConsumer。首先需要在 pom.xml 中引入 jar 包依赖:

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-connector-kafka_2.11</artifactId>
  <version>1.10.0</version>
</dependency>

下面将对 Flink 消费 Kafka 数据的方式进行分类讲解。

消费单个 Topic

上一课时我们在本地搭建了 Kafka 环境,并且手动创建了名为 test 的 Topic,然后向名为 test 的 Topic 中写入了数据。

那么现在我们要消费这个 Topic 中的数据,该怎么做呢?

public static void main(String[] args) throws Exception {
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
    env.enableCheckpointing(5000);
    Properties properties = new Properties();
    properties.setProperty("bootstrap.servers", "127.0.0.1:9092");
    // 如果你是0.8版本的Kafka,需要配置
    //properties.setProperty("zookeeper.connect", "localhost:2181");
    //设置消费组
    properties.setProperty("group.id", "group_test");
    FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>("test", new SimpleStringSchema(), properties);
    //设置从最早的ffset消费
    consumer.setStartFromEarliest();
    //还可以手动指定相应的 topic, partition,offset,然后从指定好的位置开始消费
    //HashMap<KafkaTopicPartition, Long> map = new HashMap<>();
    //map.put(new KafkaTopicPartition("test", 1), 10240L);
    //假如partition有多个,可以指定每个partition的消费位置
    //map.put(new KafkaTopicPartition("test", 2), 10560L);
    //然后各个partition从指定位置消费
    //consumer.setStartFromSpecificOffsets(map);
    env.addSource(consumer).flatMap(new FlatMapFunction<String, String>() {
        @Override
        public void flatMap(String value, Collector<String> out) throws Exception {
            System.out.println(value);
        }
    });
    env.execute("start consumer...");
}

在设置消费 Kafka 中的数据时,可以显示地指定从某个 Topic 的每一个 Partition 中进行消费。

消费多个 Topic

我们的业务中会有这样的情况,同样的数据根据类型不同发送到了不同的 Topic 中,比如线上的订单数据根据来源不同分别发往移动端和 PC 端两个 Topic 中。但是我们不想把同样的代码复制一份,需重新指定一个 Topic 进行消费,这时候应该怎么办呢?

Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "127.0.0.1:9092");
// 如果你是0.8版本的Kafka,需要配置
//properties.setProperty("zookeeper.connect", "localhost:2181");
//设置消费组
properties.setProperty("group.id", "group_test");
FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>("test", new SimpleStringSchema(), properties);
ArrayList<String> topics = new ArrayList<>();
        topics.add("test_A");
        topics.add("test_B");
       // 传入一个 list,完美解决了这个问题
        FlinkKafkaConsumer<Tuple2<String, String>> consumer = new FlinkKafkaConsumer<>(topics, new SimpleStringSchema(), properties);
...

我们可以传入一个 list 来解决消费多个 Topic 的问题,如果用户需要区分两个 Topic 中的数据,那么需要在发往 Kafka 中数据新增一个字段,用来区分来源。

消息序列化

我们在上述消费 Kafka 消息时,都默认指定了消息的序列化方式,即 SimpleStringSchema。这里需要注意的是,在我们使用 SimpleStringSchema 的时候,返回的结果中只有原数据,没有 topic、parition 等信息,这时候可以自定义序列化的方式来实现自定义返回数据的结构。

public class CustomDeSerializationSchema implements KafkaDeserializationSchema<ConsumerRecord<String, String>> {
    //是否表示流的最后一条元素,设置为false,表示数据会源源不断地到来
    @Override
    public boolean isEndOfStream(ConsumerRecord<String, String> nextElement) {
        return false;
    }
    //这里返回一个ConsumerRecord<String,String>类型的数据,除了原数据还包括topic,offset,partition等信息
    @Override
    public ConsumerRecord<String, String> deserialize(ConsumerRecord<byte[], byte[]> record) throws Exception {
        return new ConsumerRecord<String, String>(
                record.topic(),
                record.partition(),
                record.offset(),
                new String(record.key()),
                new String(record.value())
        );
    }
    //指定数据的输入类型
    @Override
    public TypeInformation<ConsumerRecord<String, String>> getProducedType() {
        return TypeInformation.of(new TypeHint<ConsumerRecord<String, String>>(){});
    }
}

这里自定义了 CustomDeSerializationSchema 信息,就可以直接使用了。

Parition 和 Topic 动态发现

在很多场景下,随着业务的扩展,我们需要对 Kafka 的分区进行扩展,为了防止新增的分区没有被及时发现导致数据丢失,消费者必须要感知 Partition 的动态变化,可以使用 FlinkKafkaConsumer 的动态分区发现实现。

我们只需要指定下面的配置,即可打开动态分区发现功能:每隔 10ms 会动态获取 Topic 的元数据,对于新增的 Partition 会自动从最早的位点开始消费数据。

properties.setProperty(FlinkKafkaConsumerBase.KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS, "10");

如果业务场景需要我们动态地发现 Topic,可以指定 Topic 的正则表达式:

FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>(Pattern.compile("^test_([A-Za-z0-9]*)$"), new SimpleStringSchema(), properties);
Flink 消费 Kafka 设置 offset 的方法

Flink 消费 Kafka 需要指定消费的 offset,也就是偏移量。Flink 读取 Kafka 的消息有五种消费方式:

  • 指定 Topic 和 Partition

  • 从最早位点开始消费

  • 从指定时间点开始消费

  • 从最新的数据开始消费

  • 从上次消费位点开始消费

/**
* Flink从指定的topic和parition中指定的offset开始
*/
Map<KafkaTopicPartition, Long> offsets = new HashedMap();
offsets.put(new KafkaTopicPartition("test", 0), 10000L);
offsets.put(new KafkaTopicPartition("test", 1), 20000L);
offsets.put(new KafkaTopicPartition("test", 2), 30000L);
consumer.setStartFromSpecificOffsets(offsets);
/**
* Flink从topic中最早的offset消费
*/
consumer.setStartFromEarliest();
/**
* Flink从topic中指定的时间点开始消费
*/
consumer.setStartFromTimestamp(1559801580000l);
/**
* Flink从topic中最新的数据开始消费
*/
consumer.setStartFromLatest();
/**
* Flink从topic中指定的group上次消费的位置开始消费,所以必须配置group.id参数
*/
consumer.setStartFromGroupOffsets();

源码解析

Drawing 0.png

从上面的类图可以看出,FlinkKafkaConsumer 继承了 FlinkKafkaConsumerBase,而 FlinkKafkaConsumerBase 最终是对 SourceFunction 进行了实现。

整体的流程:FlinkKafkaConsumer 首先创建了 KafkaFetcher 对象,然后 KafkaFetcher 创建了 KafkaConsumerThread 和 Handover,KafkaConsumerThread 负责直接从 Kafka 中读取 msg,并交给 Handover,然后 Handover 将 msg 传递给 KafkaFetcher.emitRecord 将消息发出。

因为 FlinkKafkaConsumerBase 实现了 RichFunction 接口,所以当程序启动的时候,会首先调用 FlinkKafkaConsumerBase.open 方法:

public void open(Configuration configuration) throws Exception {
   // 指定offset的提交方式
   this.offsetCommitMode = OffsetCommitModes.fromConfiguration(
         getIsAutoCommitEnabled(),
         enableCommitOnCheckpoints,
         ((StreamingRuntimeContext) getRuntimeContext()).isCheckpointingEnabled());
   // 创建分区发现器
   this.partitionDiscoverer = createPartitionDiscoverer(
         topicsDescriptor,
         getRuntimeContext().getIndexOfThisSubtask(),
         getRuntimeContext().getNumberOfParallelSubtasks());
   this.partitionDiscoverer.open();
   subscribedPartitionsToStartOffsets = new HashMap<>();
   final List<KafkaTopicPartition> allPartitions = partitionDiscoverer.discoverPartitions();
   if (restoredState != null) {
      for (KafkaTopicPartition partition : allPartitions) {
         if (!restoredState.containsKey(partition)) {
            restoredState.put(partition, KafkaTopicPartitionStateSentinel.EARLIEST_OFFSET);
         }
      }
      for (Map.Entry<KafkaTopicPartition, Long> restoredStateEntry : restoredState.entrySet()) {
         if (!restoredFromOldState) {
        <span class="hljs-keyword">if</span> (KafkaTopicPartitionAssigner.assign(
           restoredStateEntry.getKey(), getRuntimeContext().getNumberOfParallelSubtasks())
              == getRuntimeContext().getIndexOfThisSubtask()){
           subscribedPartitionsToStartOffsets.put(restoredStateEntry.getKey(), restoredStateEntry.getValue());
        }
     } <span class="hljs-keyword">else</span> {
       subscribedPartitionsToStartOffsets.put(restoredStateEntry.getKey(), restoredStateEntry.getValue());
     }
  }
  <span class="hljs-keyword">if</span> (filterRestoredPartitionsWithCurrentTopicsDescriptor) {
     subscribedPartitionsToStartOffsets.entrySet().removeIf(entry -&gt; {
        <span class="hljs-keyword">if</span> (!topicsDescriptor.isMatchingTopic(entry.getKey().getTopic())) {
           LOG.warn(
              <span class="hljs-string">"{} is removed from subscribed partitions since it is no longer associated with topics descriptor of current execution."</span>,
              entry.getKey());
           <span class="hljs-keyword">return</span> <span class="hljs-keyword">true</span>;
        }
        <span class="hljs-keyword">return</span> <span class="hljs-keyword">false</span>;
     });
  }
  LOG.info(<span class="hljs-string">"Consumer subtask {} will start reading {} partitions with offsets in restored state: {}"</span>,
     getRuntimeContext().getIndexOfThisSubtask(), subscribedPartitionsToStartOffsets.size(), subscribedPartitionsToStartOffsets);

} else {

  <span class="hljs-keyword">switch</span> (startupMode) {
     <span class="hljs-keyword">case</span> SPECIFIC_OFFSETS:
        <span class="hljs-keyword">if</span> (specificStartupOffsets == <span class="hljs-keyword">null</span>) {
           <span class="hljs-keyword">throw</span> <span class="hljs-keyword">new</span> IllegalStateException(
              <span class="hljs-string">"Startup mode for the consumer set to "</span> + StartupMode.SPECIFIC_OFFSETS +
                 <span class="hljs-string">", but no specific offsets were specified."</span>);
        }
        <span class="hljs-keyword">for</span> (KafkaTopicPartition seedPartition : allPartitions) {
           Long specificOffset = specificStartupOffsets.get(seedPartition);
           <span class="hljs-keyword">if</span> (specificOffset != <span class="hljs-keyword">null</span>) {
                             subscribedPartitionsToStartOffsets.put(seedPartition, specificOffset - <span class="hljs-number">1</span>);
           } <span class="hljs-keyword">else</span> {
           subscribedPartitionsToStartOffsets.put(seedPartition, KafkaTopicPartitionStateSentinel.GROUP_OFFSET);
           }
        }
        <span class="hljs-keyword">break</span>;
     <span class="hljs-keyword">case</span> TIMESTAMP:
        <span class="hljs-keyword">if</span> (startupOffsetsTimestamp == <span class="hljs-keyword">null</span>) {
           <span class="hljs-keyword">throw</span> <span class="hljs-keyword">new</span> IllegalStateException(
              <span class="hljs-string">"Startup mode for the consumer set to "</span> + StartupMode.TIMESTAMP +
                 <span class="hljs-string">", but no startup timestamp was specified."</span>);
        }
        <span class="hljs-keyword">for</span> (Map.Entry&lt;KafkaTopicPartition, Long&gt; partitionToOffset
              : fetchOffsetsWithTimestamp(allPartitions, startupOffsetsTimestamp).entrySet()) {
           subscribedPartitionsToStartOffsets.put(
              partitionToOffset.getKey(),
              (partitionToOffset.getValue() == <span class="hljs-keyword">null</span>)
                  KafkaTopicPartitionStateSentinel.LATEST_OFFSET
                    : partitionToOffset.getValue() - <span class="hljs-number">1</span>);
        }
        <span class="hljs-keyword">break</span>;
     <span class="hljs-keyword">default</span>:
        <span class="hljs-keyword">for</span> (KafkaTopicPartition seedPartition : allPartitions) {
           subscribedPartitionsToStartOffsets.put(seedPartition, startupMode.getStateSentinel());
        }
  }
  <span class="hljs-keyword">if</span> (!subscribedPartitionsToStartOffsets.isEmpty()) {
     <span class="hljs-keyword">switch</span> (startupMode) {
        <span class="hljs-keyword">case</span> EARLIEST:
           LOG.info(<span class="hljs-string">"Consumer subtask {} will start reading the following {} partitions from the earliest offsets: {}"</span>,
              getRuntimeContext().getIndexOfThisSubtask(),
              subscribedPartitionsToStartOffsets.size(),
              subscribedPartitionsToStartOffsets.keySet());
           <span class="hljs-keyword">break</span>;
        <span class="hljs-keyword">case</span> LATEST:
           LOG.info(<span class="hljs-string">"Consumer subtask {} will start reading the following {} partitions from the latest offsets: {}"</span>,
              getRuntimeContext().getIndexOfThisSubtask(),
              subscribedPartitionsToStartOffsets.size(),
              subscribedPartitionsToStartOffsets.keySet());
           <span class="hljs-keyword">break</span>;
        <span class="hljs-keyword">case</span> TIMESTAMP:
           LOG.info(<span class="hljs-string">"Consumer subtask {} will start reading the following {} partitions from timestamp {}: {}"</span>,
              getRuntimeContext().getIndexOfThisSubtask(),
              subscribedPartitionsToStartOffsets.size(),
              startupOffsetsTimestamp,
              subscribedPartitionsToStartOffsets.keySet());
           <span class="hljs-keyword">break</span>;
        <span class="hljs-keyword">case</span> SPECIFIC_OFFSETS:
           LOG.info(<span class="hljs-string">"Consumer subtask {} will start reading the following {} partitions from the specified startup offsets {}: {}"</span>,
              getRuntimeContext().getIndexOfThisSubtask(),
              subscribedPartitionsToStartOffsets.size(),
              specificStartupOffsets,
              subscribedPartitionsToStartOffsets.keySet());
           List&lt;KafkaTopicPartition&gt; partitionsDefaultedToGroupOffsets = <span class="hljs-keyword">new</span> ArrayList&lt;&gt;(subscribedPartitionsToStartOffsets.size());
           <span class="hljs-keyword">for</span> (Map.Entry&lt;KafkaTopicPartition, Long&gt; subscribedPartition : subscribedPartitionsToStartOffsets.entrySet()) {
              <span class="hljs-keyword">if</span> (subscribedPartition.getValue() == KafkaTopicPartitionStateSentinel.GROUP_OFFSET) {
                 partitionsDefaultedToGroupOffsets.add(subscribedPartition.getKey());
              }
           }
           <span class="hljs-keyword">if</span> (partitionsDefaultedToGroupOffsets.size() &gt; <span class="hljs-number">0</span>) {
              LOG.warn(<span class="hljs-string">"Consumer subtask {} cannot find offsets for the following {} partitions in the specified startup offsets: {}"</span> +
                    <span class="hljs-string">"; their startup offsets will be defaulted to their committed group offsets in Kafka."</span>,
                 getRuntimeContext().getIndexOfThisSubtask(),
                 partitionsDefaultedToGroupOffsets.size(),
                 partitionsDefaultedToGroupOffsets);
           }
           <span class="hljs-keyword">break</span>;
        <span class="hljs-keyword">case</span> GROUP_OFFSETS:
           LOG.info(<span class="hljs-string">"Consumer subtask {} will start reading the following {} partitions from the committed group offsets in Kafka: {}"</span>,
              getRuntimeContext().getIndexOfThisSubtask(),
              subscribedPartitionsToStartOffsets.size(),
              subscribedPartitionsToStartOffsets.keySet());
     }
  } <span class="hljs-keyword">else</span> {
     LOG.info(<span class="hljs-string">"Consumer subtask {} initially has no partitions to read from."</span>,
        getRuntimeContext().getIndexOfThisSubtask());
  }

}
}

对 Kafka 中的 Topic 和 Partition 的数据进行读取的核心逻辑都在 run 方法中:

public void run(SourceContext<T> sourceContext) throws Exception {
   if (subscribedPartitionsToStartOffsets == null) {
      throw new Exception("The partitions were not set for the consumer");
   }
   this.successfulCommits = this.getRuntimeContext().getMetricGroup().counter(COMMITS_SUCCEEDED_METRICS_COUNTER);
   this.failedCommits =  this.getRuntimeContext().getMetricGroup().counter(COMMITS_FAILED_METRICS_COUNTER);
   final int subtaskIndex = this.getRuntimeContext().getIndexOfThisSubtask();
   this.offsetCommitCallback = new KafkaCommitCallback() {
      @Override
      public void onSuccess() {
         successfulCommits.inc();
      }
      @Override
      public void onException(Throwable cause) {
         LOG.warn(String.format("Consumer subtask %d failed async Kafka commit.", subtaskIndex), cause);
         failedCommits.inc();
      }
   };

if (subscribedPartitionsToStartOffsets.isEmpty()) {
sourceContext.markAsTemporarilyIdle();
}
LOG.info(“Consumer subtask {} creating fetcher with offsets {}.”,
getRuntimeContext().getIndexOfThisSubtask(), subscribedPartitionsToStartOffsets);

this.kafkaFetcher = createFetcher(
sourceContext,
subscribedPartitionsToStartOffsets,
periodicWatermarkAssigner,
punctuatedWatermarkAssigner,
(StreamingRuntimeContext) getRuntimeContext(),
offsetCommitMode,
getRuntimeContext().getMetricGroup().addGroup(KAFKA_CONSUMER_METRICS_GROUP),
useMetrics);
if (!running) {
return;
}
if (discoveryIntervalMillis == PARTITION_DISCOVERY_DISABLED) {
kafkaFetcher.runFetchLoop();
} else {
runWithPartitionDiscovery();
}
}

Flink 消费 Kafka 数据代码

上面介绍了 Flink 消费 Kafka 的方式,以及消息序列化的方式,同时介绍了分区和 Topic 的动态发现方法,那么回到我们的项目中来,消费 Kafka 数据的完整代码如下:

public class KafkaConsumer {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
        env.enableCheckpointing(5000);
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "127.0.0.1:9092");
        //设置消费组
        properties.setProperty("group.id", "group_test");
        properties.setProperty(FlinkKafkaConsumerBase.KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS, "10");
        FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>("test", new SimpleStringSchema(), properties);
        //设置从最早的ffset消费
        consumer.setStartFromEarliest();
        env.addSource(consumer).flatMap(new FlatMapFunction<String, String>() {
            @Override
            public void flatMap(String value, Collector<String> out) throws Exception {
                System.out.println(value);
            }
        });
        env.execute("start consumer...");
    }
}

我们可以直接右键运行代码,在控制台中可以看到数据的正常打印,如下图所示:

Drawing 1.png

通过代码可知,我们之前发往 Kafka 的消息被完整地打印出来了。

总结

这一课时介绍了 Flink 消费 Kafka 的方式,比如从常用的指定单个或者多个 Topic、消息的序列化、分区的动态发现等,还从源码上介绍了 Flink 消费 Kafka 的原理。通过本课时的学习,相信你可以对 Flink 消费 Kafka 有一个较为全面地了解,根据业务场景可以正确选择消费的方式和配置。


精选评论

**6513:

项目中也是用main方法作为入口吗?有集成springboot的案例吗

    讲师回复:

    在编写Flink代码时,尽量避免使用spring类的框架,因为没有必要。只要依赖Flink必要的包和一些工具类即可。

**冰:

老师,这里可以从指定offset消费,怎么在程序终止或停止时保存offset,以便启时使用了?

    讲师回复:

    如果实际情况需要保存位点,那么一般是自己管理位点,每次停止重启后从自己管理的位点消费,比如你可以存储在mysql中,自己去读取

**良:

我是用flink消费Kafka从指定的topic和partition中指定的offset处开始消费,实际结果与预期不一致,消费的分区对不上是啥原因呢?示例代码:Mapoffsets.put(new KafkaTopicPartition(topic, 0), 1L);offsets.put(new KafkaTopicPartition(topic, 1), 2L);offsets.put(new KafkaTopicPartition(topic, 2), 3L);consumer.setStartFromSpecificOffsets(offsets);返回结果:ConsumerRecord(topic=new-topic-config-test, partition=0, offset=10105849, key=, value=Python从入门到放弃!ConsumerRecord(topic=new-topic-config-test, partition=3, offset=10107121, key=, value=Python从入门到放弃!ConsumerRecord(topic=new-topic-config-test, partition=2, offset=10102806, key=, value=Java从入门到放弃!

    讲师回复:

    从两个原因查询,第一看下并行度的设置要和kafka分区设置保持一致。第二,要保证kafka4个分区都有数据。

*轩:

请问哪里可以下载项目用的数据,不是源码

    讲师回复:

    在项目中有数据,也可以自己造一些数据

**7324:

flink如何将key相同的数据写入到kafka的同一个partition呢?

    讲师回复:

    你可以自定义自己的kafka分区器,可以查一下FlinkKafkaPartitioner的用法,但是一般我们不会这么用,如果你需要自定义写入kafka的分区器,要保证数据尽量均匀,不要引起kafka端的数据倾斜

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1301859.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

张驰咨询:掌握流程改进的关键,深入了解六西格玛绿带培训

尊敬的读者&#xff0c;当您寻求提升个人能力&#xff0c;加强企业流程管理时&#xff0c;六西格玛绿带培训无疑是您的不二选择。本文将带您深入了解六西格玛绿带培训的核心内容、必备工具和实际案例&#xff0c;以助您在职业生涯中一帆风顺。 六西格玛绿带培训主要针对中层管…

【SQL开发实战技巧】系列(四十八):Oracle12C常用新特性☞多分区操作和管理

系列文章目录 【SQL开发实战技巧】系列&#xff08;一&#xff09;:关于SQL不得不说的那些事 【SQL开发实战技巧】系列&#xff08;二&#xff09;&#xff1a;简单单表查询 【SQL开发实战技巧】系列&#xff08;三&#xff09;&#xff1a;SQL排序的那些事 【SQL开发实战技巧…

风险评估是什么,为什么被称为保护网络安全的重要一环!

随着互联网的普及和信息技术的快速发展&#xff0c;网络已经成为人们生活和工作中不可或缺的一部分。然而&#xff0c;网络在为我们带来便利的同时&#xff0c;也存在着各种安全风险。因此&#xff0c;进行网络风险评估是保护网络安全的重要一环。而为什么说风险评估是保护网络…

Gti GUI添加标签

通过Git Gui打开项目&#xff0c;通过菜单打开分支历史&#xff0c;我这里是名为"develop"的分支 选中需要打标签的commit&#xff0c;右键-Create tag即可 但貌似无法删除标签&#xff0c;只能通过git bash&#xff0c;本地标签通过git tag -d tagname&#xff0c;…

《使用ThinkPHP6开发项目》 - 创建应用

《使用ThinkPHP6开发项目》 - 安装ThinkPHP框架-CSDN博客 《使用ThinkPHP6开发项目》 - 设置项目环境变量-CSDN博客 《使用ThinkPHP6开发项目》 - 项目使用多应用开发-CSDN博客 根据前面的步骤&#xff0c;我们现在就可以开发我们的项目开发了&#xff0c;根据项目开发的需要…

晶圆划片机助力LED陶瓷基板高效切割:科技提升产业新高度

博捷芯半导体划片机在LED陶瓷基板制造领域&#xff0c;晶圆划片机作为一种先进的切割工具&#xff0c;正在为提升产业效率和产品质量发挥重要作用。通过精确的切割工艺&#xff0c;晶圆划片机将LED陶瓷基板高效地切割成独立的芯片&#xff0c;为LED产业的快速发展提供了有力支持…

大数据分析与国际市场:跨境电商如何精准洞察需求

随着数字化时代的来临&#xff0c;大数据分析已经成为跨境电商的一项关键工具&#xff0c;为企业提供了更深入、更精准的市场洞察。在国际市场竞争激烈的环境中&#xff0c;了解和满足消费者需求是取得成功的关键。本文将探讨大数据分析在跨境电商中的作用&#xff0c;以及如何…

奇迹进化宝石怎么用

奇迹进化宝石的用法是: (一)先用再生宝石点在想要强化的物品上生成强化属性。 (二)点击进化石放到想要进化的具有强化属性的物品上。 进化过程存在一定概率的成功/失败&#xff0c;高级进化石的成功概率比低级进化石的成功概率低。 进化成功时&#xff0c;强化的属性会提高。…

配置本地端口镜像示例

目录 实验拓扑 组网需求 配置思路 配置步骤 1.配置观察端口 2.配置镜像端口 实验拓扑 组网需求 如实验拓扑所示 某公司行政部通过Switch与外部Internet通信&#xff0c;监控设备Server&#xff08;Router&#xff09;与Switch直连。 现在希望通过Server对行政部访…

微信小程序:模态框(弹窗)的实现

效果 wxml <!--新增&#xff08;点击按钮&#xff09;--> <image classimg src"{{add}}" bindtapadd_mode></image> <!-- 弹窗 --> <view class"modal" wx:if"{{showModal}}"><view class"modal-conten…

用keepalived做mysql高可用

两台机器(centos7系统)安装mysql [rootmysql-keep-master ~]# wget https://dev.mysql.com/get/mysql80-community-release-el7-3.noarch.rpm [rootmysql-keep-master ~]# rpm -ivh mysql80-community-release-el7-3.noarch.rpm [rootmysql-keep-master ~]# yum -y install yu…

2024年网络安全竞赛-网站渗透

网站渗透 (一)拓扑图 1.使用渗透机对服务器信息收集,并将服务器中网站服务端口号作为flag提交; 使用nmap工具对靶机进行信息收集 2.使用渗透机对服务器信息收集,将网站的名称作为flag提交; 访问页面即可 3.使用渗透机对服务器渗透,将可渗透页面的名称作为flag提交…

RT-DETR优化:Backbone改进 | UniRepLKNet,通用感知大内核卷积网络,RepLK改进版本 | 2023.11

🚀🚀🚀本文改进: UniRepLKNet,通用感知大内核卷积网络,ImageNet-22K预训练,精度和速度SOTA,ImageNet达到88%, COCO达到56.4 box AP,ADE20K达到55.6 mIoU 🚀🚀🚀RT-DETR改进创新专栏:http://t.csdnimg.cn/vuQTz 学姐带你学习YOLOv8,从入门到创新,轻轻松松…

创投课程第四期 | Web3一级市场投资框架的演变及投资人能力框架的构成

协会邀请了来自Zonff Partners的合伙人——Colin&#xff0c;作为VC创投课程第4期的嘉宾&#xff0c;在北京时间12月9日(周六)下午14:00 PM-15:00 PM于蚂蚁链科技产业创新中心进行线下分享&#xff0c;届时将与所有对Web3投资、创业心怀热忱的朋友们共同探讨《WEB3一级市场投资…

【机器学习实训项目】黑色星期五画像分析

目录 前言 一、项目概述 1.1 项目简介 1.2 项目背景 1.3 项目目标 二、数据分析 2.1 导入库 2.2 数据基本信息 三、画像分析 3.1 画像1&#xff1a;消费金额Top10 3.2 画像2&#xff1a;高频消费Top10 3.3 画像3&#xff1a;人均消费金额Top10 3.4 画像4&#xff1a;男女消费对…

Java代码审计之SpEL表达式注入漏洞分析

文章目录 前言SpEL表达式基础基础用法安全风险案例演示 CVE-2022-22963漏洞简述环境搭建反弹shell CVE漏洞调试分析本地搭建调试分析补丁分析 总结 前言 表达式注入是 Java 安全中一类常见的能够注入命令并形成 RCE 的漏洞&#xff0c;而常见的表达式注入方式有 EL 表达式注入…

Proteus仿真--射击小游戏仿真设计

本文介绍基于proteus射击小游戏仿真设计&#xff08;完整仿真源文件及代码见文末链接&#xff09; 仿真图如下 K1-K4为4个按键&#xff0c;用于上移、下移、确认等&#xff0c;模拟单机游戏 仿真运行视频 Proteus仿真--射击小游戏仿真设计 附完整Proteus仿真资料代码资料 …

个人博客搭建保姆级教程-发布篇

发布方式 可以使用gitee或者github托管博客内容&#xff0c;然后直接在服务端nginx目录进行拉取。或者将内容压缩&#xff0c;拷贝到对应目录后再进行解压。 发布位置 前面我们已经部署了nginx服务器。这里我们需要将对应的html文件拉取或拷贝到对应的文件夹&#xff0c;即n…

销售技巧培训之如何提升门店销售技巧

销售技巧培训之如何提升门店销售技巧 在如今竞争激烈的商业环境中&#xff0c;提升门店销售技巧对于实现销售业绩的飞跃至关重要。本文将探讨如何通过了解客户需求、制定优惠策略、优化销售流程、加强顾客关系管理等手段来提升门店销售技巧&#xff0c;并结合实际案例进行分析…

为什么说AI现在还不行!

AI最近有点被妖魔化了&#xff0c;很像一个老虎在还没有橘猫大的时候&#xff0c;就已经被天天当成虎力大仙来讨论。这种普遍的高预期其实是有害的&#xff0c;尤其是当事情本身还需要耐心细致深耕且长跑的时候。资本、品牌可以匹配高预期所对应的增长倍数&#xff0c;业务则不…