Kafka命令行的使用/Spark-Streaming核心编程(二)

news2025/4/25 19:40:27

Kafka命令行的使用

创建topic

kafka-topics.sh --create --zookeeper node01:2181,node02:2181,node03:2181 --topic test1 --partitions 3 --replication-factor 3

分区数量,副本数量,都是必须的。

数据的形式:

主题名称-分区编号。

在Kafka的数据目录下查看。

设定副本数量,不能大于broker的数量。

2.2查看所有的topic

kafka-topics.sh --list --zookeeper node01:2181,node02:2181,node03:2181

2.3查看某个topic的详细信息

kafka-topics.sh --describe --zookeeper node01:2181,node02:2181,node03:2181 --topic test1

ISR: In-Sync Replicas   可以提供服务的副本。

AR = ISR + OSR

2.4删除topic

kafka-topics.sh --delete --zookeeper node01:2181,node02:2181,node03:2181 --topic test1

2.5生产数据

kafka-console-producer.sh:

指定broker

指定topic

写数据的命令:

kafka-console-producer.sh --broker-list node01:9092,node02:9092,node03:9092 --topic test1

Spark-Streaming核心编程(二)

  1. 需求:通过 SparkStreaming 从 Kafka 读取数据,并将读取过来的数据做简单计算,最终打印到控制台。
  2. 导入依赖

<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-streaming-kafka-0-10_2.12</artifactId>
    <version>3.0.0</version>
</dependency>

  1. 编写代码

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.kafka010._
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.spark.streaming.dstream.{DStream, InputDStream}

object DirectAPI {
  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("direct")

    val ssc = new StreamingContext(sparkConf, Seconds(3))

    // 定义 Kafka 相关参数
    val kafkaPara: Map[String, Object] = Map[String, Object](
      ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "node01:9092,node02:9092,node03:9092",
      ConsumerConfig.GROUP_ID_CONFIG -> "kafka",
      "key.deserializer" -> classOf[StringDeserializer],
      "value.deserializer" -> classOf[StringDeserializer]
    )

    // 通过读取 Kafka 数据,创建 DStream
    val kafkaDStream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String, String](
      ssc,
      LocationStrategies.PreferConsistent,
      ConsumerStrategies.Subscribe[String, String](Set("kafka"), kafkaPara)
    )

    // 提取出数据中的 value 部分
    val valueDStream: DStream[String] = kafkaDStream.map(record => record.value())

    // wordCount 计算逻辑
    valueDStream.flatMap(_.split(" "))
      .map((_, 1))
      .reduceByKey(_ + _)
      .print()

    ssc.start()
    ssc.awaitTermination()
  }
}

  1. 开启Kafka集群

  1. 开启Kafka生产者,产生数据

kafka-console-producer.sh --broker-list node01:9092,node02:9092,node03:9092 --topic kafka

  1. 运行程序,接收Kafka生产的数据并进行相应处理

8)查看消费进度

kafka-consumer-groups.sh --describe --bootstrap-server node01:9092,node02:9092,node03:9092 --group kafka

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2342657.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

2020-06-23 暑期学习日更计划(机器学习入门之路(资源汇总)+概率论)

机器学习入门 前言 说实话&#xff0c;机器学习想学好真心不易&#xff0c;很多时候都感觉自己学得云里雾里。以前一段时间自己为了完成毕业设计&#xff0c;在机器学习的理论部分并没有深究&#xff0c;仅仅通过TensorFlow框架力求快速实现模型。现在来看&#xff0c;很多时候…

SQL 时间转换的CONVERT()函数应用说明

