springboot整合kafka多数据源

news2024/12/29 8:47:03

整合kafka多数据源

  • 项目背景
  • 依赖
  • 配置
  • 生产者
  • 消费者
  • 消息体

项目背景

在很多与第三方公司对接的时候,或者处在不同的网络环境下,比如在互联网和政务外网的分布部署服务的时候,我们需要对接多台kafka来达到我们的业务需求,那么当kafka存在多数据源的情况,就与单机的情况有所不同。

依赖

    implementation 'org.springframework.kafka:spring-kafka:2.8.2'

配置

单机的情况
如果是单机的kafka我们直接通过springboot自动配置的就可以使用,例如在yml里面直接引用

spring:
  kafka:
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
    consumer:
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
    bootstrap-servers: server001.bbd:9092

在使用的时候直接注入,然后就可以使用里面的方法了

    @Resource
    private KafkaTemplate<String, String> kafkaTemplate;

在这里插入图片描述

多数据源情况下

本篇文章主要讲的是在多数据源下的使用,和单机的有所不同,我也看了网上的一些博客,但是当我去按照网上的配置的时候,总是会报错 kafakTemplate这个bean找不到,所以没办法只有按照springboot自动配置里面的来改
在这里插入图片描述

package com.ddb.zggz.config;

import org.springframework.beans.factory.ObjectProvider;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.autoconfigure.kafka.DefaultKafkaConsumerFactoryCustomizer;
import org.springframework.boot.autoconfigure.kafka.DefaultKafkaProducerFactoryCustomizer;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.*;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.kafka.security.jaas.KafkaJaasLoginModuleInitializer;
import org.springframework.kafka.support.LoggingProducerListener;
import org.springframework.kafka.support.ProducerListener;
import org.springframework.kafka.support.converter.RecordMessageConverter;
import org.springframework.kafka.transaction.KafkaTransactionManager;

import java.io.IOException;

@Configuration(proxyBeanMethods = false)
@ConditionalOnClass(KafkaTemplate.class)
@EnableConfigurationProperties(KafkaProperties.class)
public class KafkaConfiguration {

    private final KafkaProperties properties;

    private final KafkaSecondProperties kafkaSecondProperties;



    public KafkaConfiguration(KafkaProperties properties, KafkaSecondProperties kafkaSecondProperties) {
        this.properties = properties;
        this.kafkaSecondProperties = kafkaSecondProperties;
    }

    @Bean("kafkaTemplate")
    @Primary
    public KafkaTemplate<?, ?> kafkaTemplate(ProducerFactory<Object, Object> kafkaProducerFactory,
                                             ProducerListener<Object, Object> kafkaProducerListener,
                                             ObjectProvider<RecordMessageConverter> messageConverter) {
        KafkaTemplate<Object, Object> kafkaTemplate = new KafkaTemplate<>(kafkaProducerFactory);
        messageConverter.ifUnique(kafkaTemplate::setMessageConverter);
        kafkaTemplate.setProducerListener(kafkaProducerListener);
        kafkaTemplate.setDefaultTopic(this.properties.getTemplate().getDefaultTopic());
        return kafkaTemplate;
    }


    @Bean("kafkaSecondTemplate")
    public KafkaTemplate<?, ?> kafkaSecondTemplate(@Qualifier("kafkaSecondProducerFactory") ProducerFactory<Object, Object> kafkaProducerFactory,
                                                   @Qualifier("kafkaSecondProducerListener") ProducerListener<Object, Object> kafkaProducerListener,
                                                   ObjectProvider<RecordMessageConverter> messageConverter) {
        KafkaTemplate<Object, Object> kafkaTemplate = new KafkaTemplate<>(kafkaProducerFactory);
        messageConverter.ifUnique(kafkaTemplate::setMessageConverter);
        kafkaTemplate.setProducerListener(kafkaProducerListener);
        kafkaTemplate.setDefaultTopic(this.properties.getTemplate().getDefaultTopic());
        return kafkaTemplate;
    }


    @Bean("kafkaProducerListener")
    @Primary
    public ProducerListener<Object, Object> kafkaProducerListener() {
        return new LoggingProducerListener<>();
    }


    @Bean("kafkaSecondProducerListener")
    public ProducerListener<Object, Object> kafkaSecondProducerListener() {
        return new LoggingProducerListener<>();
    }

