Java技术栈 —— Spark入门(三)之实时视频流

news2024/11/24 18:22:09

Java技术栈 —— Spark入门(三)之实时视频流转灰度图像

  • 一、将摄像头数据发送至kafka
  • 二、Kafka准备topic
  • 三、spark读取kafka图像数据并处理
  • 四、本地显示灰度图像(存在卡顿现象,待优化)

项目整体结构图如下

在这里插入图片描述

参考文章或视频链接
[1] Architecture-for-real-time-video-streaming-analytics

一、将摄像头数据发送至kafka

这个代码将运行在你有摄像头的机器上,缺依赖就装依赖

import cv2
import kafka
import numpy as np

# 设置 Kafka Producer
# 注意修改你的kafka地址
producer = kafka.KafkaProducer(bootstrap_servers='localhost:9092')

# 打开摄像头(0 为默认摄像头)
cap = cv2.VideoCapture(0)

while True:
    # 从摄像头捕获帧
    ret, frame = cap.read()
    if not ret:
        break
    
    # 将图像编码为 JPEG 格式
    _, buffer = cv2.imencode('.jpg', frame)

    # 将图像作为字节数组发送到 Kafka
    producer.send('camera-images', buffer.tobytes())

    # 显示当前捕获的帧
    cv2.imshow('Video', frame)

    # 按 'q' 键退出
    if cv2.waitKey(1) & 0xFF == ord('q'):
        break

# 释放资源
cap.release()
cv2.destroyAllWindows()
producer.close()

二、Kafka准备topic

在准备topic之前,要先配置kafka中的config/server.properties文件,否则其它机器无法联通kafka,配置好后重启kafka。

# 找到这两个选项并修改成如下内容
listeners=PLAINTEXT://0.0.0.0:9092
# 改成你的kafka所在服务器ip
advertised.listeners=PLAINTEXT://{your_ip}:9092

如果你之前创建过topic,那就清空这些topic中的数据

# 设置保留时间为0,相当于立即清空数据
#bin/kafka-configs.sh --bootstrap-server localhost:9092 --alter --entity-type topics --entity-name {your_topic_name} --add-config retention.ms=0
# 恢复原始保留设置,立即清空数据后,将数据的保留时间恢复至原有状态
#bin/kafka-configs.sh --bootstrap-server localhost:9092 --alter --entity-type topics --entity-name {your_topic_name} --add-config retention.ms=604800000


开始正式创建topic

# 创建输入图片所在topic
bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic camera-images --partitions 1 --replication-factor 1
# 创建输出的gray灰度图片所在topic
bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic result-gray-images --partitions 1 --replication-factor 1

# 准备好后查看下topic list进行验证
bin/kafka-topics.sh --bootstrap-server localhost:9092 --list
# 查看某topic中的数据
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic {your_topic_name} --from-beginning

三、spark读取kafka图像数据并处理

首先给你的spark脚本所运行的python环境(这个环境一般可以为conda等虚拟环境),安装必要的依赖库

pip install opencv-python-headless

准备脚本文件

from pyspark.sql import SparkSession
from pyspark.sql.functions import udf
from pyspark.sql.types import BinaryType
import cv2
import numpy as np

bootstrapServers = "localhost:9092"

# 创建 SparkSession
spark = SparkSession.builder \
    .appName("Kafka-Spark-OpenCV") \
    .getOrCreate()

# 初始化 Kafka Producer,用于发送处理后的图像
# 如果不这样做,会出现PicklingError,因为如果UDF中,包含了无法被序列化的对象,例如线程锁(_thread.RLock)或 Kafka 的 KafkaProducer 实例,序列化就会失败。
# 因此,在每个执行器内部,创建 KafkaProducer 实例
producer = None

# 从 Kafka 读取数据流
df = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "localhost:9092") \
  .option("subscribe", "camera-images") \
  .load()

# UDF 用于将图像转换为灰度
def convert_to_gray(image_bytes):
    global producer

    # 创建 KafkaProducer 实例(在每个执行器上只初始化一次)
    if producer is None:
        producer = KafkaProducer(bootstrap_servers = bootstrapServers)

    # 将字节数组转换为 numpy 数组
    nparr = np.frombuffer(image_bytes, np.uint8)
    # 将 numpy 数组解码为图像
    img = cv2.imdecode(nparr, cv2.IMREAD_COLOR)
    # 将图像转换为灰度
    gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
    # 将灰度图像编码为 JPEG
    _, buffer = cv2.imencode('.jpg', gray)
    
    # 将处理后的图像发送到 Kafka 'result-gray-images' 主题
    producer.send('result-gray-images', buffer.tobytes())
    
    return buffer.tobytes()

