SpringBoot集成kafka-消息转发@sendTo()注解

news2024/9/20 16:29:13

SpringBoot集成kafka-消息转发@sendTo

  • 1、消费者
  • 2、生产者
  • 3、实体类对象
  • 4、JSON工具类
  • 5、配置文件application.yml
  • 6、测试类
  • 7、测试

在这里插入图片描述在这里插入图片描述

1、消费者

  • 启动消费者进行消息监听,消费者A监听到生产者发送的消息
  • 使用@sendTo()注解将消息转发给消费者B
package com.power.consumer;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.messaging.handler.annotation.SendTo;
import org.springframework.stereotype.Component;

@Component
public class EventConsumer {

    @KafkaListener(topics = {"topicA"}, groupId = "aGroup")
    @SendTo("topicB")
    public String onEventA(ConsumerRecord<String, String> record) {
        System.out.println("消费者A接收消息,转发到消费者B record = " + record);
        return record.value() + "--forward message";
    }

    @KafkaListener(topics = {"topicB"}, groupId = "bGroup")
    public void onEventB(ConsumerRecord<String, String> record) {
        System.out.println("消费B消息接收到消费者A转发的消息 record = " + record);
    }

}

2、生产者

package com.power.producer;

import com.power.model.User;
import com.power.util.JSONUtils;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;
import java.util.Date;

@Component
public class EventProducer {

    @Resource
    private KafkaTemplate<String,Object> kafkaTemplate;

    public void sendEvent(){
        User user = User.builder().id(1).phone("15676767673").birthday(new Date()).build();
        String userJson = JSONUtils.toJSON(user);
        kafkaTemplate.send("topicA","k", userJson);

    }

}

3、实体类对象

package com.power.model;

import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;

import java.util.Date;

@Builder
@AllArgsConstructor
@NoArgsConstructor
@Data
public class User {

    private Integer id;

    private String phone;

    private Date birthday;

}

4、JSON工具类

package com.power.util;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;

public class JSONUtils {

    private static final ObjectMapper OBJECTMAPPER = new ObjectMapper();

    public static String toJSON(Object object){
        try {
            return OBJECTMAPPER.writeValueAsString(object);
        } catch (JsonProcessingException e) {
            throw new RuntimeException(e);
        }
    }

    public static <T> T toBean(String json,Class<T> clazz){
        try {
            return OBJECTMAPPER.readValue(json,clazz);
        } catch (JsonProcessingException e) {
            throw new RuntimeException(e);
        }
    }
}

5、配置文件application.yml

spring:
  application:
    #应用名称
    name: spring-boot-05-kafka-MsgRedirect

  #kafka连接地址(ip+port)
  kafka:
    bootstrap-servers: <你的kafak服务器IP>:9092

    #配置消费者的反序列化
    consumer:
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

6、测试类

package com.power;

import com.power.producer.EventProducer;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;

import javax.annotation.Resource;

@SpringBootTest
public class SpringBoot05KafkaBaseApplication {

    @Resource
    private EventProducer eventProducer;

    @Test
    void sendInterceptor(){
        eventProducer.sendEvent();
    }

}

7、测试

  • 先启动消费者进行消息监听
  • 再启动生产者发送消息

消费者A先接收到消息:
在这里插入图片描述

消费者B后接收到消费者A转发的消息:
在这里插入图片描述
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2072565.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

在树莓派5上使用pytroch进行模型训练—全流程笔记

在树莓派上运行pytroch模型&#x1f680; 在完成了树莓派的一系列基础配置学习之后&#xff0c;按照规划&#xff0c;下一步要做的就是在树莓派上安装一个pytorch&#xff0c;尝试运行一下深度学习的模型&#xff0c;如果可以实现且准速度有一定保证的话&#xff0c;就可以作为…

代码随想录算法训练营第50天|卡码网 98. 所有可达路径

