kafka消息系统实战

news2024/11/17 17:30:31

kafka是什么?

        是一种高吞吐量的、分布式、发布、订阅、消息系统 

1.导入maven坐标

        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>2.4.1</version>
        </dependency>

2.编写提供者

public class KafkaProducer {
    public static void main(String[] args) {
        Properties prop = new Properties();

        prop.put("bootstrap.servers", "localhost:9092");
        prop.put("key.serializer",
                "org.apache.kafka.common.serialization.StringSerializer");
        prop.put("value.serializer",
                "org.apache.kafka.common.serialization.StringSerializer");

        prop.put("acks", "all");
        prop.put("retries", 0);
        prop.put("batch.size", 16384);
        prop.put("linger.ms", 1);
        prop.put("buffer.memory", 33554432);
        String topic = "hello"; // 主题
        org.apache.kafka.clients.producer.KafkaProducer<String, String> producer = new org.apache.kafka.clients.producer.KafkaProducer<>(prop);
        producer.send(new ProducerRecord<String, String>(topic, Integer.toString(2), "hello kafka"));

        producer.close();
    }

}

3.编写消费者

public class KafkaConsumer {
    public static void main(String[] args) throws InterruptedException {
        Properties prop = new Properties();
        prop.put("bootstrap.servers", "192.168.8.166:9092");
        prop.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        prop.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        prop.put("group.id", "con-1");
        prop.put("auto.offset.reset", "latest");
        //自动提交偏移量
        prop.put("auto.commit.intervals.ms", "true");
        //自动提交时间
        prop.put("auto.commit.interval.ms", "1000");
        org.apache.kafka.clients.consumer.KafkaConsumer<String, String> consumer = new org.apache.kafka.clients.consumer.KafkaConsumer<>(prop);
        ArrayList<String> topics = new ArrayList<>();
        //可以订阅多个消息
        topics.add("hello");
        consumer.subscribe(topics);
        while (true) {
            ConsumerRecords<String, String> poll = consumer.poll(Duration.ofSeconds(20));
            for (ConsumerRecord<String, String> consumerRecord : poll) {
                System.out.println(consumerRecord);
                System.out.println(consumerRecord.key());
                System.out.println(consumerRecord.value());
                System.out.println(consumerRecord.topic());
            }
        }

    }
}

 4.下载kafka

点此去官网下载——>Apache Kafka

解压后进入config目录 

修改zookeeper.properties

dataDir=D:/kafka_2.13-3.5.1/tmp/zookeeper

修改日志存放的路径server.properties

log.dirs=D:/kafka_2.13-3.5.1/tmp/kafka-logs

 启动zookeeper服务

zookeeper-server-start.bat ../../config/zookeeper.properties

启动kafka服务

kafka-server-start.bat ../../config/server.properties

5.依次启动消费者和生产者,查看发布的消息

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/946939.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Python文本终端GUI框架详解

今天笔者带大家&#xff0c;梳理几个常见的基于文本终端的 UI 框架&#xff0c;一睹为快&#xff01; Curses 首先出场的是 Curses。 Curses 是一个能提供基于文本终端窗口功能的动态库&#xff0c;它可以: 使用整个屏幕 创建和管理一个窗口 使用 8 种不同的彩色 为程序提供…

web之利用延迟实现复杂动画、animation

