springboot集成kafka-生产者发送消息

news2024/11/23 11:38:25

springboot集成kafka发送消息

  • 1、kafkaTemplate.send()方法
    • 1.1、springboot集成kafka发送消息Message对象消息
    • 1.2、springboot集成kafka发送ProducerRecord对象消息
    • 1.3、springboot集成kafka发送指定分区消息
  • 2、kafkaTemplate.sendDefault()方法
  • 3、kafkaTemplate.send(...)和kafkaTemplate.sendDefault(...)的区别

在这里插入图片描述在这里插入图片描述

1、kafkaTemplate.send()方法

1.1、springboot集成kafka发送消息Message对象消息

生产者代码

package com.power.producer;

import org.springframework.kafka.annotation.KafkaHandler;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;

@Component
public class EventProducer {

    @Resource
    private KafkaTemplate<String,String> kafkaTemplate;

    public void sendMessage(){
        //通过构建器模式创建Message对象
        Message message = MessageBuilder.withPayload("hello-message")
                .setHeader(KafkaHeaders.TOPIC,"test-topic-02")//在header中放置topic的名字
                .build();
        kafkaTemplate.send(message);
    }
}

2、测试类

package com.power;

import com.power.producer.EventProducer;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;

import javax.annotation.Resource;

@SpringBootTest
public class SpringBoot01KafkaBaseApplication {

    @Resource
    private EventProducer eventProducer;
    
    @Test
    void testMessage(){
        eventProducer.sendMessage();
    }
    
	@Test
	void testMessage(){
	    eventProducer.sendMessage();
	}
}

1.2、springboot集成kafka发送ProducerRecord对象消息

生产者

package com.power.producer;

import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;
import java.nio.charset.StandardCharsets;

@Component
public class EventProducer {

    @Resource
    private KafkaTemplate<String,String> kafkaTemplate;

    public void sendMessage(){
        //通过构建器模式创建Message对象
        Message message = MessageBuilder.withPayload("hello-message")
                .setHeader(KafkaHeaders.TOPIC,"test-topic-02")//在header中放置topic的名字
                .build();
        kafkaTemplate.send(message);
    }

    public void sendProducerRecord(){
        //Headers里面放一些信息(信息是key-value键值对),到时候消费组接收到消息后,可以拿到这个Headers里面放的信息
        Headers headers = new RecordHeaders();
        headers.add("phone","17676767676".getBytes(StandardCharsets.UTF_8));
        headers.add("orderId","OD1456467576467".getBytes(StandardCharsets.UTF_8));
        //String topic, Integer partition, Long timestamp, K key, V value, Iterable<Header> headers
        ProducerRecord<String,String> record =
                new ProducerRecord("test-topic-02", 0, System.currentTimeMillis(), "key1", "value", headers);
        kafkaTemplate.send(record);
    }
}

测试类:

package com.power;

import com.power.producer.EventProducer;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;

import javax.annotation.Resource;

@SpringBootTest
public class SpringBoot01KafkaBaseApplication {

    @Resource
    private EventProducer eventProducer;

    @Test
    void testProducerRecord(){
        eventProducer.sendProducerRecord();
    }

}

1.3、springboot集成kafka发送指定分区消息

生产者:(方法sendEvent4)

package com.power.producer;

import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;
import java.nio.charset.StandardCharsets;

@Component
public class EventProducer {

    @Resource
    private KafkaTemplate<String,String> kafkaTemplate;

    public void sendEvent4(){
        //String topic, Integer partition, Long timestamp, K key, V data
        kafkaTemplate.send("test-topic-02",0,System.currentTimeMillis(),"k2","hello-kafka");
    }
}

测试类:

package com.power;

import com.power.producer.EventProducer;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;

import javax.annotation.Resource;

@SpringBootTest
public class SpringBoot01KafkaBaseApplication {

    @Resource
    private EventProducer eventProducer;

    @Test
    void sendEvent4(){
        eventProducer.sendEvent4();
    }
}

2、kafkaTemplate.sendDefault()方法

生产者:

package com.power.producer;

import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;
import java.nio.charset.StandardCharsets;

@Component
public class EventProducer {

