Kafka消息队列python开发环境搭建

news2025/1/14 0:47:44

目录

引言

Kafka 的核心概念和组件

Kafka 的主要特性

使用场景

申请云服务器

安装docker及docker-compose

VSCODE配置

开发环境搭建

搭建Kafka的python编程环境

Kafka的python编程示例

引言

Apache Kafka 是一个分布式流处理平台,由 LinkedIn 开发并在 2011 年贡献给 Apache 软件基金会。虽然 Kafka 常被归类为消息队列(也称为消息传递系统或消息中间件),但它实际上提供了比传统消息队列更丰富的功能,特别是在处理大规模数据流方面。Kafka 最初被设计用于处理 LinkedIn 的高吞吐量日志数据,但现在已广泛应用于各种场景,包括网站活动跟踪、日志收集、实时分析、监控数据聚合以及流处理等。

Kafka 的核心概念和组件
  1. Broker(代理):Kafka 集群中的服务器被称为 broker。每个 broker 都可以独立地处理来自生产者的数据,并响应消费者的请求。

  2. Topic(主题):Kafka 中的消息被分类存储在名为 topic 的容器中。每个 topic 可以有多个分区(partition),每个分区都有序地存储消息。

  3. Partition(分区):分区是 Kafka 中实现水平扩展和容错的关键。每个分区可以分布在不同的 broker 上,同时每个分区内的消息都是有序的。

  4. Producer(生产者):生产者负责向 Kafka 集群发送消息到指定的 topic。生产者可以指定消息的键(key),Kafka 使用这个键来决定消息被发送到哪个分区。

  5. Consumer(消费者):消费者从 Kafka 集群订阅 topic 并消费数据。Kafka 支持多个消费者群组(consumer group)同时消费同一个 topic,每个消费者群组内的消费者可以共同分担处理数据的任务。

  6. Consumer Group(消费者群组):同一个消费者群组内的消费者可以并行地消费同一个 topic 的不同分区,但每个分区只能被一个消费者群组内的一个消费者消费,以确保消息的有序性。

  7. Offset(偏移量):Kafka 中的每条消息都有一个唯一的偏移量,用于标识消息在分区中的位置。消费者通过记录自己消费到的偏移量来跟踪消息的读取进度。

Kafka 的主要特性
  • 高吞吐量:Kafka 被设计用来处理高吞吐量的数据,可以轻松处理成千上万条消息/秒。
  • 可扩展性:Kafka 的集群可以通过增加更多的 broker 来水平扩展,以处理更大的数据量和更高的吞吐量。
  • 持久性:Kafka 通过将消息存储在磁盘上来保证消息的持久性,即使在服务器故障的情况下也不会丢失数据。
  • 容错性:Kafka 提供了强大的容错机制,包括自动的副本复制和数据冗余,以确保数据的可靠性和可用性。
  • 实时性:Kafka 支持实时数据处理,使得它可以用于构建实时流处理应用程序。
使用场景
  • 消息传递:作为传统的消息队列使用,支持解耦的生产者和消费者。
  • 网站活动跟踪:收集和分析用户的点击流、搜索查询等网站活动数据。
  • 日志收集:从分布式系统中收集日志数据,用于监控和分析。
  • 实时分析:对实时数据流进行实时处理和分析,以支持实时决策。
  • 事件流处理:处理实时事件流,如传感器数据、金融交易数据等。

申请云服务器

(以京东云为例,阿里云、腾讯云、华为云、天翼云类似)

注意在选择操作系统的时候选择ubuntu22.04或ubuntu20.04

管理员账户root

管理员密码:在安装的时候设置,记住密码

下载安装mobaXterm

https://mobaxterm.mobatek.net/download-home-edition.html

安装docker及docker-compose

#以下只安装一次即可!
sudo apt update
sudo apt install -y docker.io 

# intel x86_64
sudo curl -SL https://github.com/docker/compose/releases/download/v2.21.0/docker-compose-linux-x86_64 \
          -o /usr/local/bin/docker-compose
# 如果github不能访问,可用hub.njuu.cf或521github.com/镜像站替换github.com重试
sudo chmod +x /usr/local/bin/docker-compose #如报错,去掉sudo重试
sudo ln -s /usr/local/bin/docker-compose /usr/bin/docker-compose #如报错,去掉sudo重试

查看docker和dockers是否安装好?

docker version
docker-compose version

VSCODE配置

开发环境搭建

  • 将如下文件保存为docker-compose.yml,并上传至服务器,例如/home/ubuntu/iiot/kafka

  • 将下面代码中的localhost替换为云服务公网IP

version: '3'
services:
  zookeeper:
    image: confluentinc/cp-zookeeper:latest
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
    ports:
      - "2181:2181"
