Flink链接Kafka

news2025/1/16 17:39:12

一、基于 Flink 的 Kafka 消息生产者

  • Kafka 生产者的创建与配置
    • 代码通过 FlinkKafkaProducer 创建 Kafka 生产者,用于向 Kafka 主题发送消息。
  • Flink 执行环境的配置
    • 配置了 Flink 的检查点机制,确保消息的可靠性,支持"精确一次"的消息交付语义。
  • 模拟数据源
    • 通过 env.fromElements() 方法创建了简单的消息流,发送了三条消息 "a", "b", 和 "c"
package com.example.kafka_flink.service;

import com.example.kafka_flink.util.MyNoParalleSource;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.springframework.stereotype.Service;

import javax.annotation.PostConstruct;
import java.util.Properties;
@Service
public class SimpleKafkaProducer {

    public static void main(String[] args) throws Exception {
        // 创建 SimpleKafkaProducer 的实例
        SimpleKafkaProducer kafkaProducer = new SimpleKafkaProducer();
        // 调用 producer 方法
        kafkaProducer.producer();
    }

    public void producer() throws Exception {
        // 设置 Flink 执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 配置检查点机制,设置检查点模式为 "Exactly Once"(精确一次)
        env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);

        // 启用检查点机制,设置检查点时间间隔为 5000 毫秒(5 秒)
        env.enableCheckpointing(5000);

        // 配置 Kafka 属性,包括身份验证信息
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "xxxx");
        properties.setProperty("security.protocol", "SASL_PLAINTEXT");
        properties.setProperty("sasl.mechanism", "SCRAM-SHA-512");
        properties.setProperty("sasl.jaas.config", "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"xxx\" password=\"xxx\";");

        // 创建 Kafka 生产者实例,并设置目标主题和序列化模式
        FlinkKafkaProducer<String> producer = new FlinkKafkaProducer<>(
                "WJ-TEST",
                // 使用 SimpleStringSchema 进行字符串序列化
                new SimpleStringSchema(),
                properties
        );

        // 模拟数据源,生产一些简单的消息,并将消息写入 Kafka
        env.fromElements("a", "b", "c")
                .addSink(producer);

        // 启动 Flink 作业
        env.execute("Kafka Producer Job");
    }
}

二、基于 Flink 的 Kafka 消息消费者

2.1 消费一个Topic

  • 设置 Flink 执行环境

    • 使用 StreamExecutionEnvironment.getExecutionEnvironment() 创建执行环境。
  • 启用检查点机制

    • 调用 env.enableCheckpointing(5000),设置检查点时间间隔为 5 秒。
    • 配置检查点模式为 EXACTLY_ONCE,确保数据一致性。
  • 配置 Kafka 属性

    • 设置 Kafka 服务器地址(bootstrap.servers)。
    • 指定消费组 ID(group.id)。
    • 配置安全协议和认证机制(SASL_PLAINTEXTSCRAM-SHA-512)。
  • 创建 Kafka 消费者

    • 使用 FlinkKafkaConsumer<String> 指定单个 Kafka Topic(如 "WJ-TEST")。
    • 设置消息反序列化方式为 SimpleStringSchema
    • 配置消费者从最早偏移量开始消费(setStartFromEarliest())。
  • 将 Kafka 消费者添加到 Flink 数据流

    • 调用 env.addSource(consumer) 添加 Kafka 消费者作为数据源。
    • 使用 FlatMapFunction 处理消息,将其打印或进一步处理。
  • 启动 Flink 作业

    • 使用 env.execute("start consumer...") 启动 Flink 作业,开始消费 Kafka 的消息流。
 //消费单个topic
    public static void consumerOneTopic() throws Exception {
        // 设置 Flink 执行环境
        // 创建一个流处理的执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 启用检查点机制,设置检查点的时间间隔为 5000 毫秒(5 秒)
        env.enableCheckpointing(5000);

        // 配置检查点模式为 "Exactly Once"(精确一次)
        env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);

        // 配置 Kafka 属性,包括身份验证信息
        Properties properties = new Properties();
        // 设置 Kafka 集群地址
        properties.setProperty("bootstrap.servers", "xxxx");
        // 设置消费组 ID,用于管理消费偏移量
        properties.setProperty("group.id", "group_test");
        // 设置安全协议为 SASL_PLAINTEXT
        properties.setProperty("security.protocol", "SASL_PLAINTEXT");
        // 设置 SASL 认证机制为 SCRAM-SHA-512
        properties.setProperty("sasl.mechanism", "SCRAM-SHA-512");
        // 配置 SASL 登录模块,包含用户名和密码
        properties.setProperty("sasl.jaas.config", "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"xxx\" password=\"xxx\";");

        // 创建一个 Kafka 消费者实例
        FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>(
                // 设置要消费的 Kafka 主题名称
                "WJ-TEST",
                // 使用 SimpleStringSchema 将 Kafka 的消息反序列化为字符串
                new SimpleStringSchema(),
                // 传入 Kafka 的配置属性
                properties
        );

        // 设置消费者从 Kafka 的最早偏移量开始消费消息
        consumer.setStartFromEarliest();

        // 将 Kafka 消费者作为数据源添加到 Flink 的执行环境中
        env.addSource(consumer).flatMap(new FlatMapFunction<String, String>() {
            @Override
            
            public void flatMap(String s, Collector<String> collector) throws Exception {
                // 打印消费到的消息内容到控制台
                System.out.println(s);
                // 收集消费到的消息,供后续处理
                collector.collect(s);
            }
        });

        // 启动并执行 Flink 作业,作业名称为 "start consumer..."
        env.execute("start consumer...");
    }