    @Resource
    private KafkaTemplate<String,String> kafkaTemplate;
    
    public void sendDefault(){
        //Integer partition, Long timestamp, K key, V data
        kafkaTemplate.sendDefault(0,System.currentTimeMillis(),"k3","hello-kafka");
    }
}

测试类:

package com.power;

import com.power.producer.EventProducer;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;

import javax.annotation.Resource;

@SpringBootTest
public class SpringBoot01KafkaBaseApplication {

    @Resource
    private EventProducer eventProducer;

    @Test
    void sendDefault(){
        eventProducer.sendDefault();
    }
}

配置文件:设置默认topic
不设置topic运行生产者会报找不到topic的错。
在这里插入图片描述

3、kafkaTemplate.send(…)和kafkaTemplate.sendDefault(…)的区别

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2064048.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

案例-异常

题目: (如果一开始不知道如何用异常的语法写,可先用如if语句代替try...catch,最后再把if优化为try...catch) 代码: javabean类: 测试类:

Java CompletableFuture:你真的了解它吗?

文章目录 1 什么是 CompletableFuture&#xff1f;2 如何正确使用 CompletableFuture 对象&#xff1f;3 如何结合回调函数处理异步任务结果&#xff1f;4 如何组合并处理多个 CompletableFuture&#xff1f; 1 什么是 CompletableFuture&#xff1f; CompletableFuture 是 Ja…

springboot静态资源访问问题归纳

以下内容基于springboot 2.3.4.RELEASE 1、默认配置的springboot项目&#xff0c;有四个静态资源文件夹&#xff0c;它们是有优先级的&#xff0c;如下&#xff1a; "classpath:/META-INF/resources/", &#xff08;优先级最高&#xff09; "classpath:/reso…

【精选】基于Spark的国漫推荐系统(精选设计产品)

目录&#xff1a; 系统开发技术 Python可视化技术 Django框架 Hadoop介绍 Scrapy介绍 IDEA介绍 B/S架构 MySQL数据库介绍 系统流程分析 操作流程 添加信息流程 删除信息流程 系统系统介绍&#xff1a; 可以查看我的B站&#xff1a; 系统测试 运行环境 软件平台 硬…

docker-compose安装NebulaGraph 3.8.0

文章目录 一. 安装NebulaGraph1.1 通过 Git 克隆nebula-docker-compose仓库的3.8.0分支到主机1.2 部署1.3 卸载1.4 查看 二. 安装NebulaGraph Studio2.1 下载 Studio 的部署配置文件2.2 创建nebula-graph-studio-3.10.0目录&#xff0c;并将安装包解压至目录中2.3 解压后进入 n…

shaushaushau1

CVE-2023-7130 靶标介绍&#xff1a; College Notes Gallery 2.0 允许通过“/notes/login.php”中的参数‘user’进行 SQL 注入。利用这个问题可能会使攻击者有机会破坏应用程序&#xff0c;访问或修改数据. 已经告诉你在哪里存在sql注入了&#xff0c;一般上来应该先目录扫…

【补充篇】AUTOSAR多核OS介绍(下)

文章目录 前文回顾1 AUTOSAR OS1.1 AUTSOAR OS元素1.1.1 操作系统对象1.1.2 操作系统应用程序1.1.3 AUTOSAR OS裁剪类型1.1.4 AUTOSAR OS软件分区1.2 AUTOSAR OS自旋锁1.3 AUTOSAR OS核间通信1.4 AUTOSAR OS多核调度前文回顾 在上篇文章【补充篇】AUTOSAR多核OS介绍(上)中,…

对于一个36岁的人来说,现在转行AI大模型还来得及吗?

前言 在职场生涯中&#xff0c;33岁似乎是一个尴尬的年龄。许多人在这个阶段已经定型&#xff0c;难以寻求新的突破。然而&#xff0c;随着科技行业的飞速发展&#xff0c;人工智能成为了新时代的宠儿。那么&#xff0c;对于一个33岁的人来说&#xff0c;现在转行AI大模型还来…

做SSH实验下载 paramiko库