d
  kafka:
    image: confluentinc/cp-kafka:latest
    container_name: kafka-dev
    depends_on:
      - zookeeper
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
    ports:
      - "9092:9092"
  • 上传metaldigi.zip

解压缩文件

sudo apt update

sudo apt install zip

unzip metaladigi.zip

cd metaldigi/kafka
  • 启动kafka消息服务器

    • 在命令行执行消息生产者

docker-compose up -d

搭建Kafka的python编程环境

  • 进入metaldigi文件夹

  • 执行 docker ps

cd metaldigi
docker-compose up -d

Kafka的python编程示例

  • 进入metaldigi文件夹

  • 执行 docker ps

 docker exec -it metal-digi-backend bash

消息生产者

from kafka import KafkaProducer

# 创建一个 Kafka 生产者实例# 这里指定了 Kafka 服务器的地址和端口
producer = KafkaProducer(bootstrap_servers='150.158.11.142:9092')

# 循环发送 10 条消息到 'demo-topic' 主题
for_in range(10):
# 将要发送的消息转换为字节格式
  message =f'message{_}'.encode('utf-8')
  # 发送消息到 Kafka 的 'demo-topic' 主题
  producer.send('demo-topic', value=message)
  # 打印已发送的消息print(f'Sent message: message{_}')

# 关闭生产者实例
producer.close()这段代码创建一个 Kafka 生产者,用于向 Kafka 集群发送消息。它循环发送10条消息到名为 'demo-topic' 的主题。每条消息都是一个简单的文本字符串,转换为字节格式后发送。

消息消费者

from kafka import KafkaConsumer

# 创建一个 Kafka 消费者实例
# 指定 Kafka 服务器地址、端口以及其他一些配置
consumer = KafkaConsumer(
    'demo-topic',  # 指定要消费的主题
    bootstrap_servers='150.158.11.142:9092',  # Kafka 服务器地址和端口
    auto_offset_reset='earliest',  # 从最早的消息开始消费
    enable_auto_commit=True,  # 自动提交偏移量
    group_id='demo-group'  # 消费者组标识
)

# 循环消费并打印收到的消息
for message in consumer:
    # 解码并打印消息内容
    print(f"Received message: {message.value.decode()}")

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1933890.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Linux--网络基础

计算机网络背景 计算机网络背景是一个复杂而丰富的领域,涵盖了从计算机单机模式到网络互联的演变过程,以及网络技术的不断发展和创新。 计算机单机模式和独立发展 在早期,计算机主要以单机模式存在,即每台计算机都是独立的&…

Jupyter Notebook安装及基本使用

Jupyter Notebook安装及基本使用 目录 Jupyter Notebook安装及基本使用方式一:Anaconda直接安装方式二:pip命令安装Jupyter使用虚拟环境 方式一:Anaconda直接安装 安装Anaconda 下载地址,输入邮箱,Windows下载 开始安…

探索Puppeteer的强大功能:抓取隐藏内容

背景/引言 在现代网页设计中,动态内容和隐藏元素的使用越来越普遍,这些内容往往只有在特定的用户交互或条件下才会显示出来。为了有效地获取这些隐藏内容,传统的静态爬虫技术往往力不从心。Puppeteer,作为一个强大的无头浏览器工…

成都亚恒丰创教育科技有限公司 【插画猴子:笔尖下的灵动世界】

在浩瀚的艺术海洋中,每一种创作形式都是人类情感与想象力的独特表达。而插画,作为这一广阔领域中的璀璨明珠,以其独特的视觉语言和丰富的叙事能力,构建了一个又一个令人遐想连篇的梦幻空间。成都亚恒丰创教育科技有限公司 在众多插…

【深度学习】InternVL2-8B,图转文,docker部署

文章目录 基础fastapi服务请求fastapi接口 基础 https://huggingface.co/OpenGVLab/InternVL2-8B#%E7%AE%80%E4%BB%8B InternVL2-26B应该更好,但显存没那么大,只能跑InternVL2-8B了。 下载: cd /ssd/xiedong/InternVL2-26B git clone htt…

Xline 异步运行时IO问题分析

Table of Contents 1. Xline运行时性能问题 2. 异步运行时和阻塞操作 3. Runtime调度问题 4. 性能测试 4.1 测试结果分析 5. 如何正确实现? 6. 何时能够在Runtime上阻塞 7. 总结 在异步运行时上进行编程经常是很困难的,在本篇文章中,我…

万界星空科技电线电缆MES系统实现线缆全流程追溯

MES系统通过高度集成的数据平台,对电线电缆的生产全过程进行实时监控与记录,从原材料入库开始,到生产过程中的各个关键控制点,再到成品出库,每一步操作都被详细记录并可追溯。这种全流程追溯能力主要体现在以下几个方面…

