Qt中用thrift验证flume

news2025/1/18 11:50:59

一.flume简介

flume是Cloudera提供的一个高可用的,高可靠的,分布式的海量日志采集、聚合和传输的系统。
在flume中分为了3个组件,分别为source,channel和sink。
Source是负责接收数据到Flume Agent的组件。Source组件可以处理各种类型、各种格式的日志数据。
Sink不断地轮询Channel中的事件且批量地移除它们,并将这些事件批量写入到存储或索引系统、或者被发送到另一个Flume Agent。
Channel是位于Source和Sink之间的缓冲区。因此,Channel允许Source和Sink运作在不同的速率上。Channel是线程安全的,可以同时处理几个Source的写入操作和几个Sink的读取操作。
flume支持的Source、Sink和Channel如下所示
Flume sources
☆Avro Source(常用于agent之间传递消息)
☆Exec Source(tail -f 文件,会重复消费)
☆Spooling Directory Source(文件夹变更)
☆Taildir Source(实时更新一组文件,不会重复消费)
☆Kafka Source
☆HTTP Source
☆Thrift Source
☆JMS Source
☆NetCat TCP Source
☆NetCat UDP Source
☆Sequence Generator Source
☆Custom Source 自定义
Flume Sinks
☆Avro Sink
☆HDFS Sink
☆Hive Sink
☆Logger Sink (测试用)
☆File Roll Sink
☆Thrift Sink
☆IRC Sink
☆Null Sink
☆HBaseSinks
☆ElasticSearchSink
☆Kafka Sink
☆HTTP Sink
☆Custom Sink 自定义
Flume Channels
☆Memory Channel
☆JDBC Channel 当前Flume Channel内置支持Derby
☆Kafka Channel
☆File Channel
☆Spillable Memory Channel (当前试验性的,不建议生产环境使用)
☆Custom Channel 自定义
从这可以看出,Source和Sink都是支持Thrift的,可以理解为flume内置了Thrift服务器和客户端。

二.windows部署flume

下载最新的jdk 19  
安装完成后,将jdk添加到环境变量
①新建系统变量JAVA_HOME,值为D:\Program Files\Java\jdk-19

②将%JAVA_HOME%\bin添加到环境变量Path中
测试一下java是否配置好

下载flume-1.9.0,bin和src都要下载


将bin解压到D盘,然后将D:\flume-1.9.0\conf中的三个template文件复制一份并去掉".template",如下图所示

将jdk 19的安装路径添加到D:\flume-1.9.0\conf\flume-env.sh中,如下图所示

将D:\flume-1.9.0\conf\flume-conf.properties的内容修改为

agent.sources = r1  
agent.sinks = k1  
agent.channels = c1  

# Describe/configure the source  
agent.sources.r1.type = thrift  
agent.sources.r1.port = 9090  
agent.sources.r1.bind = 0.0.0.0  
agent.sources.r1.threads = 10 

# Use a channel which buffers events in file  
agent.channels.c1.type = memory  
agent.channels.c1.capacity = 10000000  
agent.channels.c1.transactionCapacity= 2000  
  
# Describe the sink k1  
agent.sinks.k1.type = logger  

# Bind the source and sink to the channel  
agent.sources.r1.channels = c1  
agent.sinks.k1.channel = c1

这里将Source设置为thrift,Sink为logger,这样就能接收thrift客户端发送的数据,并在flume控制台中打印出来。这里端口号是9090,那么thrift客户端的端口号也要设置为9090
将flume添加到环境变量
①新建系统变量FLUME_HOME,值为D:\flume-1.9.0

②将%FLUME_HOME%\bin和%FLUME_HOME%\conf添加到环境变量Path中
启动flume

flume-ng agent --conf D:\flume-1.9.0\conf --conf-file D:\flume-1.9.0\conf\flume-conf.properties --name agent -property flume.root.logger=INFO,console

