kafka集成spark

news2024/11/26 9:46:31

1.新建Scala项目

具体教程可见在idea中创建Scala项目教程-CSDN博客

1.1右键项目名-添加框架支持-勾选scala

1.2main目录下新建scala目录-右键Scala目录-将目录标记为-勾选源代码根目录

1.3创建包com.ljr.spark

1.4引入依赖(pox.xml)

<dependencies>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming-kafka-0-10_2.12</artifactId>
            <version>3.0.0</version>
        </dependency>
           <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.12</artifactId>
            <version>3.0.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_2.12</artifactId>
            <version>3.0.0</version>
        </dependency>
    </dependencies>

1.5把spark conf/目录下的log4j.properties 复制到项目的resources目录

2.集成spark生产者

新建SparkKafkaProducer (注意选择的是object而不是class)

package com.ljr.spark
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
import org.apache.kafka.common.serialization.StringSerializer

import java.util.Properties

object SparkKafkaProducer {

  def main(args: Array[String]): Unit = {
    //1 属性配置
    val pros = new Properties()
    pros.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"node1:9092,node2:9092")
    pros.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,classOf[StringSerializer])
    pros.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,classOf[StringSerializer])

    //2 创建生产者
    val producer = new KafkaProducer[String, String](pros)

    //3 发送数据
    for (i <- 1 to 5) {
      producer.send(new ProducerRecord[String,String]("customers","Lili" + i))
    }
    //4 关闭资源
    producer.close()
  }
}

运行,开启Kafka 消费者消费数据

kafka-console-consumer.sh --bootstrap-server node1:9092 --topic customers

能接收到信息,可见spark作为生产者集成Kafka成功

3.集成spark消费者

package com.ljr.spark

import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
import org.apache.spark.streaming.{Seconds, StreamingContext}


object SparkKafkaConsumer {
      def main(args: Array[String]): Unit = {
       //1 初始化上下文环境
       val conf = new SparkConf().setMaster("local[*]").setAppName("spark-kafka")
        val sc = new StreamingContext(conf, Seconds(3))

        //2 消费数据
        val kafkapara = Map[String, Object](
          ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG->"node1:9092,node2:9092",
          ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG->classOf[StringDeserializer],
          ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG->classOf[StringDeserializer],
          ConsumerConfig.GROUP_ID_CONFIG->"KFK-SP"
        )
        val kafkaDstream = KafkaUtils.createDirectStream(sc, LocationStrategies.PreferConsistent, ConsumerStrategies.Subscribe[String, String](Set("customers"), kafkapara))
        val valueDstream = kafkaDstream.map(record => record.value())
        valueDstream.print()
        //3 执行代码并阻塞
          sc.start()
        sc.awaitTermination()

      }
}

运行,

开启Kafka 生产者生产数据

kafka-console-producer.sh.sh --bootstrap-server node1:9092 --topic customers

控制台可以消费到数据,可见spark作为消费者集成Kafka成功。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1808899.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Java——二维数组

一、二维数组介绍 二维数组与一维数组很相似。可以说二维数组是元素为一维数组的数组&#xff0c;也就是一维数组的数组。每个元素可以通过行索引和列索引来访问。 1、二维数组的创建 我们知道&#xff0c;在 C 语言中&#xff0c;二维数组是一个连续的内存块&#xff0c;通…

【Python】使用pip安装seaborn sns及失败解决方法与sns.load_dataset(“tips“)

&#x1f60e; 作者介绍&#xff1a;我是程序员洲洲&#xff0c;一个热爱写作的非著名程序员。CSDN全栈优质领域创作者、华为云博客社区云享专家、阿里云博客社区专家博主。 &#x1f913; 同时欢迎大家关注其他专栏&#xff0c;我将分享Web前后端开发、人工智能、机器学习、深…

RISC-V MCU IDE MRS(MounRiver Studio)开发 编译后打印FLASH及RAM使用占比信息

以RISC-V MCU IDE MounRiver Studio(MRS)为例&#xff0c;首先我们选中目标工程&#xff0c;点击工具栏工程属性按钮&#xff0c;打开工程属性配置页&#xff1a; 在C/C Build->Settings->Tool Settings选项列表中单击GNU RISC-V Cross C Linker->Miscellaneous&#…

Anzo 跟单社区现已正式上线!即刻体验无与伦比的强大功能

Anzo 跟单社区现已正式上线! ANZO 跟单社区是一个颠覆性的创新跟单社区平台&#xff0c;作为新一代跟单社区&#xff0c;我们旨在让更多的用户享受跟单交易带来的便捷性和收益性。交易者可以通过跟单社区&#xff0c;学习和分享交易策略&#xff0c;轻松复制交易专家的交易策略…

人类记忆优化算法:针对全局优化问题的记忆启发优化器

Human memory optimization algorithm: A memory-inspired optimizer for global optimization problems 24年 Expert Systems With Applications sci一区 原文链接: https://doi.org/10.1016/j.eswa.2023.121597 Zhu D, Wang S, Zhou C, et al. Human memory optimization alg…

二进制文件的膨胀策略和使用 debloat 消除膨胀测试

在恶意软件的分析中有的 Windows 可执行文件&#xff08;PE 文件&#xff09;会通过膨胀策略来绕过防病毒一些防病毒的检查&#xff0c;比如上传云进行分析&#xff0c;因为文件太大了所以无法进行一些防病毒分析。一般的可执行文件有很多的膨胀策略&#xff0c;一般简单的膨胀…

Elasticsearch-经纬度查询(8.x)