# 注册 UDF
convert_to_gray_udf = udf(convert_to_gray, BinaryType())

# 应用 UDF 对数据进行灰度化处理
gray_df = df.withColumn("gray_image", convert_to_gray_udf("value"))

# 将处理后的数据写入文件或其他输出
query = gray_df.writeStream \
    .outputMode("append") \
    .format("console") \
    .start()
    
# query = gray_df\
#     .writeStream \
#     .format('kafka') \
#     .outputMode('update') \
#     .option("kafka.bootstrap.servers", bootstrapServers) \
#     .option('checkpointLocation', '/spark/job-checkpoint') \
#     .option("topic", "result-gray-images") \
#     .start()

query.awaitTermination()

spark-submit提交脚本文件:

# 1.提高内存
# 2.调整 Kafka 批次大小,减少单个批次的数据量,从而降低内存使用(这个步骤存疑)
/opt/spark-3.5.2-bin-hadoop3/bin/spark-submit \
--executor-memory 4g \
--driver-memory 4g \
--conf "spark.kafka.maxOffsetsPerTrigger=1000" \
--packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.2,org.apache.kafka:kafka-clients:3.5.2 \
/opt/spark-3.5.2-bin-hadoop3/jobs/pyjobs/kafka_to_spark.py

四、本地显示灰度图像(存在卡顿现象,待优化)

import cv2
import numpy as np
from kafka import KafkaConsumer

# 设置 Kafka Consumer
consumer = KafkaConsumer(
    'result-gray-images',
    bootstrap_servers='{your_kafka_ip}:9092',
    auto_offset_reset='latest',
    enable_auto_commit=True,
    # group_id='image-display-group'
)

# 从 Kafka 主题读取灰度图像并显示
for message in consumer:
    # print("reading gray image.... ")
    # 将消息转换为 numpy 数组
    nparr = np.frombuffer(message.value, np.uint8)
    # 解码为图像
    gray_img = cv2.imdecode(nparr, cv2.IMREAD_GRAYSCALE)
    # 显示灰度图像
    cv2.imshow('Gray Video', gray_img)
    if cv2.waitKey(1) & 0xFF == ord('q'):
        break

# 释放资源
cv2.destroyAllWindows()
consumer.close()

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2091304.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

RocketMQ学习(二)

文章目录 1. 案例介绍1.1 业务分析1)下单2)支付 1.2 问题分析问题1如何保证数据的完整性?使用MQ保证在下单失败后系统数据的完整性 问题2如何处理第三方支付平台的异步通知通过MQ进行数据分发,提高系统处理性能 2. 技术分析2.1 技…

Visual Studio 快速跳转至特定程序行的快捷键

Visual Studio 快速跳转至特定程序行的快捷键 linuxWindows在Visual Studio中在Visual Code中在Notepad中Win11中的普通记事本 总结 linux :numWindows Ctrl G在Visual Studio中 在Visual Code中 Ctrl G也是可以的 在Notepad中 Ctrl G也是可以的 Win11中的普通记事本…

模型 生产微笑曲线

系列文章 分享 模型,了解更多👉 模型_思维模型目录。产业链中,研发设计和品牌营销环节附加值高,制造环节附加值低。 1 生产微笑曲线的应用 1.1 大杨集团的“微笑曲线”到“武藏曲线”转型 武藏曲线简介说明:在制造业…

JavaScript的对象详解

作为程序员,我们常常会听见一种说法,那就是面向对象编程。那到底什么是对象呢?有改如何面向对象编程呢?今天我们就来详细讲讲 什么是对象呢? 对象是JavaScript中一个非常重要的概念,这是因为对象可以将多个…

Java项目怎么从零部署到Linux服务器上?

目录 一.Java环境(JDK)安装 二.数据库(MySQL)安装 三.部署上线 ▐ 部署Jar包 ▐ 运行程序 ▐ 开放端口 一个Java项目首先需要一个支持它编译的Java环境,因此首先要保证服务器上安装的有相应的JDK 一.Java环境&a…

为什么使用雪花算法,有什么优缺点,如何解决?为什么不使用UUID的方法,如何解决系统回拨的问题?

为什么使用雪花算法,有什么优缺点,如何解决?为什么不使用UUID的方法,如何解决系统回拨的问题? 生成的id应该满足下面的条件: 首先是全局唯一,不能出现重复的ID之后是总体应该是递增的&#xf…

8个平面设计必备素材网站,免费下载。

平面设计师应该去哪里找免费可商用素材网站?我推荐这8个,赶紧收藏好。 1、菜鸟图库 菜鸟图库-免费设计素材下载 菜鸟图库是一个非常大的素材库,站内包含设计、办公、自媒体、图片、电商等各行业素材。网站还为新手设计师提供免费的素材&…