文章目录 效果图htmlstyleJavaScript 效果图 html <div class"container"><div class"ball"></div><input type"range" min"0" max"1" step"0.01" /> </div>style body {display…

有机器视觉工程师假装在工作

没有节假日&#xff0c;没有任何业务时间&#xff0c;去充实自己&#xff0c;甚至都没有时间陪女朋友&#xff0c;甚至都没有时间找女朋友。 没有人休息的工作&#xff1a; 每天上班三个地点&#xff0c;住宿&#xff0c;现场&#xff0c;吃饭的地方。搞得和高考似的&#xff…

【算法训练-哈希】两数之和、三数之和

废话不多说&#xff0c;喊一句号子鼓励自己&#xff1a;程序员永不失业&#xff0c;程序员走向架构&#xff01;本篇Blog的主题是两数之和和三数之和&#xff0c;使用哈希这个基本的数据结构来实现 两数之和【EASY】 照例先从简单往难搞 题干 输入&#xff1a; [3,2,4],6返…

【LeetCode-中等题】148. 排序链表

文章目录 题目方法一&#xff1a;集合排序&#xff08;核心是内部的排序&#xff09;方法二&#xff1a; 优先队列&#xff08;核心也是内部的排序&#xff09;方法三&#xff1a;归并排序&#xff08;带递归&#xff09; 从上往下方法四&#xff1a;归并排序&#xff08;省去递…

【juc】读写锁ReentrantReadWriteLock

目录 一、说明二、读读不互斥2.1 代码示例2.2 截图示例 三、读写互斥3.1 代码示例3.2 截图示例 四、写写互斥4.1 代码示例4.2 截图示例 五、注意事项5.2.1 代码示例5.2.2 截图示例 一、说明 1.当读操作远远高于写操作时&#xff0c;使用读写锁让读读可以并发&#xff0c;来提高…

excel功能区(ribbonx)编程笔记--2 button控件与checkbox控件

我们上一章简单先了解了ribbonx的基本内容,以及使用举例实现自己修改ribbox的内容,本章紧接上一章,先讲解一下ribbonx的button控件。 在功能区的按钮中,可以使用内置图像或提供自已的图像,可以指定大按钮或者更小的形式,添加少量的代码甚至可以同时提供标签。此外,可以利…

Nginx到底是什么,他能干什么?

Ngnix是什么&#xff0c;它是用来做什么的呢&#xff1f; 一。Nginx简介 Nginx是enginex的简写&#xff0c;是一款很优秀的开源的高性能HTTP和反向代理服务器,由于它是用C语言写的&#xff0c;所以速度非常快&#xff0c;性能非常优秀&#xff0c;它主要功能就是反向代理&…

使用安全复制命令scp在Windows系统和Linux系统之间相互传输文件

现在已经有很多远程控制服务器的第三方软件平台&#xff0c;比如FinalShell&#xff0c;MobaXterm等&#xff0c;半可视化界面&#xff0c;使用起来非常方便和友好&#xff0c;两个系统之间传输文件直接拖就行&#xff0c;当然也可以使用命令方式在两个系统之间相互传递。 目录…

Java网络爬虫——jsoup快速上手,爬取京东数据。同时解决‘京东安全’防爬问题

Java网络爬虫——jsoup快速上手&#xff0c;爬取京东数据。同时解决‘京东安全’防爬问题 介绍 网络爬虫&#xff0c;就是在浏览器上&#xff0c;代替人类爬取数据&#xff0c;Java网络爬虫就是通过Java编写爬虫代码&#xff0c;代替人类从网络上爬取信息数据。程序员通过设定…

Spring Cloud Gateway的快速使用

环境前置搭建Nacos&#xff1a;点击跳转 Spring Cloud Gateway Docs 新建gateway网关模块 pom.xml导入依赖 <!-- 网关 --> <dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-gateway</artifact…

部署Spring Boot项目

上传jar包 之前在新建Spring Boot项目[1]使用mvn install的方式&#xff0c;已经构建出jar包。 通过scp或rz/sz&#xff0c;将该jar包上传到服务器 执行java -jar hello-0.0.1-SNAPSHOT.jar,发生如下报错&#xff1a; Exception in thread "main" java.lang.Unsuppo…

jsp+servlet+mysql阳光网吧管理系统

项目介绍&#xff1a; 本系统使用jspservletmysql开发的阳光网吧管理系统&#xff0c;纯手工敲打&#xff0c;系统管理员和用户角色&#xff0c;功能如下&#xff1a; 管理员&#xff1a;修改个人信息、修改密码&#xff1b;机房类型管理&#xff1b;机房管理&#xff1b;机位…

如何变更小程序会员卡的上级

在小程序中&#xff0c;手动变更会员的上级是一项常见的操作。无论是为了层级调整还是因个人原因&#xff0c;支持手动变更会员的上级可以有效地管理和优化团队的组织结构。下面就具体介绍如何手动变更会员的上级。 1. 找到指定的会员卡。在管理员后台->会员管理处&#xf…

vue项目静态文件资源下载

业务场景&#xff1a;页面有一个导入功能&#xff0c;需要一个模板文件供下载&#xff0c;文件放在本地。 对于 Vue 3 Vite 项目&#xff0c;使用 require 方法来导入模块是不被支持的。require 是 CommonJS 规范中用于模块导入的方法&#xff0c;在 Webpack 等构建工具中常用…

扩散模型实战(八):微调扩散模型

推荐阅读列表&#xff1a; 扩散模型实战&#xff08;一&#xff09;&#xff1a;基本原理介绍 扩散模型实战&#xff08;二&#xff09;&#xff1a;扩散模型的发展 扩散模型实战&#xff08;三&#xff09;&#xff1a;扩散模型的应用 扩散模型实战&#xff08;四&#xf…

FPGA VR摄像机-拍摄和拼接立体 360 度视频

本文介绍的是 FPGA VR 相机的第二个版本&#xff0c;第一个版本是下面这样&#xff1a; 第一版地址&#xff1a; ❝ https://hackaday.io/project/26974-vr-camera-fpga-stereoscopic-3d-360-camera ❞ 本文主要介绍第二版本&#xff0c;第二版本的 VR 摄像机&#xff0c;能够以…

Python提取JSON文件中的指定数据并保存在CSV或Excel表格文件内

本文介绍基于Python语言&#xff0c;读取JSON格式的数据&#xff0c;提取其中的指定内容&#xff0c;并将提取到的数据保存到.csv格式或.xlsx格式的表格文件中的方法。 JSON格式的数据在数据信息交换过程中经常使用&#xff0c;但是相对而言并不直观&#xff1b;因此&#xff0…

Java学习笔记31——字符流

字符流 字符流为什么出现字符流编码表字符串中的编码解码问题字符流写数据的5中方式字符流读数据的两种方式字符流复制Java文件 字符流 为什么出现字符流 汉字的存储如果是GBK编码占用2个字节&#xff0c;如果是UTF-8占用三个字节 用字节流复制文本文件时&#xff0c;文本文…

华为---OSPF协议优先级、开销(cost)、定时器简介及示例配置

OSPF协议优先级、开销、定时器简介及示例配置 路由协议优先级&#xff1a;由于路由器上可能同时运行多种动态路由协议&#xff0c;就存在各个路由协议之间路由信息共享和选择的问题。系统为每一种路由协议设置了不同的默认优先级&#xff0c;当在不同协议中发现同一条路由时&am…