生产消息结果:

2.2 消费多个Topic

  • 设置 Flink 执行环境

    • 使用 StreamExecutionEnvironment.getExecutionEnvironment() 创建执行环境。
  • 启用检查点机制

    • 配置检查点模式为 EXACTLY_ONCE,确保数据一致性。
    • 调用 env.enableCheckpointing(5000) 设置检查点时间间隔为 5 秒。
  • 配置 Kafka 属性

    • 设置 Kafka 服务器地址(bootstrap.servers)。
    • 指定消费组 ID(group.id)。
    • 配置安全协议和认证机制(SASL_PLAINTEXTSCRAM-SHA-512)。
  • 定义 Kafka Topic 列表

    • 创建一个 List<String>,添加多个 Kafka Topic 名称(如 "WJ-TEST""KAFKA_TEST_001")。
  • 创建 Kafka 消费者

    • 使用 FlinkKafkaConsumer,传入 Kafka Topic 列表和自定义反序列化器(CustomDeSerializationSchema)。
    • 配置消费者从最早偏移量开始消费(setStartFromEarliest())。
  • 将 Kafka 消费者添加到 Flink 数据流

    • 调用 env.addSource(consumer) 添加 Kafka 消费者作为数据源。
    • 使用 FlatMapFunction 处理消息,打印消息的 Topic、分区、偏移量、键和值,并收集消息值进行进一步处理。
  • 启动 Flink 作业

    • 使用 env.execute("start consumer...") 启动 Flink 作业,开始消费 Kafka 的多个主题消息流。
//消费多个topic
    public static void consumerTopics() throws Exception {
        // 设置 Flink 执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 配置检查点机制,设置检查点模式为 "Exactly Once"(精确一次)
        env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);

        // 启用检查点机制,设置检查点时间间隔为 5000 毫秒(5 秒)
        env.enableCheckpointing(5000);

        // 配置 Kafka 属性,包括身份验证信息
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "xxxx");

        properties.setProperty("group.id", "group_test");
        properties.setProperty("security.protocol", "SASL_PLAINTEXT");
        properties.setProperty("sasl.mechanism", "SCRAM-SHA-512");
        properties.setProperty("sasl.jaas.config", "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"xxx\" password=\"xxx\";");

        // 定义需要消费的 Kafka 主题列表
        List<String> topics = new ArrayList<>();
        topics.add("WJ-TEST");
        topics.add("KAFKA_TEST_001");

        // 使用自定义反序列化器创建 Kafka 消费者实例
        FlinkKafkaConsumer<ConsumerRecord<String, String>> consumer = new FlinkKafkaConsumer<>(
                topics,
                new CustomDeSerializationSchema(),
                properties
        );

        // 设置消费者从 Kafka 的最早偏移量开始消费消息
        consumer.setStartFromEarliest();

        // 将 Kafka 消费者作为数据源添加到 Flink 的执行环境中
        env.addSource(consumer).flatMap(new FlatMapFunction<ConsumerRecord<String, String>, Object>() {
            @Override
            public void flatMap(ConsumerRecord<String, String> record, Collector<Object> collector) throws Exception {
                // 打印消费到的消息内容到控制台
                System.out.println("Topic: " + record.topic() +
                        ", Partition: " + record.partition() +
                        ", Offset: " + record.offset() +
                        ", Key: " + record.key() +
                        ", Value: " + record.value());
                // 收集消费到的消息,供后续处理
                collector.collect(record.value());
            }
        });

        // 启动并执行 Flink 作业
        env.execute("start consumer...");
    }

