centos 上安装 kafka 与 python 调用

news2025/1/12 21:39:39

step0: 环境准备

1、 安装jdk 1.8 以上版本

yum -y install java-1.8.0-openjdk.x86_64

2、 安装配置ZooKeeper

wget https://dlcdn.apache.org/zookeeper/zookeeper-3.8.2/apache-zookeeper-3.8.2-bin.tar.gz --no-check-certificate
tar -zxf apache-zookeeper-3.8.2-bin.tar.gz 


# 存放数据的路径 根据需要修改
mkdir /你存放数据的路径/data
mkdir /你存放日志的路径/logs
cd apache-zookeeper-3.8.2-bin/conf
cp zoo_sample.cfg zoo.cfg
vim zoo.cfg
# 增加如下内容
# dataDir = /你存放数据的路径/data(自定义目录)
# dataLogDir = /你存放日志的路径/logs(自定义目录)
cd ../bin
./zkServer.sh start
#./zkServer.sh stop
#./zkServer.sh restart
#./zkServer.sh status
# 如果启动失败 Starting zookeeper ... FAILED TO START 可能是server 的端口号被占用 需要指定下端口号
# vim zoo.cfg 
# admin.serverPort=9091

./zkCli.sh 
./zkServer.sh stop

step1: 下载kafka

wget https://dlcdn.apache.org/kafka/3.5.0/kafka_2.13-3.5.0.tgz --no-check-certificate
tar -xzf kafka_2.13-3.5.0.tgz
cd kafka_2.13-3.5.0
# 新建一个会话
# yum install tmux -y
tmux 
# Start the ZooKeeper service
./bin/zookeeper-server-start.sh config/zookeeper.properties 
#  Start the Kafka broker service
bin/kafka-server-start.sh config/server.properties

step3: 验证安装

KafkaProducer

pip install kafka-python
import logging
import time
from kafka import KafkaProducer


def on_send_success(record_metadata):
    print(record_metadata.topic)
    print(record_metadata.partition)
    print(record_metadata.offset)


def on_send_error(excp):
    logging.error('I am an errback', exc_info=excp)
    # handle exception

# 将yoursIp替换为启动kafka 服务的ip
producer = KafkaProducer(bootstrap_servers="yoursIp:9092")
i = 0
while True:
    ts = int(time.time() * 1000)
    future = producer.send(topic="test", value=bytes(str(i), encoding="utf-8"), key=bytes(str(i), encoding="utf-8"),
                           timestamp_ms=ts).add_callback(on_send_success).add_errback(on_send_error)
    record_metadata = future.get(timeout=60)
    producer.flush()
    print(i)
    print(record_metadata)

    i += 1
    time.sleep(1)


在这里插入图片描述

KafkaConsummer

from kafka import KafkaConsumer, TopicPartition, OffsetAndMetadata
import time

tp = TopicPartition("test", 0)
consummer = KafkaConsumer(bootstrap_servers=["yoursIp:9092"], auto_offset_reset='earliest',
                          enable_auto_commit=False, group_id="my-group")
consummer.assign([tp])
print("starting offset is {}".format(consummer.position(tp)))
for message in consummer:
    print(message, message.offset)

    consummer.commit({tp: OffsetAndMetadata(message.offset + 1, message)})
    time.sleep(0.15)

在这里插入图片描述
重新启动后,consumer从上一次记录的offset开始继续消费消息。之后每一次consumer重新启动,consumer都会从上一次停止的地方继续开始消费。

参考链接

  • https://kafka.apache.org/quickstart
  • https://blog.csdn.net/qq_54780911/article/details/124293670
  • https://cloud.tencent.com/developer/article/1700375
  • https://juejin.cn/post/6924584866327560199
  • https://kafka-python.readthedocs.io/

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1032213.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

首购2元起!CDN与加速特惠专场来啦~

还在为内容分发、加速成本发愁吗?看过来!火山引擎边缘云CDN与加速特惠专场来啦! 限时活动:首购2元起,新老低至7折! 限时优惠!错过后悔!这波折扣实实在在! 首购专区 新…

用户体验测试:确保产品满足用户期望的关键步骤

在当今竞争激烈的市场中,为了确保产品的成功和用户的满意度,用户体验测试变得至关重要。通过系统化地评估产品在用户使用过程中的可用性、便利性和满意度,用户体验测试有助于发现潜在问题并改进产品设计。本文将介绍用户体验测试的关键步骤和…

有什么推荐使用的企业上网行为管理软件?

在当今信息化社会,企业的上网行为管理越来越重要。企业上网行为软件是一种能够监控和管理企业员工上网行为的工具,它可以帮助企业更好地管理网络资源,提高工作效率,保护企业信息安全,并符合相关的法律法规。本文将深入…

【煤矿虚拟仿真体验】VR采煤机技能培训有效提高训练效果

在我们的社会中,能源是至关重要的。它是推动我们日常生活和工作的主要动力。然而,我们在获取这种能源的过程中,也带来了许多环境问题。煤矿开采是其中的一个重要部分,因此我们需要寻找更环保、更安全的方式来进行煤矿开采。VR&…

【开发篇】三、web下单元测试与mock数据

文章目录 1、加载测试专用属性2、加载测试专用配置3、测试类中启动web环境4、发送虚拟请求5、匹配(断言)响应的执行状态6、匹配响应的结果7、匹配响应头8、业务层测试事务回滚9、UT数据设置随机数据 1、加载测试专用属性 写单元测试时,如果需…

PyTorch实战:卷积神经网络详解+Python实现卷积神经网络Cifar10彩色图片分类

