微服务同时接入多个Kafka

news2024/9/21 10:49:02

准备工作

自己搭建一个Kafka
从官方下载Kafka,选择对应Spring Boot 的版本,好在Kafka支持的版本范围比较广,当前最新版本是3.2.1,支持2.12-3.2.1 范围的版本,覆盖了Spring Boot 2.0x-Spring Boot 3.0.x。
Apache Kafka

解压安装

进入bin目录,执行如下命令,按照如下顺序启动
Linux

# 配置文件选择自己对应的目录
zookeeper-server-start.sh ../config/zookeeper.properties

Windows

windows/zookeeper-server-start.bat ../config/zookeeper.properties

打开另外一个终端,启动KafkaServer
Linux

kafka-server-start.sh ../config/server.properties

Windows

windows/kafka-server-start.bat ../config/server.properties

最小化配置Kafka
如下是最小化配置Kafka
pom.xml 引入依赖

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>

application.properties

server.port=8090
spring.application.name=single-kafka-server

#kafka 服务器地址
spring.kafka.bootstrap-servers=localhost:9092
#消费者分组,配置后,自动创建
spring.kafka.consumer.group-id=default_group

KafkaProducer 生产者

@Slf4j
@Component
@EnableScheduling
public class KafkaProducer {

    @Resource
    private KafkaTemplate kafkaTemplate;

    private void sendTest() {
        //topic 会自动创建
        kafkaTemplate.send("topic1", "hello kafka");
    }

    @Scheduled(fixedRate = 1000 * 10)
    public void testKafka() {
        log.info("send message...");
        sendTest();
    }
}

KafkaConsumer 消费者

@Slf4j
@Component
public class KafkaConsumer {

    @KafkaListener(topics = {"topic1"})
    public void processMessage(String spuId) {
        log.warn("process spuId ={}", spuId);
    }

}

运行效果:

多Kafka配置
配置稍微复杂了一点,灵魂就是手动创建,同样引入依赖
pom.xml

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>

application.properties

server.port=8090
spring.application.name=kafka-server

#kafka1
#服务器地址
spring.kafka.one.bootstrap-servers=localhost:9092
spring.kafka.one.consumer.group-id=default_group


#kafka2
spring.kafka.two.bootstrap-servers=localhost:9092
spring.kafka.two.consumer.group-id=default_group2

第一个Kafka配置,需要区分各Bean的名称
KafkaOneConfig

@Configuration
public class KafkaOneConfig {

    @Value("${spring.kafka.one.bootstrap-servers}")
    private String bootstrapServers;
    @Value("${spring.kafka.one.consumer.group-id}")
    private String groupId;

    @Bean
    public KafkaTemplate<String, String> kafkaOneTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }

    @Bean(name = "kafkaOneContainerFactory")
    KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> kafkaOneContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.getContainerProperties().setPollTimeout(3000);
        return factory;
    }

    private ProducerFactory<String, String> producerFactory() {
        return new DefaultKafkaProducerFactory<>(producerConfigs());
    }

    private ConsumerFactory<Integer, String> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }

    private Map<String, Object> producerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        return props;
    }

    private Map<String, Object> consumerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        return props;
    }

}

kafkaOneTemplate 定义第一个Kafka的高级模板,用来发送消息
kafkaOneContainerFactory 消费监听容器,配置在@KafkaListener中,
producerFactory 生产者工厂
consumerFactory 消费者工厂
producerConfigs 生产者配置
consumerConfigs 消费者配置

同样创建第二个Kafka,配置含义,同第一个Kafka
KafkaTwoConfig

@Configuration
public class KafkaTwoConfig {

    @Value("${spring.kafka.two.bootstrap-servers}")
    private String bootstrapServers;
    @Value("${spring.kafka.two.consumer.group-id}")
    private String groupId;

    @Bean
    public KafkaTemplate<String, String> kafkaTwoTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }

    @Bean(name = "kafkaTwoContainerFactory")
    KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> kafkaTwoContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.getContainerProperties().setPollTimeout(3000);
        return factory;
    }

    private ProducerFactory<String, String> producerFactory() {
        return new DefaultKafkaProducerFactory<>(producerConfigs());
    }

    public ConsumerFactory<Integer, String> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }

    private Map<String, Object> producerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        return props;
    }

    private Map<String, Object> consumerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        return props;
    }

}

创建一个测试的消费者,注意配置不同的监听容器containerFactory
KafkaConsumer

@Slf4j
@Component
public class KafkaConsumer {