今天做SSH实验下载paramiko库文件一直出问题&#xff0c;后面库文件下好了还是报错&#xff0c;这里记录了我的解决方案。 pycharm修改默认下载路径为国内镜像&#xff08;我这里用清华大学的镜像下载快一些&#xff09; Simple Index 到这里路径就改好了&#xff0c;接下来就…

从就业出发,深度剖析大数据行业的现状与前景

以一个经典案例引入——啤酒与纸尿裤的故事。 20世纪90年代&#xff0c;沃尔玛从购物的后台信息数据中&#xff0c;发现很多买了纸尿裤的男士会同时买啤酒。后来&#xff0c;调查发现&#xff0c;此类人多是被“轰出来”买纸尿裤&#xff0c;一想到养娃压力大&#xff0c;心情…

牛客竞赛数据结构专题班树状数组、线段树练习题

牛客竞赛_ACM/NOI/CSP/CCPC/ICPC算法编程高难度练习赛_牛客竞赛OJ G 智乃酱的平方数列&#xff08;线段树&#xff0c;等差数列&#xff0c;多项式&#xff09; 题目描述 想必你一定会用线段树维护等差数列吧&#xff1f;让我们来看看它的升级版。 请你维护一个长度为510 ^5…

Mysql高级 [Linux版] 性能优化 数据库系统配置优化 和 MySQL的执行顺序 以及 Mysql执行引擎介绍

数据库系统配置优化 1、定义 数据库是基于操作系统的&#xff0c;目前大多数MySQL都是安装在linux系统之上&#xff0c;所以对于操作系统的一些参数配置也会影响到MySQL的性能&#xff0c;下面就列出一些常用的系统配置。 2、优化配置参数-操作系统 优化包括操作系统的优化及My…

集运系统:如何实现不同员工的不同操作权限?

在集运行业&#xff0c;员工的角色和职责各有不同&#xff0c;因此对系统的操作权限需求也不尽相同。为了确保数据的安全性和业务的顺利进行&#xff0c;易境通集运系统提供了灵活的权限管理功能&#xff0c;让企业可以根据员工的角色和职责&#xff0c;设置不同的操作权限。 易…

Redis (day 3)

一、通过jedis连接数据库 1.首先导入依赖 <!-- https://mvnrepository.com/artifact/redis.clients/jedis --><dependency><groupId>redis.clients</groupId><artifactId>jedis</artifactId><version>5.1.0</version></de…

mac 微信数据直接存储到移动硬盘

在apple设备上存储都是1500块/128gb的价格收取的&#xff0c;真的是寸土寸金。在手机已经占用了一遍存储空间之后&#xff0c;微信备份还要占用一遍。 iCloud备份微信聊天记录的稳定性真的非常差劲&#xff0c;比如我微信30g&#xff0c;经常恢复到20g左右就被打断&#xff0c;…

【C++ Primer Plus习题】2.6

问题: 解答: #include <iostream> using namespace std;#define LIGHT_TO_SKY 63240double lightToSky(double value) {return value * LIGHT_TO_SKY; }int main() {double light 0;cout << "请输入光年值:";cin >> light;cout << light &…

还在返回一大堆 null 字段给前端?

在许多情况下&#xff0c;返回的 JSON 数据可能包含许多 null 值的字段&#xff0c;这会导致数据冗余&#xff0c;增加网络传输的负担&#xff0c;并使得前端处理数据变得复杂。因此&#xff0c;使用 JsonInclude(JsonInclude.Include.NON_NULL) 可以帮助我们优化 JSON 的输出&…

看看人家写的,Controller太优雅了~【送源码】

今天咱们来聊聊如何写出优雅的Controller代码。 写程序想让作品成为经典&#xff0c;不只是简单地加个try-catch就完事了。有时候&#xff0c;一个不小心&#xff0c;Controller里写的业务逻辑都能让你血压飙升&#xff01;不过别慌&#xff0c;今天我就来带大家看看怎么把Cont…

文件描述符的复制,访问测试,修改文件大小,文件锁

指定文件描述符 #include <unistd.h>int dup2(int oldfd, int newfd); ->功能:复制文件描述符表的特定条目到指定项&#xff0c; ->参数:oldfd: 源文件描述符 newfd: 目标文件描述符 ->返回值:成功返回目标文件描述符(newfd)&#xff0c;失败返回…