目录 前言 一、卷积神经网络概述 二、卷积神经网络特点 卷积运算 单通道,二维卷积运算示例 单通道,二维,带偏置的卷积示例 带填充的单通道,二维卷积运算示例 Valid卷积 Same卷积 多通道卷积计算 1.局部感知域 2.参数共…

常见弱口令汇编

一、OA办公系统常见弱口令 (一)致远OA system用户(默认密码:system,对应A8的系统管理员、A6的单位管理员) group-admin(默认密码:123456,对应A8集团版的集团管理员&am…

百度知道本地搭建环境无限制采集聚合【最新版】

本工具是本地php环境搭建,根据关键词进行采集聚合某度知道,不限制ip,最新版新添加了违规词过滤,样式处理,自动匹配优质标题等功能,只需要导入关键词可以无限采集,是养站的好帮手! 功…

BOM与DOM--记录

BOM基础(BOM简介、常见事件、定时器、this指向) BOM和DOM的区别和联系 JavaScript的DOM与BOM的区别与用法详解 DOM和BOM是什么?有什么作用? 图解BOM与DOM的区别与联系 BOM和DOM详解 JavaScript 中的 BOM(浏览器对…

怎么将几张图片做成pdf合在一起

怎么将几张图片做成pdf合在一起?在我们平时的工作中,图片和pdf都是非常重要的电脑文件,使用也非常频繁,图片能够更为直观的展示内容,而pdf则更加的正规,很多重要文件大多会做成pdf格式的。在职场人的日常工…

ThePASS 研究院|探索 Aragon:开创性的 DAO 基础设施实现全面治理

这篇研究文章由 ThePASS 团队呈献,同时感谢 Aragon Growth Guild 的校对。The PASS 是一个开创性的 DAO 聚合器和搜索引擎,目前是最大的DAO数据来源,在为 DAO 提供洞见和分析方面起着关键作用。 介绍 DAO的诞生源于一个简单的理念&#xff…

BLE Mesh蓝牙mesh传输大数据包传输文件照片等大数据量通讯

1、BLE Mesh数据传输现状 BLE Mesh网络技术是低功耗蓝牙的一个进阶版,Mesh扩大了蓝牙在应用中的规模和范围,因为它同时支持超过三万个网络节点,可以跨越大型建筑物,不仅可以使得医疗健康应用更加方便快捷,还能监测像学…

前端三件套速成

一、HTML 1、基本的文档结构 <!doctype html> <html lang"en-US"><head><meta charset"utf-8" /><meta name"viewport" content"widthdevice-width" /><title>My test page</title></…

API文档搜索引擎

导航小助手 一、认识搜索引擎 二、项目目标 三、模块划分 四、创建项目 五、关于分词 六、实现索引模块 6.1 实现 Parser类 6.2 实现 Index类 6.2.1 创建 Index类 6.2.2 创建DocInfo类 6.2.3 创建 Weight类 6.2.4 实现 getDocInfo 和 getInverted方法 6.2.5 实现 …

解析数据库的“四世同堂”,畅聊数据前沿技术!

引言 数据库与大数据一直是技术圈的两个常青领域。PC 时代诞生了最早的关系型数据库&#xff0c;之后数据类型越来越多&#xff0c;出现了各种非关系型数据库。云时代拉开序幕的同时&#xff0c;“大数据”一词也被广泛使用&#xff0c;涵盖海量数据的采集、处理、存储、分析和…

【数模研赛思路】2023华为杯研究生数学建模竞赛选题建议及CDEF题思路

大家好呀&#xff0c;全国研究生数学建模竞赛今天早上开赛啦&#xff0c;在这里先带来初步的选题建议及思路。 目前团队正在写E题完整论文&#xff0c;此外C已经完成了第一问代码及结果&#xff0c;本文章只是一个比较粗略的文字版思路&#xff0c;更加详细的半小时视频讲解版…

React 全栈体系(十二)

第六章 React UI 一、流行的开源 React UI 组件库 1. material-ui(国外) 官网: http://www.material-ui.com/#/github: https://github.com/callemall/material-ui 2. ant-design(国内蚂蚁金服) 官网: https://ant.design/index-cnGithub: https://github.com/ant-design/…

企业虚拟化KVM的三种安装方式(1、完全文本2、模板镜像+配置文件3、gustos图形方式部署安装虚拟机)

一、安装完虚拟机后的操作 第一步: 第二步&#xff1a;分配的内存大一下&#xff0c;处理器多些 第三步&#xff1a;打开虚拟化 打开虚拟机、安装KVM 一般企业如果使用kvm虚拟化平台&#xff0c;都会把物理服务器装成Centos的操作系统&#xff0c;然后装上kvm&#xff0c;创建…

创龙TL6678F开发板: 实现FPGA与DSP之间 SRIO(3.125Gbps, 4x)通信

创龙TL6678F开发板官方Demo:SRIO_AD9613 实现了FPGA和DSP之间的SRIO通信, SRIO的速率为5Gbps. 在FPGA端, srio_gen_2 模块的参考时钟为 125MHz. 而Demo: udp_10g_echo 实现了10G以太网通信, ten_gig_eth_pcs_pma模块的参考时钟为156.25 MHz. 两者共用一个 cdcm61002, 且两个参考…

【100天精通Python】Day67:Python可视化_Matplotlib 绘动画,2D、3D 动画 示例+代码

1 绘制2D动画&#xff08;animation&#xff09; Matplotlib是一个Python绘图库&#xff0c;它提供了丰富的绘图功能&#xff0c;包括绘制动画。要绘制动画&#xff0c;Matplotlib提供了FuncAnimation类&#xff0c;允许您创建基于函数的动画。下面是一个详细的Matplotlib动画示…