Spark读取kafka(流式和批数据)

news2024/9/28 7:24:39

spark读取kafka(批数据处理)

在这里插入图片描述

在这里插入图片描述

# 按照偏移量读取kafka数据
from pyspark.sql import SparkSession

ss = SparkSession.builder.getOrCreate()

# spark读取kafka
options = {
    # 写kafka配置信息
    # 指定kafka的连接的broker服务节点信息
    'kafka.bootstrap.servers': 'node1:9092',
    # 指定主题
    'subscribe': 'itcast',# 读取的主题不存在会自动创建
    # todo 注意一:连接的配置
    #       主题名称 ,分区编号,偏移量
    # 指定起始偏移量   {主题名称:{分区编号0:偏移量,分区编号1:偏移量....}}
    'startingOffsets':""" {"itcast":{"0":0,"1":1}} """,
    # 指定结束偏移量  {主题名称:{分区编号0:偏移量,分区编号1:偏移量....}}
    'endingOffsets':""" {"itcast":{"0":3,"1":2}}  """
    # 注意点  : 偏移量的区间是左闭右开 ,结束偏移的指定按照最大偏移量加一 ,所有分区都要指定
}
# 读取
# format 指定读取kafka
df = ss.read.load(format='kafka',**options)
# todo 注意二:这一步的数据处理(将value转化为字符串类型)是必须做的,不然你看不懂数据。
#       可以用df.的方式,那我后来怎么都没怎么见过了0
df_select = df.select('key',df.value.cast('string'),'topic','partition','offset','timestamp','timestampType')
# 查看df数据
# todo 注意三:这里使用.show()的方式的,是因为它是有界表
df_select.show()

在这里插入图片描述

spark读取kafka(流数据处理)

在这里插入图片描述

# 流式读取kafka数据
from pyspark.sql import SparkSession

ss = SparkSession.builder.getOrCreate()
# todo 注意一:定义kafka的连接配置
options={
    # 写kafka配置信息
    # 指定kafka的连接的broker服务节点信息
    'kafka.bootstrap.servers': 'node1:9092',
    # 指定主题
    'subscribe': 'itheima'  # 读取的主题不存在会自动创建
}
df = ss.readStream.load(format='kafka',**options)
# todo 注意二:必须将value转化为string类型

# 计算
df_res = df.select('key',df.value.cast('string'),'topic','partition','offset','timestamp')

# 输出
# todo 注意三:输出不是df_res.show,
df_res.writeStream.start(format='console',outputMode='append').awaitTermination()

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1407990.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

《幻兽帕鲁》被指AI缝合,开发过程疑点重重,最后附游戏安装教程

由日本游戏工作室Pocketpair开发的《Palworld / 幻兽帕鲁》毫无疑问成为了2024年的首个巨热游戏!上周五(2024年1月19日)游戏上线抢先体验,仅在3天内销量就已突破400万!并于2024年1月21日创下了1291967名同时在线玩家的…

[ACM学习] 树形dp之换根

算法概述 总的来说: 题目描述:一棵树求哪一个节点为根时,XXX最大或最小 分为两步:1. 树形dp 2. 第二次dfs 问题引入 如果暴力就是 O(n^2) , 当从1到2的时候,2及其子树所有的深度都减一,其它…

手把手教你快速掌握连接远程git仓库or赋值远程仓库到本地并上传代码到gitee

1. 先去官网安装Git ,这里不多赘述网上教程很多 2.1去gitee注册一个账号,然后去我的新建一个仓库,这里是演示一下新手第一次操作的流程 2.2设置仓库名称完成创建(这里的库名随便输入看自己): 2.3 打开git bash 配置用户名&#x…

Kubernetes-Taint (污点)和 Toleration(容忍)

目录 一、Taint(污点) 1.污点的组成 2.污点的设置、查看和去除 3.污点实验: 二、Toleration(容忍) 1.容忍设置的方案 2.容忍实验: Taint 和 toleration 相互配合,可以用来避免 pod 被分配…

VUE3好看的我的家乡网站模板源码

文章目录 1.设计来源1.1 首页界面1.2 旅游导航界面1.3 上海景点界面1.4 上海美食界面1.5 上海故事界面1.6 联系我们界面1.7 在线留言界面 2.效果和结构2.1 动态效果2.2 代码结构 源码下载 作者:xcLeigh 文章地址:https://blog.csdn.net/weixin_43151418/…

虹科方案丨湿热灭菌工艺验证解决方案,确保所有产品和容器达到无菌要求

来源:虹科环境监测技术 虹科方案丨湿热灭菌工艺验证解决方案,确保所有产品和容器达到无菌要求 原文链接:https://mp.weixin.qq.com/s/O-pKQdehB9mHSETpU8egbA 欢迎关注虹科,为您提供最新资讯! #蒸汽灭菌 #高压灭菌 …

小程序直播系统源码_报价与开发_OctShop

