4.3、Flink任务怎样读取Kafka中的数据

news2024/11/24 4:24:09

目录

1、添加pom依赖

2、API使用说明

3、这是一个完整的入门案例

4、Kafka消息应该如何解析

4.1、只获取Kafka消息的value部分

​4.2、获取完整Kafka消息(key、value、Metadata)

4.3、自定义Kafka消息解析器

5、起始消费位点应该如何设置

​5.1、earliest()

5.2、latest()

5.3、timestamp()

6、Kafka分区扩容了,该怎么办 —— 动态分区检查

7、在加载KafkaSource时提取事件时间&添加水位线

7.1、使用内置的单调递增的水位线生成器 + kafka timestamp 为事件时间

7.2、使用内置的单调递增的水位线生成器 + kafka 消息中的 ID字段 为事件时间


1、添加pom依赖

我们可以使用Flink官方提供连接Kafka的工具flink-connector-kafka

该工具实现了一个消费者FlinkKafkaConsumer,可以用它来读取kafka的数据

如果想使用这个通用的Kafka连接工具,需要引入jar依赖

<!-- 引入 kafka连接器依赖-->
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-kafka</artifactId>
    <version>1.17.0</version>
</dependency>

2、API使用说明

官网链接:Apache Kafka 连接器

语法说明: 

// 1.初始化 KafkaSource 实例
KafkaSource<String> source = KafkaSource.<String>builder()
    .setBootstrapServers(brokers)                           // 必填:指定broker连接信息 (为保证高可用,建议多指定几个节点)                     
    .setTopics("input-topic")                               // 必填:指定要消费的topic
    .setGroupId("my-group")                                 // 必填:指定消费者的groupid(不存在时会自动创建)
    .setValueOnlyDeserializer(new SimpleStringSchema())     // 必填:指定反序列化器(用来解析kafka消息数据,转换为flink数据类型)
    .setStartingOffsets(OffsetsInitializer.earliest())      // 可选:指定启动任务时的消费位点(不指定时,将默认使用 OffsetsInitializer.earliest())
    .build(); 

// 2.通过 fromSource + KafkaSource 获取 DataStreamSource
env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");

3、这是一个完整的入门案例

开发语言:java1.8

flink版本:flink1.17.0

public class ReadKafka {
    public static void main(String[] args) throws Exception {
        newAPI();
    }

    public static void newAPI() throws Exception {
        // 1.获取执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 2.读取kafka数据
        KafkaSource<String> source = KafkaSource.<String>builder()
                .setBootstrapServers("worker01:9092")               // 必填:指定broker连接信息 (为保证高可用,建议多指定几个节点)
                .setTopics("20230810")                              // 必填:指定要消费的topic
                .setGroupId("FlinkConsumer")                        // 必填:指定消费者的groupid(不存在时会自动创建)
                .setValueOnlyDeserializer(new SimpleStringSchema()) // 必填:指定反序列化器(用来解析kafka消息数据)
                .setStartingOffsets(OffsetsInitializer.earliest())  // 可选:指定启动任务时的消费位点(不指定时,将默认使用 OffsetsInitializer.earliest())
                .build();

        env.fromSource(source,
                WatermarkStrategy.noWatermarks(),
                "Kafka Source")
                .print()
        ;

        // 3.触发程序执行
        env.execute();
    }
}

4、Kafka消息应该如何解析

代码中需要提供一个反序列化器(Deserializer)来对 Kafka 的消息进行解析

反序列化器的功能:

                将Kafka ConsumerRecords转换为Flink处理的数据类型(Java/Scala对象)

反序列化器通过  setDeserializer(KafkaRecordDeserializationSchema.of(反序列化器类型)) 指定

下面介绍两种常用Kafka消息解析器:

        KafkaRecordDeserializationSchema.of(new JSONKeyValueDeserializationSchema(true)) :

                 1、返回完整的Kafka消息,将JSON字符串反序列化为ObjectNode对象

                 2、可以选择是否返回Kafak消息的Metadata信息,true-返回,false-不返回

        KafkaRecordDeserializationSchema.valueOnly(StringDeserializer.class) :

                1、只返回Kafka消息中的value部分 

4.1、只获取Kafka消息的value部分

4.2、获取完整Kafka消息(key、value、Metadata)

