springboot整合kafka入门

news2024/11/19 1:42:01

kafka基本概念

producer: 生产者,负责发布消息到kafka cluster(kafka集群)中。生产者可以是web前端产生的page view,或者是服务器日志,系统CPU、memory等。

consumer: 消费者,每个consumer属于一个特定的consuer group(可为每个consumer指定group name,若不指定group name则属于默认的group)。创建消费者时,要指定消费者接受的消息的topic,该消费者只会接受该topic的消息。

topic: 每条发布到Kafka集群的消息都有一个类别,这个类别被称为topic。(物理上不同topic的消息分开存储,逻辑上一个topic的消息虽然保存于一个或多个broker上但用户只需指定消息的topic即可生产或消费数据而不必关心数据存于何处)。

broker: kafka集群包含一个或多个服务器,这些服务器就叫做broker。

本机安装kafka测试

安装kafka(mac下)

kafka下载: 从官网下载 kafka_2.13-2.7.0.tgz,直接解压即可。

本机测试kafka

1、进入到kafka的解压目录,输入命令启动zookeeper:

./bin/zookeeper-server-start.sh config/zookeeper.properties

复制

打开另一个终端输入命令启动kafka:

./bin/kafka-server-start.sh config/server.properties 

复制

2、服务启起来后,可以创建生产者和消费者了。 再打开另一个终端输入命令创建生产者:

./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test_topic

复制

broker-list: 参数指定生产者所使用的broker localhost: 9092 参数表示broker,这个broker为本机(127.0.0.1),且使用的端口是kafka的默认端口号是9092 topic: 参数表示生产者生产的消息的topic 为 “test_topic”

最后再打开另一个终端创建消费者:

./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test_topic --from-beginning

复制

bootstrap-server: 是指定consumer从哪里(broker)取出消息 topic: 指定消费者consumer取出的 topic 为“test_topic”的消息。 from-beginning: Kafka实际环境有可能会出现Consumer全部宕机,虽然基于Kafka的高可用特性,消费者群组中的消费者可以实现再均衡,所有Consumer不处理数据的情况很少,但是还是有可能会出现,此时就要求Consumer重启的时候能够读取在宕机期间Producer发送的数据。基于消费者订阅模式默认是无法实现的,因为只能订阅最新发送的数据。通过消费者命令行可以实现,只要在命令行中加上–from-beginning即可

3、都创建完了可以通过生产者输入消息,消费者来接收并显示消息,效果图如下:

springboot整合kafka(IDEA)

注意: kafka要是部署在服务器的话,本机就 要和服务器之间能ping通。

1、创建springboot项目:

2、创建两个类,分别为生产者和消费者 项目目录结构:

配置文件application.yml:(一般项目自动生成的是applicaiton.properties,但为了书写简便,改成yml)

spring:
  kafka:
    bootstrap-servers: 127.0.0.1:9092 #服务器的ip及端口,可以写多个,服务器之间用“:”间隔
    producer: #生产者配置
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
    consumer: #消费者配置
      group-id: test #设置消费者的组id
      enable-auto-commit: true
# auto-commit-interval: 1000
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

复制

springboot启动类入口,KafkaStudyApplication.java:

package com.study.kafka.kafka_study;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class KafkaStudyApplication { 
   

    public static void main(String[] args) { 
   
        SpringApplication.run(KafkaStudyApplication.class, args);
    }

}

复制

TestKafkaProducerController.java:(生产者)

package com.study.kafka.kafka_study;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController     //定义这是一个控制器,可以通过浏览器访问
@RequestMapping("/kafka")
public class TestKafkaProducerController { 

@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
//当在浏览器上输入http://localhost:8080/kafka/send?msg=abc,就会发送abc到服务器上去让消费者接收,msg对应下面的String msg
@RequestMapping("/producerSend")
public String send(String msg){ 

kafkaTemplate.send("test_topic", msg); //使用kafka模板发送信息
return "success";
}
}

复制

TestConsumer.java:(消费者)

package com.study.kafka.kafka_study;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
@Component
public class TestConsumer { 

/** * 定义此消费者接收topic为“test_topic”的消息,监听服务器上的kafka是否有相关的消息发过来 * @param record record变量代表消息本身,可以通过ConsumerRecord<?,?>类型的record变量来打印接收的消息的各种信息 * */
@KafkaListener(topics = "test_topic")
public void listen (ConsumerRecord<?, ?> record) throws Exception { 

System.out.printf("topic = %s, offset = %d, value = %s \n", record.topic(), record.offset(), record.value());
}
}