2.3 消费Topic的总体代码

package com.example.kafka_flink.service;

import com.example.kafka_flink.util.CustomDeSerializationSchema;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.util.Collector;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.stereotype.Service;

import javax.annotation.PostConstruct;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;

/**
 * @author wangjian
 */
@Service
public class SimpleKafkaConsumer {

    public static void main(String[] args) throws Exception {
//         SimpleKafkaConsumer.consumerOneTopic();
        SimpleKafkaConsumer.consumerTopics();

    }

 //消费单个topic
    public static void consumerOneTopic() throws Exception {
        // 设置 Flink 执行环境
        // 创建一个流处理的执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 启用检查点机制,设置检查点的时间间隔为 5000 毫秒(5 秒)
        env.enableCheckpointing(5000);

        // 配置检查点模式为 "Exactly Once"(精确一次)
        env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);

        // 配置 Kafka 属性,包括身份验证信息
        Properties properties = new Properties();
        // 设置 Kafka 集群地址
        properties.setProperty("bootstrap.servers", "xxxx");
        // 设置消费组 ID,用于管理消费偏移量
        properties.setProperty("group.id", "group_test");
        // 设置安全协议为 SASL_PLAINTEXT
        properties.setProperty("security.protocol", "SASL_PLAINTEXT");
        // 设置 SASL 认证机制为 SCRAM-SHA-512
        properties.setProperty("sasl.mechanism", "SCRAM-SHA-512");
        // 配置 SASL 登录模块,包含用户名和密码
        properties.setProperty("sasl.jaas.config", "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"xxx\" password=\"xxx\";");

        // 创建一个 Kafka 消费者实例
        FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>(
                // 设置要消费的 Kafka 主题名称
                "WJ-TEST",
                // 使用 SimpleStringSchema 将 Kafka 的消息反序列化为字符串
                new SimpleStringSchema(),
                // 传入 Kafka 的配置属性
                properties
        );

        // 设置消费者从 Kafka 的最早偏移量开始消费消息
        consumer.setStartFromEarliest();

        // 将 Kafka 消费者作为数据源添加到 Flink 的执行环境中
        env.addSource(consumer).flatMap(new FlatMapFunction<String, String>() {
            @Override
            
            public void flatMap(String s, Collector<String> collector) throws Exception {
                // 打印消费到的消息内容到控制台
                System.out.println(s);
                // 收集消费到的消息,供后续处理
                collector.collect(s);
            }
        });

        // 启动并执行 Flink 作业,作业名称为 "start consumer..."
        env.execute("start consumer...");
    }


