Spark Stream操作Kafka总结

news2025/1/11 5:57:56

kafka集群搭建

搭建参考

https://www.toutiao.com/article/6496743889053942286/?log_from=d5d6394cf75d_1687599146327

 zk下载位置

国内:https://mirrors.tuna.tsinghua.edu.cn/apache/zookeeper/

国外:Apache ZooKeeper 

 kafka位置

国内:Index of /apache/kafka

国外:Apache Kafka

spark集群搭建

 搭建参考

https://www.toutiao.com/article/6690323260132819468/?log_from=b6bc7fd413909_1687599567164

Spark之三大集群模式—详解(3)-腾讯云开发者社区-腾讯云

下载位置

国内最新版本: Index of /apache/spark

国内历史版本:Index of /dist/spark

注意:历史版本下载位置如下

链接:Index of /apache

 

 standalone集群模式搭建

集群架构

排除防火墙干扰

注意先关闭防火墙(测试环境)

systemctl stop firewalld.service
systemctl disable firewalld.service

集群规划

node01:master
node02:slave/worker  

/etc/hosts

192.168.10.159 node02
192.168.10.153 node01

配置免密登录

需要配置master到slave节点的免密登录

node01中:

ssh-keygen -t dsa -P '' -f ~/.ssh/id_dsa
cat ~/.ssh/id_dsa.pub >> ~/.ssh/authorized_keys
scp ~/.ssh/id_dsa.pub root@node02:/opt

node02中:

cat /opt/id_dsa.pub >> ~/.ssh/authorized_keys

安装java环境

如果选择在线安装可以使用

yum -y install java-1.8.0-openjdk*

修改配置

cd /root/spark-3.4.0-bin-hadoop3-scala2.13/conf

mv spark-env.sh.template spark-env.sh

vi spark-env.sh

#配置java环境变量(如果之前配置过了就不需要动了)
#指定spark Master的IP
export SPARK_MASTER_HOST=node01
#指定spark Master的端口
export SPARK_MASTER_PORT=7077

 vi  slaves 

node02

配置spark环境变量

/etc/profile

export SPARK_HOME=/root/spark-3.4.0-bin-hadoop3-scala2.13
export PATH=$PATH:$SPARK_HOME/bin

配置分发

scp -r /root/spark-3.4.0-bin-hadoop3-scala2.13 node02:/root/

启动和停止

cd /root/spark-3.4.0-bin-hadoop3-scala2.13/sbin/

集群启动和停止

 在主节点上启动spark集群
./start-all.sh 
在主节点上停止spark集群
./stop-all.sh

单独启动和停止

在 master 安装节点上启动和停止 master:
start-master.sh
stop-master.sh
在 Master 所在节点上启动和停止worker(work指的是slaves 配置文件中的主机名)

start-slaves.sh
stop-slaves.sh

集群检查

web界面查看

正常启动spark集群后,查看spark的web界面,查看相关信息。

注意也有可能是8081端口。

http://node01:8080/

 

jps查看

在master节点上jps可以看到Master进程,在slave节点上jps可以看到Worker进程,可以初步判定集群是否启动成功。

使用样例

官方首选推荐使用scala,也可以使用更加熟悉的java或者python

scala环境准备

下载安装scala

IDEA新建Scala+Maven工程步骤指南(2020.09.21)_idea创建scala maven项目_jing_zhong的博客-CSDN博客

maven依赖

注意:spark相关的依赖打包时候可以不用打入jar包

 <dependencies>
        <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-streaming-kafka-0-10 -->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming-kafka-0-10_2.13</artifactId>
            <version>3.4.0</version>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_2.13</artifactId>
            <version>3.4.0</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-core -->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.13</artifactId>
            <version>3.4.0</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-sql -->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.13</artifactId>
            <version>3.4.0</version>
            <scope>provided</scope>
        </dependency>

        <dependency>
            <groupId>org.json</groupId>
            <artifactId>json</artifactId>
            <version>20210307</version>
        </dependency>

    </dependencies>

scala读取kafka数据并打印结果 

KafaDemo 

import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe

object KafaDemo {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setMaster("local[2]").setAppName("KafaDemo")
    //刷新时间设置为1秒
    val ssc = new StreamingContext(conf, Seconds(1))