基于django的失物招领系统的设计与实现/ 基于Python的失物招领系统的设计与实现/失物招领管理系统

失物招领系统的设计与实现 摘要:伴随着我国全面推动信息化的趋势,我国的很多行业都在朝着互联网的方向进发。结合计算机技术的失物招领系统能够很好地解决传统失物招领存在的问题,能够提高管理员管理的效率,改善服务质量。优秀的失物招领系统…

感染了后缀为.Wormhole勒索病毒如何应对?数据能够恢复吗?

引言: 在当今日益复杂的网络安全环境中,勒索病毒成为了企业和个人面临的一大威胁。其中,.Wormhole勒索病毒以其独特的传播机制和强大的加密能力,尤为引人注目。本文将深入探讨.Wormhole勒索病毒的特点、感染途径、危害以及相应的…

XSS LABS 靶场初识

关注这个靶场的其他相关笔记:XSS - LABS —— 靶场笔记合集-CSDN博客 0x01:XSS LABS 靶场简介 XSS LABS 靶场是一个专注于跨站脚本攻击(Cross-Site Scripting,XSS)教育和训练的平台。平台中有一系列精心设计的关于 XS…

若依框架 MyBatis 改为 MyBatis-Plus 的实现步骤

本文只做了简单的实现,具体的细节需根据自己的需求进一步实现。如果实现中遇到问题欢迎留言讨论。 引入 MyBatis-Plus 引入相关依赖(pom.xml) 推荐先直接在顶级 pom.xml 中直接依赖,等调试通过之后,在去按需依赖&…

【hot100篇-python刷题记录】【三数之和】

R7-双指针篇 思路: 三个元素,代表需要3个指针,利用双指针收缩的思想,我们可以设置k,i,j三个元素指针。 k代表最外层的循环,循环一遍数组。(为了降低时间复杂度以及搜索难度,我们先将nums sort…

移动硬盘文件夹变成白色无法正常访问,怎么恢复?

在使用移动硬盘时,有时会遇到文件夹变白的情况。这通常意味着文件夹已经损坏或无法正常访问。本文将分析移动硬盘文件夹变白的原因,并提供相应的恢复方法。 一、原因分析 文件系统损坏:移动硬盘的文件系统可能因多种原因而损坏,如…

001集——CAD—C#二次开发入门——开发环境基本设置

CAD C#二次开发首先需要搭建一个舒服的开发环境,软件安装后,需要修改相关设置。本文为保姆级入门搭建开发环境教程,默认已成功安装vs和cad 。 第一步:创建类库 第二步:进行相关设置,如图: 下一…

milvus资源限制 benchmarker压测 qps优化

根据milvus 资源限制的官网,我们得出百万数据资源限制。 1.dev 环境 对接不同的配置最大的qps 如下(dev的机器内存很小) 2.于是认为当前的性能是匹配的,然后加上资源限制,配置 压测结果如下 {"run_id": "13292982fee74f64…

基于springboot+vue的民族文化推广系统设计与实现---附源码92323

摘 要 在全球化和信息化日益加深的当下,保护和推广民族文化显得尤为重要。民族文化不仅是一个国家或地区的独特标识,更是其历史、传统和智慧的结晶。然而,随着现代社会的快速发展,许多传统文化和习俗面临着被遗忘和消失的风险。因…

ssh---配置密钥对验证

1.在客户端创建密钥对 ssh-keygen -t ecdsa秘钥存放位置(生成密钥时的用户的工作目录下) 2.将公钥文件上传至服务器 3.在服务器中导入公钥文本 4.在客户机设置ssh代理功能,实现免交互登录 5.测试

Python自适应光学模态星形小波分析和像差算法

🎯要点 🎯星形小波分析像差测量 | 🎯对比傅里叶和小波分析 | 🎯定义多尺度图像质量度量,矩阵数据 | 🎯像差校正算法 | 🎯受激发射损耗显微镜布局 | 🎯干涉仪分支校准,求…

Java 虚方法表(虚函数)

虚方法表 Java 中的虚方法表(Virtual Method Table, VMT)是实现动态方法分派和多态的重要机制。它帮助 Java 运行时系统(JVM)决定在继承体系中调用哪一个方法的具体实现。 什么是虚方法表? 虚方法表是一个类的内部数…

Linux学习笔记(4)----通过网口灯判断网速是千兆还是百兆

网卡PHY 移植注意事项 注意RTL8211F的LED0,LED1,LED2,软件是可以自定义的,比如百兆,千兆,是亮哪个灯,黄灯或者绿灯,还有传输时是闪烁哪个灯,要注意硬件上是怎么驱动灯的…