互联网全景消息(10)之Kafka深度剖析(中)

news2025/1/13 11:38:18

一、深入应用

1.1 SpringBoot集成Kafka

        引入对应的依赖。

<dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.62</version>
        </dependency>

        <!--swagger2增强,官方ui太low , 访问地址: /doc.html  -->
        <dependency>
            <groupId>com.github.xiaoymin</groupId>
            <artifactId>swagger-bootstrap-ui</artifactId>
            <version>1.8.8</version>
        </dependency>
        <dependency>
            <groupId>io.springfox</groupId>
            <artifactId>springfox-swagger2</artifactId>
            <version>2.9.2</version>
        </dependency>

    </dependencies>

        添加配置文件:

spring:
  application:
    name: demo

  kafka:
    bootstrap-servers: 这里换成自己的kafka信息
    producer: # producer 生产者
      retries: 0 # 重试次数
      acks: 1 # 应答级别:多少个分区副本备份完成时向生产者发送ack确认(可选0、1、all/-1)
      batch-size: 16384 # 批量大小
      buffer-memory: 33554432 # 生产端缓冲区大小
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
#      value-serializer: com.itheima.demo.config.MySerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer

    consumer: # consumer消费者
      group-id: javagroup # 默认的消费组ID
      enable-auto-commit: true # 是否自动提交offset
      auto-commit-interval: 100  # 提交offset延时(接收到消息后多久提交offset)

      # earliest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费
      # latest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据
      # none:topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常
      auto-offset-reset: latest
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
#      value-deserializer: com.itheima.demo.config.MyDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

        启动服务:

@SpringBootApplication
@RestController
public class Demo {
    public static void main(String[] args) {
        new SpringApplicationBuilder(Demo.class).run(args);
    }

}

        启动信息如下:

1.2 消息发送 

 1.2.1 异步发送

        KafkaTemplate调用的send默认采用的是异步发送,如果需要同步发送获取发送结果,则需要调用get方法。 

@RestController
public class KafkaProducer {
    @Resource
    private KafkaTemplate<String, Object> kafkaTemplate;

    @GetMapping("/kafka/test/{msg}")
    public void sendMessage(@PathVariable("msg") String msg) {
        Message message = new Message();
        message.setMessage(msg);
        kafkaTemplate.send("test", JSON.toJSONString(message));
    }
}

    @KafkaListener(topics = {"test"})
    public void onMessage1(ConsumerRecord<?, ?> consumerRecord) {
        Optional<?> optional = Optional.ofNullable(consumerRecord.value());
        if (optional.isPresent()) {
            Object msg = optional.get();
            logger.info("message:{}", msg);
        }
    }

        效果如下:

        同步发送代码如下:

    @GetMapping("/kafka/sync/{msg}")
    public void sync(@PathVariable("msg") String msg) throws Exception {
        Message message = new Message();
        message.setMessage(msg);
        ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send("test", JSON.toJSONString(message));
        SendResult<String, Object> result = future.get(3,TimeUnit.SECONDS);
        logger.info("send result:{}",result.getProducerRecord().value());
    }

1.2.2 序列化 

        序列化详解:

  • 前面我们用到的是Kafka自带的字符串序列化器,
    org.apache.kafka.common.serialization.StringDeserializer
  • 除此之外还有:ByteArray、ByteBuffer、Bytes、Double、Integer、Long等;
  • 这些序列化接器都实现了org.apache.kafka.common.serialization.Serializer 

        自定义序列化,自己实现序列化对应的接口即可,如下:

public class MySerializer implements Serializer {

    @Override
    public byte[] serialize(String s, Object o) {
        String json = JSON.toJSONString(o);
        return json.getBytes();
    }

}

        然后在yaml配置自己的编辑器:

value-serializer: com.itheima.demo.config.MySerializer

         对应的我们在消费者消费消息的时候也需要按照我们自定义的方式进行解码,具体代码如下:

package com.itheima.demo.config;

import com.alibaba.fastjson.JSON;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.serialization.Deserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.UnsupportedEncodingException;
import java.util.Iterator;
import java.util.Map;

public class MyDeserializer implements Deserializer {
    private final static Logger logger = LoggerFactory.getLogger(MyDeserializer.class);