近几年,随着直播的火热,人们对于直播带货是相当的熟悉了,逐渐渗透到各行各业中,小程序直播可以实时的更全面的传递商品信息,同时还可以与主播进行互动,可以通过直播聚集的人气打造团购气氛,通过…

LSTM时间序列预测

本文借鉴了数学建模清风老师的课件与思路,可以点击查看链接查看清风老师视频讲解:【1】演示:基于LSTM深度学习网络预测时间序列(MATLAB工具箱)_哔哩哔哩_bilibili % Forecast of time series based on LSTM deep learn…

win 下使用 cmd 运行 jar 包

1、使用 Win R 输入 cmd 命令打开命令提示符 2、在 cmd 窗口中输入以下命令 java -jar xxxxxx.jar 运行 jar 包,控制台出现中文乱码 原因是 windows 默认使用 GBK 编码格式,程序使用 UTF-8 编码格式 将编码格式改为 UTF-8 编码,在 cmd 窗…

C#中IsNullOrEmpty和IsNullOrWhiteSpace的区别?

前言 今天我们一起来探讨C#中两个常用的字符串处理方法:IsNullOrEmpty和IsNullOrWhiteSpace。这两个方法在处理字符串时非常常见,但是它们之间存在一些细微的区别。在本文中,我们将详细解释这两个方法的功能和使用场景,并帮助您更…

Qt Quick程序的发布|Qt5中QML和Qt Quick 的更改

# Quick程序的发布旧版做法 # Qt5中QML和Qt Quick 的更改 1.QML语言的更改(Qt4->Qt5) 在QML语言中,只有少量更改会影响QML代码的迁移:无法直接导入单独的文件(例如:import"MyType.qml”),需要导人该文件所在的目录; JavaScript文件中的相对路径被解析…

webassembly003 whisper.cpp的python绑定实现+Cython+Setuptools

python绑定项目 官方未提供python的封装绑定,直接调用执行文件 https://github.com/stlukey/whispercpp.py提供了源码和Cpython结合的绑定 https://github.com/zhujun1980/whispercpp_py提供了ctype方式的绑定,需要先make libwhisper.so Pybind11 bi…

你真的会数据结构吗:顺序表

❀❀❀ 文章由不准备秃的大伟原创 ❀❀❀ ♪♪♪ 若有转载,请联系博主哦~ ♪♪♪ ❤❤❤ 致力学好编程的宝藏博主,代码兴国!❤❤❤ 又和大家见面啦!在大家看到这个标题的时候其实就已经发现了:我们的C语言的基础知识大…

Shell脚本的if条件语句

目录 1.单分支结构 2.双分支结构 3.多分支结构 4.例题 1.单分支结构 实际上使用“&&”和“||”逻辑测试已经可以完成简单的判断并执行相应的操作,但是当需要选择执行的命令语句较多时,这种方式将使执行代码显得很复杂,不好理解。…

gdzwfw某省公共资源交易平台逆向学习

声明:本文中网站仅为学习技术使用,请勿暴力爬取数据。 学习地址:aHR0cHM6Ly95Z3AuZ2R6d2Z3Lmdvdi5jbi8jLzQ0L2p5Z2c 此网站采用请求头反爬,难点是请求头中几个参数是如何生成的(别问为什么知道是请求头,一…

学单片机前先学什么?

学单片机前先学什么? 在开始前我有一些资料,是我根据网友给的问题精心整理了一份「单片机的资料从专业入门到高级教程」, 点个关注在评论区回复“888”之后私信回复“888”,全部无偿共享给大家!!&#xff…

77_组合

描述 给定两个整数 n 和 k,返回范围[1, n]中所有可能的 k 个数的组合。 你可以按任何顺序返回答案。 思路 数组问题 从横向上来看往往有 遍历、滑动窗口、动态规划等思路。但是,其实在遍历这种横向取数过程中,可以根据条件的判断形成树形操作…

一键拥有你的GPT4

这几天我一直在帮朋友升级ChatGPT,现在已经可以闭眼操作了哈哈😝。我原本以为大家都已经用上GPT4,享受着它带来的巨大帮助时,但结果还挺让我吃惊的,还是有很多人仍苦于如何进行升级。所以就想着写篇教程来教会大家如何…

山西电力市场日前价格预测【2024-01-24】

日前价格预测 预测说明: 如上图所示,预测明日(2024-01-24)山西电力市场全天平均日前电价为320.98元/MWh。其中,最高日前电价为480.67元/MWh,预计出现在18:30。最低日前电价为0.00元/MWh,预计出…

RabbitMQ交换机与队列

交换机 RabbitMQ 消息传递模型的核心思想是: 生产者生产的消息从不会直接发送到队列。实际上,通常生产者甚至都不知道这些消息传递传递到了哪些队列中。 相反, 生产者只能将消息发送到交换机(exchange),交换机工作的内容非常简单&#xff0c…