(二十八)大数据实战——Flume数据采集之kafka数据生产与消费集成案例

news2024/11/28 10:56:31

前言

本节内容我们主要介绍一下flume数据采集和kafka消息中间键的整合。通过flume监听nc端口的数据,将数据发送到kafka消息的first主题中,然后在通过flume消费kafka中的主题消息,将消费到的消息打印到控制台上。集成使用flume作为kafka的生产者和消费者。关于nc工具、flume以及kafka的安装部署,这里不在赘述,请读者查看作者往期博客内容。整体架构如下:

正文

  • 启动Kafka集群,创建first主题

- 启动Kafka集群

- 创建first主题

kafka-topics.sh --bootstrap-server hadoop101:9092 --create --topic first --partitions 3 --replication-factor 3

- 查看first主题详情

kafka-topics.sh --bootstrap-server hadoop101:9092 --describe --topic first

  • 在hadoop101服务器flume安装目录/opt/module/apache-flume-1.9.0/job下创建nc监听服务

 - 创建nc监听的flume任务:job-netcat-flume-kafka.conf

# 1 组件定义
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# 2 配置 source
a1.sources.r1.type = netcat
a1.sources.r1.bind = hadoop101
a1.sources.r1.port = 1111
# 3 配置 channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# 4 配置 sink
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.bootstrap.servers = hadoop101:9092,hadoop102:9092,hadoop103:9092
a1.sinks.k1.kafka.topic = first
a1.sinks.k1.kafka.flumeBatchSize = 20
a1.sinks.k1.kafka.producer.acks = 1
a1.sinks.k1.kafka.producer.linger.ms = 1
# 5 拼接组件
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

  • 在hadoop102服务器flume安装目录/opt/module/apache-flume-1.9.0/job下创建kafka监听r任务

-  创建kafka监听的flume任务:job-kafka-flume-console.conf

# 1 组件定义
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# 2 配置 source
a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.r1.batchSize = 50
a1.sources.r1.batchDurationMillis = 200
a1.sources.r1.kafka.bootstrap.servers = hadoop101:9092,hadoop102:9092,hadoop103:9092
a1.sources.r1.kafka.topics = first
a1.sources.r1.kafka.consumer.group.id = custom.g.id
# 3 配置 channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# 4 配置 sink
a1.sinks.k1.type = logger
# 5 拼接组件
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

  • 在hadoop102服务器启动kafka监听任务job-kafka-flume-console.conf

- 启动job-kafka-flume-console.conf任务

bin/flume-ng agent -c conf/ -n a1 -f job/job-kafka-flume-console.conf -Dflume.root.logger=INFO,console

  •  在hadoop101服务器启动nc监听任务job-netcat-flume-kafka.conf

 - 启动job-netcat-flume-kafka.conf任务

bin/flume-ng agent -c conf/ -n a1 -f job/job-netcat-flume-kafka.conf -Dflume.root.logger=INFO,console

  •  使用netcat工具发送数据到nc服务1111端口

- 发送nc消息

  • 查看结果 

- 控制台结果

结语

该案例证明了flume1成功采集到了nc监听端口的数据,并将数据发送到了kafka主题first中,flume2成功从kafka主题中消费到了数据并打印到了控制台。关于Flume数据采集之kafka数据生产与消费的集成案例到这里就结束了,我们下期见。。。。。。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1008476.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

18. 线性代数 - 线性变换

文章目录 线性空间线性变换线性变换的几何意义特征值与特征向量NumPy的矩阵操作Hi, 你好。我是茶桁。 经历了几节线性代数课程之后,终于咱们到了最后一节课了。本节课的内容说多不多,说少也不少。 我们先是要理解一下线性空间和线性变换,并且探讨一下线性变换的几何意义。…

Mapbox加载arcgis的底图

成果图 这种底图基本上都是按照raster来加载的,主要就是知道地址了,拼参数 具体参数请参考官网 https://developers.arcgis.com/rest/services-reference/enterprise/export-map.htm 源码 我的服务列表是这样的 http://XXXX:XXXX/arcgis/rest/services/…

Rsync远程同步inotify监控

Rsync 简介 rsync(Remote Sync,远程同步) 是一个开源的快速备份工具,可以在不同主机之间镜像同步整个目录树,支持增量备份,并保持链接和权限 在远程同步任务中,负责发起rsync同步操作的客户机…

【Docker】Docker简介

Docker简介 📋导航 1. Docker简介1.1 什么是Docker?1.2 什么是容器?1.3 容器的优势?1.4 Docker的优势?1.5 虚拟技术与容器技术Docker的区别?1.6 为什么学习Docker? 2. 安装Docker3. Docker架构4. Docker命…

【算法训练-栈 一】【结构特性】有效的括号

废话不多说,喊一句号子鼓励自己:程序员永不失业,程序员走向架构!本篇Blog的主题是【栈的使用】,使用【栈】这个基本的数据结构来实现,这个高频题的站点是:CodeTop,筛选条件为&#x…

影刀RPA解决WPS不存在的问题