复制

测试

1、运行KafkaStudyApplication.java之后,终端上输入消息时,不仅终端上(服务器)运行的测试消费者能收到,IDEA上的程序也能收到。

2、在浏览器上输入http://localhost:8080/kafka/producerSend?msg=web world31231,不仅IDEA上的消费者能收到,在终端(服务器)上运行的测试消费者也能收到:(其中8080是tomcat服务器的端口,springboot默认下带的是tomcat)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/604126.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Git提交代码报错 Push failed unable to access

目录 场景 环境&#xff1a; Git配置 场景 Push failed unable to access https://github.com/1790753131/remotRepository3.git/: Failed to connect to github.com port 443 after 21114 ms: Couldnt connect to server Push failed unable to ac…

A JavaScript error occurred in the main processUncaught Exception

A JavaScript error occurred in the main processUncaught Exception: Error: getaddrinfo ENOTFOUND rfw.jnsii.com at GetAddrInfoReqWrap.onlookup [as oncomplete] (dns.js:60:26) &#x1f4a7; 记录一下今天遇到的 b u g \color{#FF1493}{记录一下今天遇到的bug} 记录一…

开放接口签名(Signature)实现

开放接口签名(Signature)实现方案 既然是对外开放&#xff0c;那么调用者一定没有我们系统的Token&#xff0c;就需要对调用者进行签名验证&#xff0c;签名验证采用主流的验证方式&#xff0c;采用Signature 的方式。 字段 类型 必传 说明 appid String 是 应用id tim…

windows server安全设置

Windows服务器安全策略设置 1. Windows服务器安全策略设置 操作系统关闭不必要的服务如smartd&#xff08;一个守护进程&#xff08;帮助程序&#xff09;、Print Spoole&#xff08;管理所有本地和网络打印队列及控制所有打印工作&#xff09;&#xff0c;操作系统关闭默认盘…

名著《MySQL必知必会》讲了个啥

文章目录 第一章 了解SQL第二章 检索数据第三章 排序第四章 过滤数据第五章 高级数据过滤第六章 多表查询内连接&#xff08;交集&#xff09;外连接多表连接UNIONUNION ALL 第七章 单行函数日期和时间函数获取日期、时间日期与时间戳的转换获取月份、星期、星期数、天数等函数…

chatgpt赋能python:Python中创建空列表的两种方法

Python中创建空列表的两种方法 在Python编程中&#xff0c;创建空列表是一项非常常见的任务。Python提供了两种主要的方法来创建一个空列表&#xff0c;分别是“中括号法”和“list()函数法”。本文将介绍这两种方法&#xff0c;它们的优缺点以及如何在你的代码中使用它们。 …

2023年5月Web3行业月度发展报告区块链篇 | 陀螺科技会员专享

5月&#xff0c;市场大部熊市与局部牛市并存&#xff0c;一方面&#xff0c;BRC-20与Meme币引领热点涨势&#xff0c;代表项目Ordinals与Pepe涨幅上千倍&#xff0c;相继引发比特币与以太坊拥堵&#xff0c;市场情绪高涨&#xff1b;另一方面&#xff0c;主流币种持续震荡&…

【终结】Plsql 监听失败+链接Oracle ORA-12545:因目标主机不存在

折腾我好久&#xff01;翻阅千山万水也没有命中问题&#xff01;今天我要公布于世&#xff01;如何解决&#xff01; 起因&#xff1a;服务器那边重启了我运行良好的主机导致上述问题&#xff01; 感谢各位CSDN默默贡献的大神&#xff0c;是你们促进互联网的发展&#xff01;…

无需魔法、无需账户!可免费直接使用微软新必应

大家好&#xff0c;我是校长。 如果你不会魔法上网&#xff0c;如果你没有 ChatGPT 账号&#xff0c;无法体验大语言模型生成式 AI &#xff0c;别怕&#xff0c;我给大家推荐一个好的开源的应用站点&#xff0c;它可以让你体验一番。 前几天&#xff0c;我刷 GitHub 的时候&am…

Transformer工业部署落地!超越ResNet、CSWin(附源码)

