kafka 安装快速入门

news2025/1/9 1:22:36

直接上干货,我们公司最近要进行消息推送指定软件kafka,直接走起。

1.下载

        kafka 是apache的项目。下载地址:kafka.apache.org/

点击download kafka 进入查看相关版本进行下载。

我这里用的版本比窘旧一点,公司技术一切求稳。

下载好安装包就已经实现了。

2.安装

 说安装就是免安装版本,简单配置一下就可以使用了

直接解压就可以,但是需要我们修改一些配置文件。

kafka 集成了 zookeeper 的软件包,不需要安装,配置就好。

1):配置zookeeper.properties

进入到config文件夹里面,找到zookeeper.properties文件,进行编辑,找到dataDir,修改这个参数的路径为:

dataDir=D:\kafka\kafka_2.13-3.2.1\zookeeper-data (安装目录\zookeeper-data)

2):配置server.properties

进入到config文件夹里面,找到server.properties文件,进行编辑,找到log.dirs,修改这个参数的路径为:

log.dirs=D:\kafka\kafka_2.13-3.2.1\kafka-logs (安装目录\kefke-log)

完成了配置,现在就可以启动了

3.启动

1):启动zookeeper

 打开cmd,进入kafka安装目录,执行:

.\bin\windows\zookeeper-server-start.bat .\config\zookeeper.properties

2):启动kafka

再打开一个cmd,进入kafka安装目录,执行:

.\bin\windows\kafka-server-start.bat .\config\server.properties

完成启动就可以访问了,但是我们访问有端口限制,所以我又进行了端口修改

kafka默认端口号:9092  我这里给它改为8081 方便测试访问。

4.配置端口(可以不修改)

config下

1、service.properties (添加)                               port = 8081    不指定的话,按照默认9092

2、connect-distributed.properties (集群)            bootstrap.servers=localhost:8081

3、producer.properties (生产者)                         bootstrap.servers=localhost:8081

4、connect-standalone.properties (单机)            bootstrap.servers=localhost:8081

5、consumer.properties (消费者)                        bootstrap.servers=localhost:8081

根据情况配置端口

5.编码

maven坐标

        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>2.2.1</version>
        </dependency>

1)生产者代码

package com.java;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;

public class MyKafKaProducer extends Thread{

    KafkaProducer<Integer, String> producer;
    String topic; // 主题

    public MyKafKaProducer(String topic) {
        // 构建连接配置
        Properties properties = new Properties();
       
        // 若要配多个服务器,用逗号隔开
        // 注:服务器要开放端口,若云服务器还要在server.properties配置内网IP和外网IP
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.1.32:8081");
        properties.put(ProducerConfig.CLIENT_ID_CONFIG, "my-producer");
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
                       IntegerSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
                       StringSerializer.class.getName());

        // 构造Client无非是:new 或 工厂模式
        producer = new KafkaProducer<Integer, String>(properties);
        this.topic = topic;
    }

    public void run() {
        int num = 0;
        String msg = "kafka practice msg: " + num;
        while (num < 20) {
            try {
                // 发送消息send()!!! 同步调用
                // Future.get()会阻塞,等待返回结果......
                RecordMetadata recordMetadata = producer.send(new ProducerRecord<>(topic, msg)).get();
      			// 等上面get到结果了,才能执行这里
                System.out.println(recordMetadata.offset() + "->" + recordMetadata.partition() + 
                                   "->" + recordMetadata.topic());
                TimeUnit.SECONDS.sleep(2);
                num++;
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (ExecutionException e) {
                e.printStackTrace();
            }
        }
    }

    public static void main(String[] args) {
        // 传入test主题
        new MyKafKaProducer("test").start();
    }
} 

2)消费者代码

package com.java;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;
import java.util.Collections;
import java.util.Properties;

public class MykafkaConsumer extends Thread{

    KafkaConsumer<Integer, String> consumer;
    String topic;