    //消费者配置
    val kafkaParams = Map[String, Object](
      "bootstrap.servers" -> "192.168.10.153:9092", //kafka集群地址
      "key.deserializer" -> classOf[StringDeserializer],
      "value.deserializer" -> classOf[StringDeserializer],
      "group.id" -> "group", //消费者组名
      "auto.offset.reset" -> "latest", //latest自动重置偏移量为最新的偏移量
      "enable.auto.commit" -> (false: java.lang.Boolean)) //如果是true,则这个消费者的偏移量会在后台自动提交
    val topics = Array("top1") //消费主题,可以同时消费多个
    //创建DStream,返回接收到的输入数据
    val stream = KafkaUtils.createDirectStream[String, String](
      ssc,
      PreferConsistent,
      Subscribe[String, String](topics, kafkaParams))
    //打印获取到的数据,因为1秒刷新一次,所以数据长度大于0时才打印
    stream.foreachRDD(f => {
      if (f.count > 0)
        f.foreach(f => println(f.value()))
    })
    ssc.start();
    ssc.awaitTermination();
  }
}

scala解析json并保存数据

scala读取kafka的json数据,处理以后添加字段,保存数据入一个新的topic

KafkaStreamingExample 

import org.apache.spark.SparkConf
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka010._
import org.apache.kafka.common.serialization.StringDeserializer
import org.json4s.jackson.JsonMethods._
import org.json4s.JsonDSL._
import org.json4s._

object KafkaStreamingExample {
  implicit val formats: Formats = DefaultFormats
  def main(args: Array[String]): Unit = {
    // 创建SparkConf对象
    val sparkConf = new SparkConf().setAppName("KafkaStreamingExample").setMaster("local[*]")

    // 创建StreamingContext对象
    val ssc = new StreamingContext(sparkConf, Seconds(5))

    // 设置Kafka参数
    val kafkaParams = Map[String, Object](
      "bootstrap.servers" -> "192.168.10.153:9092",  // Kafka broker地址
      "key.deserializer" -> classOf[StringDeserializer],
      "value.deserializer" -> classOf[StringDeserializer],
      "group.id" -> "spark-consumer-group",  // 消费者组ID
      "auto.offset.reset" -> "latest",  // 从最新的offset开始读取数据
      "enable.auto.commit" -> (false: java.lang.Boolean)  // 关闭自动提交offset
    )

    // 设置要读取的Kafka topic
    val topics = Array("input_topic")

    // 创建DStream,从Kafka读取数据
    val kafkaStream = KafkaUtils.createDirectStream[String, String](
      ssc,
      LocationStrategies.PreferConsistent,
      ConsumerStrategies.Subscribe[String, String](topics, kafkaParams)
    )

    // 解析每条数据的value值为JSON对象
    val parsedStream = kafkaStream.map(record => {
      implicit val formats = DefaultFormats
      parse(record.value)
    })

    // 过滤出age>20的数据
    val filteredStream = parsedStream.filter(json => (json \ "age").extract[Int] > 20)

    // 增加性别字段
    val genderStream = filteredStream.map(json => {
      val name = (json \ "name").extract[String]
      val gender = if (name.endsWith("a")) "female" else "male"
      json.asInstanceOf[JObject] ~ ("gender" -> JString(gender))
    })

    // 将新数据写入另一个Kafka topic中
    genderStream.foreachRDD(rdd => {
      rdd.foreachPartition(iter => {
        val producer = createKafkaProducer()  // 创建Kafka生产者

        iter.foreach(json => {
          val value = compact(render(json))
          val record = new org.apache.kafka.clients.producer.ProducerRecord[String, String]("output_topic", value)
          producer.send(record)  // 发送数据到Kafka
        })

        producer.close()  // 关闭Kafka生产者
      })
    })

    // 启动StreamingContext
    ssc.start()
    ssc.awaitTermination()
  }

  def createKafkaProducer(): org.apache.kafka.clients.producer.KafkaProducer[String, String] = {
    val props = new java.util.Properties()
    props.put("bootstrap.servers", "192.168.10.153:9092")  // Kafka broker地址
    props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
    props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
    new org.apache.kafka.clients.producer.KafkaProducer[String, String](props)
  }
}

java解析json并打印结果

java读取kafka数据,处理以后添加字段打印结果

 KafkaStreamingExample4 

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.spark.SparkConf;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.JavaInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka010.ConsumerStrategies;
import org.apache.spark.streaming.kafka010.KafkaUtils;
import org.apache.spark.streaming.kafka010.LocationStrategies;
import org.json.JSONObject;

import java.util.Collections;
import java.util.HashMap;
import java.util.Map;