    @Override
    public Object deserialize(String s, byte[] bytes) {
        try {
            String json = new String(bytes,"utf-8");
            return JSON.parse(json);
        } catch (UnsupportedEncodingException e) {
            e.printStackTrace();
        }
        return null;
    }

}

 1.2.3 分区策略

         分区策略决定了消息根据key投放到那个分区,也是顺序消费保障的基石。

  • 给定了分区号,直接将数据发送到指定分区里面去;
  • 没有给定了分区号,给定数据的key值,通过key取上hashcode进行分区;
  • 既没有给定分区号,也没有给定key值,直接轮询进行分区;
  • 自定义分区策略,按照自定义需求选择分区号;

        生产者代码如下:

//测试分区发送
@RestController
public class PartitionProducer {
    @Resource
    private KafkaTemplate<String, Object> kafkaTemplate;

//    指定分区发送
//    不管你key是什么,到同一个分区
    @GetMapping("/kafka/partitionSend/{key}")
    public void setPartition(@PathVariable("key") String key) {
        kafkaTemplate.send("test", 0,key,"key="+key+",msg=指定0号分区");
    }

//    指定key发送,不指定分区
//    根据key做hash,相同的key到同一个分区
    @GetMapping("/kafka/keysend/{key}")
    public void setKey(@PathVariable("key") String key) {
        kafkaTemplate.send("test", key,"key="+key+",msg=不指定分区");
    }

    // 什么也不指定
    @GetMapping("/kafka/test/{msg}")
    public void sendMessage(@PathVariable("msg") String msg) {
        Message message = new Message();
        message.setMessage(msg);
        kafkaTemplate.send("test", JSON.toJSONString(message));
    }
}

        消费者代码如下:

//指定消费组消费
@Component
public class PartitionConsumer {
    private final Logger logger = LoggerFactory.getLogger(PartitionConsumer.class);

    //分区消费
    @KafkaListener(topics = {"test"},topicPattern = "0")
    public void onMessage(ConsumerRecord<?, ?> consumerRecord) {
        Optional<?> optional = Optional.ofNullable(consumerRecord.value());
        if (optional.isPresent()) {
            Object msg = optional.get();
            logger.info("partition=0,message:[{}]", msg);
        }
    }
    @KafkaListener(topics = {"test"},topicPattern = "1")
    public void onMessage1(ConsumerRecord<?, ?> consumerRecord) {
        Optional<?> optional = Optional.ofNullable(consumerRecord.value());
        if (optional.isPresent()) {
            Object msg = optional.get();
            logger.info("partition=1,message:[{}]", msg);
        }
    }
}

        1)测试默认分区策略,发送什么也不指定的消息

        可以发现发送的是同一条的数据,他是跟partition轮询发送的;

         2)测试指定分区

         3)按照key的hashcode来分区

1.3 消息消费 

 1.3.1 消费者分组

public class GroupConsumer {
    private final Logger logger = LoggerFactory.getLogger(GroupConsumer.class);

    //组1,消费者1
    @KafkaListener(topics = {"test"},groupId = "group1")
    public void onMessage1(ConsumerRecord<?, ?> consumerRecord) {
        Optional<?> optional = Optional.ofNullable(consumerRecord.value());
        if (optional.isPresent()) {
            Object msg = optional.get();
            logger.info("group:group1-1 , message:{}", msg);
        }
    }

    //组1,消费者2
    @KafkaListener(topics = {"test"},groupId = "group1")
    public void onMessage2(ConsumerRecord<?, ?> consumerRecord) {
        Optional<?> optional = Optional.ofNullable(consumerRecord.value());
        if (optional.isPresent()) {
            Object msg = optional.get();
            logger.info("group:group1-2 , message:{}", msg);
        }
    }

    //组2,只有一个消费者
    @KafkaListener(topics = {"test"},groupId = "group2")
    public void onMessage3(ConsumerRecord<?, ?> consumerRecord) {
        Optional<?> optional = Optional.ofNullable(consumerRecord.value());
        if (optional.isPresent()) {
            Object msg = optional.get();
            logger.info("group:group2 , message:{}", msg);
        }
    }
}

         启动:

        需要注意的是,注意分区数与消费者数的搭配,如果(消费者数 > 分区数量),消费者将会出现闲置,浪费资源!

1.3.2 位移提交

        1)自动提交,我们在前面设置了以下两个选项,则kafka会延时设置自动提交:

        2)手动提交,有些时候我们需要手动控制偏移量的提交时机,比如确保消息严格消费后再提交,以防止丢失或重复,如下我们配置手动提交:

@Configuration
public class MyOffsetConfig {

    private final Logger logger = LoggerFactory.getLogger(this.getClass());

    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;