1.卡码网 98. 所有可达路径 题目链接&#xff1a;https://kamacoder.com/problempage.php?pid1170 文章链接&#xff1a;https://www.programmercarl.com/kamacoder/0098.所有可达路径.html#总结 1.图的存储 本题我们使用邻接表 或者 邻接矩阵都可以&#xff0c;因为后台数据…

掌握路演艺术:创新大赛路演稿撰写指南

如何在创新大赛中脱颖而出&#xff1f;一份精心准备的路演稿是关键 前言开场白&#xff1a;抓住注意力的第一步项目概述&#xff1a;清晰传达核心价值市场分析&#xff1a;展示项目的市场潜力商业模式与盈利计划&#xff1a;务实的策略团队介绍&#xff1a;展现团队的实力与文化…

vue3的nginx配置文件配置(nginx只配置前端vue3的nginx.conf文件)

1、本地的访问网址的链接 http://localhost:5173/official-website/ 2、程序的配置 3、nginx.conf配置 #user nobody; worker_processes 1;events {worker_connections 1024; }http {include mime.types;default_type application/octet-stream;log_format main…

全网最细springboot学习笔记—SpringBoot配置SSL(https)

文章主要内容大致为&#xff1a; SpringBoot配置SSL&#xff08;https&#xff09; SpringBoot全局异常处理 SpringBoot 404页面处理 接下来让我们一起探讨三大框架之一的springboot框架&#xff1a; SpringBoot配置SSL&#xff08;https&#xff09; SpringBoot可以通过在appl…

如何使用ssm实现基于java的电脑硬件库存管理系统

TOC ssm145基于java的电脑硬件库存管理系统jsp 第1章 绪论 1.1 课题背景 互联网发展至今&#xff0c;无论是其理论还是技术都已经成熟&#xff0c;而且它广泛参与在社会中的方方面面。它让信息都可以通过网络传播&#xff0c;搭配信息管理工具可以很好地为人们提供服务。所…

第四十六篇,PID心法解读

猛回头&#xff0c;有近半年的时光没输出了&#xff0c;看着昨天加班调的PID数据&#xff0c;灵光闪现了一下&#xff0c;赶紧记录。 参数整定找最佳&#xff0c;从小到大顺序查   先是比例后积分&#xff0c;最后再把微分加 曲线振荡很频繁&#xff0c;比例度盘要放大   曲…

基于Open Cv的数字图像手势识别系统,Python编程实现,可以识别以下6种手势,含代码和报告

该项目的目标是设计并实现一个能够实时识别特定手势的系统。系统将使用OpenCV库来捕获视频流&#xff0c;并通过图像处理技术来识别特定的手势。具体来说&#xff0c;系统将识别以下六种手势&#xff1a; 挥手 - 手掌水平移动。握拳 - 手指全部弯曲成拳头。坐起 - 模拟做仰卧起…

sft是mean-seeking rl是mode-seeking

原文链接 KL散度是D(P||Q)&#xff0c;P和Q谁在前谁在后是有讲究的&#xff0c;P在前&#xff0c;就从P采样。 D K L ( P ∣ ∣ Q ) E x − p ( x ) ( l o g ( P ( x ) / Q ( x ) ) ) D_{KL}(P||Q)E_{x-p(x)}(log(P(x)/Q(x))) DKL​(P∣∣Q)Ex−p(x)​(log(P(x)/Q(x)))想象一…

关于Java中@Component的使用中出现@Autowired为NULL的问题

目录&#xff1a; 关于Java中Component的使用中出现Autowired为NULL的问题解决过程 关于Java中Component的使用中出现Autowired为NULL的问题 解决过程 我在写一个项目中使用Component配置了一个RedisCompent在这里插入代码片类我将在AccountController和 UserinfoController中…

如何使用ssm实现基于Vue框架的订餐系统+vue

TOC ssm157基于Vue框架的订餐系统vue 绪论 1.1 研究背景 当前社会各行业领域竞争压力非常大&#xff0c;随着当前时代的信息化&#xff0c;科学化发展&#xff0c;让社会各行业领域都争相使用新的信息技术&#xff0c;对行业内的各种相关数据进行科学化&#xff0c;规范化管…