    @Bean("kafkaConsumerFactory")
    @Primary
    public ConsumerFactory<Object, Object> kafkaConsumerFactory(
            ObjectProvider<DefaultKafkaConsumerFactoryCustomizer> customizers) {
        DefaultKafkaConsumerFactory<Object, Object> factory = new DefaultKafkaConsumerFactory<>(
                this.properties.buildConsumerProperties());
        customizers.orderedStream().forEach((customizer) -> customizer.customize(factory));
        return factory;
    }

    @Bean("kafkaSecondConsumerFactory")
    public ConsumerFactory<Object, Object> kafkaSecondConsumerFactory(
            ObjectProvider<DefaultKafkaConsumerFactoryCustomizer> customizers) {
        DefaultKafkaConsumerFactory<Object, Object> factory = new DefaultKafkaConsumerFactory<>(
                this.kafkaSecondProperties.buildConsumerProperties());
        customizers.orderedStream().forEach((customizer) -> customizer.customize(factory));
        return factory;
    }


    @Bean("zwKafkaContainerFactory")
    KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> zwKafkaContainerFactory(@Qualifier(value = "kafkaSecondConsumerFactory") ConsumerFactory<Object, Object> kafkaSecondConsumerFactory) {
        ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(kafkaSecondConsumerFactory);
        factory.setConcurrency(3);
        factory.getContainerProperties().setPollTimeout(3000);
        return factory;
    }


    @Bean("kafkaProducerFactory")
    @Primary
    public ProducerFactory<Object, Object> kafkaProducerFactory(
            ObjectProvider<DefaultKafkaProducerFactoryCustomizer> customizers) {
        DefaultKafkaProducerFactory<Object, Object> factory = new DefaultKafkaProducerFactory<>(
                this.properties.buildProducerProperties());
        String transactionIdPrefix = this.properties.getProducer().getTransactionIdPrefix();
        if (transactionIdPrefix != null) {
            factory.setTransactionIdPrefix(transactionIdPrefix);
        }
        customizers.orderedStream().forEach((customizer) -> customizer.customize(factory));
        return factory;
    }

    @Bean("kafkaSecondProducerFactory")
    public ProducerFactory<Object, Object> kafkaSecondProducerFactory(
            ObjectProvider<DefaultKafkaProducerFactoryCustomizer> customizers) {
        DefaultKafkaProducerFactory<Object, Object> factory = new DefaultKafkaProducerFactory<>(
                this.kafkaSecondProperties.buildProducerProperties());
        String transactionIdPrefix = this.kafkaSecondProperties.getProducer().getTransactionIdPrefix();
        if (transactionIdPrefix != null) {
            factory.setTransactionIdPrefix(transactionIdPrefix);
        }
        customizers.orderedStream().forEach((customizer) -> customizer.customize(factory));
        return factory;
    }

    @Bean
    @ConditionalOnProperty(name = "spring.kafka.producer.transaction-id-prefix")
    public KafkaTransactionManager<?, ?> kafkaTransactionManager(ProducerFactory<?, ?> producerFactory) {
        return new KafkaTransactionManager<>(producerFactory);
    }

    @Bean
    @ConditionalOnProperty(name = "spring.kafka.jaas.enabled")
    public KafkaJaasLoginModuleInitializer kafkaJaasInitializer() throws IOException {
        KafkaJaasLoginModuleInitializer jaas = new KafkaJaasLoginModuleInitializer();
        KafkaProperties.Jaas jaasProperties = this.properties.getJaas();
        if (jaasProperties.getControlFlag() != null) {
            jaas.setControlFlag(jaasProperties.getControlFlag());
        }
        if (jaasProperties.getLoginModule() != null) {
            jaas.setLoginModule(jaasProperties.getLoginModule());
        }
        jaas.setOptions(jaasProperties.getOptions());
        return jaas;
    }

    @Bean("kafkaAdmin")
    @Primary
    public KafkaAdmin kafkaAdmin() {
        KafkaAdmin kafkaAdmin = new KafkaAdmin(this.properties.buildAdminProperties());
        kafkaAdmin.setFatalIfBrokerNotAvailable(this.properties.getAdmin().isFailFast());
        return kafkaAdmin;
    }

}


生产者


package com.ddb.zggz.event;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;

import com.ddb.zggz.config.ApplicationConfiguration;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Component;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;

import javax.annotation.Resource;

@Component
@Slf4j
public class KafkaPushEvent {


    @Resource
    private KafkaTemplate<String, String> kafkaSecondTemplate;

    @Resource
    private KafkaTemplate<String, String> kafkaTemplate;

    @Autowired
    private ApplicationConfiguration configuration;