    @Bean
    public KafkaListenerContainerFactory<?> manualKafkaListenerContainerFactory() {
        Map<String, Object> configProps = new HashMap<>();
        configProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        configProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        configProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        // 注意这里!!!设置手动提交
        configProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
        ConcurrentKafkaListenerContainerFactory<String, String> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(configProps));

        // ack模式:
        //          AckMode针对ENABLE_AUTO_COMMIT_CONFIG=false时生效,有以下几种:
        //
        //          RECORD
        //          每处理一条commit一次
        //
        //          BATCH(默认)
        //          每次poll的时候批量提交一次,频率取决于每次poll的调用频率
        //
        //          TIME
        //          每次间隔ackTime的时间去commit(跟auto commit interval有什么区别呢?)
        //
        //          COUNT
        //          累积达到ackCount次的ack去commit
        //
        //          COUNT_TIME
        //          ackTime或ackCount哪个条件先满足,就commit
        //
        //          MANUAL
        //          listener负责ack,但是背后也是批量上去
        //
        //          MANUAL_IMMEDIATE
        //          listner负责ack,每调用一次,就立即commit

        factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);

        return factory;
    }


}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2275945.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

React Fiber框架中的Render渲染阶段——workLoop(performUnitOfWork【beginWork与completeWork】)

触发渲染过程——renderRoot renderRoot 是一个函数&#xff0c;用于触发渲染工作。它通常会调用并递归地执行一系列的渲染任务&#xff0c;直到完成整个更新过程。这个过程包括执行 Fiber 树中的 beginWork 和 completeWork&#xff0c;以及渲染新状态或 DOM。 function ren…

STM32F1学习——ADC模数转换器

一、ADC模数转换器 ADC的全称 Analog-Digital Converter 模拟-数字转换器&#xff0c;他可以用来将引脚上连续变换的模拟电压转换为内存中存储的数字变量。 ADC有两个重要指标&#xff0c;分辨率和频率。 STM32的ADC是 12位 逐次逼近型&#xff0c;1us转换时间&#xff0c;也就…

[每周一更]-(第131期):Go并发协程总结篇

Go语言的并发是通过协程&#xff08;goroutine&#xff09;实现的。Go协程是轻量级的线程&#xff0c;允许多个任务同时执行&#xff0c;且Go运行时会高效地管理它们。在Go中使用并发协程的方式非常简便&#xff0c;也很强大。以下是一些关于Go协程的基础用法和并发控制方法&am…

Ecdsa密钥在线生成工具

具体前往&#xff1a;ECC公钥私钥对在线生成器

llama.cpp 模型可视化工具 GGUF Visualizer

llama.cpp 模型可视化工具 GGUF Visualizer 1. GGUF Visualizer for VS Code (gguf-viz)1.1. Features1.2. Extension Settings References GGUF Visualizer https://marketplace.visualstudio.com/items?itemNameAgainstEntropy.gguf-viz 1. GGUF Visualizer for VS Code (g…

【DAPM杂谈之三】DAPM的初始化流程

本文主要分析DAPM的设计与实现 内核的版本是&#xff1a;linux-5.15.164&#xff0c;下载链接&#xff1a;Linux内核下载 主要讲解有关于DAPM相关的知识&#xff0c;会给出一些例程并分析内核如何去实现的 /**************************************************************…

HarmonyOS:@LocalBuilder装饰器: 维持组件父子关系

一、前言 当开发者使用Builder做引用数据传递时&#xff0c;会考虑组件的父子关系&#xff0c;使用了bind(this)之后&#xff0c;组件的父子关系和状态管理的父子关系并不一致。为了解决组件的父子关系和状态管理的父子关系保持一致的问题&#xff0c;引入LocalBuilder装饰器。…

Pytorch导出onnx模型并在C++环境中调用(含python和C++工程)

Pytorch导出onnx模型并在C环境中调用&#xff08;含python和C工程&#xff09; 工程下载链接&#xff1a;Pytorch导出onnx模型并在C环境中调用&#xff08;python和C工程&#xff09; 机器学习多层感知机MLP的Pytorch实现-以表格数据为例-含数据集和PyCharm工程中简单介绍了在…

git打补丁

1、应用场景 跨仓库升级 开发项目B使用的是开源项目A。开源项目A发现漏洞&#xff0c;作者进行了修复&#xff0c;我们可以通过使用git补丁的方式&#xff0c;将作者修改的内容复制到我 们的项目B中。 2、TortoiseGit方式 源仓库 格式化补丁 根据提交数量&#xff0c;生成…