kafak消息格式:

                key =  {"nation":"蜀国"}

                value = {"ID":整数}

    public static void ParseMessageJSONKeyValue() throws Exception {
        // 1.获取执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 2.读取kafka数据
        KafkaSource<ObjectNode> source = KafkaSource.<ObjectNode>builder()
                .setBootstrapServers("worker01:9092")               // 必填:指定broker连接信息 (为保证高可用,建议多指定几个节点)
                .setTopics("9527")                                  // 必填:指定要消费的topic
                .setGroupId("FlinkConsumer")                        // 必填:指定消费者的groupid(不存在时会自动创建)
                // 必填:指定反序列化器(将kafak消息解析为ObjectNode,json对象)
                .setDeserializer(KafkaRecordDeserializationSchema.of(
                        // includeMetadata = (true:返回Kafak元数据信息 false:不返回)
                        new JSONKeyValueDeserializationSchema(true)
                ))
                .setStartingOffsets(OffsetsInitializer.latest())  // 可选:指定启动任务时的消费位点(不指定时,将默认使用 OffsetsInitializer.earliest())
                .build();

        env
                .fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source")
                .print()
        ;

        // 3.触发程序执行
        env.execute();

    }

运行结果:    

常见报错: 

Caused by: java.io.IOException: Failed to deserialize consumer record ConsumerRecord(topic = 9527, partition = 0, leaderEpoch = 0, offset = 1064, CreateTime = 1691668775938, serialized key size = 4, serialized value size = 9, headers = RecordHeaders(headers = [], isReadOnly = false), key = [B@5e9eaab8, value = [B@67390400).
	at org.apache.flink.connector.kafka.source.reader.deserializer.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:57)
	at org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter.emitRecord(KafkaRecordEmitter.java:53)
	... 14 more
Caused by: org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'xxxx': was expecting (JSON String, Number, Array, Object or token 'null', 'true' or 'false')
 at [Source: (byte[])"xxxx"; line: 1, column: 5]

报错原因:

          出现这个报错,一般是使用flink读取fafka时,使用JSONKeyValueDeserializationSchema

来解析消息时,kafka消息中的key 或者 value 内容不符合json格式而造成的解析错误

例如下面这个格式,就会造成解析错误  key=1000,value=你好

那应该怎么解决呢?

        1、如果有权限修改Kafka消息格式,可以将Kafka消息key&value内容修改为Json格式

        2、如果没有权限修改Kafka消息格式(比如线上环境,修改比较困难),可以重新实现

       JSONKeyValueDeserializationSchema类,根据所需格式来解析Kafka消息(可以参考源码)

4.3、自定义Kafka消息解析器

        生产中对Kafka消息及解析的格式总是各种各样的,当flink预定义的解析器满足不了业务需求时,可以通过自定义kafka消息解析器来完成业务的支持

例如,当使用 MyJSONKeyValueDeserializationSchema 获取Kafka元数据时,只返回了 offset、topic、partition 三个字段信息,现在需要`kafka生产者写入数据时的timestamp`,就可以通过自定义kafka消息解析器来完成

代码示例:

// TODO 自定义Kafka消息解析器,在 metadata 中增加 timestamp字段
public class MyJSONKeyValueDeserializationSchema implements KafkaDeserializationSchema<ObjectNode>{

        private static final long serialVersionUID = 1509391548173891955L;

        private final boolean includeMetadata;
        private ObjectMapper mapper;

        public MyJSONKeyValueDeserializationSchema(boolean includeMetadata) {
            this.includeMetadata = includeMetadata;
        }

        @Override
        public void open(DeserializationSchema.InitializationContext context) throws Exception {
            mapper = JacksonMapperFactory.createObjectMapper();
        }

        @Override
        public ObjectNode deserialize(ConsumerRecord<byte[], byte[]> record) throws Exception {
            ObjectNode node = mapper.createObjectNode();
            if (record.key() != null) {
                node.set("key", mapper.readValue(record.key(), JsonNode.class));
            }
            if (record.value() != null) {
                node.set("value", mapper.readValue(record.value(), JsonNode.class));
            }
            if (includeMetadata) {
                node.putObject("metadata")
                        .put("offset", record.offset())
                        .put("topic", record.topic())
                        .put("partition", record.partition())
                        // 添加 timestamp 字段
                        .put("timestamp",record.timestamp())
                ;
            }
            return node;
        }

        @Override
        public boolean isEndOfStream(ObjectNode nextElement) {
            return false;
        }

        @Override
        public TypeInformation<ObjectNode> getProducedType() {
            return getForClass(ObjectNode.class);
        }
    }

运行结果:


5、起始消费位点应该如何设置

起始消费位点说明:

        起始消费位点是指 启动flink任务时,应该从哪个位置开始读取Kafka的消息   

        下面介绍下常用的三个设置:    

                OffsetsInitializer.earliest()  :

                        从最早位点开始消

                        这里的最早指的是Kafka消息保存的时长(默认为7天,生成环境各公司略有不同)

                        该这设置为默认设置,当不指定OffsetsInitializer.xxx时,默认为earliest() 

                OffsetsInitializer.latest()   :

                        从最末尾位点开始消费

                        这里的最末尾指的是flink任务启动时间点之后生产的消息

                OffsetsInitializer.timestamp(时间戳) :

                        从时间戳大于等于指定时间戳(毫秒)的数据开始消费

下面用案例说明下,三种设置的效果,kafak生成10条数据,如下:

5.1、earliest()

代码示例:

KafkaSource<ObjectNode> source = KafkaSource.<ObjectNode>builder()
        .setBootstrapServers("worker01:9092")
        .setTopics("23230811")
        .setGroupId("FlinkConsumer")
        // 将kafka消息解析为Json对象,并返回元数据
        .setDeserializer(KafkaRecordDeserializationSchema.of(
                new JSONKeyValueDeserializationSchema(true)
        ))
        // 设置起始消费位点:从最早位置开始消费(该设置为默认设置)
        .setStartingOffsets(OffsetsInitializer.earliest())
        .build();

运行结果:

5.2、latest()

代码示例:

KafkaSource<ObjectNode> source = KafkaSource.<ObjectNode>builder()
        .setBootstrapServers("worker01:9092")
        .setTopics("23230811")
        .setGroupId("FlinkConsumer")
        // 将kafka消息解析为Json对象,并返回元数据
        .setDeserializer(KafkaRecordDeserializationSchema.of(
                new JSONKeyValueDeserializationSchema(true)
        ))
        // 设置起始消费位点:从最末尾位点开始消费
        .setStartingOffsets(OffsetsInitializer.latest())
        .build();

运行结果:

5.3、timestamp()

代码示例:

KafkaSource<ObjectNode> source = KafkaSource.<ObjectNode>builder()
        .setBootstrapServers("worker01:9092")
        .setTopics("23230811")
        .setGroupId("FlinkConsumer")
        // 将kafka消息解析为Json对象,并返回元数据
        .setDeserializer(KafkaRecordDeserializationSchema.of(
                new MyJSONKeyValueDeserializationSchema(true)
        ))
        // 设置起始消费位点:从指定时间戳后开始消费
        .setStartingOffsets(OffsetsInitializer.timestamp(1691722791273L))
        .build();

运行结果:


6、Kafka分区扩容了,该怎么办 —— 动态分区检查

        在flink1.13的时候,如果Kafka分区扩容了,只有通过重启flink任务,才能消费到新增分区的数据,小编就曾遇到过上游业务部门的kafka分区扩容了,并没有通知下游使用方,导致实时指标异常,甚至丢失了数据。

        在flink1.17的时候,可以通过`开启动态分区检查`,来实现不用重启flink任务,就能消费到新增分区的数据

开启分区检查:(默认不开启)

KafkaSource.builder()
    .setProperty("partition.discovery.interval.ms", "10000"); // 每 10 秒检查一次新分区

代码示例:

KafkaSource<ObjectNode> source = KafkaSource.<ObjectNode>builder()
        .setBootstrapServers("worker01:9092")
        .setTopics("9527")
        .setGroupId("FlinkConsumer")
        // 将kafka消息解析为Json对象,并返回元数据
        .setDeserializer(KafkaRecordDeserializationSchema.of(
                new JSONKeyValueDeserializationSchema(true)
        ))
        // 设置起始消费位点:从最末尾位点开始消费
        .setStartingOffsets(OffsetsInitializer.latest())
        // 开启动态分区检查(默认不开启)
        .setProperty("partition.discovery.interval.ms", "10000") // 每 10 秒检查一次新分区
        .build();

7、在加载KafkaSource时提取事件时间&添加水位线

可以在 fromSource(source,WatermarkStrategy,sourceName) 时,提取事件时间和制定水位线生成策略

注意:当不指定事件时间提取器时,Kafka Source 使用 Kafka 消息中的时间戳作为事件时间

7.1、使用内置的单调递增的水位线生成器 + kafka timestamp 为事件时间

代码示例:

    // 在读取Kafka消息时,提取事件时间&插入水位线
    public static void KafkaSourceExtractEventtimeAndWatermark() throws Exception {
        // 1.获取执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 2.读取kafka数据
        KafkaSource<ObjectNode> source = KafkaSource.<ObjectNode>builder()
                .setBootstrapServers("worker01:9092")
                .setTopics("9527")
                .setGroupId("FlinkConsumer")
                // 将kafka消息解析为Json对象,并返回元数据
                .setDeserializer(KafkaRecordDeserializationSchema.of(
                        new MyJSONKeyValueDeserializationSchema(true)
                ))
                // 设置起始消费位点:从最末尾位点开始消费
                .setStartingOffsets(OffsetsInitializer.latest())
                .build();

        env.fromSource(source,
                        // 使用内置的单调递增的水位线生成器(默认使用 kafka的timestamp作为事件时间)
                        WatermarkStrategy.forMonotonousTimestamps(),
                        "Kafka Source")
                // 通过 ProcessFunction 查看提取的事件时间和水位线信息
                .process(
                        new ProcessFunction<ObjectNode, String>() {
                            @Override
                            public void processElement(ObjectNode kafkaJson, ProcessFunction<ObjectNode, String>.Context ctx, Collector<String> out) throws Exception {
                                // 当前处理时间
                                long currentProcessingTime = ctx.timerService().currentProcessingTime();
                                // 当前水位线
                                long currentWatermark = ctx.timerService().currentWatermark();
                                StringBuffer record = new StringBuffer();
                                record.append("========================================\n");
                                record.append(kafkaJson + "\n");
                                record.append("currentProcessingTime:" + currentProcessingTime + "\n");
                                record.append("currentWatermark:" + currentWatermark + "\n");
                                record.append("kafka-ID:" + Long.parseLong(kafkaJson.get("value").get("ID").toString()) + "\n");
                                record.append("kafka-timestamp:" + Long.parseLong(kafkaJson.get("metadata").get("timestamp").toString()) + "\n");
                                out.collect(record.toString());

                            }
                        }
                ).print();

        // 3.触发程序执行
        env.execute();
    }

运行结果:

7.2、使用内置的单调递增的水位线生成器 + kafka 消息中的 ID字段 为事件时间

代码示例:

    // 在读取Kafka消息时,提取事件时间&插入水位线
    public static void KafkaSourceExtractEventtimeAndWatermark() throws Exception {
        // 1.获取执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 2.读取kafka数据
        KafkaSource<ObjectNode> source = KafkaSource.<ObjectNode>builder()
                .setBootstrapServers("worker01:9092")
                .setTopics("9527")
                .setGroupId("FlinkConsumer")
                // 将kafka消息解析为Json对象,并返回元数据
                .setDeserializer(KafkaRecordDeserializationSchema.of(
                        new MyJSONKeyValueDeserializationSchema(true)
                ))
                // 设置起始消费位点:从最末尾位点开始消费
                .setStartingOffsets(OffsetsInitializer.latest())
                .build();

        env.fromSource(source,
                        // 使用内置的单调递增的水位线生成器(使用 kafka消息中的ID字段作为事件时间)
                        WatermarkStrategy.<ObjectNode>forMonotonousTimestamps()
                                // 提取 Kafka消息中的 ID字段作为 事件时间
                                .withTimestampAssigner(
                                        (json, timestamp) -> Long.parseLong(json.get("value").get("ID").toString())
                                ),

                        "Kafka Source")
                // 通过 ProcessFunction 查看提取的事件时间和水位线信息
                .process(
                        new ProcessFunction<ObjectNode, String>() {
                            @Override
                            public void processElement(ObjectNode kafkaJson, ProcessFunction<ObjectNode, String>.Context ctx, Collector<String> out) throws Exception {
                                // 当前处理时间
                                long currentProcessingTime = ctx.timerService().currentProcessingTime();
                                // 当前水位线
                                long currentWatermark = ctx.timerService().currentWatermark();
                                StringBuffer record = new StringBuffer();
                                record.append("========================================\n");
                                record.append(kafkaJson + "\n");
                                record.append("currentProcessingTime:" + currentProcessingTime + "\n");
                                record.append("currentWatermark:" + currentWatermark + "\n");
                                record.append("kafka-ID:" + Long.parseLong(kafkaJson.get("value").get("ID").toString()) + "\n");
                                record.append("kafka-timestamp:" + Long.parseLong(kafkaJson.get("metadata").get("timestamp").toString()) + "\n");
                                out.collect(record.toString());

                            }
                        }
                ).print();

        // 3.触发程序执行
        env.execute();
    }

运行结果:

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/866064.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

wsl2安装mysql环境

安装完mysql后通过如下命令启动mysql service mysql start 会显示如下错误&#xff1a; mysql: unrecognized service 实际上上面显示的错误是由于mysql没有启动成功造成的 我们要想办法成功启动mysql才可以 1.通过如下操作就可以跳过密码直接进入mysql环境 2.如果想找到my…

cesium学习记录07-实体(Entity)

在学习记录05中&#xff0c;我们将了如何在 Cesium 中加载各种数据&#xff0c;包括矢量数据、影像图层、地形和 3D 模型。这些数据为我们构建了一个基础的场景和背景。特别是在加载 3D 模型时&#xff0c;我们采用了 viewer.scene.primitives.add 方法将模型作为一个原始对象添…

凯迪正大—微机继电保护校验仪

一、继电保护测试仪产品概述 KDJB-802继电保护测试仪是在参照电力部颁发的《微机型继电保护试验装置技术条件&#xff08;讨论稿&#xff09;》的基础上&#xff0c;听取用户意见&#xff0c;总结目前国内同类产品优缺点&#xff0c;充分使用现代的微电子技术和器件实现的一种新…

msvcp120.dll丢失的解决方法,Win11系统报错处理方法

在使用Windows11系统的时候&#xff0c;出现报错msvcp120.dll丢失我们需要怎么去修复它呢&#xff1f;msvcp120.dll是Windows操作系统中的一个重要的动态链接库文件&#xff0c;它包含了许多用于C程序的函数和类。然而&#xff0c;有时候我们可能会遇到msvcp120.dll丢失或损坏的…

AMD高保真超分算法1.0解密

FSR 1.0是空间滤波算法&#xff0c;分成EASU和RCAS两部分。EASU是边缘适配的空间上采样(Edge Adaptive Spatial Upsampling)&#xff0c;RCAS是健壮对比度适配锐化(Robust Contrast Adaptive Sharpening)&#xff0c;从CAS发展而来。 Lanczos 采样及多项式拟合 FSR 1.0 使用了 …

​ATF(TF-A)安全通告 TFV-7 (CVE-2018-3639)​

ATF(TF-A)安全通告汇总 目录 一、ATF(TF-A)安全通告 TFV-7 (CVE-2018-3639) 二、静态缓解&#xff08;Static mitigation&#xff09; 三、动态缓解&#xff08;Dynamic mitigation&#xff09; 一、ATF(TF-A)安全通告 TFV-7 (CVE-2018-3639) Title TF-A披露基于cache前瞻…

pc端网页用vue并且实现响应式 vue+bootstrap-vue

1、hbuiler内新建vue项目 在项目文件夹下用npm加载依赖&#xff08;或者用hbuilder内打开命令&#xff09; 2、配置路由 src内新建router文件夹&#xff0c;router内新建index.js index.js内配置重定向到首页 main.js内配置路由 import router from /router/index.js new…

08-1_Qt 5.9 C++开发指南_QPainter绘图

文章目录 前言1. QPainter 绘图系统1.1 QPainter 与QPaintDevice1.2 paintEvent事件和绘图区1.3 QPainter 绘图的主要属性 2. QPen的主要功能3. QBrush的主要功能4. 渐变填充5. QPainter 绘制基本图形元件5.1 基本图像元件5.2 QpainterPath的使用 前言 本章所介绍内容基本在《…

python编辑器安装与配置,python用哪个编辑器好用

大家好&#xff0c;给大家分享一下python编辑器pycharm安装教程&#xff0c;很多人还不知道这一点。下面详细解释一下。现在让我们来看看&#xff01; 哪些python的编程软件值得推荐&#xff1f; 编写python源代码的软件.首推的Pycharm。 PyCharm用于bai一般IDE具备的功能&…

Kotlin Executors线程池newSingleThreadExecutor单线程

Kotlin Executors线程池newSingleThreadExecutor单线程 import java.util.concurrent.Executorsfun main() {val mExecutorService Executors.newSingleThreadExecutor()for (i in 1..5) {mExecutorService.execute {println("seq-$i tid:${Thread.currentThread().threa…

CSS3 中新增了哪些常见的特性?

聚沙成塔每天进步一点点 ⭐ 专栏简介⭐ 圆角&#xff08;Border Radius&#xff09;⭐ 渐变&#xff08;Gradients&#xff09;⭐ 阴影&#xff08;Box Shadow&#xff09;⭐ 文本阴影&#xff08;Text Shadow&#xff09;⭐ 透明度&#xff08;Opacity&#xff09;⭐ 过渡&…

在跨境电子商务客户支持中使用AI的几种方法

OpenAI&#xff0c;ChatGPT等&#xff0c;AI正在快速发展。今天的人工智能比以往任何时候都更智能、更细致、更准确。高性能的客户支持团队为任何电子商务业务提供了许多好处。这些优势包括更高的销售额、更好的保留率、更高的忠诚度和信任度、更高的认知度和品牌知名度、更好的…

【网络】传输层——UDP | TCP(协议格式确认应答超时重传连接管理)

&#x1f431;作者&#xff1a;一只大喵咪1201 &#x1f431;专栏&#xff1a;《网络》 &#x1f525;格言&#xff1a;你只管努力&#xff0c;剩下的交给时间&#xff01; 现在是传输层&#xff0c;在应用层中的报文(报头 有效载荷)就不能被叫做报文了&#xff0c;而是叫做数…

通过OpenTelemetry上报Python-flask应用数据(阿里云)

参考文档 https://help.aliyun.com/document_detail/611711.html?spma2c4g.90499.0.0.34a056ddTu2WWq 先按照 方法一&#xff1a;手动埋点上报Python应用数据 步骤测试上报是否正常。 flas 上报 在 手动埋点上报Python应用数据 的基础上&#xff0c;上报flask应用的数据&#…

Jmeter快捷方式和应用图标设置

很多人在安装Jmeter,安装到本机却没有icon&#xff0c;每次使用的时候&#xff0c;每次打开应用都要找目录&#xff0c;不太方便。 【解决问题】 使用bin路径下的一个.bat文件&#xff0c;创建快捷方式。 【操作步骤】 Step1、将Jmeter 安装bin路径下的jmeter.bat 发送快捷方…

【论文阅读】基于深度学习的时序预测——Crossformer

系列文章链接 论文一&#xff1a;2020 Informer&#xff1a;长时序数据预测 论文二&#xff1a;2021 Autoformer&#xff1a;长序列数据预测 论文三&#xff1a;2022 FEDformer&#xff1a;长序列数据预测 论文四&#xff1a;2022 Non-Stationary Transformers&#xff1a;非平…

【后端面经-数据库】Redis详解——Redis基本概念和特点

【后端面经-数据库】Redis详解——Redis基本概念和特点 1. Redis基本概念2. Redis特点2.1 优点2.2 缺点 3. Redis的应用场景面试模拟参考资料 声明&#xff1a;Redis的相关知识是面试的一大热门知识点&#xff0c;同时也是一个庞大的体系&#xff0c;所涉及的知识点非常多&…

日常BUG——Java使用Bigdecimal类型报错

&#x1f61c;作 者&#xff1a;是江迪呀✒️本文关键词&#xff1a;日常BUG、BUG、问题分析☀️每日 一言 &#xff1a;存在错误说明你在进步&#xff01; 一、问题描述 直接上代码&#xff1a; Test public void test22() throws ParseException {System.out.p…

Golang通过alibabaCanal订阅MySQLbinlog

最近在做redis和MySQL的缓存一致性&#xff0c;一个方式是订阅MySQL的BinLog文件&#xff0c;我们使用阿里巴巴的Canal的中间件来做。 Canal是服务端和客户端两部分构成&#xff0c;我们需要先启动Canal的服务端&#xff0c;然后在Go程序里面连接Canal服务端&#xff0c;即可监…

初始C语言——详细讲解操作符以及操作符的易错点

系列文章目录 第一章 “C“浒传——初识C语言&#xff08;更适合初学者体质哦&#xff01;&#xff09; 第二章 详细认识分支语句和循环语句以及他们的易错点 第三章 初阶C语言——特别详细地介绍函数 第四章 初始C语言——详细地讲解数组的内容以及易错点 第五章 初始C语言—…