kafka和Flume的整合

news2025/1/22 19:59:49

目录

一、Kafka作为Source 【数据进入到kafka中,抽取出来】

 1、在我的flume的conf文件夹下,有个myconf文件夹:

2、 创建一个flume脚本文件: kafka-memory-logger.conf

 3、测试

二、kafka作为Sink 【数据从别的地方抽取到kafka里面】 

1、创建一个flume脚本文件:flume-kafka-sink.conf

 2、测试


 

一、Kafka作为Source 【数据进入到kafka中,抽取出来】

 1、在我的flume的conf文件夹下,有个myconf文件夹:

2、 创建一个flume脚本文件: kafka-memory-logger.conf

flume学习网站:Flume 1.9用户手册中文版 — 可能是目前翻译最完整的版本了 (liyifeng.org)

# 来到这个目录下
cd /opt/installs/flume/conf/myconf
# 创建一个conf文件
vi kafka-memory-logger.conf

在kafka-memory-logger.conf文件中写入:

a1.sources = r1

a1.channels = c1

a1.sinks=k1

a1.sources.r1.channels = c1

a1.sinks.k1.channel = c1

a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource

a1.sources.r1.kafka.bootstrap.servers = bigdata01:9092,bigdata02:9092,bigdata03:9092

a1.sources.r1.kafka.topics = bigdata

a1.sources.r1.kafka.consumer.group.id = text7

a1.sources.r1.batchSize = 100

a1.sources.r1.batchDurationMillis = 2000

a1.channels.c1.type = memory

a1.channels.c1.capacity = 1000

a1.channels.c1.transactionCapacity = 100

a1.sinks.k1.type = logger

a1.sinks.k1.maxBytesToLog = 128

 3、测试

启动一个消息生产者,向topic中发送消息,启动flume,接收消息

  • 启动一个消息生产者,向topic中发送消息:
kafka-console-producer.sh --bootstrap-server bigdata01:9092 --topic bigdata

  • 启动flume,接收消息 
flume-ng agent -n a1 -c ../ -f kafka-memory-logger.conf -Dflume.root.logger=INFO,console

二、kafka作为Sink 【数据从别的地方抽取到kafka里面】 

 

 

 

1、创建一个flume脚本文件:flume-kafka-sink.conf

 在flume-kafka-sink.conf文件中写入:

a1.sources = r1

a1.channels = c1

a1.sinks=k1

a1.sources.r1.channels = c1

a1.sinks.k1.channel = c1

a1.sources.r1.type = netcat

a1.sources.r1.bind = bigdata01

a1.sources.r1.port = 44444

a1.channels.c1.type = memory

a1.channels.c1.capacity = 1000

a1.channels.c1.transactionCapacity = 100

a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink

a1.sinks.k1.kafka.topic = bigdata

a1.sinks.k1.kafka.bootstrap.servers = bigdata01:9092,bigdata02:9092,bigdata03:9092

a1.sinks.k1.kafka.producer.acks = 1

a1.sinks.k1.kafka.producer.linger.ms = 1

 2、测试

启动:

flume-ng agent -n a1 -c ../ -f flume-kafka-sink.conf -Dflume.root.logger=INFO,console

 使用telnet命令,向端口发送消息:

yum -y install telnet

telnet bigdata01 44444

 

 在窗口不断的发送文本数据,数据被抽取到了kafka中,如何获取kafka数据呢?使用消费者:

kafka-console-consumer.sh --topic bigdata --bootstrap-server bigdata01:9092

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2242482.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

打造专业问答社区:Windows部署Apache Answer结合cpolar实现公网访问

文章目录 前言1. 本地安装Docker2. 本地部署Apache Answer2.1 设置语言选择简体中文2.2 配置数据库2.3 创建配置文件2.4 填写基本信息 3. 如何使用Apache Answer3.1 后台管理3.2 提问与回答3.3 查看主页回答情况 4. 公网远程访问本地 Apache Answer4.1 内网穿透工具安装4.2 创建…