    public MykafkaConsumer(String topic) {
        // 构建连接配置,这里是ConsumerConfig
        Properties properties = new Properties();
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.1.32:8081");
        properties.put(ConsumerConfig.CLIENT_ID_CONFIG, "my-consumer");
        // 反序列化,这里是Deserializer
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
                       IntegerDeserializer.class.getName());
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
                       StringDeserializer.class.getName());
                       
        // 以下是Producer没有的配置
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, "my-gid"); // 要加入的group
        properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000"); // 超时,心跳
        properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000"); // 自动提交(批量)
        properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); // 新group消费为位置

        consumer = new KafkaConsumer<>(properties);
        this.topic = topic;
    }

    public void run() {
        // 死循环不断消费消息
        while (true) {
            // 绑定订阅主题
            // 注:Collections.singleton返回一个单元素&不可修改Set集合,
            // 同样的还有singletonList,singletonMap
            consumer.subscribe(Collections.singleton(this.topic));
            // 接收消息 POLL()!!!
            ConsumerRecords<Integer, String> consumerRecords = consumer.poll(Duration.ofSeconds(1));
            // 注:一行的lamada表达式可以不用{}
            consumerRecords.forEach(record -> System.out.println(record.key() + "->" + 
                                                 record.value() + "->" + record.offset()));
        }
    }

    public static void main(String[] args) {
        // 拉取test主题的消息
        new MykafkaConsumer("test").start();
    }
}

 topic 主题必须一致才可以实现生产者和消费者的互通

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/611202.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

论文笔记:Normalizing Flows for Probabilistic Modeling and Inference

Abstract 正则流&#xff08;Normalizing flows&#xff09;提供了一种通用的机制来定义富有表达力的概率分布&#xff0c;只需要指定一个&#xff08;通常简单的&#xff09;基础分布和一系列可逆变换。 Intraduction 正则流通过将简单的密度通过一系列变换来产生更丰富、可…

怎么选择适合爬虫的代理IP,使用时需要注意什么

网络爬虫工作离不开代理服务器的支持&#xff0c;但并不是所有的代理服务器都适合爬虫工作。那么如何选择适合爬虫的代理服务器呢&#xff1f; 选择适合爬虫的代理服务器需要考虑以下几个方面&#xff1a; 1、代理服务器的稳定性&#xff1a;稳定可靠的代理服务器更能够保证爬虫…

JPEG压缩基本原理

JPEG算法的第一步是将图像分割成8X8的小块。 在计算机中&#xff0c;彩色图像最常见的表示方法是RGB格式&#xff0c;通过R(Red)、G(Green)A和(Blue)组合出各种颜色。 除此以外&#xff0c;还有一种表示彩色图像的方法&#xff0c;称为YUV格式。Y表示亮度&#xff0c;U和V表示…

【C++】一文带你吃透C++继承

&#x1f34e; 博客主页&#xff1a;&#x1f319;披星戴月的贾维斯 &#x1f34e; 欢迎关注&#xff1a;&#x1f44d;点赞&#x1f343;收藏&#x1f525;留言 &#x1f347;系列专栏&#xff1a;&#x1f319; C/C专栏 &#x1f319;那些看似波澜不惊的日复一日&#xff0c;…

Docker attach VS exec

我们知道&#xff0c;进入容器常用的两种方式为&#xff1a;docker exec ...、docker attach ...&#xff0c;那这两者有什么区别呢&#xff1f; 首先&#xff0c;运行一个测试容器&#xff0c;并在启动容器时运行相关指令&#xff0c;如下&#xff1a; docker run --name te…

JVM学习笔记一

程序计数器是一块儿较小的内存, 请你谈谈你对JVM的理解?java8虚拟机和之前的有什么变化更新?什么是OOM?什么是栈溢出(StackOverFlowError)?怎么分析JVM的常用调优参数?内存快照如何抓取?怎么分析Dump文件?谈谈JVM中类加载器你的认识?JVM的位置JVM的体系结构类加载器双…

科研热点|科研人专属身份证来了,国产ORCID ID启动!

2023年6月1日&#xff0c;国家自然科学基金委员会发布了《国家自然科学基金委员会关于推广和发布基础研究科研人员标识&#xff08;BRID&#xff09;有关工作安排的通告》&#xff0c;宣布从即日起&#xff0c;国家自然科学基金委员会&#xff08;以下简称自然科学基金委&#…

高完整性系统(4)Formal Logic (形式逻辑和 Alloy 简介)

文章目录 Story so far形式逻辑命题 proposition谓词 predicate连接词VariablesSet 集合Set operation 集合操作Set Relationship 集合关系Alloy Set alloy 的集合表示Quantification 量词Relations 关系案例Binary Relations 二元关系图Functions 函数Total v.s. Partial Func…

IO模型、select、poll、epoll

阻塞IO模型 阻塞IO是最通用的IO类型&#xff0c;使用这种模型进行数据接收的时候&#xff0c;在数据没有到之前程序会一直等待。例如&#xff0c;对于函数recvfrom(),内核会一直阻塞该请求直到有数据到来才返回。 非阻塞IO模型 当把套接字设置成非阻塞的IO,则对每次请求&…

Java网络开发(Tomcat)——遇到的 bug 汇总(持续更新)