    @KafkaListener(topics = {"topic1"}, containerFactory = "kafkaOneContainerFactory")
    public void oneProcessItemcenterSpuMessage(String spuId) {
        log.warn("one process spuId ={}", spuId);
    }

    @KafkaListener(topics = {"topic2"},containerFactory = "kafkaTwoContainerFactory")
    public void twoProcessItemcenterSpuMessage(String spuId) {
        log.warn("two process spuId ={}", spuId);
    }
}

创建一个测试的生产者,定时往两个topic中发送消息
KafkaProducer

@Slf4j
@Component
public class KafkaProducer {

    @Resource
    private KafkaTemplate kafkaOneTemplate;
    @Resource
    private KafkaTemplate kafkaTwoTemplate;

    private void sendTest() {
        kafkaOneTemplate.send("topic1", "hello kafka one");
        kafkaTwoTemplate.send("topic2", "hello kafka two");
    }

    @Scheduled(fixedRate = 1000 * 10)
    public void testKafka() {
        log.info("send message...");
        sendTest();
    }
}

最后运行效果:

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/80133.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

CMake中target_compile_definitions的使用

CMake中的target_compile_definitions命令用于向target添加编译定义&#xff0c;其格式如下&#xff1a; target_compile_definitions(<target><INTERFACE|PUBLIC|PRIVATE> [items1...][<INTERFACE|PUBLIC|PRIVATE> [items2...] ...]) 指定在编译给定的<…

网络原理初识

网络原理初识 文章目录网络原理初识网络发展历程独立模式网络互联IP地址端口号网络协议OSI七层TCP/ IP协议封装与分用封装一.应用层二.传输层三.网络层四.数据链路层五.物理层分用六.物理层七.数据链路层八.网络层九.传输层十.应用层网络发展历程 独立模式 一开始电脑之间是相…

【InnoDB ClusterSet】快速部署

快速部署 InnoDB ClusterSet 文章目录快速部署 InnoDB ClusterSet前言前期准备架构设计部署过程1. 使用配置账号通过 MySQL Shell 连接到 InnoDB Cluster 任一成员2. 为主 InnoDB Cluster 实例设置变量3. 创建以当前集群作为主集群的 ClusterSet4. 为每个独立服务器实例添加配置…

382. 链表随机节点-哈希表法

382. 链表随机节点-哈希表法 给你一个单链表&#xff0c;随机选择链表的一个节点&#xff0c;并返回相应的节点值。每个节点 被选中的概率一样 。 实现 Solution 类&#xff1a; Solution(ListNode head) 使用整数数组初始化对象。 int getRandom() 从链表中随机选择一个节点…

Vue Element动态生成的表单如何用 el-form 校验

<el-form :model"dynamicValidateForm" ref"dynamicValidateForm" label-width"100px" class"demo-dynamic"><el-form-item prop"email" label"邮箱" :rules"[{ required: true, message: 请输入…

研究良久,终于发现了他代码写的快且bug少的原因

前言 读者诸君&#xff0c;今日我们适当放松一下&#xff0c;不钻研枯燥的知识和源码&#xff0c;分享一套高效的摸鱼绝活。 我有一位程序员朋友&#xff0c;当时在一个团队中开发Android应用&#xff0c;历经多次考核后发现&#xff1a; 在组内以及与iOS团队的对比中: 他的任…

java项目请求url存在特殊字符 400错误

java项目请求url特殊字符 400错误 1 现象 请求路径带特殊字符&#xff0c;就会400错误&#xff0c;这就泄露了服务器版本和报错信息&#xff0c;无疑是敏感信息泄露&#xff0c;实属安全漏洞。 补充项目环境&#xff1a;springmvc、tomcat 8.5.59 2 原因 经排查和报错信息…

STM32--ADC模数转换器

学习江科大自化协stm32教程记录的笔记 ADC模数转换器 ADC&#xff08;Analog-Digital Converter&#xff09;模拟-数字转换器 ADC可以将引脚上连续变化的模拟电压转换为内存中存储的数字变量&#xff0c;建立模拟电路到数字电路的桥梁 DAC是数字-模拟转换器&#xff0c;但是P…

AI 助你轻松剪视频 # AutoCut

如果你还在犯愁每次剪视频都要反复听才能下手&#xff0c;不妨试试AutoCut , AI 大神李沐开源的一个剪辑神器&#xff0c;使用 Python 开发&#xff0c;它可以通过字幕来剪切视频。AutoCut 对你的视频自动生成字幕。然后你选择需要保留的句子&#xff0c;AutoCut 将对你视频中对…

C语言:变量的深入理解