MySQL 8.4.3 Windows绿色安装与主从配置

下载 下载安装包链接: https://dev.mysql.com/downloads/mysql/ 安装配置 假设解压后的目录为C:\mysql-8.4.3-winx64 将C:\mysql-8.4.3-winx64\bin 加入path环境变量在C:\mysql-8.4.3-winx64中创建data文件夹在C:\mysql-8.4.3-winx64中创建my.ini文件 [mysqld]…

文件的简单操作

路径&#xff1a; 代码&#xff1a; main.c #include <stdio.h> #include <stdlib.h> #include <errno.h>int main() {/** 打开文件* FILE *fopen(const char *pathname, const char *mode);*///以追加的方式打开文件FILE* fp fopen("a.txt", &…

【网络】什么是交换机?switch

交换机&#xff08;Switch&#xff09;意为“开关”&#xff0c;是一种用于电&#xff08;光&#xff09;信号转发的网络设备。以下是关于交换机的详细解释&#xff1a; 一、交换机的基本定义 功能&#xff1a;交换机能为接入交换机的任意两个网络节点提供独享的电信号通路&am…

1 图的搜索 奇偶剪枝

图论——图的搜索_Alex_McAvoy的博客-CSDN博客 语雀版本 1 深度优先搜索DFS 1. 从图中某个顶点 v0 出发&#xff0c;首先访问 v0 2. 访问结点 v0 的第一个邻接点&#xff0c;以这个邻接点 vt 作为一个新节点&#xff0c;访问 vt 所有邻接点&#xff0c;直到以 vt 出发的所有节…

【Linux庖丁解牛】—Linux基本指令(下)!

目录 1、grep指令 2、zip/unzip指令 3、sz/rz指令 4、tar指令 ​编辑 5、scp指令 6、bc指令 7、uname –r指令 8、重要的几个热键 9、关机 10、完结撒花 1、grep指令 grep是文本过滤器&#xff0c;其作用是在指定的文件中过滤出包含你指定字符串的内容&#xff0c;…

Oracle19C AWR报告分析之Top 10 Foreground Events by Total Wait Time

Oracle19C AWR报告分析之Top 10 Foreground Events by Total Wait Time 一、分析数据二、详细分析2.1 Top 10 Foreground Events by Total Wait Time各项指标及其解释2.2 分析和总结 一、分析数据 二、详细分析 2.1 Top 10 Foreground Events by Total Wait Time各项指标及其解…

Android Framework AMS(14)ContentProvider分析-1(CP组件应用及开机启动注册流程解读)

该系列文章总纲链接&#xff1a;专题总纲目录 Android Framework 总纲 本章关键点总结 & 说明&#xff1a; 说明&#xff1a;本章节主要解读ContentProvider组件的基本知识。关注思维导图中左上侧部分即可。 有了前面activity组件分析、service组件分析、广播组件分析的基…

计算机视觉 1-8章 (硕士)

文章目录 零、前言1.先行课程&#xff1a;python、深度学习、数字图像处理2.查文献3.环境安装 第一章&#xff1a;概论1.计算机视觉的概念2.机器学习 第二章&#xff1a;图像处理相关基础1.图像的概念2.图像处理3.滤波器4.卷积神经网络CNN5.图像的多层表示&#xff1a;图像金字…

Vue基础(1)_模板语法、数据绑定

模板语法 Vue模板语法有2大类&#xff1a; 1、插值语法&#xff1b; 功能&#xff1a;用于解析标签体内内容。 写法&#xff1a;{{xxx}}&#xff0c;xxx是js表达式&#xff0c;且可以直接读取到data中的所有属性。 2、指令语法&#xff1a; 功能&#xff1a;用于解析标签(包括…

《生成式 AI》课程 第3講 CODE TASK 任务2:角色扮演的机器人