//消费多个topic
    public static void consumerTopics() throws Exception {
        // 设置 Flink 执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 配置检查点机制,设置检查点模式为 "Exactly Once"(精确一次)
        env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);

        // 启用检查点机制,设置检查点时间间隔为 5000 毫秒(5 秒)
        env.enableCheckpointing(5000);

        // 配置 Kafka 属性,包括身份验证信息
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "xxxx");

        properties.setProperty("group.id", "group_test");
        properties.setProperty("security.protocol", "SASL_PLAINTEXT");
        properties.setProperty("sasl.mechanism", "SCRAM-SHA-512");
        properties.setProperty("sasl.jaas.config", "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"xxx\" password=\"xxx\";");

        // 定义需要消费的 Kafka 主题列表
        List<String> topics = new ArrayList<>();
        topics.add("WJ-TEST");
        topics.add("KAFKA_TEST_001");

        // 使用自定义反序列化器创建 Kafka 消费者实例
        FlinkKafkaConsumer<ConsumerRecord<String, String>> consumer = new FlinkKafkaConsumer<>(
                topics,
                new CustomDeSerializationSchema(),
                properties
        );

        // 设置消费者从 Kafka 的最早偏移量开始消费消息
        consumer.setStartFromEarliest();

        // 将 Kafka 消费者作为数据源添加到 Flink 的执行环境中
        env.addSource(consumer).flatMap(new FlatMapFunction<ConsumerRecord<String, String>, Object>() {
            @Override
            public void flatMap(ConsumerRecord<String, String> record, Collector<Object> collector) throws Exception {
                // 打印消费到的消息内容到控制台
                System.out.println("Topic: " + record.topic() +
                        ", Partition: " + record.partition() +
                        ", Offset: " + record.offset() +
                        ", Key: " + record.key() +
                        ", Value: " + record.value());
                // 收集消费到的消息,供后续处理
                collector.collect(record.value());
            }
        });

        // 启动并执行 Flink 作业
        env.execute("start consumer...");
    }


}

2.4 自定义的 Kafka 反序列化器 (CustomDeSerializationSchema)

实现了一个自定义的 Kafka 反序列化器 (CustomDeSerializationSchema),主要功能是将从 Kafka 中消费到的消息(字节数组格式)解析为包含更多元数据信息的 ConsumerRecord<String, String> 对象。以下是其作用的具体说明:

  • 解析 Kafka 消息

    • 消息的 keyvalue 由字节数组转换为字符串格式,便于后续业务逻辑处理。
    • 同时保留 Kafka 消息的元数据信息(如主题名称 topic、分区号 partition、偏移量 offset)。
  • 扩展 Flink 的 Kafka 数据处理能力

    • 默认的反序列化器只处理消息内容(keyvalue),而该自定义类将消息的元数据(如 topicpartition)也作为输出的一部分,为复杂业务需求提供了更多上下文信息。
  • 控制流数据的结束逻辑

    • 实现了 isEndOfStream 方法,返回 false,表示 Kafka 的数据流是持续的,Flink 不会主动终止数据消费。
  • 定义 Flink 数据类型

    • 使用 getProducedType 方法,明确告诉 Flink 输出的数据类型是 ConsumerRecord<String, String>,便于 Flink 在运行时正确处理流数据。
package com.example.kafka_flink.util;

import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema;
import org.apache.kafka.clients.consumer.ConsumerRecord;

import java.nio.charset.StandardCharsets;

/**
 * @author wangjian
 */
public class CustomDeSerializationSchema implements KafkaDeserializationSchema<ConsumerRecord<String, String>> {

    // 是否表示流的最后一条元素
    // 返回 false,表示数据流会源源不断地到来,Flink 不会主动停止消费
    @Override
    public boolean isEndOfStream(ConsumerRecord<String, String> stringStringConsumerRecord) {
        return false;
    }

    // 反序列化方法
    // 将 Kafka 消息从字节数组转换为 ConsumerRecord<String, String> 类型的数据
    // 返回的数据不仅包括消息内容(key 和 value),还包括 topic、offset 和 partition 等元数据信息
    @Override
    public ConsumerRecord<String, String> deserialize(ConsumerRecord<byte[], byte[]> consumerRecord) throws Exception {
        // 检查 key 和 value 是否为 null,避免空指针异常
        String key = consumerRecord.key() == null ? null : new String(consumerRecord.key(), StandardCharsets.UTF_8);
        String value = consumerRecord.value() == null ? null : new String(consumerRecord.value(), StandardCharsets.UTF_8);

        // 构造并返回一个 ConsumerRecord 对象,其中包含反序列化后的 key 和 value,以及其他元数据信息
        return new ConsumerRecord<>(
                // Kafka 主题名称
                consumerRecord.topic(),
                // 分区号
                consumerRecord.partition(),
                // 消息偏移量
                consumerRecord.offset(),
                // 消息的 key
                key,
                // 消息的 value
                value
        );
    }

    // 指定数据的输出类型
    // 告诉 Flink 消费的 Kafka 数据类型是 ConsumerRecord<String, String>
    @Override
    public TypeInformation<ConsumerRecord<String, String>> getProducedType() {
        return TypeInformation.of(new TypeHint<ConsumerRecord<String, String>>() {
        });
    }
}