    public void pushEvent(PushParam param) {
        ListenableFuture<SendResult<String, String>> sendResultListenableFuture = null;
        if ("zw".equals(configuration.getEnvironment())){
            sendResultListenableFuture = kafkaSecondTemplate.send(configuration.getPushTopic(), JSON.toJSONString(param));
        }
        if ("net".equals(configuration.getEnvironment())){
            sendResultListenableFuture = kafkaTemplate.send(configuration.getPushTopic(), JSON.toJSONString(param));
        }
        if (sendResultListenableFuture == null){
            throw new IllegalArgumentException("kakfa发送消息失败");
        }
        sendResultListenableFuture.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
            @Override
            public void onFailure(Throwable ex) {
                log.error("kafka发送的message报错,发送数据:{}", param);
            }

            @Override
            public void onSuccess(SendResult<String, String> result) {
                log.info("kafka发送的message成功,发送数据:{}", param);
            }
        });


    }


}

消费者

package com.ddb.zggz.event;

import com.alibaba.fastjson.JSONObject;

import com.ddb.zggz.config.ApplicationConfiguration;
import com.ddb.zggz.model.dto.ApprovalDTO;
import com.ddb.zggz.param.OffShelfParam;
import com.ddb.zggz.service.GzApprovalService;
import com.ddb.zggz.service.GzServiceService;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.DltHandler;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.annotation.RetryableTopic;
import org.springframework.retry.annotation.Backoff;
import org.springframework.stereotype.Component;
import org.springframework.util.ObjectUtils;

import java.util.ArrayList;
import java.util.List;
import java.util.Objects;


@Component
@Slf4j
public class SendMessageListener {

    @Autowired
    private GzApprovalService gzApprovalService;

    @Autowired
    private GzServiceService gzServiceService;

    @KafkaListener(topics = "${application.config.push-topic}", groupId = "zggz",containerFactory = "zwKafkaContainerFactory")
    @RetryableTopic(include = {Exception.class},
            backoff = @Backoff(delay = 3000, multiplier = 1.5, maxDelay = 15000)
    )
    public void listen(ConsumerRecord<?, ?> consumerRecord) {
        String value = (String) consumerRecord.value();
        PushParam pushParam = JSONObject.parseObject(value, PushParam.class);

        //版本提审
        if ("version-approval".equals(pushParam.getEvent())) {
            ApprovalDTO approvalDTO = JSONObject.parseObject(JSONObject.toJSONString(pushParam.getData()), ApprovalDTO.class);
            gzApprovalService.approval(approvalDTO);
        }

        //服务下架
        if (pushParam.getEvent().equals("server-OffShelf-gzt")) {
            OffShelfParam offShelfParam = JSONObject.parseObject(JSONObject.toJSONString(pushParam.getData()), OffShelfParam.class);
            gzServiceService.offShelfV1(offShelfParam.getReason(), null, offShelfParam.getUserName(), "ZGGZ", offShelfParam.getH5Id(), offShelfParam.getAppId(), offShelfParam.getVersion());

        }

    }
    @DltHandler
    public void processMessage(String message) {

    }
}

消息体

package com.ddb.zggz.event;

import com.alibaba.fastjson.annotation.JSONField;
import com.ddb.zggz.model.GzH5VersionManage;
import com.ddb.zggz.model.GzService;
import com.ddb.zggz.model.dto.ApprovalDTO;
import com.ddb.zggz.param.OffShelfParam;
import com.ddb.zggz.param.PublishParam;
import com.ddb.zggz.param.ReviewAndRollback;
import com.fasterxml.jackson.annotation.JsonFormat;
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
import com.fasterxml.jackson.databind.annotation.JsonSerialize;
import com.fasterxml.jackson.datatype.jsr310.deser.LocalDateTimeDeserializer;
import com.fasterxml.jackson.datatype.jsr310.ser.LocalDateTimeSerializer;
import lombok.Data;

import java.io.Serializable;
import java.time.LocalDateTime;

/**
 * @author bbd
 */
@Data
public class PushParam implements Serializable {

    /**
     * 发送的消息数据
     */
    private Object data;
    @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss")
    @JsonSerialize(using = LocalDateTimeSerializer.class)
    @JsonDeserialize(using = LocalDateTimeDeserializer.class)
    @JSONField(format = "yyyy-MM-dd HH:mm:ss")
    private LocalDateTime createTime = LocalDateTime.now();