课程 《生成式 AI》课程 第3講&#xff1a;訓練不了人工智慧嗎&#xff1f;你可以訓練你自己-CSDN博客 我们希望你设计一个机器人服务&#xff0c;你可以用LM玩角色扮演游戏。 与LM进行多轮对话 提示:告诉聊天机器人扮演任意角色。 后续输入:与聊天机器人交互。 Part 2: Role…

【软件工程】一篇入门UML建模图(类图)

&#x1f308; 个人主页&#xff1a;十二月的猫-CSDN博客 &#x1f525; 系列专栏&#xff1a; &#x1f3c0;软件开发必练内功_十二月的猫的博客-CSDN博客 &#x1f4aa;&#x1f3fb; 十二月的寒冬阻挡不了春天的脚步&#xff0c;十二点的黑夜遮蔽不住黎明的曙光 目录 1. 前…

展会邀约|加速科技与您相约IC China 2024!

第二十一届中国国际半导体博览会&#xff08; IC China 2024&#xff09;将于 2024 年11月18日—11月20日在北京国家会议中心举行。加速科技将携高性能测试机ST2500EX、ST2500E、eATE及全系测试解决方案亮相E2馆B150展位。博览会期间&#xff0c;将同期举办"半导体产业前沿…

用python中的tkinter包实现进度条

python中的tkinter包是一种常见的设计程序的GUI界面用的包。本文主要介绍这里面的一个组件&#xff1a;进度条&#xff08;Progressbar&#xff09;。Tkinter Progressbar里面对进度条组件已经做了一定的介绍&#xff0c;但比较抽象。本文以另一种方式介绍这个组件及其常用用法…

蓝桥杯每日真题 - 第15天

题目&#xff1a;&#xff08;钟表&#xff09; 题目描述&#xff08;13届 C&C B组B题&#xff09; 解题思路&#xff1a; 理解钟表指针的运动&#xff1a; 秒针每分钟转一圈&#xff0c;即每秒转6度。 分针每小时转一圈&#xff0c;即每分钟转6度。 时针每12小时转一圈…

rust高级特征

文章目录 不安全的rust解引用裸指针裸指针与引用和智能指针的区别裸指针使用解引用运算符 *&#xff0c;这需要一个 unsafe 块调用不安全函数或方法在不安全的代码之上构建一个安全的抽象层 使用 extern 函数调用外部代码rust调用C语言函数rust接口被C语言程序调用 访问或修改可…

ArcGIS Pro属性表乱码与字段名3个汉字解决方案大总结

01 背景 我们之前在使用ArcGIS出现导出Excel中文乱码及shp添加字段3个字被截断的情况&#xff0c;我们有以下应对策略&#xff1a; 推荐阅读&#xff1a;ArcGIS导出Excel中文乱码及shp添加字段3个字被截断&#xff1f; 那如果我们使用ArGIS Pro出现上述问题&#xff0c;该如何…

GOLANG+VUE后台管理系统

1.截图 2.后端工程截图 3.前端工程截图

STM32设计防丢防摔智能行李箱

目录 目录 前言 一、本设计主要实现哪些很“开门”功能&#xff1f; 二、电路设计原理图 1.电路图采用Altium Designer进行设计&#xff1a; 2.实物展示图片 三、程序源代码设计 四、获取资料内容 前言 随着科技的不断发展&#xff0c;嵌入式系统、物联网技术、智能设备…

论文阅读 - Causally Regularized Learning with Agnostic Data Selection

代码链接&#xff1a; GitHub - HMTTT/CRLR: CRLR尝试实现 https://arxiv.org/pdf/1708.06656v2 目录 摘要 INTRODUCTION 2 RELATED WORK 3 CAUSALLY REGULARIZED LOGISTIC REGRESSION 3.1 Problem Formulation 3.2 Confounder Balancing 3.3 Causally Regularized Lo…