目录 引出:bug::bug::bug:Tomcat开发的bug汇总项目启动就报错1.WebServlet()路径配置的问题2.由于之前的错误&#xff0c;Context[/day01]启动失败【困扰】3.启动过滤器异常---init方法 JSP使用相关报错1.后端传给jsp的数据&#xff0c;前端jsp不显示2.jsp的包没有导&#xff0…

6 vue

前端开发 1.前端开发 前端工程师“Front-End-Developer”源自于美国。大约从2005年开始正式的前端工程师角色被行业所认可&#xff0c;到了2010年&#xff0c;互联网开始全面进入移动时代&#xff0c;前端开发的工作越来越重要。 最初所有的开发工作都是由后端工程师完成的&…

‘jupyter‘ 不是内部或外部命令,也不是可运行的程序或批处理文件。

目录 0.问题背景环境介绍 1.解决步骤 2.测试步骤 0.问题背景环境介绍 1&#xff09;环境&#xff1a;windows64 2&#xff09;问题背景&#xff1a;在搭建jupyter notebook的过程中&#xff0c;想用windows的任务管理器启动jupyter notebook或者使用【jupyter notebook --…

降低成本,快速搭建企业帮助文档的方法盘点

企业帮助文档是企业为了解决客户疑问和提高客户满意度而制作的一种文档&#xff0c;通常包括产品的使用指南、故障排除、常见问题解答等内容。一个好的帮助文档可以帮助企业降低客服成本、提高客户满意度&#xff0c;进而提高产品销量和企业品牌形象。但是&#xff0c;有些企业…

基于html+css的图展示108

准备项目 项目开发工具 Visual Studio Code 1.44.2 版本: 1.44.2 提交: ff915844119ce9485abfe8aa9076ec76b5300ddd 日期: 2020-04-16T16:36:23.138Z Electron: 7.1.11 Chrome: 78.0.3904.130 Node.js: 12.8.1 V8: 7.8.279.23-electron.0 OS: Windows_NT x64 10.0.19044 项目…

MFC(十二)多个对话框

我们来制定多个对话框&#xff0c;每个对话框都有不同的功能&#xff0c;单击下一步&#xff0c;即可跳转到下一个对话框 1.新建一个启动按钮 2.在资源视图&#xff0c;Dialog里面&#xff0c;右键-->添加资源---->dialog>选择IDD PROPPAGE_SMALL新建 属性页&#…

「移动机器人行业应用分析」锂电行业

锂电池作为目前一种比较成熟和先进的电池&#xff0c;因其质量轻&#xff0c;储电量大等特点&#xff0c;受到了人们的广泛应用。中国作为全球最大的锂电生产和消费国&#xff0c;也是全球最大的电动汽车市场&#xff0c;随着“碳中和”这一目标的提出&#xff0c;锂离子电池技…

陶瓷板检测系统在工业质检领域的前景

陶瓷是一种重要的工业材料&#xff0c;广泛应用于建筑、电子、航空航天、医疗等领域。在生产过程中&#xff0c;陶瓷制品需要经过多道工序&#xff0c;其中检测环节是非常重要的一环。传统的陶瓷板检测方式主要依赖人工目视检测&#xff0c;效率低下且容易出错。随着人工智能技…

5年Java经验字节社招:15天3次面试,成功拿下Offer

背景经历 当时我工作近5年&#xff0c;明显感觉到了瓶颈期。具体来说&#xff0c;感觉自己用过很多框架、做过一些技术设计、也有过一些产出&#xff0c;但是从技术深度上感觉不足&#xff0c;到后期时做事也没有明显挑战&#xff0c;完全适应了公司节奏&#xff0c;说句不好听…

企业做直播如何选择好的直播平台?需要考虑哪些方面?

企业做直播如何选择好的直播平台&#xff1f;需要考虑哪些方面&#xff1f;我将从功能需求、可靠性与稳定性、用户体验、技术能与售后服务能力等方面进行综合考虑&#xff0c;帮助您做出明智的决策&#xff0c;或是说提供选型方面的参考。 企业在选择一家直播平台时应考虑以下因…

Vue.js 中的过滤器是什么?如何使用过滤器?

Vue.js 中的过滤器是什么&#xff1f;如何使用过滤器&#xff1f; 在 Vue.js 中&#xff0c;过滤器是一种以函数为基础的可重用代码片段&#xff0c;用于对数据进行格式化和处理。通过使用过滤器&#xff0c;我们可以在模板中对数据进行简单的转换&#xff0c;以便更好地呈现给…