    /**
     * 事件名称,用于消费者处理相关业务
     */
    private String event;


    /**
     * 保存版本参数
     */
    public static PushParam toKafkaVersion(GzH5VersionManage gzH5VersionManage) {
        PushParam pushParam = new PushParam();
        pushParam.setData(gzH5VersionManage);
        pushParam.setEvent("save-version");
        return pushParam;
    }

    /**
     * 保存服务参数
     */
    public static PushParam toKafkaServer(GzService gzService) {
        PushParam pushParam = new PushParam();
        pushParam.setData(gzService);
        pushParam.setEvent("save-server");
        return pushParam;
    }

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/876823.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

塔斯汀的“中国汉堡”并不tasty

文 | 螳螂观察 作者 | 易不二 华莱士门徒 作为快餐消费市场的两大霸主——麦当劳、肯德基&#xff0c;自入华以来&#xff0c;从来不缺模仿者、挑战者。 从福建起家、已经成为了万店品牌的华莱士&#xff0c;无疑是最成功的一个。 得益于被餐饮界称之为“福建模式”的门店…

Arduino驱动红外二氧化碳传感器(气体传感器篇)

目录 1、传感器特性 2、驱动程序 红外激光传感器是将成熟的红外吸收气体检测技术与精密光路设计、精良电路设计紧密结合而制作出的高性能传感器,具有高灵敏度、高分辨率、低功耗,响应快、抗水汽干扰、不中毒、稳定性高、使用寿命长等特点。本篇博文使用Arduino驱动红外二氧…

C++ 字符串类 string

文章目录 前言一、string 类型概括二、字符串流总结 前言 在C中&#xff0c;字符串是一种常见的数据类型&#xff0c;用于存储和操作文本数据。C标准库中提供了std::string类&#xff0c;它是一个功能强大的字符串类&#xff0c;提供了丰富的方法和操作符&#xff0c;使我们能…

精挑细选的几个宝藏软件

是不是感觉你的电脑里面永远都缺少一款软件&#xff1f;每次想要使用某个功能的时候总是不能找到合适的&#xff0c;还要先去网上找&#xff0c;小编给大家分享几款超级实用的软件&#xff0c;建议低调收藏哦~ Proxyee-down/下载工具 proxyee-down是一款免费开源的http下载工…

Vue 安装开发者工具

1.下载开发者工具&#xff0c;下载地址&#xff1a;http://book.wiyp.top/App/Vue3开发者工具-谷歌/Vue3.crx 2.打开谷歌浏览器&#xff0c;点击扩展&#xff0c;点击管理扩展程序。 3.开启开发者模式&#xff0c;将 Vue3 开发者工具文件拖拽到浏览器中进行安装。 注&#xff…

构建一个LLM应用所需的所有信息

一、说明 您是否对大型语言模型&#xff08;LLM&#xff09;的潜力感兴趣&#xff0c;并渴望创建您的第一个基于LLM的应用程序&#xff1f;或者&#xff0c;也许您是一位经验丰富的开发人员&#xff0c;希望简化工作流程&#xff1f;看看DemoGPT就是您的最佳选择。该工具旨在简…

车载智能座舱开发核心技术——SystemServer

SystemServer在车载开发中扮演着重要角色&#xff0c;它是Android系统的核心组件之一&#xff0c;负责管理和调度其他系统服务。我们这篇内容将对SystemServer技术进行深入解析&#xff0c;并以实战代码示例加以分析&#xff0c;帮助读者更好地理解和应用该技术。 一、SystemS…

理解软件行业职位的职责

对于职位的一些个人看法 ....... 目录 1.初级程序员&#xff08;PG &#xff09; 2.中级程序员&#xff08;SE&#xff09; 3. 高级程序员&#xff08;SSE&#xff09; 4.技术经理&#xff08;TL&#xff09; 5.技术总监&#xff08;VP&#xff09; 6. 首席技术官&#xf…

综述:计算机视觉中的图像分割

一、说明 这篇文章是关于图像分割的探索&#xff0c;这是解决计算机视觉问题&#xff08;如对象检测、对象识别、图像编辑、医学图像分析、自动驾驶汽车等&#xff09;的重要步骤之一。让我们从介绍开始。 二、图像分割介绍 图像分割是计算机视觉中的一项基本任务&#xff0c;涉…

2021年03月 C/C++(二级)真题解析#中国电子学会#全国青少年软件编程等级考试

第1题&#xff1a;石头剪刀布 石头剪刀布是常见的猜拳游戏。石头胜剪刀&#xff0c;剪刀胜布&#xff0c;布胜石头。如果两个人出拳一样&#xff0c;则不分胜负。 一天&#xff0c;小A和小B正好在玩石头剪刀布。已知他们的出拳都是有周期性规律的&#xff0c;比如&#xff1a;“…

OSCS开源安全周报第 55 期:JeecgBoot 远程代码执行漏洞

本周安全态势综述 OSCS 社区共收录安全漏洞 11 个&#xff0c;公开漏洞值得关注的是 JeecgBoot 远程代码执行漏洞、企业微信私有化后台API未授权访问漏洞、WPS Office 存在代码执行漏洞(MPS-3pcb-l4mv)、Microsoft Exchange Server 远程代码执行漏洞(CVE-2023-38182)、Smartbi…

SDK是什么,SDK和API有什么区别

SDK&#xff08;Software Development Kit&#xff09;是一种开发工具包&#xff0c;通常由软件开发公司或平台提供&#xff0c;用于帮助开发人员构建、测试和集成特定平台或软件的应用程序。SDK 包含一系列的库、工具、示例代码和文档&#xff0c;旨在简化开发过程并提供所需的…

老网工必备好物,分享15个网络监控神器

下午好&#xff0c;我的网工朋友。 近年来&#xff0c;随着虚拟、云和边缘网络的增加&#xff0c;网络监控工具已经显得越来越重要。 在当今大多数企业中&#xff0c;监控混合IT环境中的网络流量对于主动网络管理至关重要。 无论是检测行为异常、占用带宽、应对新威胁&#…

【学会动态规划】最大子数组和(19)

目录 动态规划怎么学&#xff1f; 1. 题目解析 2. 算法原理 1. 状态表示 2. 状态转移方程 3. 初始化 4. 填表顺序 5. 返回值 3. 代码编写 写在最后&#xff1a; 动态规划怎么学&#xff1f; 学习一个算法没有捷径&#xff0c;更何况是学习动态规划&#xff0c; 跟我…

【PDF.js】PDF.js的简单使用与CDN加速遇到的问题

PDF.js的简单使用与CDN加速遇到的问题 一、PDF.js是什么&#xff1f;二、PDF.js三、 选择PDF.js的版本下载1. Prebuilt (现代浏览器) *作者选择2. Prebuilt (历史淘汰浏览器)3. Source 来源4. 通过CDN加速5. 文件树PrebuiltSource 6. 尝试查看器 四、选择文档&#xff08;不是使…

IC人必看| 模拟IC方向面试常考问题及答案汇总(二)

有不少小伙伴说还想要更多模拟IC方向的面试题目&#xff0c;这不就来了&#xff01;&#xff08;文末可领全部面试题目&#xff09; 1. Bandgap 里有几种反馈&#xff1f;原理是&#xff1f; 正反馈和负反馈。 2. 负反馈种类&#xff1f;负反馈的优点&#xff1f; 种类&am…

mqttfx连上OneNET生成token时的一大坑,报用户名或密码错误

整个流程如下连接&#xff1a; MQTT.fx和MQTTX 链接ONENET物联网开发平台避坑细节干货。 其中在生成token时&#xff0c;搞了半天在连接后都会报用户名密码错误 最后发现是格式问题&#xff0c;输入所有字符后一定要双击看是否可以全选中&#xff0c;可以全选中说明字符的格式…

【boost网络库从青铜到王者】第二篇:asio网络编程中的socket的监听和连接

文章目录 1、网络编程基本流程2、终端节点endpoint的创建2.1、客户端终端节点endpoint的创建2.2、服务器终端节点endpoint的创建 3、服务器与客户端通信套接字socket的创建4、服务器监听套接字socket的创建5、绑定accpet监听套接字6、客户端连接指定的端点7、服务器接收连接8、…

H5 和小程序的区别

什么是小程序&#xff1f; 从“微信之父” 张小龙的定义里&#xff0c;我们可以了解到&#xff0c;小程序其实就是内嵌在微信&#xff0c;不需要安装和卸载的一种新应用形态。它具备的两个强属性&#xff1a;提高效率&#xff0c;用完即走&#xff01;因此小程序的设计以轻便、…

vue element 多图片组合预览

定义组件&#xff1a;preview-image <template><div><div class"imgbox"><divclass"preview-img":class"boxClass"v-if"Imageslist 3 ||Imageslist 5 ||Imageslist 7 ||Imageslist 8 ||Imageslist > 9"&…