SpringBoot集成kafka-指定topic(主题)-partition(分区)-offset(偏移量)消费信息

news2024/9/20 18:40:31

SpringBoot集成kafka-指定topic-partition-offset消费信息

  • 1、消费者
  • 2、生产者
  • 3、配置类
  • 4、配置文件
  • 5、实体类
  • 6、工具类
  • 7、测试类
  • 8、第一次测试(读取到19条信息)
  • 9、第二次测试(读取到3条信息)

在这里插入图片描述

1、消费者

指定消费者读取配置文件中
topic = " k a f k a . t o p i c . n a m e " , g r o u p I d = " {kafka.topic.name}", groupId=" kafka.topic.name",groupId="{kafka.consumer.group}"下的数据。
在这里插入图片描述

package com.power.consumer;

import com.power.model.User;
import com.power.util.JSONUtils;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.annotation.PartitionOffset;
import org.springframework.kafka.annotation.TopicPartition;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;

import java.util.function.Consumer;

@Component
public class EventConsumer { 	

    @KafkaListener(groupId="${kafka.consumer.group}",
            topicPartitions = {
                @TopicPartition(
                        topic = "${kafka.topic.name}",
                        partitions={"0","1","2"},
                        partitionOffsets = {
                                @PartitionOffset(partition="3",initialOffset = "3"),
                                @PartitionOffset(partition="4",initialOffset = "3")
                        })
            })
    public void onEvent5(String userJson,
                         @Header(value=KafkaHeaders.RECEIVED_TOPIC) String topic,
                         @Header(value=KafkaHeaders.RECEIVED_PARTITION_ID) String partition,
                         ConsumerRecord<String,String> record,
                         Acknowledgment ack){
        try {
            User user =JSONUtils.toBean(userJson,User.class);
            System.out.println("读取到的事件5:"+user+",topic:"+topic+",partition:"+partition);

            //业务确认完成,给kafka服务器反馈确认
            ack.acknowledge();//手动确认消息,就是告诉kafka服务器,该消息我已经接收到了,默认情况下是自动确认
            //手动确认后,下次启动消费者,偏移量会从新的位置开始;没有手动确认,下次启动消费者,偏移量还是从老位置开始
        }catch (Exception e){
            e.printStackTrace();
        }
    }

}

2、生产者

package com.power.producer;

import com.power.model.User;
import com.power.util.JSONUtils;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;
import java.util.Date;

@Component
public class EventProducer {

    @Resource
    private KafkaTemplate<String,Object> kafkaTemplate;

    public void sendEvent3(){
        for (int i = 0; i < 25; i++) {
            User user = User.builder().id(i).phone("1567676767"+i).birthday(new Date()).build();
            String userJson = JSONUtils.toJSON(user);
            kafkaTemplate.send("helloTopic","k"+i,userJson);
        }
    }

}

3、配置类

package com.power.config;

import org.apache.kafka.clients.admin.NewTopic;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class KafkaConfig {

    @Bean
    public NewTopic newTopic(){
        return new NewTopic("helloTopic",5,(short)1);
    }
}

4、配置文件

spring:
  application:
    #应用名称
    name: spring-boot-02-kafka-base

  #kafka连接地址(ip+port)
  kafka:
    bootstrap-servers: <你的服务器IP>:9092

    #消费者
    consumer:
      auto-offset-reset: earliest


    #配置消息监听器
    listener:
      ack-mode: manual

#自定义配置
kafka:
  topic:
    name: helloTopic
  consumer:
    group: helloGroup

5、实体类

package com.power.model;

import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;

import java.util.Date;

@Builder
@AllArgsConstructor
@NoArgsConstructor
@Data
public class User {

    private Integer id;

    private String phone;

    private Date birthday;

}

6、工具类

package com.power.util;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;

public class JSONUtils {

    private static final ObjectMapper OBJECTMAPPER = new ObjectMapper();

    public static String toJSON(Object object){
        try {
            return OBJECTMAPPER.writeValueAsString(object);
        } catch (JsonProcessingException e) {
            throw new RuntimeException(e);
        }
    }

    public static <T> T toBean(String json,Class<T> clazz){
        try {
            return OBJECTMAPPER.readValue(json,clazz);
        } catch (JsonProcessingException e) {
            throw new RuntimeException(e);
        }
    }
}

7、测试类

package com.power;

import com.power.producer.EventProducer;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;

import javax.annotation.Resource;