问题阐述 明明电脑上已经安装了WPS,但影刀程序还是提示没有安装的问题 解决办法 1.打开WPS并关闭所有其他网页 2. 配置与修复 3.开始修复 出现这个框,就要关闭WPS,否则无法执行,关闭WPS不影响其修复 4.等待修复完成即可

《打造高可用PostgreSQL:策略与工具》

🌷🍁 博主猫头虎(🐅🐾)带您 Go to New World✨🍁 🐅🐾猫头虎建议程序员必备技术栈一览表📖: 🛠️ 全栈技术 Full Stack: &#x1f4da…

性能测试、负载测试、压力测试、稳定性测试简单区分

是一个总称,可细分为性能测试、负载测试、压力测试、稳定性测试。 性能测试 以系统设计初期规划的性能指标为预期目标,对系统不断施加压力,验证系统在资源可接受范围内,是否能达到性能瓶颈。 关键词提取理解 有性能指标&#…

【SpringMVC】JSON数据传输与异常处理的使用

文章目录 一、Jackson1.1 Jackson是什么1.2 常用注解1.3 实例1.3.1导入依赖1.3.2 配置spring-mvc.xml1.3.3 JsonController.java 二、Spring MVC异常处理机制2.1 使用原因2.2 SpringMVC异常处理2.2.1 异常处理机制流程图2.2.2 异常处理的三种方式 一、Jackson 1.1 Jackson是什…

Spring Boot配置文件(YAML Properties)总结

文章目录 配置文件的作用YAML配置文件Properties配置文件配置文件的加载顺序激活不同的配置文件配置文件的占位符自定义配置属性加密敏感信息配置文件的最佳实践结论 🎉欢迎来到架构设计专栏~Spring Boot配置文件(YAML & Properties)总结…

解决Java应用程序中的SQLException:Access denied for user ‘root‘@‘localhost‘ 错误

目录 问题背景 解决方案 如何重置 MySQL root 密码: 问题背景 java.sql.SQLException: Access denied for user rootlocalhost (using password: YES) at com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:127) at com.mysql.cj.jdbc.ex…

树控件、下拉框、文本框常用测试用例

01 控件的测试外观操作 1)项目中的所有树是否风格一致 2)树结构的默认状态是怎样的。比如默认树是否是展开,是展开几级? 是否有默认的焦点?默认值是什么?展开的节点图标和颜色? 3&#xff09…

springboot整合actuator、admin对应用程序进行监控

Spring Boot Actuator 是 Spring Boot 的一个子项目,可以对 Spring Boot 应用程序进行监控和管理,并对外提供了大量的端点,可以选择使用 HTTP 端点或 JMX 来管理和监控应用程序。 这篇文章主要介绍我们的应用程序中怎么加入actuator来对应用进…

算法课程入门

1、算法这门课,主要讲这三件事: 状态空间 最优可行解问题。 确定与非确定。 状态空间的思维方式要掌握住,要能使用状态空间解决新问题。 2、课程安排: 前几本也要看,但是最后一本强烈推荐。 3、经验之谈: …

SpringMVC 的三种异常处理方式详解

目录 1. 什么是异常 2. 为什么要全局异常处理 3. SpringMVC异常分类 4. 异常处理思路 5. 三种异常处理方式示例 ① 配置 SimpleMappingExceptionResolver 处理器 ② 实现 HandlerExceptionResolver 接口 ③ 使用ControllerAdviceExceptionHandler实现全局异常 6. 响应…

TypeScript泛型和类型体操

🎬 岸边的风:个人主页 🔥 个人专栏 :《 VUE 》 《 javaScript 》 ⛺️ 生活的理想,就是为了理想的生活 ! 目录 泛型(Generics) 1. 泛型函数 2. 泛型接口 3. 泛型类 类型体操(Type Gymnast…

leetcode 232 用栈实现队列

请你仅使用两个栈实现先入先出队列。队列应当支持一般队列支持的所有操作(push、pop、peek、empty): 实现 MyQueue 类: void push(int x) 将元素 x 推到队列的末尾int pop() 从队列的开头移除并返回元素int peek() 返回队列开头…

Python实现天文计算

迷途小书童 读完需要 2分钟 速读仅需 1 分钟 1 简介 ephem 模块为 Python 提供了精确的天文计算能力,可以预测星球、卫星的轨道信息,计算日出日落、经星时间等数据,它的算法准确可靠。最初由 Brandon Craig Rhodes 在 20 世纪 90 年代开发&am…

数字IC验证23912--寄存器模型

文章目录 寄存器模型的集成总线UVC的实现总线UVC的示例Adapter的实现Adapter的集成 访问方式前门访问后门访问 寄存器模型的集成 总线UVC的实现 MCDF访问寄存器的总线接口时序较为简单。控制寄存器接口首先需要在每一个时钟解析cmd。当cmd为写指令时,即需要把数据c…

Linux安装mysql8.0.34(图文详细教程2023)

安装mysql数据库目录2023-09-13更新 1. 下载mysql数据库2. 安装3. mysql启动4. 进入数据库修改密码 以下是root用户操作, 非root用户,命令前请添加sudo 1. 下载mysql数据库 下载地址: https://dev.mysql.com/downloads/mysql/ 获取下载链接&…