Python 版分布式消息队列 Kafka 实现图片数据传输

news2025/2/7 12:43:31

1、Kafka 介绍

在使用 Kafka 之前,通常需要先安装和配置 ZooKeeper。ZooKeeper 是 Kafka 的依赖项之一,它用于协调和管理 Kafka 集群的状态。

ZooKeeper 是一个开源的分布式协调服务,它提供了可靠的数据存储和协调机制,用于协调分布式系统中的各个节点。Kafka 使用 ZooKeeper 来存储和管理集群的元数据、配置信息和状态。

2、Kafka 环境搭建

环境:

  • Windows11
  • Java 1.8 及以上
  • Anaconda
  • Python10
  • Kafka 2.0.2 (kafka-python)
2.1、安装 Python 版本 Kafka
pip install kafka-python -i https://pypi.tuna.tsinghua.edu.cn/simple

至此,Windows 环境下还不能运行 Kafka,一般情况下,程序会提示超时(60ms)等报错。原因是,还需要启动 Kafka 服务。

2.2、启动 Kafka 服务

从 Kafka 官网下下载对应的文件:Apache Kafka 官网下载地址

在这里插入图片描述

下载红色箭头所指向的文件到本地并解压。

在这里插入图片描述

注意:

从 Kafka 官网上下载的 kafka_2.12-3.2.1 文件需要放置在路径较浅文件夹下解压,一旦放置的路径较深,会报错:

在这里插入图片描述

输入行太长。
命令语法不正确。

本案例放在 E 盘下。

2.2.1、启动 Zookeeper 服务

在上图路径下打开 cmd 命令窗口,执行如下命令:

.\bin\windows\zookeeper-server-start.bat .\config\zookeeper.properties

在这里插入图片描述

出现如下信息,表示 Zookeeper 服务启动成功:

在这里插入图片描述

2.2.2、启动 Kafka 服务

在上图路径下打开 cmd 命令窗口,执行如下命令:

.\bin\windows\kafka-server-start.bat .\config\server.properties

出现如下信息,表示 Kafka 服务启动成功:

在这里插入图片描述

3、构建图片传输队列

在这里插入图片描述

3.1、配置文件

Properties/config.yaml:

kafka:
 host: "127.0.0.1"
 port: 9092
 parameter:
   bootstrap_servers: '127.0.0.1:9092'
   api_version: "2.5.0"
   log_path: "KafkaLog/log.txt"
workspace:
  path: "E:/harrytsz-workspace/harrytsz-python/DistributedSystemDemo00"
input:
  images_path: "DataSource/Images"
output:
  output_path: "DataSource/Output"
3.2、Kafka 创建分区

KafkaModule/ProducerConsumer/KafkaClient.py:

from kafka.admin import KafkaAdminClient, NewPartitions


client = KafkaAdminClient(bootstrap_servers="127.0.0.1:9092")

# 在已有的 topic 中创建分区
new_partitions = NewPartitions(3)
client.create_partitions({"kafka_demo": new_partitions})
3.3、生产者、消费者(单线程版)

生产者:

KafkaModule/ProducerConsumer/KafkaDemoProducer.py:

# -*- coding: utf-8 -*-
import json
import yaml
import base64
import os.path
import logging
import random
import traceback
from kafka.errors import kafka_errors
from kafka import KafkaProducer

def producer_demo(cfg):
    """
    :param cfg:
    :return:
    """
    # 假设生产的消息为键值对(不是一定要键值对),且序列化方式为json
    producer = KafkaProducer(bootstrap_servers=cfg['kafka']['parameter']['bootstrap_servers'],
                             key_serializer=lambda k: json.dumps(k).encode(),
                             value_serializer=lambda v: json.dumps(v).encode())
    logging.info("Kafka Producer Starting")
    images_path = cfg['input']['images_path']
    workspace_path = cfg['workspace']['path']
    for i, img in enumerate(os.listdir(os.path.join(workspace_path, images_path))):
        print(f"img: {img}")
        workspace_path = cfg['workspace']['path']
        image_path = os.path.join(workspace_path, images_path, img)
        with open(image_path, "rb") as image_file:
            image_data = image_file.read()
        encode_image = base64.b64encode(image_data)
        json_data = encode_image.decode("utf-8")
        json_string = json.dumps(json_data)

        future = producer.send('kafka_demo',
                               key=str(i),  # 同一个key值,会被送至同一个分区
                               value=json_string,
                               partition=random.randint(0, 2))  # 向分区1发送消息
        producer.flush()
        logging.info("Send {}".format(str(i)))
        try:
            future.get(timeout=10)  # 监控是否发送成功
        except kafka_errors:  # 发送失败抛出 kafka_errors
            traceback.format_exc()