2.5 消费到消息的结果

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2277641.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

基于国产麒麟操作系统,通过Kubeadm离线部署Kubernetes 1.28版本

文章目录 前言一、环境准备1.主机操作系统说明2.主机硬件配置3.ansible-playbook相关目录准备4.下载离线部署包4.1. 下载kubeclt、kubeam、kubelet RPM包4.2. 下载docker安装包4.3. 下载containerd安装包4.4. 镜像包下载 二、部署流程三、部署过程1.修改hosts文件2.部署单maste…

3、docker的数据卷和dockerfile

dockerfile--------------------自定义镜像 docker的数据卷&#xff1a; 容器与宿主机之间&#xff0c;或者容器和容器之间的数据共享&#xff08;目录&#xff09;。 创建容器的时间&#xff0c;通过指定目录&#xff0c;实现容器于宿主机之间&#xff0c;或者容器和容器之…

登上Nature!交叉注意力机制 发顶会流量密码!

在深度学习领域&#xff0c;交叉注意力融合技术正迅速崛起&#xff0c;并成为处理多模态数据的关键工具。这一技术通过有效地整合来自不同模态的信息&#xff0c;使得模型能够更好地理解和推理复杂的数据关系。 随着多模态数据的日益普及&#xff0c;如图像、文本和声音等&…

网安——CSS

一、CSS基础概念 CSS有两个重要的概念&#xff0c;分为样式和布局 CSS的样式分为两种&#xff0c;一种是文字的样式&#xff0c;一种是盒模型的样式 CSS的另一个重要的特质就是辅助页面布局&#xff0c;完成HTML不能完成的功能&#xff0c;比如并排显示或精确定位显示 从HT…

SOME/IP协议详解 基础解读 涵盖SOME/IP协议解析 SOME/IP通讯机制 协议特点 错误处理机制

车载以太网协议栈总共可划分为五层&#xff0c;分别为物理层&#xff0c;数据链路层&#xff0c;网络层&#xff0c;传输层&#xff0c;应用层&#xff0c;其中今天所要介绍的内容SOME/IP就是一种应用层协议。 SOME/IP协议内容按照AUTOSAR中的描述&#xff0c;我们可以更进一步…

Mysql--实战篇--SQL优化(查询优化器,常用的SQL优化方法,执行计划EXPLAIN,Mysql性能调优,慢日志开启和分析等)

一、查询优化 1、查询优化器 (Query Optimizer) MySQL查询优化器&#xff08;Query Optimizer&#xff09;是MySQL数据库管理系统中的一个关键组件&#xff0c;负责分析和选择最有效的执行计划来执行SQL查询。查询优化器的目标是尽可能减少查询的执行时间和资源消耗&#xff…

CV项目详解:基于yolo8的车辆识别系统(含源码和具体教程)

使用YOLOv8&#xff08;You Only Look Once&#xff09;和OpenCV实现车道线和车辆检测&#xff0c;目标是创建一个可以检测道路上的车道并识别车辆的系统&#xff0c;并估计它们与摄像头的距离。该项目结合了计算机视觉技术和深度学习物体检测。 使用YOLOv8和OpenCV实现车道线…

osg中实现模型的大小、颜色、透明度的动态变化

以博饼状模型为对象,实现了模型大小、颜色、透明度的动态变化。 需要注意的是一点: // 创建材质对象osg::ref_ptr<osg::Material> material = new osg::Material;material->setDiffuse(osg::Material::FRONT_AND_BACK, osg::Vec4(0.0, 1.0, 0.0, 0.5));// 获取模型的…

小米vela系统(基于开源nuttx内核)——openvela开源项目

前言 在 2024 年 12 月 27 日的小米「人车家全生态」合作伙伴大会上&#xff0c;小米宣布全面开源 Vela 操作系统。同时&#xff0c;OpenVela 项目正式上线 GitHub 和 Gitee&#xff0c;采用的是比较宽松的 Apache 2.0 协议&#xff0c;这意味着全球的开发者都可以参与到 Vela…

《数据思维》之数据可视化_读书笔记