@SpringBootTest
public class SpringBoot02KafkaBaseApplication {

    @Resource
    private EventProducer eventProducer;

    @Test
    void sendEvent5(){
        eventProducer.sendEvent5();
    }

}

8、第一次测试(读取到19条信息)

在这里插入图片描述

总共读取1、2、3分区全部数据和4分区从4偏移量开始读,5分区从4开始读取。
6+4+6+3+0=19条数据。

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

在这里插入图片描述

9、第二次测试(读取到3条信息)

因为第一次读取已经记下来偏移量offset,即便在配置文件中指定了消费者从最开始读,也依然读取不到的。

在这里插入图片描述

在这里插入图片描述

在这里插入图片描述

此时可以通过修改组id来从头开始读取
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2073205.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

[pytorch] --- pytorch环境配置

本教程环境搭建基于windows 1 安装miniconda 1.1 miniconda与anaconda的区别 包含的包: Anaconda: 是一个较大的发行版&#xff0c;预装了大量的科学计算和数据分析相关的 Python 包。Miniconda: 更轻量级&#xff0c;只包含 Conda、Python 和它们的依赖&#xff0c;以及少…

微信小程序获取用户openId并通过服务端向用户发送模板消息

1.引言 注意&#xff1a; 1.标题中的服务端是自己研发的服务端&#xff0c;不是腾讯公司的服务端。 2.小程序的模板消息分为一次性订阅消息与长期订阅&#xff0c;一次性订阅就是每次在给用户发送消息之前都需要获得用户的同意&#xff08;即用户订阅&#xff09;&#xff0…

数据结构(树、平衡树、红黑树)

目录 树 树的遍历方式 平衡二叉树 旋转机制 左旋 右旋 旋转实例 左左 左右 右右 右左 总结 红黑树 树 相关概念 节点的内部结构如下 二叉树与二叉搜索树的定义 树的遍历方式 前序遍历&#xff1a;当前节点&#xff0c;左子节点&#xff0c;右子结点 中序遍历&a…

React学习day01-React-开发环境配置、JSX基础-本质、JSX中js表达式的用法、JSX的条件渲染

1、React &#xff08;1&#xff09;概念&#xff1a;由Meta公司研发&#xff0c;是一个用于构建Web和原生交互页面的库 &#xff08;2&#xff09;优点&#xff1a; 1&#xff09;相较于传统基于DOM开发的优势&#xff1a;组件化的开发方式、不错的性能 2&#xff09;相较于…

软件设计原则之单一职责原则

目录 单一职责原则单一职责原则的主要特点应用范围Demo用户信息日志记录 单一职责原则 单一职责原则&#xff08;Single Responsibility Principle&#xff0c;简称SRP&#xff09;是面向对象设计中的一个重要原则&#xff0c;其核心思想是&#xff1a;一个类应该仅有一个引起…

ollma 本地部署大模型

因为我本地是 windows 的系统&#xff0c;所以这里直接写的是通过 docker 来实现本地大模型的部署。 windows 下 WSl 的安装这里就不做重复&#xff0c;详见 windows 部署 mindspore GPU 开发环境&#xff08;WSL&#xff09; 一、Docker 部署 ollma 1. 拉取镜像&#xff08;…

Ubuntu系统设置Java项目开机自启

1、创建自启动脚 sudo vi /etc/systemd/system/java-service.service 2、编辑自启脚本 [Unit]部分包含了service的描述和依赖关系。在这个示例中&#xff0c;我们将其设置为在系统启动后执行。 [Service]部分定义了service的执行方式。在这个示例中&#xff0c;我们指定了Java…

shell工具箱集合!!

shell工具箱集合 1.shell工具箱集合 2.Chrony 时间同步 3.Get_host_Info 设备信息收集 4.Init_host 系统初始化 5.Iperf 带宽测试套件 6.Lagscope_test 时延测试套件 7.Mtr_test 双向路由探测套件 下载地址&#xff1a; https://pan.quark.cn/s/6936cc13bc04

学习笔记——Redis基础

文章目录 Redis五种常用数据类型Redis常用命令Spring Data Redis使用方式操作步骤 Redis五种常用数据类型 Redis存储的是key-values结构的数据&#xff0c;其中key是字符串类型&#xff0c;value有五种常用的数据类型&#xff1a; 字符串&#xff08;string&#xff09;&…

C++入门基础知识32——【关于C++ 存储类之auto存储类】