def process():
    with open(os.path.expanduser("E:/harrytsz-workspace/harrytsz-python/DistributedSystemDemo00/"
                                 "Properties/config.yaml"), "r") as config:
        cfg = yaml.safe_load(config)
    logging.basicConfig(filename=cfg['kafka']['parameter']['log_path'],
                        format='%(asctime)s - %(name)s - %(levelname)s - %(message)s - %(funcName)s',
                        level=logging.INFO)
    producer_demo(cfg)

if __name__ == '__main__':
    process()

消费者:

KafkaModule/ProducerConsumer/KafkaDemoConsumer.py:

import json
import yaml
import base64
import logging
import os.path
from kafka import KafkaConsumer

def consumer_demo0(cfg):
    """
    :param cfg:
    :return:
    """
    consumer = KafkaConsumer('kafka_demo',
                             bootstrap_servers=cfg['kafka']['parameter']['bootstrap_servers'],
                             api_version=cfg['kafka']['parameter']['api_version'],
                             group_id='test')
    logging.info("consumer_demo0 starting")
    for message in consumer:
        key_json_string = json.loads(message.key.decode())
        value_json_string = json.loads(message.value.decode())
        name_data = "test0" + key_json_string + ".jpg"
        image_data = base64.b64decode(value_json_string)
        logging.info(f"Receiving {name_data} data.")
        workspace_path = cfg['workspace']['path']
        output_path = cfg['output']['output_path']
        image_path = os.path.join(workspace_path, output_path, name_data)
        with open(image_path, 'wb') as jpg_file:
            jpg_file.write(image_data)
            logging.info(f"Save {name_data} data finished.")

def process():
    """
    :return:
    """
    with open(os.path.expanduser("E:/harrytsz-workspace/harrytsz-python/DistributedSystemDemo00/"
                                 "Properties/config.yaml"), "r") as config:
        cfg = yaml.safe_load(config)
    logging.basicConfig(filename=cfg['kafka']['parameter']['log_path'],
                        format='%(asctime)s - %(name)s - %(levelname)s - %(message)s - %(funcName)s',
                        level=logging.INFO)
    consumer_demo0(cfg)

if __name__ == '__main__':
    process()
3.4、生产者、消费者(线程池版)

生产者:

KafkaModule/ProducerConsumer/KafkaDemoProducerMultiThread.py:

# -*- coding: utf-8 -*-
import json
import yaml
import base64
import os.path
import logging
import random
import traceback
from kafka.errors import kafka_errors
from kafka import KafkaProducer

def producer_demo(cfg):
    """
    :param cfg:
    :return:
    """
    # 假设生产的消息为键值对(不是一定要键值对),且序列化方式为json
    producer = KafkaProducer(bootstrap_servers=cfg['kafka']['parameter']['bootstrap_servers'],
                             key_serializer=lambda k: json.dumps(k).encode(),
                             value_serializer=lambda v: json.dumps(v).encode())

    logging.info("Kafka Producer Starting")

    images_path = cfg['input']['images_path']
    workspace_path = cfg['workspace']['path']

    for i, img in enumerate(os.listdir(os.path.join(workspace_path, images_path))):
        print(f"img: {img}")
        workspace_path = cfg['workspace']['path']
        image_path = os.path.join(workspace_path, images_path, img)

        with open(image_path, "rb") as image_file:
            image_data = image_file.read()

        encode_image = base64.b64encode(image_data)
        json_data = encode_image.decode("utf-8")
        json_string = json.dumps(json_data)

        future = producer.send('kafka_demo',
                               key=str(i),  # 同一个key值,会被送至同一个分区
                               value=json_string,
                               partition=random.randint(0, 2))  # 向分区1发送消息
        producer.flush()
        logging.info("Send {}".format(str(i)))
        try:
            future.get(timeout=10)  # 监控是否发送成功
        except kafka_errors:  # 发送失败抛出 kafka_errors
            traceback.format_exc()

def process():
    with open(os.path.expanduser("E:/harrytsz-workspace/harrytsz-python/DistributedSystemDemo00/"
                                 "Properties/config.yaml"), "r") as config:
        cfg = yaml.safe_load(config)
    logging.basicConfig(filename=cfg['kafka']['parameter']['log_path'],
                        format='%(asctime)s - %(name)s - %(levelname)s - %(message)s - %(funcName)s',
                        level=logging.INFO)
    producer_demo(cfg)

if __name__ == '__main__':
    process()

消费者:

KafkaModule/ProducerConsumer/KafkaDemoConsumerMultiThread.py:

import json
import yaml
import base64
import logging
import os.path
from kafka import KafkaConsumer
from concurrent.futures import ThreadPoolExecutor, as_completed