AI数字时代客户体验白皮书5G云算力网络云网终端AIGC人工智能宽带政企物联网专线 IDC智慧城市专家学者教授培训讲师分享

客户体验的时代已然来临 在过去的几十年里&#xff0c;中国企业逐步从产品驱动转向市场驱动&#xff0c;从规模竞争走向创新竞争。然而&#xff0c;随着市场竞争的白热化和产品、服务的高度同质化&#xff0c;企业之间的差异化逐渐被削弱&#xff0c;传统的价格战、渠道战已经…

一题看 无记忆化dfs、记忆化dfs和dp直接的转化

无记忆化dfs&#xff1a; class Solution { public:bool resfalse;bool wordBreak(string s, vector<string>& wordDict) {set<string> S;int ns.size();for(auto ss:wordDict){S.insert(ss);}function<void(int)> dfs[&](int t){if(restrue) retur…

深度学习--对抗生成网络(GAN)

对抗生成网络&#xff08;Generative Adversarial Network, GAN&#xff09;是一种深度学习模型&#xff0c;由伊恩古德费洛&#xff08;Ian Goodfellow&#xff09;及其同事在2014年提出。GAN通过两个神经网络的对抗过程来生成数据&#xff0c;这两个网络分别是生成器&#xf…

Chapter 03 Vue指令(下)

欢迎大家订阅【Vue2Vue3】入门到实践 专栏&#xff0c;开启你的 Vue 学习之旅&#xff01; 文章目录 前言一、v-on指令二、v-for指令三、v-bind指令 前言 在 Vue.js 中&#xff0c;指令是带有 v- 前缀的特殊属性&#xff0c;不同属性对应不同的功能。通过学习不同的指令&#…

临床医生与人工智能识别三级淋巴结成熟状态的研究对比|文献速递·24-08-24

小罗碎碎念 这期推文的主题是三级淋巴结&#xff0c;主要解决一个问题——临床上如何识别三级淋巴结&人工智能如何应用于三级淋巴结的识别。这两篇文献来源于临床和工科两位不同的老师&#xff0c;是在与他们交流的过程中推荐的&#xff0c;在这里向他们表示感谢&#xff…

在VSCode中使用REST Client插件调试HTTP接口

在 VSCode 中安装 REST Client 扩展程序。新建 test.http 文件。编写请求 请求编写格式可以查看 REST Client 扩展程序说明。点击“Send Request”发送请求 5. 等待请求完成查看响应 请求完成会自动打开响应结果。响应结果上面部分是响应头&#xff0c;下面部分是响应…

idea付费插件,SequenceDiagram比较好用

以下idea付费插件你们都用过哪些呢&#xff1f; SequenceDiagram插件是一种用于绘制时序图的工具。时序图是一种图形化的表示对象之间消息传递顺序的方法。 该插件可以在使用各种编程语言编写代码时&#xff0c;方便地绘制时序图&#xff0c;以帮助开发者更好地理解和描述系统…

【数据分享】全球含建筑高度的建筑物数据(shp格式\约15亿栋建筑物)

建筑数据是我们在各项研究中经常使用到的数据。之前我们能获取到的建筑数据大多没有建筑高度信息&#xff0c;而建筑高度是建筑数据最重要的属性。之前我们给大家分享了我国分城市的含建筑高度的建筑物数据&#xff08;可查看之前的文章获悉详情&#xff09;&#xff0c;本次我…

ST-LINK常见错误总结

伴随着走进STM32 开发 &#xff0c;烧录部分一直会出现 各种各样的问题 &#xff0c;写一篇博文记录关于烧录部分的问题&#xff0c;此文会持续更新&#xff0c;可能之后又遇到其他新的问题&#xff0c;会回来再添加的。 目录 STLINK CONNECTION ERROR 问题的解决 固件丢失 …