成长路上不孤单&#x1f60a;【14后&#xff0c;C爱好者&#xff0c;持续分享所学&#xff0c;如有需要欢迎收藏转发&#x1f60a;&#x1f60a;&#x1f60a;&#x1f60a;&#x1f60a;&#x1f60a;&#x1f60a;&#xff01;&#xff01;&#xff01;&#xff01;&#xff…

Flex的基本使用+综合案例

组成 弹性盒子没有设置高&#xff0c;就会自动拉伸 <!DOCTYPE html> <html lang"en"><head><meta charset"UTF-8"><meta http-equiv"X-UA-Compatible" content"IEedge"><meta name"viewport&q…

高并发下阻塞队列的选择

高并发下阻塞队列的选择 一、队列 队列&#xff1a;queue。简称队&#xff0c;它和堆栈一样&#xff0c;也是一种运算受限的线性表&#xff0c;其限制是仅允许在表的一端进行插入&#xff0c;而在表的另一端进行删除。 简单的说&#xff0c;采用该结构的集合&#xff0c;对元素…

洛谷 P2569 [SCOI2010] 股票交易

题目来源于&#xff1a;洛谷 题目本质&#xff1a;动态规划&#xff0c;单调队列 解题思路&#xff1a; 方程f[i][j]表示第 i 天结束后&#xff0c;手里剩下 j 股的最大利润&#xff0c;则不买不卖&#xff1a;f[i][j]f[i-1][j]。 买入&#xff1a;f[i][j]max{f[i-w-1][k]k*…

Spring DI 数据类型——构造注入

首先新建项目&#xff0c;可参考 初识 IDEA 、模拟三层--控制层、业务层和数据访问层 一、spring 环境搭建 &#xff08;一&#xff09;pom.xml 导相关坐标 <?xml version"1.0" encoding"UTF-8"?> <project xmlns"http://maven.apache.…

【Kubernetes】K8s 持久化存储方式

K8s 持久化存储方式 1.使用节点数据卷2.使用网络数据卷3.使用临时数据卷 由于容器是一种无状态的服务&#xff0c;所以容器中的文件在宿主机上表现出来的都是临时存放&#xff08;当容器崩溃或者重启时&#xff0c;容器中的文件会丢失&#xff09;。另外&#xff0c;Kubernetes…

C++领进门(第一讲)

目录 1. C关键字&#xff08;C98&#xff09; 2. 命名空间 ​编辑 2.1命名空间的定义 2.2命名空间的使用 3.C的输入&输出 3.1cout与printf的区别 4.缺省参数 4.1缺省函数的概念 4.2缺省参数分类 5.函数重载 C的语法就是在C的基础上弥补了C的缺陷与不足 1. C关键…

Java集合框架(三)---Map

接口Map<K,V> Map集合&#xff1a;该集合存储键值对&#xff0c;一对一对往里存&#xff0c;而且要保证键的唯一性。 1&#xff0c;添加 put(K key, V value) putAll(Map<? extends K, ? extends V> m) 2&#xff0c;删除 clear() remove(Object key) 3&#xff…

【鸿蒙学习】HarmonyOS应用开发者高级认证 - 应用DFX能力介绍(含闯关习题)

学完时间&#xff1a;2024年8月24日 学完排名&#xff1a;第1698名 一、Performance Analysis Kit简介 Performance Analysis Kit&#xff08;性能分析服务&#xff09;为开发者提供应用事件、日志、跟踪分析工具&#xff0c;可观测应用运行时状态&#xff0c;用于行为分析、…

游戏分享网站|基于SprinBoot+vue的游戏分享网站系统(源码+数据库+文档)

游戏分享网站 目录 基于SprinBootvue的游戏分享网站 一、前言 二、系统设计 三、系统功能设计 5.1系统功能模块 5.2后台登录 5.2.1管理员功能模块 5.2.2用户功能模块 四、数据库设计 五、核心代码 六、论文参考 七、最新计算机毕设选题推荐 八、源码获取&#x…

kaggle竞赛宝典 | 量化竞赛第一名的网络模型

本文来源公众号“kaggle竞赛宝典”&#xff0c;仅用于学术分享&#xff0c;侵权删&#xff0c;干货满满。 原文链接&#xff1a;量化竞赛第一名的网络模型 1 简介 今天我们重温Jane Street 大赛第一名的网络模型。该次赛事数据集包含了一组匿名的特征&#xff0c;feature_{0…