文章目录 系列文章目录前言一、pandas是什么&#xff1f;二、使用步骤 1.引入库2.读入数据总结 前言 数据之道&#xff0c;路漫漫其修远兮&#xff0c;吾将上下而求索。 一、数据可视化 最基础的数据可视化方法就是统计图。一个好的统计图应该满足四个标准&#xff1a;准确、有…

AI刷题-最大矩形面积问题、小M的数组变换

目录 一、最大矩形面积问题 问题描述 输入格式 输出格式 输入样例 输出样例 数据范围 解题思路&#xff1a; 问题理解 数据结构选择 算法步骤 最终代码&#xff1a; 运行结果&#xff1a; 二、小M的数组变换 问题描述 测试样例 解题思路&#xff1a; 问题…

数据库(MySQL)练习

数据库&#xff08;MySQL&#xff09;练习 一、练习1.15练习练习 二、注意事项2.1 第四天 一、练习 1.15练习 win11安装配置MySQL超详细教程: https://baijiahao.baidu.com/s?id1786910666566008458&wfrspider&forpc 准备工作&#xff1a; mysql -uroot -p #以管理…

C语言:-三子棋游戏代码:分支-循环-数组-函数集合

思路分析&#xff1a; 1、写菜单 2、菜单之后进入游戏的操作 3、写函数 实现游戏 3.1、初始化棋盘函数&#xff0c;使数组元素都为空格 3.2、打印棋盘 棋盘的大概样子 3.3、玩家出棋 3.3.1、限制玩家要下的坐标位置 3.3.2、判断玩家要下的位置是否由棋子 3.4、电脑出棋 3.4.1、…

知识图谱常见的主流图数据库

在知识图谱中&#xff0c;主流使用的图数据库包括以下几种&#xff1a; Neo4j&#xff1a;这是目前全球部署最广泛的图数据库之一&#xff0c;具有强大的查询性能和灵活的数据模型&#xff0c;适用于复杂关系数据的存储和查询。 JanusGraph&#xff1a;JanusGraph是一个开源的…

Nginx三种不同类型的虚拟主机(基于域名、IP 和端口)

&#x1f3e1;作者主页&#xff1a;点击&#xff01; Nginx-从零开始的服务器之旅专栏&#xff1a;点击&#xff01; &#x1f427;Linux高级管理防护和群集专栏&#xff1a;点击&#xff01; ⏰️创作时间&#xff1a;2025年1月15日13点14分 目录 1. 基于域名的虚拟主机 …

RabbitMQ(四)

SpringBoot整合RabbitMQ SpringBoot整合1、生产者工程①创建module②配置POM③YAML④主启动类⑤测试程序 2、消费者工程①创建module②配置POM③YAML文件内配置&#xff1a; ④主启动类⑤监听器 3、RabbitListener注解属性对比①bindings属性②queues属性 SpringBoot整合 1、生…

java_将数据存入elasticsearch进行高效搜索

使用技术简介&#xff1a; (1) 使用Nginx实现反向代理&#xff0c;使前端可以调用多个微服务 (2) 使用nacos将多个服务管理关联起来 (3) 将数据存入elasticsearch进行高效搜索 (4) 使用消息队列rabbitmq进行消息的传递 (5) 使用 openfeign 进行多个服务之间的api调用 参…

win32汇编环境,对话框程序中组合框的应用举例

;运行效果 ;win32汇编环境,对话框程序中组合框的应用举例 ;比如在对话框中生成组合框&#xff0c;增加子项&#xff0c;删除某项&#xff0c;取得指定项内容等 ;直接抄进RadAsm可编译运行。重点部分加备注。 ;以下是ASM文件 ;>>>>>>>>>>>>…

occ的开发框架

occ的开发框架 1.Introduction This manual explains how to use the Open CASCADE Application Framework (OCAF). It provides basic documentation on using OCAF. 2.Purpose of OCAF OCAF (the Open CASCADE Application Framework) is an easy-to-use platform for ra…

Linux检查磁盘占用情况

1.检查使用情况 df -h发现是/dev/vda1占用很高 2.查看/dev/vda1文件夹 cd /dev/vda1发现不是文件夹 3.继续查看使用情况 df -h *4.原因可能是文件已经删除但是进程还在&#xff0c;没有释放空间 5.查看删除操作的进程 lsof -n | grep deleted6.杀死进程 kill -9 PID