public class KafkaStreamingExample4 {
    public static void main(String[] args) throws Exception {
        SparkConf conf = new SparkConf()
//                .set("spark.streaming.logLevel","ERROR")
                .setAppName("Kafka Streaming Example")
                .setMaster("local[*]");

        JavaStreamingContext jssc = new JavaStreamingContext(conf, new Duration(1000));

        Map<String, Object> kafkaParams = new HashMap<>();
        kafkaParams.put("bootstrap.servers", "192.168.10.153:9092");
        kafkaParams.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        kafkaParams.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        kafkaParams.put("group.id", "spark-streaming");
        kafkaParams.put("auto.offset.reset", "latest");

        String topic = "input_topic";
        JavaInputDStream<ConsumerRecord<String, String>> stream = KafkaUtils.createDirectStream(
                jssc,
                LocationStrategies.PreferConsistent(),
                ConsumerStrategies.<String, String>Subscribe(Collections.singleton(topic), kafkaParams)
        );

        stream.map(record -> record.value())
                .map(val->{
                    JSONObject json = new JSONObject(val);
                    json.put("tag","tagxxxxx");
                    return json.toString();
                })
                .print();

        jssc.start();
        jssc.awaitTermination();
    }
}

java解析json并读取结果

java读取kafka的json数据,处理以后添加字段,保存数据入一个新的topic

KafkaSparkStreamingExample3 

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.spark.SparkConf;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka010.*;
import org.json.JSONException;
import org.json.JSONObject;

import java.util.*;

public class KafkaSparkStreamingExample3 {
    public static void main(String[] args) throws InterruptedException {
        // 设置 Spark Streaming 应用的名称和 master
        SparkConf conf = new SparkConf().setAppName("KafkaSparkStreamingExample").setMaster("local[*]");
        // 创建 Spark Streaming 上下文对象
        JavaStreamingContext streamingContext = new JavaStreamingContext(conf, Durations.seconds(1));
        // 设置 Kafka 相关参数
        Map<String, Object> kafkaParams = new HashMap<>();
        kafkaParams.put("bootstrap.servers", "192.168.10.153:9092");
        kafkaParams.put("key.deserializer", StringDeserializer.class);
        kafkaParams.put("value.deserializer", StringDeserializer.class);
        kafkaParams.put("group.id", "spark-streaming");
        kafkaParams.put("auto.offset.reset", "latest");
        kafkaParams.put("enable.auto.commit", false);

        // 设置要从 Kafka 读取的 topic
        Collection<String> topics = Arrays.asList("input_topic");

        // 创建 Kafka direct stream
        JavaInputDStream<ConsumerRecord<String, String>> stream =
                KafkaUtils.createDirectStream(streamingContext,
                        LocationStrategies.PreferConsistent(),
                        ConsumerStrategies.Subscribe(topics, kafkaParams));

        // 解析 Kafka 中的数据
        stream.foreachRDD(rdd -> {
//            OffsetRange[] offsetRanges = ((CanCommitOffsets) rdd.rdd()).offsetRanges();

            OffsetRange[] offsetRanges = ((HasOffsetRanges) rdd.rdd()).offsetRanges();

            rdd.foreachPartition(records -> {
                while (records.hasNext()) {
                    ConsumerRecord<String, String> record = records.next();
                    String data = record.value();
                    System.out.println("Received data: " + data);

                    // 解析 JSON 数据
                    try {
                        JSONObject json = new JSONObject(data);
                        String name = json.getString("name");
                        int age = json.getInt("age");

                        // 过滤年龄大于20的数据
                        if (age > 20) {
                            String gender = getGender(name);
                            json.put("gender", gender);

                            // 将结果写入新的 Kafka topic
                            writeToKafka(json.toString(), "output_topic");
                        }
                    } catch (JSONException e) {
                        e.printStackTrace();
                    }
                }
            });

            // 提交消费的offset
            ((CanCommitOffsets) stream.inputDStream()).commitAsync(offsetRanges);
        });

        // 启动 Spark Streaming 应用程序
        streamingContext.start();

        // 等待程序运行结束
        streamingContext.awaitTermination();
    }

    private static void writeToKafka(String data, String topic) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "192.168.10.153:9092");
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        KafkaProducer<String, String> producer = new KafkaProducer<>(props);
        ProducerRecord<String, String> record = new ProducerRecord<>(topic, data);
        producer.send(record);
        producer.flush();
        System.out.println("Data written to Kafka: " + data);
    }

    private static String getGender(String name) {
        // 根据名字判断性别
        // 在此处添加你的逻辑判断代码
        return "unknown";
    }
}

