sparkstreamnig实时处理入门

news2024/11/18 1:36:58

1.2 SparkStreaming实时处理入门

1.2.1 工程创建

导入maven依赖

<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-streaming_2.12</artifactId>
    <version>3.1.2</version>
</dependency>
<dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming-kafka-0-10_2.12</artifactId>
            <version>3.1.2</version>
        </dependency>
1.2.2 入口类StreamingContext
SparkStreaming中的入口类,称之为StreamingContext,但是底层还是得需要依赖SparkContext。
object SparkStreamingWordCountOps {
    def main(args: Array[String]): Unit = {
        /*
            StreamingContext的初始化,需要至少两个参数,SparkConf和BatchDuration
            SparkConf不用多说
            batchDuration:提交两次作业之间的时间间隔,每次会提交一个DStream,将数据转化batch--->RDD
            所以说:sparkStreaming的计算,就是每隔多长时间计算一次数据
         */
        val conf = new SparkConf()
                    .setAppName("SparkStreamingWordCount")
                    .setMaster("local[*]")
        val duration = Seconds(2)
        val ssc = new StreamingContext(conf, duration) //批次
​
        //业务
        
        
        //为了执行的流式计算,必须要调用start来启动
        ssc.start()
        //为了不至于start启动程序结束,必须要调用awaitTermination方法等待程序业务完成之后调用stop方法结束程序,或者异常
        ssc.awaitTermination()
    }
}
1.2.3 业务编写

SparkStreaming是一个流式计算的计算引擎,那么 就模拟一个对流式数据进行单词统计

代码实现

package com.qianfeng.sparkstreaming
​
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Durations, StreamingContext}
​
/**
 * sparkStreaming的流程序
 */
object Demo01_SparkStreaming_WC {
  def main(args: Array[String]): Unit = {
    //1、获取streamingcontext
    val conf = new SparkConf()
      .setAppName("streaming wc")
      .setMaster("local[*]")
    val sc = new StreamingContext(conf, Durations.seconds(2)) //微批次微2s
    //2、初始化数据
    val ds = sc.socketTextStream("qianfeng01", 6666)
    //3、对数据进行操作
    val sumDS = ds.flatMap(_.split(" "))
      #判断H开头 5位
      .filter(x=>x.startsWith("H") && x.length == 5)
      .map((_, 1))
      .reduceByKey(_ + _)
    //4、对数据做输出
    sumDS.print()
​
    //5、开启sc
    sc.start()
    //6、等待结束  --- 实时不能停止
    sc.awaitTermination()
  }
}

使用netcat进行测试(如果没有请先安装,有则忽略如下)

需要在任意一台节点上安装工具:

[root@qianfeng01 home]# yum install -y nc

启动监听端口:

[root@qianfeng01 home]# nc -lk 6666
hello nihao
nihao hello
hi
hello nihao

Guff_hys_python数据结构,大数据开发学习,python实训项目-CSDN博客

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1338606.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

K8S的二进制部署

K8S的源码包部署 搭建准备&#xff1a; k8smaster01&#xff1a;20.0.0.32 kube-apiserver kube-controller-manager kube-scheduler etcdk8smaster02&#xff1a;20.0.0.33 kube-apiserver kube-controller-manager kube-scheduler node节点01&#xff1a;20.0.0.34 …

Jenkins集成allure测试报告

前言 Allure框架是一个功能强大的自动化测试报告工具&#xff0c;不仅支持多种编程语言&#xff0c;而且能够完美的与各种集成工具结合&#xff0c;包括Jenkins&#xff0c;TeamCity&#xff0c;Bamboo&#xff0c;Maven等等&#xff0c;因此受到了很多测试人员的青睐&#xff…

Redis7快速入门

Docker安装Redis 指定密码&#xff1a; docker run --restartalways -p 6379:6379 --name redis -d redis:7.0.12 --requirepass zhangdapeng520不指定密码&#xff1a; docker run --restartalways -p 6379:6379 --name redis -d redis:7.0.12在真实的开发中&#xff0c;如…

【c++】入门1

c关键字 命名空间 在C/C中&#xff0c;变量、函数和后面要学到的类都是大量存在的&#xff0c;这些变量、函数和类的名称将都存在于全局作用域中&#xff0c;可能会导致很多冲突。使用命名空间的目的是对标识符的名称进行本地化&#xff0c;以避免命名冲突或名字污染&#xff…

C/C++图形化编程(1)

归纳编程学习的感悟&#xff0c; 记录奋斗路上的点滴&#xff0c; 希望能帮到一样刻苦的你&#xff01; 如有不足欢迎指正&#xff01; 共同学习交流&#xff01; &#x1f30e;欢迎各位→点赞 &#x1f44d; 收藏⭐ 留言​&#x1f4dd; 信念是一把无坚不摧的利刃&#xff01…

每日一题-----逆序字符串

大家好我是Beilef&#xff0c;在一个美好的下午我意外接触到编程并且产生了兴趣&#xff0c;哈哈我要努力成为一个跨界者&#xff0c;让我们一起加油吧O(∩_∩)O 文章目录 目录 文章目录 前言 大家好请上车 一、逆序字符串 题⽬描述&#xff1a; 输⼊⼀个字符串&#xff0c;写…

Node.js版本对比

目录 1. node版本与Npm版本对照表 2. node版本与node-sass版本对照表 3. node-sass与sass-loader版本对照表 1. node版本与Npm版本对照表 以往的版本 | Node.js 下面显示最新的对应内容&#xff0c;如果需要查找历史版本&#xff0c;可以进入上面的页面查询 VersionLTSDateV8np…