def consumer_demo0(cfg, thread_id):
    """ 线程池版的消费者
    :param cfg: 配置文件
    :param thread_id: 线程序号
    :return:
    """
    consumer = KafkaConsumer('kafka_demo',
                             bootstrap_servers=cfg['kafka']['parameter']['bootstrap_servers'],
                             api_version=cfg['kafka']['parameter']['api_version'],
                             group_id='test')

    logging.info("consumer_demo0 starting")
    for message in consumer:
        key_json_string = json.loads(message.key.decode())
        value_json_string = json.loads(message.value.decode())
        name_data = f"test_{thread_id}_" + key_json_string + ".jpg"
        image_data = base64.b64decode(value_json_string)
        logging.info(f"Receiving {name_data} data.")
        workspace_path = cfg['workspace']['path']
        output_path = cfg['output']['output_path']
        image_path = os.path.join(workspace_path, output_path, name_data)
        with open(image_path, 'wb') as jpg_file:
            jpg_file.write(image_data)
            logging.info(f"Save {name_data} data finished.")

def process():
    """
    :return:
    """
    with open(os.path.expanduser("E:/harrytsz-workspace/harrytsz-python/DistributedSystemDemo00/"
                                 "Properties/config.yaml"), "r") as config:
        cfg = yaml.safe_load(config)

    logging.basicConfig(filename=cfg['kafka']['parameter']['log_path'],
                        format='%(asctime)s - %(name)s - %(levelname)s - %(message)s - %(funcName)s',
                        level=logging.INFO)
    # 线程池
    thread_pool_executor = ThreadPoolExecutor(max_workers=5, thread_name_prefix="thread_test_")

    all_task = [thread_pool_executor.submit(consumer_demo0, cfg, i) for i in range(10)]
    for future in as_completed(all_task):
        res = future.result()
        print("res", str(res))
    thread_pool_executor.shutdown(wait=True)

if __name__ == '__main__':
    process()

运行顺序:

  • 首先运行 KafkaDemoConsumer.py 或者 KafkaDemoConsumerMultiThread.py
  • 然后运行 KafkaDemoProducer.py 或者 KafkaDemoProducerMultiThread.py
  • DataSource/Output 中会接受生产者发送的图片数据,ProducerConsumer/KafkaLog 路径也会产生运行日志。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1598377.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

libcurl 简单使用

LibCurl是一个开源的免费的多协议数据传输开源库,该框架具备跨平台性,开源免费,并提供了包括HTTP、FTP、SMTP、POP3等协议的功能,使用libcurl可以方便地进行网络数据传输操作,如发送HTTP请求、下载文件、发送电子邮件等…

【计算机毕业设计】物流管理系统设计与实现——后附源码

🎉**欢迎来到琛哥的技术世界!**🎉 📘 博主小档案: 琛哥,一名来自世界500强的资深程序猿,毕业于国内知名985高校。 🔧 技术专长: 琛哥在深度学习任务中展现出卓越的能力&a…

Python(11):网络编程

文章目录 一、一些基本概念二、软件的开发架构(c/s架构和b/s架构)三、OSI模型四、socket套接字编程1.socket编程过程2.python中的socket编程 一、一些基本概念 来了解一些网络的基本概念 名词解释IP(互联网协议地址)IP用来标识网…

归并排序详解(附代码)

归并排序 数据科学家每天都在处理算法。 然而,数据科学学科作为一个整体已经发展成为一个不涉及复杂算法实现的角色。 尽管如此,从业者仍然可以从建立对算法的理解和知识库中受益。 在本文中,对排序算法归并排序进行了介绍、解释、评估和实…

hds更换电源操作

HDS更换电源 1、 查看损坏的电源 2.选中电源 3、 如下图所示,选择Execute 4、选择ok,表示为防止静电引起的严重故障,请务必在手腕上佩戴腕带,并将腕带的另一侧的接地夹连接到机柜架上。 5、 选择YES,提示你的手…

Linux安装docker(含Centos系统和Ubuntu系统)

一、Centos系统 1. 卸载旧版本依赖 sudo yum remove docker \docker-client \docker-client-latest \docker-common \docker-latest \docker-latest-logrotate \docker-logrotate \docker-engine 2. 设置仓库 安装所需的软件包。yum-utils 提供了 yum-config-manager &…

高负压采样器

你的未来是你自己创造的,你的路是你自己选择的。走向成功,需要你的勇气和决心,成功不是得到多少,而是付出了多少。当你还在努力时,不要忘记身边的风景——鹤壁永成在你身边 一、高负压瓦斯采取器的用途: 高…

OpenHarmony实战开发-如何使用AKI轻松实现跨语言调用。