React学习笔记02-----React基本使用

一、React简介 想实现页面的局部刷新,而不是整个网页的刷新。AJAXDOM可以实现局部刷新 1.特点 (1)虚拟DOM 开发者通过React来操作原生DOM,从而构建页面。 React通过虚拟DOM来实现,可以解决DOM的兼容性问题&#x…

Android10.0 锁屏分析-KeyguardPatternView图案锁分析

首先一起看看下面这张图: 通过前面锁屏加载流程可以知道在KeyguardSecurityContainer中使用getSecurityView()根据不同的securityMode inflate出来,并添加到界面上的。 我们知道,Pattern锁所使用的layout是 R.layout.keyguard_pattern_view&a…

【ESP32】打造全网最强esp-idf基础教程——18.ESP32连接MQTT Broker

ESP32连接MQTT Broker 一、MQTT Broker 在开始ESP32编程之前,我们要先来看看公共主流的MQTT服务器可供使用,所谓的公共MQTT服务器就是一些网站给我们提供了在线的MQTT Broker,我可以直接利用其进行 MQTT 学习、测试甚至是小规模使用&…

表格竖向展示

最近在做手机端web页面,页面中需要有个表格来显示数据,但是由于数据太多页面太窄,table展示横向滑动的话感觉很丑。所以让表格竖向显示了 具体页面如下: 实现代码:当然代码里面绑定的数据啊什么的你都可以修改为自己的内容&#…

【软件建模与设计】-05-软件建模和设计方法概览

目录 1、COMET基于用例的软件生命周期 1.1、需求建模 1.2、分析建模 1.3、设计建模 1.4、增量软件构建 1.5、增量软件集成 1.6、系统测试 2、COMET与其他软件过程比较 2.1、与RUP对比 2.2、与螺旋模型对比 3、需求、分析和设计建模 3.1、需求建模活动 3.2、分析建…

机器学习入门【经典的CIFAR10分类】

模型 神经网络采用下图 我使用之后发现迭代多了之后一直最高是正确率65%左右,然后我自己添加了一些Relu激活函数和正则化,现在正确率可以有80%左右。 模型代码 import torch from torch import nnclass YmModel(nn.Module):def __init__(self):super(…

【香橙派】Orange pi AIpro开发板评测,与树莓派的横向对比以及实机性能测试

一、前言 在人工智能领域飞速发展的时代,国产厂商们也是紧随时代的步伐,迅龙公司联合华为推出了一款全新的开发板 Orange pi AIpro 作为一款建设人工智能新生态的开发板,它可广泛适用于AI边缘计算、深度视觉学习及视频流AI分析、视频图像分析…

ssh远程登录另一台linux电脑

大部分的博客内容所说的安装好ssh服务后,terminal输入 ssh -p port_number clientnameserver_ip 之后输入密码等等就可以登上别人的电脑 但是这是有一个前提的,就是这两台电脑要在同一个局域网下面。 如果很远呢? 远到不在同一个网下面怎么办…

【智能算法应用】粒子群算法求解带出入点车间布局设计问题

目录 1.算法原理2.数学模型3.结果展示4.参考文献5.代码获取 1.算法原理 【智能算法】粒子群算法(PSO)原理及实现 设施布局问题(Facility Layout Problem, FLP),主要目的是在给定的区域内有效地放置不同设备或部件&am…

大模型学习笔记十一:视觉大模型

一、判别式模型和生成式模型 1)判别式模型Discriminative ①给某一个样本,判断属于某个类别的概率,擅长分类任务,计算量少。(学习策略函数Y f(X)或者条件概率P(YIX)) ②不能反映训练数据本身的特性 ③学习…

JavaScript学习笔记(九)

56、JavaScript 类 56.1 JavaScript 类的语法 请使用关键字 class 创建一个类。 请始终添加一个名为 constructor() 的方法。 JavaScript 类不是对象。 它是 JavaScript 对象的模板。 语法: class ClassName {constructor() { ... } }示例:例子创…

【无人值守】对数据中心电力分配系统发展的影响

数据中心在现代信息发展中承载着巨量数据的计算、存储、挖掘、分析和应用等多个方面的功能,是国计民生各行业的多样化的信息化的资产。对稳定的运行与安全运维是基本需求也是重要的保障。 数据中心属于高能耗产业,对用电负荷大且要求极度稳定。除了对电力…

一文-深入了解Ansible常见模块、安装和部署

1 Ansible 介绍 Ansible是一个配置管理系统configuration management system, python 语言是运维人员必须会的语言, ansible 是一个基于python 开发的(集合了众多运维工具 puppet、cfengine、chef、func、fabric的优点)自动化运维工具, 其功能实现基于ss…