会报如下错误
Test-Path : 路径中具有非法字符。
所在位置 D:\flume-1.9.0\bin\flume-ng.ps1:106 字符: 56
+ ...                               ? { "$_" -ne "" -and (Test-Path $_ )} |
+                                                         ~~~~~~~~~~~~
    + CategoryInfo          : InvalidArgument: (D:\Program File...7.0_80\jre\bin":String) [Test-Path],ArgumentExceptio
    n
    + FullyQualifiedErrorId : ItemExistsArgumentError,Microsoft.PowerShell.Commands.TestPathCommand
具体如下图所示


解决方法是修改D:\flume-1.9.0\bin\flume-ng.ps1文件
将GetHadoopHome、GetHbaseHome和GetHiveHome这三个函数的定义和调用全部注释掉或者删掉,因为系统中未安装这三个软件,肯定找不到
再次运行flume,成功。提示Started Thrift source

三.用thrift验证flume

解压flume src,在src\flume-ng-sdk\src\main\thrift中有一个flume.thrift文件,这个是thrift和flume通信的接口文件,将这个文件稍做修改,去掉枚举中的ERROR,它与windows的宏定义冲突。修改后的flume.thrift如下所示

namespace java org.apache.flume.thrift

struct ThriftFlumeEvent {
  1: required map <string, string> headers,
  2: required binary body,
}

enum Status {
  OK,
  FAILED,
  UNKNOWN
}

service ThriftSourceProtocol {
  Status append(1: ThriftFlumeEvent event),
  Status appendBatch(1: list<ThriftFlumeEvent> events),
}

参考上篇博客Qt中调用thrift,执行

thrift -r --gen cpp flume.thrift

将上篇博客中的Thrift_Client稍作修改,如下所示

#include <thrift/protocol/TCompactProtocol.h>
#include <thrift/transport/TSocket.h>
#include <thrift/transport/TTransportUtils.h>

#include "ThriftSourceProtocol.h"
#include <iostream>

using namespace std;
using namespace apache::thrift;
using namespace apache::thrift::protocol;
using namespace apache::thrift::transport;

class ThriftClient
{
public:
    ThriftClient()
        : socket(new TSocket("127.0.0.1", 9090))
        , transport(new TFramedTransport(socket))
        , protocol(new TCompactProtocol(transport))
    {
        pClient = new ThriftSourceProtocolClient(protocol);
    }

    ~ThriftClient(){}


    void sendEvent()
    {
        std::string body("The first event");
        ThriftFlumeEvent event;
        event.__set_headers(headers);
        event.__set_body(body);
        if (!transport->isOpen())
        {
            transport->open();
        }

        Status::type res = pClient->append(event);
        if (res == Status::OK)
        {
            std::cout<<"Send event: "<<body<<std::endl;
        }
        else
        {
            std::cout<<"Send event via thrift failed"<<std::endl;
        }

        transport->close();
    }

    void sendBatchEvent()
    {
        std::string body2("The second event");
        ThriftFlumeEvent event;
        event.__set_headers(headers);
        event.__set_body(body2);
        events.push_back(event);

        std::string body3("The third event");
        event.__set_headers(headers);
        event.__set_body(body3);
        events.push_back(event);

        if (!transport->isOpen())
        {
            transport->open();
        }

        Status::type res = pClient->appendBatch(events);
        if (res == Status::OK)
        {
            for(auto event:events)
            {
                std::cout<<"Send event: "<<event.body<<std::endl;
            }
        }
        else
        {
            std::cout<<"Send events via thrift failed"<<std::endl;
        }

        transport->close();
    }

private:
    // Thrift protocol needings...
    std::shared_ptr<TSocket> socket;
    std::shared_ptr<TTransport> transport;
    std::shared_ptr<TProtocol> protocol;
    ThriftSourceProtocolClient *pClient;

    std::map<std::string, std::string> headers;
    std::vector<ThriftFlumeEvent> events;
};

int main(int argc, char **argv)
{
    ThriftClient client;
    client.sendEvent();
    client.sendBatchEvent();

    return 0;
}

这里的端口设置为9090。发送event有两种方式,单条发送和批量发送。代码中发送了三条event,flume的控制台也打印三条,如下图所示

现在把flume的Sink也指定为thrift,端口号为9091

agent.sources = r1  
agent.sinks = k1  
agent.channels = c1  

# Describe/configure the source  
agent.sources.r1.type = thrift  
agent.sources.r1.port = 9090  
agent.sources.r1.bind = 0.0.0.0  
agent.sources.r1.threads = 10 

# Use a channel which buffers events in file  
agent.channels.c1.type = memory  
agent.channels.c1.capacity = 10000000  
agent.channels.c1.transactionCapacity= 2000  

# Describe the sink k1  
agent.sinks.k1.type = thrift
agent.sinks.k1.hostname = 127.0.0.1
agent.sinks.r1.port = 9091  

# Bind the source and sink to the channel  
agent.sources.r1.channels = c1  
agent.sinks.k1.channel = c1
将上篇博客中的Thrift_Server稍作修改,如下所示
#include "ThriftSourceProtocol.h"
#include <thrift/protocol/TCompactProtocol.h>
#include <thrift/server/TSimpleServer.h>
#include <thrift/transport/TServerSocket.h>
#include <thrift/transport/TBufferTransports.h>
#include <iostream>

using namespace ::apache::thrift;
using namespace ::apache::thrift::protocol;
using namespace ::apache::thrift::transport;
using namespace ::apache::thrift::server;

class ThriftSourceProtocolHandler : virtual public ThriftSourceProtocolIf {
public:
    ThriftSourceProtocolHandler() {
        // Your initialization goes here
    }

    Status::type append(const ThriftFlumeEvent& event) {
        // Your implementation goes here
        std::cout<<event.body<<std::endl;

        return Status::OK;
    }

    Status::type appendBatch(const std::vector<ThriftFlumeEvent> & events) {
        // Your implementation goes here
        for(auto event:events)
        {
            std::cout<<event.body<<std::endl;
        }

        return Status::OK;
    }
};

int main(int argc, char **argv) {
    std::cout<<"Server started!"<<std::endl;
    int port = 9091;
    ::std::shared_ptr<ThriftSourceProtocolHandler> handler(new ThriftSourceProtocolHandler());
    ::std::shared_ptr<TProcessor> processor(new ThriftSourceProtocolProcessor(handler));
    ::std::shared_ptr<TServerTransport> serverTransport(new TServerSocket(port));
    ::std::shared_ptr<TTransportFactory> transportFactory(new TFramedTransportFactory());
    ::std::shared_ptr<TProtocolFactory> protocolFactory(new TCompactProtocolFactory());

    TSimpleServer server(processor, serverTransport, transportFactory, protocolFactory);
    server.serve();
    return 0;
}

重新启动flume,然后启动Thrift_Server,最后启动Thrfit_Client,结果如下图所示

至此,验证通过!

原文链接:https://blog.csdn.net/caoshangpa/article/details/128502780

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/130296.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

在阿里做了7年软件测试原来是........

你了解软件测试岗吗&#xff1f; 很多人做测试3&#xff0c;5年&#xff0c;甚至年限多长。 但并不懂软件测试岗所要求的技术和能力&#xff0c;只是拘于当前的工作环境和培训班的宣传。 在一个微信里中看到如下的对话&#xff1a; 某人说&#xff0c;工作中开始做自动化了。…

8253练习题(8253端口地址怎么求?怎么求初值?怎么看出工作方式)

目录 一&#xff1a;简单&#xff08;题目把计数初值和工作方式都给你了&#xff09; 二&#xff1a;给了你输入时间周期和初值&#xff0c;你会不会求输出&#xff1f; 三&#xff1a;简单 四&#xff1a;初值计数方式都不给&#xff0c;初值还是给的时间和频率混合 五&a…

前端_swapCache方法 发布文章

swapCache方法 swapCache方法用来手工执行本地缓存的更新&#xff0c;它只能在applicationCache对象的updateReady事件被触发时调用&#xff0c;updateReady事件只有服务器上的manifest文件被更新&#xff0c;并且把manifest文件中所要求的资源文件下载到本地后触发。顾名思义…

[极客大挑战 2019]Secret File(BUUCTF)

前言: 这篇文章还是是为了帮助一些 像我这样的菜鸟 找到简单的题解 题目描述 解题工具: fiddler或burpsuite抓包 解题过程: 又是要找秘密&#xff0c; 先检查一下源代码 发现了一个链接与背景颜色融合了 点进去看看 找到了SECRET但肯定没这么简单 点击SECRET页面发生…

15. 我是怎么用一个特殊 Cookie ,限制住别人的爬虫的

爬虫训练场&#xff0c;第15篇博客。 博客详细清单&#xff0c;参考 https://pachong.vip/blog 本次案例&#xff0c;用定值 Cookie 实现反爬 文章目录Cookie 生成Python Flask 框架生成 CookieFlask make_response 加载模板Flask 判断指定 cookie 是否存在补充知识点Cookie 生…

【AcWing每日一题】4818. 奶牛大学

Farmer John 计划为奶牛们新开办一所大学&#xff01; 有 N 头奶牛可能会入学。 每头奶牛最多愿意支付 ci 的学费。 Farmer John 可以设定所有奶牛入学需要支付的学费。 如果这笔学费大于一头奶牛愿意支付的最高金额&#xff0c;那么这头奶牛就不会入学。 Farmer John 想赚…

C++ New和Delete

目录 前言 New Delete 前言 new是c中用于动态申请空间的运算符&#xff0c;malloc也是用于动态申请空间的&#xff0c;但malloc是函数。 New new是用来开辟一段新空间的&#xff0c;和一般申明不同的是&#xff0c;new开辟的新空间是在堆上&#xff0c;而申明的变量是在栈上…

【自学Java】Java注释

Java注释 Java注释教程 用于注解说明解释程序的文字就是注释&#xff0c;注释可以提高代码的阅读性。同时&#xff0c;注释也是一个程序员必须要具有的良好的编程习惯。我们首先应该将自己的思想通过注释先整理出来&#xff0c;再用代码实现。 在 Java 语言 中&#xff0c;一…

(二)Qt多线程实现海康工业相机图像实时采集

系列文章目录 提示&#xff1a;这里是该系列文章的所有文章的目录 第一章&#xff1a; &#xff08;一&#xff09;QtOpenCV调用海康工业相机SDK示例开发 第二章&#xff1a; &#xff08;二&#xff09;Qt多线程实现海康工业相机图像实时采集 文章目录系列文章目录前言一、项目…

C语言中指针常见问题集

1. 我想声明一个指针并为它分配一些空间,但却不行。这些代码有什么问题&#xff1f; char *p; *p malloc(10);答&#xff1a;你所声明的指针是p, 而不是*p, 当你操作指针本身时你只需要使用指针的名字即可:cp malloc(10);当你操作指针指向的内存时,你才需要使用*作为间接操…

坚果的2022年终总结

人生天地之间&#xff0c;若白驹过隙&#xff0c;转眼间&#xff0c;这一年又快要过去了&#xff0c;按照惯例还是写一篇年终总结&#xff0c;同时也看一下自己是否又成长&#xff0c;是否有哪些事情没做好&#xff0c;给自己做一个复盘。一、缘起OpenHarmony我是从去年开始参加…

Webpack 钩子介绍、手写 Webpack Plugin

目录 1. Plugin 用作和工作原理 1.1 Plugin 的作用 1.2 Plugin 的工作原理 2. Webpack 底层逻辑和钩子介绍 2.1 Webpack 内部执行流程 2.2 Webpack 内部钩子 2.2.1 钩子是什么 2.2.2 Tapable —— 为 Webpack 提供 Plugin 钩子 数据类型接口 定义 2.2.3 Compiler Hook…

C#,图像二值化(08)——灰度图像二值化,全局算法,全局阈值优化算法及其源代码

1、全局阈值算法 基于灰度直方图的优化迭代算法之一。 Iterative Scheduler and Modified Iterative Water-Filling In the downlink, the inter-cell interference is only function of the power levels and is independent of the user scheduling decisions. This suggest…

俺的2022年

年末将至&#xff0c;还是要写点总结性的内容&#xff0c;以回顾过去一年做的各种事情。工作之外从客观数据上看&#xff0c;今年的收入水平略差于去年&#xff0c;主要是工作外的收入有所减少&#xff0c;其核心原因是没有录制新的课程内容进行变现&#xff0c;原本的计划是&a…

【自学Python】Python介绍

Python教程 什么是编程语言 编程语言&#xff08;programming language&#xff09;&#xff0c;是用来定义计算机程序的形式语言。它是一种被标准化的交流技巧&#xff0c;用来向计算机发出指令。 也可以说&#xff0c;计算机语言让程序员能够准确地定义计算机所需要使用的…

拓展交流空间,分享开发精彩 | 开发者说·DTalk 鉴赏

日月其迈&#xff0c;岁律更新&#xff0c;时间的洗礼让开发者们更加坚韧&#xff0c;持续探索&#xff0c;不断追求&#xff0c;同样也激励着我们为开发者提供更多的帮助与支持。不断迭代的技术产品是开发者们的趁手工具&#xff0c;定期更新的政策助力打造安全可靠的生态&…

基础数学(4)——线性回归复习

文章目录课程回顾基础知识回归模型的建模过程一元线性回归模型线性回顾进行极大似然估计&#xff08;例题&#xff08;必考&#xff09;&#xff09;极大似然估计极大似然估计的性质线性性无偏性最优性&#xff08;记住即可&#xff0c;没有推导&#xff09;方差计算一元线性回…

智能制造 | AIRIOT智慧工厂管理解决方案

工厂生产运转中&#xff0c;设备数量多&#xff0c;环境复杂、企业往往需要承担很高的维修、保养、备件和人力成本。传统的工厂改革遇到了诸多前所未有的挑战&#xff1a; 1、管理系统较多&#xff0c;数据隔离&#xff0c;系统集成困难重重&#xff1b; 2、大量老旧设备无法联…

QT使用log4cpp日志库

文章目录QT使用log4cpp日志库1. 从官网下载log4cpp源码2. 编译项目3. 在QT中使用log4cpp4. log4cpp4.1. Category4.2. Appender4.3. Layout4.4. Priority4.5. 使用宏定义为日志加上文件名 函数名 行号等QT使用log4cpp日志库 1. 从官网下载log4cpp源码 log4cpp官方网址 下载后…

MyBatisPlus ---- 常用注解

MyBatisPlus ---- 常用注解1. TableNamea>问题b>通过TableName解决问题c>通过全局配置解决问题2. Tablelda>问题b>通过TableId解决问题c>TableId的value属性d>TableId的type属性e>雪花算法3. TableFielda>情况1b>情况24. TableLogica>逻辑删除…