文章目录一.什么是变量C语言中为什么要有类型&#xff1f;C语言中的类型为什么有这么多种呢&#xff1f;定义变量的本质为什么需要定义变量定义变量的本质定义变量时的规则二.深刻理解signed/unsigned定义的变量1.运算时的符号位2.数据的存储情况3.unsigned定义时的小细节三.大…

Android 13 VTS HIDL interface 解析

Android 13 VTS Introduction Android 13已经发布&#xff0c;VTS testcase发生很多变化&#xff0c;在此博客中对其每个测试项目进行流程介绍。 这里先对VTS 做一个介绍&#xff1a; VTS是vendor test suite简称&#xff0c;意为供应商测试套件。目的是确保Vendor层实现的兼容…

Spring Boot自动装配原理

Spring Boot自动装配原理1.Spring Boot 入口2.SpringBootApplicationSpringBootConfigurationComponentScanEnableAutoConfiguration判断自动装配开关是否打开获取EnableAutoConfiguration注解中的 exclude 和 excludeName获取需要自动装配的所有配置类最后3.总结1.Spring Boot…

Github访问量过百万!阿里内部至尊级分布式事务手册,实至名归

分布式事务就是指事务的参与者、支持事务的服务器、资源服务器以及事务管理器分别位于不同的分布式系统的不同节点之上。简单的说&#xff0c;就是一次大的操作由不同的小操作组成&#xff0c;这些小的操作分布在不同的服务器上&#xff0c;且属于不同的应用&#xff0c;分布式…

[附源码]JAVA毕业设计养生药膳推荐系统(系统+LW)

[附源码]JAVA毕业设计养生药膳推荐系统&#xff08;系统LW&#xff09; 项目运行 环境项配置&#xff1a; Jdk1.8 Tomcat8.5 Mysql HBuilderX&#xff08;Webstorm也行&#xff09; Eclispe&#xff08;IntelliJ IDEA,Eclispe,MyEclispe,Sts都支持&#xff09;。 项目技术…

[LeetCode周赛复盘] 第 323 场周赛20221211

[LeetCode周赛复盘] 第 323 场周赛20221211 一、本周周赛总结二、 [Easy] 6257. 删除每行中的最大值1. 题目描述2. 思路分析3. 代码实现三、[Medium] 6258. 数组中最长的方波1. 题目描述2. 思路分析3. 代码实现四、[Medium] 6259. 设计内存分配器1. 题目描述2. 思路分析3. 代码…

web前端期末大作业【足球网页】学生网页设计作业源码

&#x1f389;精彩专栏推荐 &#x1f4ad;文末获取联系 ✍️ 作者简介: 一个热爱把逻辑思维转变为代码的技术博主 &#x1f482; 作者主页: 【主页——&#x1f680;获取更多优质源码】 &#x1f393; web前端期末大作业&#xff1a; 【&#x1f4da;毕设项目精品实战案例 (10…

基于C++实现(控制台)单位职工管理系统(数据结构)【100010017】

1需求分析 1.1 问题描述 对单位的职工进行管理&#xff0c; 包括插入、 删除、 查找、 排序等功能。 1.2 问题要求 职工对象数不必很多&#xff0c; 便于一次读入内存&#xff0c; 所有操作不经过内外存交换。 &#xff08;1&#xff09; 由键盘输 入职工对象&#xff0c;…

1564_AURIX_TC275_电压监控寄存器整理

全部学习汇总&#xff1a; GreyZhang/g_TC275: happy hacking for TC275! (github.com) 1. 如果HSM保护开启了&#xff0c;那么访问修改其他的bit就会导致一个总线错误。 2. SMU中可以配置电压监控的相关alarm是否配置 生效。 这个寄存器是几个欠压阈值的配置&#xff0c;在这…

在Ubuntu中为ROG笔记本安装驱动asusctl

我是在Kubuntu22.04上安装的&#xff0c;系统自带“省电”、“平衡”、“性能”三个电源选项&#xff0c;显卡模式切换是拿nvidia驱动切换的&#xff0c;所以目前装的这个驱动我只用到了灯光调节功能。 文章目录介绍安装安装asusctl卸载显卡模式切换驱动supergfxctl使用方法启用…

LwIP——无操作系统启动流程

目录 启动流程 虚拟网卡控制块 发送流程 接收流程 总结 启动流程 通过阅读正点原子的无操作系统移植工程的源码&#xff0c;可以总结出LwIP的无操作系统的启动流程。 前面一些都是基于其他的外设的初始化&#xff0c;我们只关心这里lwip_comm_init()&#xff0c;这个函数的…