kafka入门(一):kafka消息消费

news2025/1/18 10:49:44

安装kafka,创建 topic:

Windows安装kafka, 详情见:https://blog.csdn.net/sinat_32502451/article/details/133067851

Linux 安装kafka,详情见:https://blog.csdn.net/sinat_32502451/article/details/133080353

添加依赖包:

        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
            <version>2.1.10.RELEASE</version>
        </dependency>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>2.0.0</version>
        </dependency>

kafka代码示例(一):

主要按照以下步骤:

  • 设置 broker服务器的ip和端口, 设置 消费者群组id

  • 初始化消费者

  • 消费者订阅主题

  • 消费者批量拉取消息

public class KafkaDemo1 {
    public static final String BROKER_LIST = "localhost:9092";
    public static final String TOPIC = "myTopic1";
    public static final String GROUP_ID = "group.demo";

    public static void main(String[] args) {
        consumerRecord();
    }

    public static void consumerRecord() {
        //属性配置
        Properties properties = getProperties(BROKER_LIST, GROUP_ID);
        //消费者初始化
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
        //消息者订阅主题
        consumer.subscribe(Collections.singletonList(TOPIC));
        //循环
        while (true) {
            //每次拉取 1千条消息
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
            for (ConsumerRecord<String, String> record : records) {
                System.out.println("=============> 消费kafka消息:"+ record.value());
            }
        }
    }

    public static Properties getProperties(String brokerList, String groupId) {
        Properties properties = new Properties();
        //序列化
        properties.put("key.deserializer",
                "org.apache.kafka.common.serialization.StringDeserializer");
        properties.put("value.deserializer",
                "org.apache.kafka.common.serialization.StringDeserializer");
        //broker服务器的ip和端口,多个用逗号隔开
        properties.put("bootstrap.servers", brokerList);
        //消费者群组id
        properties.put("group.id", groupId);
        return properties;
    }

}


使用文章开头安装好的 kafka,并按文章中的步骤,创建 topic ,打开一个 生产者 producer,并发送消息。
在这里插入图片描述

观察idea 控制台,可以看到 成功消费了消息:

=============> 消费kafka消息:hello kafka

参考资料:

《深入理解kafka 核心设计与实践原理》

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1224905.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

python的文件目录操作 1

我们在实际开发中&#xff0c;经常需要对文件进行读取、遍历、修改等操作&#xff0c;通过 python 的标准内置os模块&#xff0c;能够以简洁高效的方式完成这些操作。常见的操作整理如下&#xff1a; 文件夹操作&#xff1a;包括文件夹的创建、修改&#xff08;改名/移动&…

redis cluster搭建

k8s部署 Redis Insight k8s部署redis集群_mob6454cc6c6291的技术博客_51CTO博客 占用的内存竟然这么小&#xff0c;才200M左右 随便选个节点进去&#xff0c;看能否连接上其他节点 redis-cli -h redis-cluster-v1-0.redis-cluster.project-gulimall.svc.cluster.local 再创建个…

Typecho用宝塔面板建站(保姆级教程)

提前准备&#xff1a; 1 已备案域名 注意:在腾讯云备案的域名部署阿里云服务器的话还需要在阿里云备案&#xff0c;反之亦然 2 服务器 服务器操作系统设置为windows 服务器实例设置&#xff1a;依次开放8888/888/443/3000-4000/21/22端口 个人用的阿里云&#xff0c;到安全组配…

Python与ArcGIS系列(九)自定义python地理处理工具

目录 0 简述1 创建自定义地理处理工具2 创建python工具箱0 简述 在arcgis中可以进行自定义工具箱,将脚本嵌入到自定义的可交互窗口工具中。本篇将介绍如何利用arcpy实现创建自定义地理处理工具以及创建python工具箱。 1 创建自定义地理处理工具 在arctoolbox中的自定义工具箱…

人充当LLM Agent的工具(Human-In-The-Loop ),提升复杂问题解决成功率

原文&#xff1a;人充当LLM Agent的工具&#xff08;Human-In-The-Loop &#xff09;&#xff0c;提升复杂问题解决成功率 在Agent开发过程中&#xff0c;LLM充当Agent的大脑&#xff0c;对问题进行规划、分解、推理&#xff0c;在执行过程中合理选择利用工具&#xff08;Tool&…

软件工程--软件过程学习笔记

本篇内容是对学校软件工程课堂内容的记录总结&#xff0c;部分也来源于网上查找的资料 软件过程基础 软件过程是指在软件开发过程中&#xff0c;经过一系列有序的步骤和活动&#xff0c;从问题定义到最终软件产品交付和维护的全过程。这个过程旨在确保软件项目能够按时、按预…

jvm 内存模型概述

一、类加载子系统 1、类加载的过程&#xff1a;装载、链接、初始化&#xff0c;其中&#xff0c;链接又分为验证、准备和解析 装载&#xff1a;加载class文件 验证&#xff1a;确保字节流中包含信息符合当前虚拟机要求 准备&#xff1a;分配内存&#xff0c;设置初始值 解析&a…

cesium雷达扫描(雷达扫描线)

