用pyspark把kafka主题数据经过etl导入另一个主题中的有关报错

news2025/1/20 15:16:08

首先看一下我们的示例代码

import os
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
"""
------------------------------------------
  Description : TODO:
  SourceFile : etl_stream_kafka
  Author  : zxx
  Date  : 2024/11/14
-------------------------------------------
"""
if __name__ == '__main__':
    os.environ['JAVA_HOME'] = 'D:/bigdata/03-java/java-8/jdk'
    # 配置Hadoop的路径,就是前面解压的那个路径
    os.environ['HADOOP_HOME'] = 'D:/bigdata/04-Hadoop/hadoop/hadoop-3.3.1/hadoop-3.3.1'
    # 配置base环境Python解析器的路径
    os.environ['PYSPARK_PYTHON'] = 'D:/bigdata/22-spark/Miniconda3/python.exe'  # 配置base环境Python解析器的路径
    os.environ['PYSPARK_DRIVER_PYTHON'] = 'D:/bigdata/22-spark/Miniconda3/python.exe'
    spark = SparkSession.builder.master("local[2]").appName("etl_stream_kafka").config(
        "spark.sql.shuffle.partitions", 2).getOrCreate()
    # 连接kafka
    readDF = spark.readStream.format("kafka") \
    .option("kafka.bootstrap.servers", "bigdata01:9092") \
    .option("subscribe", "topicA") \
    .load()

    # 使用DSL语句
    etlDF = readDF.selectExpr("cast(value as STRING)").filter(F.col("value").contains("success"))

    etlDF.writeStream \
    .format("kafka") \
        .option("kafka.bootstrap.servers", "bigdata01:9092") \
        .option("topic", "etlTopic") \
        .option("checkpointLocation", "../../datas/kafka_stream") \
        .start().awaitTermination()
    # 关闭
    spark.stop()