计算机网络 (34)可靠传输的工作原理

前言 计算机网络可靠传输的工作原理主要依赖于一系列协议和机制&#xff0c;以确保数据在传输过程中能够准确无误地到达目的地。 一、基本概念 可靠传输指的是数据链路层的发送端发送什么&#xff0c;在接收端就收到什么&#xff0c;即保证数据的完整性、正确性和顺序性。由于网…

基于ADAS 与关键点特征金字塔网络融合的3D LiDAR目标检测原理与算法实现

一、概述 3D LiDAR目标检测是一种在三维空间中识别和定位感兴趣目标的技术。在自动驾驶系统和先进的空间分析中&#xff0c;目标检测方法的不断演进至关重要。3D LiDAR目标检测作为一种变革性的技术&#xff0c;在环境感知方面提供了前所未有的准确性和深度信息. 在这里&…

Vue3初学之常用的指令

v-bind&#xff1a;动态绑定属性 v-bind 用于动态绑定一个或多个属性&#xff0c;或一个组件 prop 到表达式的值。 v-model&#xff1a;双向数据绑定 见上篇 https://editor.csdn.net/md/?articleId145022994 v-if、v-else-if、v-else&#xff1a;条件渲染 v-show&…

docker中jenkins流水线式部署GitLab中springboot项目

本质就是将java项目拉取下来&#xff0c;并自动打包成docker镜像&#xff0c;运行 首先启动一个docker的jenkins 如果没有镜像使用我的镜像 通过网盘分享的文件&#xff1a;jenkins.tar 链接: https://pan.baidu.com/s/1VJOMf6RSIQbvW_V1zFD7eQ?pwd6666 提取码: 6666 放入服…

在ubuntu下对NFS做性能测试

安装NFS 首先&#xff0c;安装服务 sudo apt update sudo apt install nfs-kernel-server然后创建共享文件夹 # 请自定义你自己的共享目录 sudo mkdir -p /exports/nfs4/homes sudo chmod -R 777 /exports/nfs4/homes# 这个可以根据no_root_squash标致选择设置。 # 如果不设…

Open FPV VTX开源之默认MAVLink设置

Open FPV VTX开源之默认MAVLink设置 1. 源由2. 准备3. 连接4. 安装5. 配置6. 测试6.1 启动wfb-ng服务6.2 启动wfb-ng监测6.3 启动QGroundControl6.4 观察测试结果 7. 总结8. 参考资料9. 补充9.1 telemetry_tx异常9.2 DEBUG串口部分乱码9.3 PixelPilot软件问题 1. 源由 飞控图传…

26个开源Agent开发框架调研总结(2)

根据Markets & Markets的预测&#xff0c;到2030年&#xff0c;AI Agent的市场规模将从2024年的50亿美元激增至470亿美元&#xff0c;年均复合增长率为44.8%。 Gartner预计到2028年&#xff0c;至少15%的日常工作决策将由AI Agent自主完成&#xff0c;AI Agent在企业应用中…

mark 一下conductor github

Netflix 关闭conductor 后&#xff0c;后续https://orkes.io/content/ 继续在维护&#xff0c;github地址如下 https://github.com/conductor-oss/conductor 最新release为3.21.11

PyCharm文档管理

背景&#xff1a;使用PyCharmgit做文档管理 需求&#xff1a;需要PyCharm自动识别docx/xslx/vsdx等文件类型&#xff0c;并在PyCharm内点击文档时唤起系统内关联应用(如word、excel、visio) 设置步骤&#xff1a; 1、file -》 settings -》file types 2、在Files opened i…

嘉立创画原理图和PCB

一、环境 进入立创EDA官网 注册登录的环节就不介绍了。 登录账号后&#xff0c;选择专业版 二、原理图 工程中&#xff0c;有原理图和PCB&#xff0c;这里选择原理图 那么接下来就是进行绘制 元器件在如下区域搜索使用。 双击进行放置&#xff0c;也可以左键提前预览。 网…

科创驱动 | 华望系统科技荣膺西湖区年度前沿创新新锐企业

2025年1月3日&#xff0c;由中共西湖区党委、西湖区人民政府主办的“新年第一会”—西湖区科技创新大会在杭州隆重举行。大会现场揭晓了西湖区年度科技创新团队与项目&#xff0c;并发布了“2024西湖区科技十大事件”与“西湖区五大年度科技榜单”。杭州华望系统科技有限公司榜…