验证结果

./bin/kafka-console-producer.sh --bootstrap-server 192.168.10.153:9092 --topic input_topic

测试数据

{"name":"bob","id":1,"age":23}
{"name":"tom","id":1,"age":21}
{"name":"bob","id":1,"age":5}
{"name":"jack","id":1,"age":45}

查看结果

./kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic output_topic

来源

spark中文文档:http://spark.apachecn.org/#/ 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/680238.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Kubernetes(k8s)容器编排Pod介绍和使用

目录 1 Pod 特点1.1 网络1.2 存储 2 使用方式2.1 自主式Pod2.2 控制器管理的Pod 3 自主运行Pod3.1 创建资源清单3.1.1 参数描述 3.2 创建Pod3.3 Pod操作3.3.1 查看Pod列表3.3.2 查看描述信息3.3.3 访问pod3.3.4 删除Pod 4 控制器运行Pod4.1 创建资源清单4.2 参数描述4.2.1 Repl…

【IDEA】Directory创建多级目录的正确写法

在resource下创建包的时候&#xff0c;右键resourcenew的时候并没有Package,只有Directory 我们也可以用Directory创建包&#xff0c;但写法与在Package下创建包的写法会不一样 例如&#xff1a; 在directory创建包 我们在去看文件的时候 如果是用&#xff08; com.dao.m…

【数据结构】树以及堆的讲解

(这里写自定义目录标题) 提示&#xff1a;文章写完后&#xff0c;目录可以自动生成&#xff0c;如何生成可参考右边的帮助文档 文章目录 前言一、树的概念&#xff1f;二、树的表示方法三、树的实际应用四、二叉树概念以及结构1.概念2.特殊的二叉树3.二叉树的性质4.二叉树的存储…

指针与数组--动态数组(2)[1、长度可变的一维动态数组 2、长度可变的二维动态数组]

目录 一、长度可变的一维动态数组 二、长度可变的二维动态数组 由上篇文章的理论&#xff0c;接下来使用例题来阐述。 一、长度可变的一维动态数组 例题1、编程输入某班学生的某门课成绩&#xff0c;计算并输出平均值。学生人数由键盘输入。 #include <stdio.h> #i…

Apache服务器

文章目录 Apache服务器Linux安装ApacheApache文件结构Apache主配置文件案例 配置一台Web服务器 启动用户的个人网站虚拟主机的设定基于IP的虚拟主机基于域名的虚拟主机基于端口的虚拟主机 rewrite重写rewrite使用详解使用案例 域名跳转单个域名跳转多个域名跳转 status状态页ap…

“插入排序:小数据量排序的王者“

文章目录 &#x1f50d;什么是插入排序&#xff1f;&#x1f511;插入排序的优缺点&#x1f680;实现插入排序 &#x1f50d;什么是插入排序&#xff1f; 插入排序是一种简单的排序算法&#xff0c;它的基本思想是&#xff1a;将待排序的元素&#xff0c;从第二个元素开始&…

阿里架构师整理的Java经典面试题1220道(附答案)

学习如逆水行舟&#xff0c;尤其是 IT 行业有着日新月异的节奏&#xff0c;我们更要抓紧每一次可以学习和进步的机会。所以&#xff0c;没有撤退可言 即使是面试跳槽&#xff0c;那也是一个学习的过程。只有全面的复习&#xff0c;才能让我们更好的充实自己&#xff0c;武装自…

内网隧道代理技术(五)之 Netcat反弹Shell

Netcat反弹Shell Netcat简称NC,是一个简单、可靠的网络工具,被誉为网络界的瑞士军刀。通NC可以进行端口扫描、反弹Shell、端口监听和文件传输等操作,常用参数如下&#xff1a; 参数作用-c指定连接后要执行的shell命令-e指定连接后要执行的文件名-k配置 Socket一直存活(若不想…

一文了解远程桌面连接

一文了解远程桌面连接 一、引言1.1、远程桌面连接的概述1.2、远程桌面连接的应用场景 二、远程桌面连接的基本原理2.1、远程桌面连接的工作方式2.2、远程桌面连接的安全性 三、远程桌面连接的实现方法3.1、Windows自带的远程桌面连接3.2、第三方远程桌面连接工具 四、远程桌面连…

一阶低通滤波器(CODESYS FC和FB应用介绍)