运行发现报错

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
Traceback (most recent call last):
  File "D:\bigdata\18-python\pyspark_project\pythonProject1\main\streamingkafka\etl_stream_kafka.py", line 22, in <module>
    readDF = spark.readStream.format("kafka") \
  File "D:\bigdata\22-spark\Miniconda3\lib\site-packages\pyspark\sql\streaming.py", line 482, in load
    return self._df(self._jreader.load())
  File "D:\bigdata\22-spark\Miniconda3\lib\site-packages\py4j\java_gateway.py", line 1304, in __call__
    return_value = get_return_value(
  File "D:\bigdata\22-spark\Miniconda3\lib\site-packages\pyspark\sql\utils.py", line 117, in deco
    raise converted from None
pyspark.sql.utils.AnalysisException: Failed to find data source: kafka. Please deploy the application as per the deployment section of "Structured Streaming + Kafka Integration Guide".

报错 : org.apache.spark.sql.AnalysisException: Failed to find data source: kafka. Please deploy the application as per the deployment section of "Structured Streaming + Kafka Integration Guide".;

解决:这个是因为缺少了Kafka和Spark的集成包,前往https://mvnrepository.com/artifact/org.apache.spark

下载对应的jar包即可,比如我是SparkSql写入的Kafka,那么我就需要下载Spark-Sql-Kafka.x.x.x.jar

 进入网站(已打包放入文章末尾)

找到对应有关spark 和kafka的模块

找到对应的版本 ,这里我用的kafka是3.0版本,下载的是3.1.2版本

 点进去,下载jar包

 再次运行会发现仍然报错,这是因为jar包之间的依赖关系,从刚才下载的界面下面再下载有关的jar包

 

 

 

 再次运行即可

 jar包下载链接

【免费】用pyspark把数据从kafka的一个主题用流处理后再导入kafka的另一个主题的有关报错资源-CSDN文库

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2242991.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

算法日记 26-27day 贪心算法

接下来的题目有些地方比较相似。需要注意多个条件。 题目&#xff1a;分发糖果 135. 分发糖果 - 力扣&#xff08;LeetCode&#xff09; n 个孩子站成一排。给你一个整数数组 ratings 表示每个孩子的评分。 你需要按照以下要求&#xff0c;给这些孩子分发糖果&#xff1a; 每…

编写一个生成凯撒密码的程序

plain list(input("请输入需要加密的明文&#xff08;只支持英文字母&#xff09;&#xff1a;"))key int(input("请输入移动的位数&#xff1a;"))base_A ord(A)base_a ord(a)cipher []for each in plain:if each :cipher.append( )else:if each.i…

PyCharm2024.2.4安装

一、官网下载 1.从下面的链接点进去 PyCharm: The Python IDE for data science and web development by JetBrains 2.进入官网后&#xff0c;下载pycharm安装包 3.点击下载能适配你系统的安装包 4.安装包下载完成 二、安装 1.下载完成后&#xff0c;打开点击右键&#xff…

【MySQL】MySQL数据库入门:构建你的数据基石

&#x1f351;个人主页&#xff1a;Jupiter. &#x1f680; 所属专栏&#xff1a;MySQL初阶探索&#xff1a;构建数据库基础 欢迎大家点赞收藏评论&#x1f60a; 目录 &#x1f985;数据库基础&#x1f400;什么是数据库&#x1f40f;主流数据库&#x1f986;MySQL数据库的基本…

基于Python 和 pyecharts 制作招聘数据可视化分析大屏

在本教程中&#xff0c;我们将展示如何使用 Python 和 pyecharts 库&#xff0c;通过对招聘数据的分析&#xff0c;制作一个交互式的招聘数据分析大屏。此大屏将通过不同类型的图表&#xff08;如柱状图、饼图、词云图等&#xff09;展示招聘行业、职位要求、薪资分布等信息。 …

OMV7 树莓派 tf卡安装

​ 升级7之后&#xff0c;问题多多&#xff0c;不是docker不行了&#xff0c;就是代理不好使 今天又重装了一遍&#xff0c;用官方的链接&#xff0c;重新再折腾一遍…… 使用raspberry pi imager安装最新版lite OS。 注意是无桌面 Lite版 配置好树莓派初始化设置&#xff0…

idea 弹窗 delete remote branch origin/develop-deploy

想删除远程分支&#xff0c;就选delete&#xff0c;仅想删除本地分支&#xff0c;选cancel&#xff1b; 在 IntelliJ IDEA 中遇到弹窗提示删除远程分支 origin/develop-deploy&#xff0c;这通常是在 Git 操作过程中出现的情况&#xff0c;可能是在执行如 git branch -d 或其他…

GitCode光引计划有奖征文大赛

一、活动介绍 GitCode平台汇聚了众多杰出的G-Star项目&#xff0c;它们犹如璀璨星辰&#xff0c;用各自的故事和成就&#xff0c;为后来者照亮前行的道路。我们诚邀广大开发者、项目维护者及爱好者&#xff0c;共同撰写并分享项目在GitCode平台上托管的体验&#xff0c;挖掘平…

qt移植到讯为rk3568,包含一些错误总结

qt移植到arm报错动态库找不到 error while loading shared libraries: libAlterManager.so.1: cannot open shared object file: No such file or directory 通过设置环境变量 LD_LIBRARY_PATH就行了。 LD_LIBRARY_PATH是一个用于指定动态链接器在运行时搜索共享库的路径的环…

Android Framework AMS(16)进程管理

该系列文章总纲链接&#xff1a;专题总纲目录 Android Framework 总纲 本章关键点总结 & 说明&#xff1a; 说明&#xff1a;本章节主要解读AMS 进程方面的知识。关注思维导图中左上侧部分即可。 我们本章节主要是对Android进程管理相关知识有一个基本的了解。先来了解下L…

(一)- DRM架构

一&#xff0c;DRM简介 linux内核中包含两类图形显示设备驱动框架&#xff1a; FB设备&#xff1a;Framebuffer图形显示框架; DRM&#xff1a;直接渲染管理器&#xff08;Direct Rendering Manager&#xff09;&#xff0c;是linux目前主流的图形显示框架&#xff1b; 1&am…

【PHP】ThinkPHP基础

下载composer ComposerA Dependency Manager for PHPhttps://getcomposer.org/ 安装composer 查看composer是否安装 composer composer --version 安装 ThinkPHP6 如果你是第一次安装的话&#xff0c;首次安装咱们需要打开控制台&#xff1a; 进入后再通过命令,在命令行下面&a…

Elasticsearch基本概念及使用

Elasticsearch 是一个开源的、分布式的全文搜索和分析引擎&#xff0c;基于 Apache Lucene 构建。它提供了快速的搜索能力&#xff0c;支持大规模的数据分析&#xff0c;广泛应用于日志分析、全文搜索、监控系统和商业智能等领域。ES操作指令是基于restAPI构建&#xff0c;也就…

黑马智慧商城项目学习笔记

目录 智慧商城项目创建项目调整初始化目录vant组件库vant按需导入和全部导入 项目中的vw适配路由设计配置登录页静态布局图形验证码功能request模块-axios封装api模块-封装图片验证码接口 Toast轻提示&#xff08;vant组件&#xff09;短信验证倒计时功能登录功能响应拦截器统一…

攻防世界Web-bug

打开链接 先注册一个账号 创建成功&#xff0c;会给一个UID5 抓包的user值就是UID:用户名的md5加密的编码 点击Manage时要求admin用户 利用改包把user改成admin 1:admin的md5值为4b9987ccafacb8d8fc08d22bbca797ba 还要把url上的UID改为1 存在逻辑漏洞&#xff0c;成功越权 …

apk反编译修改教程系列-----apk应用反编译中AndroidManifest.xml详细代码释义解析 包含各种权限 代码含义【二】

💝💝💝💝在上期博文中解析了一个常规apk中 AndroidManifest.xml的权限以及代码。应粉丝需求。这次解析一个权限较高的apk。这款apk是一个家长管控的应用。需求的各种权限较高。而且通过管控端可以设置控制端的app隐藏与否。 通过博文了解💝💝💝💝 1💝💝…

湘潭大学软件工程算法设计与分析考试复习笔记(一)

文章目录 前言随机类&#xff08;第七章&#xff09;随机概述数值随机化舍伍德拉斯维加斯蒙特卡罗 模拟退火遗传人工神经网络 回溯&#xff08;第五章&#xff09;动态规划&#xff08;第四章&#xff09;后记 前言 考试还剩十一天&#xff0c;现在准备开始复习这门课了。好像全…

如何使用正则表达式验证域名

下面是一篇关于如何使用正则表达式验证域名的教程。 如何使用正则表达式验证域名 简介 域名是互联网上网站的地址&#xff0c;每个域名由多个标签&#xff08;label&#xff09;组成&#xff0c;标签之间用点 . 分隔。域名规则有很多细节&#xff0c;但基本要求是&#xff1a…

【Cesium】自定义材质,添加带有方向的滚动路线

【Cesium】自定义材质&#xff0c;添加带有方向的滚动路线 &#x1f356; 前言&#x1f3b6;一、实现过程✨二、代码展示&#x1f3c0;三、运行结果&#x1f3c6;四、知识点提示 &#x1f356; 前言 【Cesium】自定义材质&#xff0c;添加带有方向的滚动路线 &#x1f3b6;一、…

DDoS高防服务器:保障业务安全和稳定的抗攻击利器

摘要 随着网络攻击愈发频繁&#xff0c;尤其是DDoS&#xff08;分布式拒绝服务&#xff09;攻击的威胁不断增长&#xff0c;DDoS高防服务器成为保护企业网络安全的重要工具。本文将详细介绍DDoS高防服务器的原理、优势、应用场景及选择要点&#xff0c;帮助企业有效应对攻击&am…