介绍 针对JS与C/C跨语言访问场景,NAPI使用比较繁琐。而AKI提供了极简语法糖使用方式,一行代码完成JS与C/C的无障碍跨语言互调,使用方便。本示例将介绍使用AKI编写C跨线程调用JS函数场景。通过调用C全局函数,创建子线程来调用JS函…

C++入门之类和对象

C入门之类和对象 文章目录 C入门之类和对象1. 类的6个默认对象2. 构造函数2.1 概念2.2 特性2.3 补丁 3. 析构函数3.1 概念3.2 特性3.3 总结 4. 拷贝构造函数4.1 概念4.2 特性4.3 总结 1. 类的6个默认对象 如果一个类中什么都没有,那么这个类就是一个空类。但是&…

UE5 C++ TimeLine 时间轴练习

一. Actor引入头文件 #include "Components/TimelineComponent.h" 声明CurveFloat 和 TimelineComponent UPROPERTY(EditAnywhere,BlueprintReadWrite,Category "MyCurve")UCurveFloat* MyCurveFloat;UPROPERTY(EditAnywhere, BlueprintReadWrite, Cate…

科技助力上亿用户隐私安全保护,合合信息两款产品再获CCIA PIA星级标识

随着互联网技术的飞速发展,个人信息的收集、存储、使用和传输变得日益频繁,其泄露和滥用的风险也随之增加,个人信息保护已成为社会共同关注的热点议题。近期,“中国网络安全产业联盟(CCIA)数据安全工作委员…

密码学 | 椭圆曲线数字签名方法 ECDSA(上)

目录 1 ECDSA 是什么? 2 理解基础知识 3 为什么使用 ECDSA? 4 基础数学和二进制 5 哈希 6 ECDSA 方程 7 点加法 8 点乘法 9 陷阱门函数! ⚠️ 原文:Understanding How ECDSA Protects Your Data. ⚠️ 写在前面…

OpenHarmony轻量系统开发【9】WiFi之STA模式连接热点

9.1AT指令操作WiFi 我们可以使用AT指令进行Hi3861 WiFi操作,连接热点、ping服务器等。 但是很多时候,我们需要实现开机后自动连接到某个热点,光靠AT指令不行。 Hi3861 为我们提供了WiFi操作的相关API,方便我们编写代码&#xff0…

GitLab 安全漏洞 CVE-2022-1162 如何解决?

本文来自极狐GitLab 官方公众号【极狐GitLab】,原文链接:https://mp.weixin.qq.com/s/JVpA14HHWgt58s3vM5TRcA。 GitLab 是一个全球知名的一体化 DevOps 平台,很多人都通过私有化部署 GitLab 来进行源代码托管。极狐GitLab :https…

文字转语音工具:GPT-SoVITS

诸神缄默不语-个人CSDN博文目录 OpenAI官方的TTS模型我在这篇博文中给出了使用教程:ChatGPT 3.5 API的调用不全指南(持续更新ing…) - 知乎 但是OpenAI的TTS对中文支持不好,有一种老外说中文的美,所以本文介绍另一个…

2024软考中项考哪个版本?应该该如何备考?

2024年1月,备受瞩目的软考中级系统集成项目管理工程师官方教程终于迎来了久违的大改版。为确保广大考生能够有充足的准备时间,软考中项的考试时间被顺延至同年11月,届时,这也将成为软考中项首次依据第3版考纲进行的考试。 新教材核…

数图智慧零售解决方案,赋能零售行业空间资源价值最大化

数图智慧零售解决方案 赋能零售行业空间资源价值最大 在激烈的市场竞争中,如何更好地提升空间资源价值,提高销售额,成为行业关注的焦点。近日,NIQ发布的《2024年中国饮料行业趋势与展望》称,“在传统零售业态店内&…

CSS特效---跳动的文字

1、演示 2、一切尽在代码中 <!--* Author: your name* Date: 2023-10-03 14:42:44* LastEditTime: 2023-10-03 14:56:26* LastEditors: DESKTOP-536UVPC* Description: In User Settings Edit* FilePath: \css-special-effects\跳动的文字.html --> <!DOCTYPE html>…

ARM看门狗定时器

作用 在S3C2440A中&#xff0c;看门狗定时器的作用是当由于噪声和系统错误引起的故障干扰时恢复控制器的工作。 也就是说&#xff0c;系统内部的看门狗定时器需要在指定时间内向一个特殊的寄存器内写入一个数值&#xff0c;俗称喂狗。 如果喂狗的时间过了&#xff0c;那么看门…

行式存储VS列式存储对比

行式存储&#xff1a; 一行代表一个记录的所有字段。 可以快速读取和写入单条记录。 如果要检索一条数据&#xff0c;数据库会读取or写入整条记录&#xff0c;包含所有相关字段。 列式存储&#xff1a; 表中每一列的数据连续存放。这种方式在需要对某一列进行大量运算或分析时…