一阶RC低通滤波器详细算法介绍请参看下面文章链接: PLC信号处理系列之一阶低通(RC)滤波器算法_plc计算滤波频率_RXXW_Dor的博客-CSDN博客1、先看看RC滤波的优缺点 优点:采用数字滤波算法来实现动态的RC滤波,则能很好的克服模拟滤波器的缺点; 1、在模拟常数要求较大的场合这…

数据挖掘——甘肃省县(区)域农业综合实力研究(论文)

《数据挖掘与分析》课程论文 题目&#xff1a;甘肃省县&#xff08;区&#xff09;域农业综合实力研究 xx学院xx专业xx班&#xff1a;xx 2023年6月 甘肃省县&#xff08;区&#xff09;域农业综合实力研究 xx (xx学院 xx学院) 摘要&#xff1a;本文主要研究甘肃省各县&#…

C语言数组指针和指针数组

文章目录 1 数组指针和指针数组的区别2 数组首地址和数组首元素地址的区别参考 1 数组指针和指针数组的区别 对指针数组和数组指针的概念&#xff0c;相信很多C程序员都会混淆。下面通过两个简单的语句来分析一下二者之间的区别&#xff0c;示例代码如下所示&#xff1a; int…

C/C++的发展历程和未来趋势

文章目录 C/C的起源C/C的应用C/C开发的工具C/C未来趋势 C/C的起源 C语言 C语言是一种通用的高级编程语言&#xff0c;由美国计算机科学家Dennis Ritchie在20世纪70年代初期开发出来。起初&#xff0c;C语言是作为操作系统UNIX的开发语言而创建的。C语言的设计目标是提供一种功…

虚拟文件系统的数据结构

文章目录 虚拟文件系统的数据结构超级快挂载描述符文件系统类型索引节点目录项文件的打开实例和打开文件表 虚拟文件系统的数据结构 虽然不同文件系统类型的物理结构不同&#xff0c;但是虚拟文件系统定义了一套统一的数据结构。 &#xff08;1&#xff09;超级块。文件系统的…

【网络2】MII MDC/MDIO

文章目录 1.MII&#xff1a;ISO网络模型中物理层&#xff08;phy&#xff09;和数据链路层&#xff08;mac&#xff09;属于硬件&#xff0c;其余都属于软件kernel2.MDC/MDIO&#xff1a;不仅管phy&#xff0c;只要支持mdio协议都可以管2.1 BMC速率适配&#xff1a;phy和switch…

二层MAC地址介绍

目录 MAC地址介绍 MAC地址的组成 MAC地址分类 MAC地址的作用 二层交换机介绍 MAC地址表的定义 MAC地址表项类型 二层交换机对数据帧的处理动作 MAC地址介绍 MAC地址&#xff08;Media Access Control Address)&#xff0c;直译为媒体存取控制位地址 MAC地址的组成 MA…

【四、基本shell命令】

1 帮助命令 man 获取帮助信息 [root@redis100 a]# man lshelp 获得shell内置命令的帮助信息 [root@redis100 a]# help cd常用快捷键 2 文件目录类 pwd 显示当前工作目录的绝对路径 pwd:print working directory [root@redis100 ~]# pwd /rootls 列出目录的内容 ls: list…

Hyper-V虚拟机安装和使用

目录 什么是虚拟化技术虚拟化技术有以下几个关键概念&#xff1a;虚拟化技术的优点&#xff1a; 什么是Hyper-V虚拟机Hyper-V虚拟机的关键特点和优势&#xff1a;使用Hyper-V虚拟机我们能做什么 安装Hyper-V系统要求启用Hyper-V功能创建虚拟机安装操作系统 最近在研究人工智能A…

仙境传说RO:NPC对话| mes/next/close函数用法详解

仙境传说RO:NPC对话| mes/next/close函数用法详解 大家好&#xff0c;我是艾西&#xff0c;今天跟大家讲解下仙境传说mes/next/close函数&#xff0c;在游戏中所有的NPC对话都是用mes函数来创建的。 我们先打开官方文档的script _commands.txt文件&#xff0c;搜索*mes searc…

mmdetection调用模型训练

mmdetection调用模型训练 文章目录 mmdetection调用模型训练转化数据集格式从labelme到coco首先data导进来改一下coco.py改一下class_names.py在模型跑了之后看生成文件然后掐了包版本设置PYTHONPATHdiffustiondet模型模型训练跑完了检测模型 yolo模型yolof模型 转化数据集格式…