使用CRA(create-react-app)初始化一个完整的项目环境(该初始化项目已上传到本文章的资源)

1. 整理项目结构&#xff0c;项目目录结构大致划分如下&#xff1a; 2. 安装sass 安装sass开发环境, 注意&#xff1a;使用的文件后缀名要用.scssnpm i sass -D3. 安装Ant Design npm i antd --save 4. 配置基础路由Router&#xff08;具体可参考ReactRouter使用详解&#x…

Kerberos安装教程与命令详解(超详细)

文章目录 前言一、安装准备1. 搭建集群2. 软件包介绍 二、使用shell脚本一键安装1. 复制脚本2. 增加执行权限3. 执行脚本 三、kdb5_util命令1. 简介2. 可用选项和命令的解释3. 常见命令及其说明4. 示例用法 四、kadmin命令1. 简介2. 可用选项和命令的解释3. 常见命令及其说明4.…

渗透测试——1.4主动扫描

主动扫描是别人可以发觉的情报收集 一、nmap的使用 1.nmap<目标主机>:最常用的扫描方式 有nmap版本、扫描时间 “host is up”表示目标主机处于开机状态、“not shown”未开放端口 有四个端口是开的&#xff08;135.139.445.912&#xff09; 2.nmap -p<端口范围…

折叠屏,移动办公的第二战场

当下的移动办公&#xff0c;正在转换战场。 从PC端到移动端&#xff0c;大屏链接小屏&#xff0c;协作模式从单人到团队&#xff0c;移动办公领域一直在发展和自我更新&#xff0c;这也是硬件和软件企业共同开辟的“第一战场”。 如今&#xff0c;折叠屏带来了新形态&#xf…

Kafka、RocketMQ、RabbitMQ消息丢失可能存在的地方,以及解决方案

这里主要对比&#xff1a;Kafka、RocketMQ、RabbitMQ 介绍一下消息生产、存储、消费三者的架构形式。 消息丢失可能存在的场景&#xff1a; 情况一&#xff1a; 生产者发送给MQ的过程消息丢失 在写消息的过程中因为网络的原因&#xff0c;还没到mq消息就丢失了&#xff1b;或…

vue3+elementPlus+cascader动态加载封装自定义组件+v-model指令实现父子通信

文章目录 select普通操作 &#xff08;1&#xff09;cascader操作&#xff08;2&#xff09; select普通操作 &#xff08;1&#xff09; 搜索条件需求&#xff1a;接口入参需要houseId&#xff0c;但是要先选择完楼栋&#xff0c;再选择单元&#xff0c;最后选择房屋 如图&a…

vue2、vue3状态管理之vuex、pinia

提示&#xff1a;文章写完后&#xff0c;目录可以自动生成&#xff0c;如何生成可参考右边的帮助文档 文章目录 前言一、状态管理之vuex1.1 State调用&#xff1a;1.2 Mutation在vuex中定义&#xff1a;在组件中使用&#xff1a; 1.3 Action在vuex中定义&#xff1a;将上面的减…

k8s是什么

生么是k8s&#xff1a; Kubernetes:8个字母省略&#xff0c;就是k8s 自动部署&#xff0c;自动扩展和管理容器化部署的应用程序的一个开源系统、 k8s是负责自动化运维管理多个容器化程序的集群&#xff0c;是一个功能强大的容器编排工具。 分布式和集群化的分布式进行容器管…

关于MySQL、分布式系统、SpringCloud面试题

前言 之前为了准备面试&#xff0c;收集整理了一些面试题。 本篇文章更新时间2023年12月27日。 最新的内容可以看我的原文&#xff1a;https://www.yuque.com/wfzx/ninzck/cbf0cxkrr6s1kniv MySQL 索引 说一下有哪些锁&#xff1f; 行锁有哪些&#xff1f; 性能优化 分库分表…

GrayLog日志平台的基本使用-ssh接入Dashboards展示

这里使用的版本为graylog4.2.10 1、一键安装graylog4.2.10&#xff0c;解压zip包&#xff0c;执行脚本就行 链接&#xff1a;https://pan.baidu.com/s/11U7GpBZ1B7PXR8pyWVcHNw?pwdudln 提取码&#xff1a;udln 2、通过rsyslog采集系统日志&#xff0c;具体操作参考前面文…

Java框架基础--maven,http,postman

maven Maven 提供了一个标准的构建生命周期和一组约定的目录结构&#xff0c;以简化和规范项目的构建过程。它主要用于 Java 项目&#xff0c;但也可以用于其他类型的项目。提高了项目的可维护性、可重复性和一致性&#xff0c;简化了构建和依赖管理的复杂性&#xff0c;使得开…

人类偏好导向:DPO技术重塑SDXL-1.0图像生成

引言 在AI领域&#xff0c;适应和理解人类偏好一直是技术发展的重要方向。斯坦福大学研究团队最近提出的Diffusion-DPO方法&#xff0c;旨在将这一理念应用于图像生成模型&#xff0c;特别是在文本到图像的转换领域。 Huggingface模型下载: https://huggingface.co/mhdang/ A…

dl转置卷积

转置卷积 转置卷积&#xff0c;顾名思义&#xff0c;通过名字我们应该就能看出来&#xff0c;其作用和卷积相反&#xff0c;它可以使得图像的像素增多 上图的意思是&#xff0c;输入是22的图像&#xff0c;卷积核为22的矩阵&#xff0c;然后变换成3*3的矩阵 代码如下 import…