目录 1.常用查询使用的几个 2.其他总结 1.常用查询使用的几个 SELECT CONVERT(VARCHAR, GETDATE(), 112) SELECT CONVERT(VARCHAR, GETDATE(), 113)SELECT CONVERT(VARCHAR, GETDATE()-1, 112) SELECT CONVERT(VARCHAR, GETDATE()-1, 113) 2.其他总结 SELECT CONVERT(VARCHA…

SystemWeaver详解:从入门到精通的深度实战指南

SystemWeaver详解&#xff1a;从入门到精通的深度实战指南 文章目录 SystemWeaver详解&#xff1a;从入门到精通的深度实战指南一、SystemWeaver环境搭建与基础配置1.1 多平台安装全流程 二、新手必学的十大核心操作2.1 项目创建全流程2.2 建模工具箱深度解析 三、需求工程与系…

windows中kafka4.0集群搭建

参考文献 Apache Kafka windows启动kafka4.0&#xff08;不再需要zookeeper&#xff09;_kafka压缩包-CSDN博客 Kafka 4.0 KRaft集群部署_kafka4.0集群部署-CSDN博客 正文 注意jdk需要17版本以上的 修改D:\software\kafka_2.13-4.0.0\node1\config\server.properties配置文…

【JavaWeb后端开发04】java操作数据库(JDBC + Mybatis+ yml格式)详解

文章目录 1. 前言2. JDBC2.1 介绍2.2 入门程序2.2.1 DataGrip2.2.2 在IDEA执行sql语句 2.3 查询数据案例2.3.1 需求2.3.2 准备工作2.3.3 AI代码实现2.3.4 代码剖析2.3.4.1 ResultSet2.3.4.2 预编译SQL2.3.4.2.1 SQL注入2.3.4.2.2 SQL注入解决2.3.4.2.3 性能更高 2.4 增删改数据…

postman 删除注销账号

一、删除账号 1.右上角找到 头像&#xff0c;view profile https://123456-6586950.postman.co/settings/me/account 二、找回账号 1.查看日志所在位置 三、postman更新后只剩下history 在 Postman 中&#xff0c;如果你发现更新后只剩下 History&#xff08;历史记录&…

Java发展史及版本详细说明

Java发展史及版本详细说明 1. Java 1.0&#xff08;1996年1月23日&#xff09; 核心功能&#xff1a; 首个正式版本&#xff0c;支持面向对象编程、垃圾回收、网络编程。包含基础类库&#xff08;java.lang、java.io、java.awt&#xff09;。支持Applet&#xff08;浏览器嵌入…

React 5 种组件提取思路与实践

在开发时,经常遇到一些高度重复但略有差异的 UI 模式,此时我们当然会把组件提取出去,但是组件提取的方式有很多,怎么根据不同场景选取合适的方式呢?尤其时在复杂的业务场景中,组件提取的思路影响着着代码的可维护性、可读性以及扩展性。本文将以一个[详情]组件为例,探讨…

[java八股文][Java基础面试篇]I/O

Java怎么实现网络IO高并发编程&#xff1f; 可以用 Java NIO &#xff0c;是一种同步非阻塞的I/O模型&#xff0c;也是I/O多路复用的基础。 传统的BIO里面socket.read()&#xff0c;如果TCP RecvBuffer里没有数据&#xff0c;函数会一直阻塞&#xff0c;直到收到数据&#xf…

数据结构-冒泡排序(Python)

目录 冒泡排序算法思想 冒泡排序算法步骤 冒泡排序代码实现 冒泡排序算法分析 冒泡排序算法思想 冒泡排序&#xff08;Bubble Sort&#xff09;基本思想&#xff1a; 经过多次迭代&#xff0c;通过相邻元素之间的比较与交换&#xff0c;使值较小的元素逐步从后面移到前面…

深入理解React高阶组件(HOC):原理、实现与应用实践

组件复用的艺术 在React应用开发中&#xff0c;随着项目规模的增长&#xff0c;组件逻辑的复用变得越来越重要。传统的组件复用方式如组件组合和props传递在某些复杂场景下显得力不从心。高阶组件&#xff08;Higher-Order Component&#xff0c;简称HOC&#xff09;作为React中…

Neo4j社区版在win下安装教程(非docker环境)

要在 Windows 10 上安装 Neo4j 社区版数据库并且不使用 Docker Desktop&#xff0c;你可以按照以下步骤操作&#xff1a; 1. 安装 Java Development Kit (JDK) Neo4j 需要 Java 运行环境。推荐安装 JDK 17 或 JDK 11&#xff08;请根据你下载的 Neo4j 版本查看具体的兼容性要…

如何在 Odoo 18 中配置自动化动作

如何在 Odoo 18 中配置自动化动作 Odoo是一款多功能的业务管理平台&#xff0c;旨在帮助各种规模的企业更高效地处理日常运营。凭借其涵盖销售、库存、客户关系管理&#xff08;CRM&#xff09;、会计和人力资源等领域的多样化模块&#xff0c;Odoo 简化了业务流程&#xff0c…

node.js 实战——(Http 知识点学习)

HTTP 又称为超文本传输协议 是一种基于TCP/IP的应用层通信协议&#xff1b;这个协议详细规定了 浏览器 和万维网 服务器 之间互相通信的规则。协议中主要规定了两个方面的内容&#xff1a; 客户端&#xff1a;用来向服务器发送数据&#xff0c;可以被称之为请求报文服务端&am…

新市场环境下新能源汽车电流传感技术发展前瞻

新能源革命重构产业格局 在全球碳中和战略驱动下&#xff0c;新能源汽车产业正经历结构性变革。国际清洁交通委员会&#xff08;ICCT&#xff09;最新报告显示&#xff0c;2023年全球新能源汽车渗透率突破18%&#xff0c;中国市场以42%的市占率持续领跑。这种产业变革正沿着&q…

fastjson使用parseObject转换成JSONObject出现将字符特殊字符解析解决

现象&#xff1a;将字符串的${TARGET_VALUE}转换成NULL字符串了问题代码&#xff1a; import com.alibaba.fastjson.JSON;JSONObject config JSON.parseObject(o.toString()); 解决方法&#xff1a; 1.更换fastjson版本 import com.alibaba.fastjson2.JSON;或者使用其他JS…

【安装neo4j-5.26.5社区版 完整过程】

1. 安装java 下载 JDK21-windows官网地址 配置环境变量 在底下的系统变量中新建系统变量&#xff0c;变量名为JAVA_HOME21&#xff0c;变量值为JDK文件夹路径&#xff0c;默认为&#xff1a; C:\Program Files\Java\jdk-21然后在用户变量的Path中&#xff0c;添加下面两个&am…

机器人项目管理新风口:如何高效推动智能机器人研发?

在2025年政府工作报告中&#xff0c;“智能机器人”首次被正式纳入国家发展战略关键词。从蛇年春晚的秧歌舞机器人惊艳亮相&#xff0c;到全球首个人形机器人马拉松的热议&#xff0c;智能机器人不仅成为科技前沿的焦点&#xff0c;也为产业升级注入了新动能。而在热潮背后&…

【Linux】网络基础和socket(4)

1.网络通信&#xff08;app\浏览器、小程序&#xff09; 2.网络通信三要素&#xff1a; IP&#xff1a;计算机在网络上唯一标识&#xff08;ipv4:4个字段&#xff0c;每段最大255 IPV6:16进制&#xff09; 端口&#xff1a;计算机应用或服务唯一标识 ssh提供远程安全连接…

大数据可能出现的bug之flume

一、vi /software/flume/conf/dir_to_logger.conf配置文件 问题的关键: Dir的D写成了小写 另一个终端里面的东西一直在监听状态下无法显示 原来是vi /software/flume/conf/dir_to_logger.conf里面的配置文件写错了 所以说不是没有source参数的第三行的原因 跟这个没关系 …