cesium雷达扫描(雷达扫描线) 下面富有源码 实现思路 使用ellipse方法加载圆型,修改ellipse中‘material’方法重写glsl来实现当前效果 示例代码 index.html <!DOCTYPE html> <html lang="en"><head>

2.2 调用星火大模型的API

调用星火大模型的API 1 申请API调用权限&#xff1a;2 调用原生星火 API3 统一API调用方式 项目仓库地址&#xff1a;https://github.com/datawhalechina/llm-universe 讯飞星火认知大模型&#xff0c;由科大讯飞于2023年5月推出的中文大模型&#xff0c;也是国内大模型的代表…

Golang环境搭建Win10(简洁版)

Golang环境搭建Win10 Golang环境搭建(Win10)一、前言二、Golang下载三、配置环境变量3.1、配置GOROOT3.2、配置GOPATH3.3、配置GOPROXY代理 Golang环境搭建(Win10) 一、前言 Go&#xff08;又称 Golang&#xff09;是 Google 的 Robert Griesemer&#xff0c;Rob Pike 及 Ken…

Spring Framework 6.1 正式 GA

Spring Framework 6.1在运行时方面针对 JDK 21 和 Jakarta EE 10 上提供了一级支持&#xff0c;同时保留了 JDK 17 和 Jakarta EE 9 基线。Spring 还通过精细的元数据推理跟踪 GraalVM for JDK 21 的演变&#xff0c;同时暂时保持与 GraalVM 22.3 的兼容性。 主要变化 支持 JD…

运行ps软件提示由于找不到vcruntime140.dll无法继续执行代码怎么修复

今天我在打开ps时候突然电脑出现找不到vcruntime140.dll无法继续执行代码&#xff0c;我很困扰不知道什么原因&#xff0c;于是我花了一天时间在网上找了5个可以解决这个问题的方案分享给大家&#xff0c;同时我自己也解决了问题。分享给大家就是为了大家以后遇到这个问题不用像…

基于Java+SpringBoot+Vue3+Uniapp+TypeScript(有视频教程)前后端分离健身预约系统设计与实现

博主介绍&#xff1a;✌全网粉丝5W&#xff0c;全栈开发工程师&#xff0c;从事多年软件开发&#xff0c;在大厂呆过。持有软件中级、六级等证书。可提供微服务项目搭建与毕业项目实战&#xff0c;博主也曾写过优秀论文&#xff0c;查重率极低&#xff0c;在这方面有丰富的经验…

Vite - 配置 - 文件路径别名的配置

为什么要配置别名 别名的配置&#xff0c;主要作用是为了缩短代码中的导入路径。例如有如下的项目目录&#xff1a; project-name| -- src| -- a| --b| --c| --d| --e| -- abc.png| -- index.html| -- main.js如果想在 main.js 文件中使用 abc.png ,则使用的路径是 &#xff1…

八股文-TCP的四次挥手

TCP&#xff08;Transmission Control Protocol&#xff09;是一种面向连接的、可靠的传输协议&#xff0c;它的连接的建立和关闭过程都是经过精心设计的。在TCP连接关闭时&#xff0c;使用四次挥手来保证数据的完整传输和连接的正常终止。 漫画TCP的四次挥手 第一次挥手&#…

Azure Machine Learning - Azure AI 搜索中的矢量搜索

矢量搜索是一种信息检索方法&#xff0c;它使用内容的数字表示形式来执行搜索方案。 由于内容是数字而不是纯文本&#xff0c;因此搜索引擎会匹配与查询最相似的矢量&#xff0c;而不需要匹配确切的字词。本文简要介绍了 Azure AI 搜索中的矢量支持。 其中还解释了与其他 Azure…

pdf如何让多张图片在一页

pdf保存为一页六张图片的方法是&#xff1a; 1、打开pdf查看器,打开文档。 2、点击【打印】图标进入打印程序&#xff0c;选择打印范围。 3、在【打印处理】选项,选择【每张张上放置多页】。 4、自定义每页放置的图片张数为六张&#xff0c;并对打印排版预览设置。 5、设置打印…

大师学SwiftUI第16章 - UIKit框架集成

其它相关内容请见​​虚拟现实(VR)/增强现实(AR)&visionOS开发学习笔记​​ SwiftUI是一套新框架&#xff0c;因此并没有包含我们构建专业应用所需的所有工具。这意味着我们会需要求助于UIKit&#xff08;移动设备&#xff09;和AppKit&#xff08;Mac电脑&#xff09;等原…

管理类联考——逻辑——知识+记忆篇——综合推理——考点+记忆

文章目录 整体目录大纲法汇总分类法记忆宫殿法绘图记忆法 考点记忆/考点汇总——按大纲 局部数字编码法归类记忆法重点记忆法歌决记忆法谐音记忆法理解记忆法比较记忆法 本篇思路&#xff1a;根据各方的资料&#xff0c;比如名师的资料&#xff0c;按大纲或者其他方式&#xff…

SQL注入1

对sql进行一个小结 还有其他的注入 其他注入:堆叠注入&#xff0c;宽字节注入&#xff0c;二次注入 首先是数值和字符 id1 and 11和id1 and 12 如果这两个语句返回的页面不一样就说明是数字型 id1 and 11#和id1 and 12# 如果这两个语句返回的页面不一样就说明是字符型 常…