目录 一、开发环境 二、pom文件 三、ES配置文件 四、ES相关字段 五、ES半径查询 ES的字段类型:geo_point&#xff0c;可以实现以一个点为中心的半径查询(geo_distance query) ES 地里位置查询: 半径查询(geo_distance query)查询指定矩形内的数据(geo_bounding_box quer…

[AI Google] 使用 Gemini 取得更多成就:试用 1.5 Pro 和更多智能功能

总结 Google 正在为超过 35 种语言的 Gemini Advanced 订阅者推出 Gemini 1.5 Pro。此次更新包括 100 万个 token 的上下文窗口、改进的数据分析功能和增强的多模态图像理解。新功能包括用于自然对话的 Gemini Live、先进的规划工具和可定制的 Gems。更新还集成了更多 Google …

基于STM32开发的智能农业监控系统

目录 引言环境准备智能农业监控系统基础代码实现&#xff1a;实现智能农业监控系统 4.1 土壤湿度传感器数据读取4.2 温湿度传感器数据读取4.3 水泵与风扇控制4.4 用户界面与数据可视化应用场景&#xff1a;农业环境监测与管理问题解决方案与优化收尾与总结 1. 引言 随着智能…

SkyWalking之P0核心业务场景输出调用链路应用

延伸扩展&#xff1a;XX核心业务场景 路由标签打标、传播、检索 链路标签染色与传播 SW: SkyWalking的简写 用户请求携带HTTP头信息X-sw8-correlation “X-sw8-correlation: key1value1,key2value2,key3value3” 网关侧读取解析HTTP头信息X-sw8-correlation&#xff0c;然后通过…

Navicat导入json文件(json文件数据导入到MySQL表中)

天行健&#xff0c;君子以自强不息&#xff1b;地势坤&#xff0c;君子以厚德载物。 每个人都有惰性&#xff0c;但不断学习是好好生活的根本&#xff0c;共勉&#xff01; 文章均为学习整理笔记&#xff0c;分享记录为主&#xff0c;如有错误请指正&#xff0c;共同学习进步。…

在Modelarts上微调量化Llama3,并用docker部署

本文概述 本文先使用llama-factory去微调llama3大模型&#xff0c;然后使用llama.cpp去量化模型并打包为docker部署到服务器上让qq机器人能够调用服务&#xff0c;实现qq群内问答。 效果展示 环境准备 本文使用华为云的Modelarts的notebook去进行的模型微调 ubuntu20.04&#x…

揭开FFT时域加窗的奥秘

FFT – Spectral Leakage 假设用于ADC输出数据分析的采样点数为N&#xff0c;而采样率为Fs&#xff0c;那我们就知道&#xff0c;这种情况下的FFT频谱分辨率为δf&#xff0c;那么δfFs/N。如果此时我们给ADC输入一个待测量的单频Fin&#xff0c;如果此时Fin除以δf不是整数&a…

IP地址冲突检测(Address Conflict Detect)记录

学习目标&#xff1a; 提示&#xff1a;ACD(IP地址冲突检测)原理学习与抓包分析 学习记录&#xff1a; 1、Address Conflict Detection地址冲突检测&#xff0c;简称ACD。RFC 5227提出ACD机制。其中ACD将arp request分为ARP probe和ARP announcement两种&#xff1b; ACD定义…

数据中心网络运维探讨

数据中心网络运维探讨 数据中心网络运维通过科学的网络架构设计、实时监控管理、智能化运维工具和全面的安全防护&#xff0c;确保网络的高效、安全运行。它不仅提升了运维效率和网络可靠性&#xff0c;还保障了业务的连续性和数据安全。随着技术的不断进步&#xff0c;智能化…

常见机器学习的原理及优略势

有监督 一、线性回归&#xff08;Linear Regression) 1. 算法原理 线性回归&#xff08;Linear Regression&#xff09;是一种基本的回归算法&#xff0c;它通过拟合一个线性模型来预测连续型目标变量。线性回归模型的基本形式是&#xff1a;y w1 * x1 w2 * x2 … wn * …

在python中关于元组的操作

创建元组 如上图所示&#xff0c;a&#xff08;&#xff09;和b tuple(),,这两种方式都可以创建出元组。 在创建元组的时候&#xff0c;指定初始值 如上图所示&#xff0c;也可以在创建元组的时候&#xff0c;指定初始值。 同列表一样元组中的元素也可以是任意类型的。 同列…

Map深度学习

Map Map是一个键值对的集合&#xff0c;和object类似&#xff0c;Map作为构造函数&#xff0c;可以通过全局对象获取到。需要通过new操作创建实例对象&#xff0c;直接调用会报错。Map构造函数接受一个iterable类型的函数&#xff0c;用来初始化Map。 var m new Map([[1, &qu…

pyrouge(ROUGE-1.5.5)的安装步骤和使用说明(适用于Linux 系统)

摘要&#xff1a;本文讲解了如何配置和使用文本摘要的评价指标ROUGE(linux 系统)。 ✅ NLP 研 1 选手的学习笔记 简介&#xff1a;小王&#xff0c;NPU&#xff0c;2023级&#xff0c;计算机技术 研究方向&#xff1a;摘要生成、大语言模型生成 文章目录 一、为啥要写这篇博客&…

Redis 双写一致原理篇

前言 我们都知道,redis一般的作用是顶在mysql前面做一个"带刀侍卫"的角色,可以缓解mysql的服务压力,但是我们如何保证数据库的数据和redis缓存中的数据的双写一致呢,我们这里先说一遍流程,然后以流程为切入点来谈谈redis和mysql的双写一致性是如何保证的吧 流程 首先…