关注并星标 从此不迷路 计算机视觉研究院 公众号ID&#xff5c;ComputerVisionGzq 学习群&#xff5c;扫码在主页获取加入方式 论文地址&#xff1a;https://arxiv.org/pdf/2207.05501.pdf 计算机视觉研究院专栏 作者&#xff1a;Edison_G 一种用于在现实工业场景中高效部署的下…

读改变未来的九大算法笔记02_数据库

1. 基础思想 1.1. 预写日志记录 1.2. 两阶段提交 1.3. 关系数据库 2. 两个事实 2.1. 计算机程序会崩溃 2.1.1. 当一个程序崩溃时&#xff0c;它会丢掉所有正在处理的东西 2.1.2. 只有安放在计算机文件系统中的信息会得到保存 2.1.3. 崩溃相当宽泛&#xff1a;包括任何可…

助力智能制造数字化转型 | 5.31 IoTDB 中航机载制造行业客户分享会回顾

5 月 31 日&#xff0c;IoTDB & 中航机载智能制造实践分享会在线上举办。IoTDB 与中航机载的两位产品、技术专家&#xff0c;结合 EMQ 与深南电路的两位技术大拿&#xff0c;针对制造行业智能化痛点带来了一场方案实践分享&#xff0c;与线上直播中上千人次的智能制造关注者…

专访瑞声科技应用软件开发总监陆其明:当一名老兵决定重新上路

编者按&#xff1a;从互联网公司到智能终端解决方案公司&#xff0c;陆其明的这次转变可能难以被人理解。但经济大环境的影响和个人的技术困境还是让他义无反顾地走向一个未知的世界。正如黄仁勋日前所言&#xff0c;“撤退”对聪明人来说并不容易。然而&#xff0c;战略性的撤…

刷题记录:一维前缀和 | leetcode-2559. 统计范围内的元音字符串数 2023/6/2

leetcode-2559. 统计范围内的元音字符串数 这道题的思路并不难找&#xff0c;一开始我有点看出是一维前缀和问题&#xff0c;但没有很确定&#xff0c;因此也就没有直接从这个思路走下去。还是想着先做暴力版本的吧&#xff01; 这是暴力版本的代码&#xff1a; class Solut…

零基础搭建私有云盘并内网穿透远程访问

文章目录 摘要视频教程1. 环境搭建2. 测试局域网访问3. 内网穿透3.1 ubuntu本地安装cpolar3.2 创建隧道3.3 测试公网访问 4 配置固定http公网地址4.1 保留一个二级子域名4.1 配置固定二级子域名4.3 测试访问公网固定二级子域名 转载自cpolar极点云的文章&#xff1a;使用Nextcl…

小白入门C#初探Web简易页面显示信息小案例

1、创建新项目 选择ASP.NET Core Web应用&#xff08;模型-视图-控制器&#xff09;&#xff0c;然后点击下一步。 然后在项目名称里面填写CSharpDemo&#xff0c;点击下一步&#xff0c;直至创建即可。 目录结构&#xff1a; Connected Services&#xff1a;是Visual S…

计算机网络-网络层1.2

IPv6 IP地址耗尽&#xff0c;CIDR和NAT只是延长了IPv4地址分配结束的时间 IPv6从根本上解决了IP地址的耗尽问题 与IPv4的比较 扩大了地址空间移除校验和字段&#xff0c;减少了每跳的处理时间将IPv4的可选字段移出首部&#xff0c;变成拓展首部&#xff0c;路由器不对拓展首…

NIO vs BIO模型解读

目录 stream vs channel IO模型 零拷贝 传统IO NIO优化 stream vs channel stream 不会自动缓冲数据&#xff0c;channel 会利用系统提供的发送缓冲区、接收缓冲区&#xff08;更为底层&#xff09;stream 仅支持阻塞 API&#xff0c;channel 同时支持阻塞、非阻塞 API&a…

计算机网络-网络层1.1

IPv4 网络层打包传输数据时&#xff0c;数据量小则称IP数据报&#xff0c;数据量大则分片&#xff0c;每一片称为IPv4分组 分组格式 固定部分长20B&#xff0c;可变部分用于提供错误检测和安全等机制 版本&#xff1a;指IP版本首部长度&#xff1a;以4B为单位&#xff0c…

Java --- springboot3之web的自动配置原理

目录 一、自动配置 二、默认效果 三、WebMvcAutoConfiguration原理 3.1、生效条件 3.2、效果 3.3、WebMvcConfigurer接口 一、自动配置 1、导入web的pom依赖 <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-s…