尚硅谷大数据项目《在线教育之采集系统》笔记002

news2025/1/9 1:18:50

视频地址:尚硅谷大数据项目《在线教育之采集系统》_哔哩哔哩_bilibili

目录

P032

P033

P033

P034

P035

P036


P032

P033

# 1、定义组件,为各组件命名
a1.sources = r1
a1.channels = c1
a1.sinks - k1

# 2、配置sources,描述source
a1.sources.r1.type = TAILDIR
a1.sources.r1.filegroups = f1
a1.sources.r1.filegroups.f1 = /opt/module/data_mocker/01-onlineEducation/log/app.*
a1.sources.r1.positionFile = /opt/module/flume/flume-1.9.0/taildir_position.json
a1.sources.r1.batchSize = 100

# 3、配置channels,描述channel
a1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel
a1.channels.c1.kafka.bootstrap.servers = node001:9092,node002:9092,node003:9092
a1.channels.c1.kafka.topic = topic_log
a1.channels.c1.parseAsFlumeEvent = false

# 4、组装,绑定source和channel以及sink和channel的关系
a1.sources.r1.channels = c1

P033

2023-07-26 11:13:42,136 (kafka-producer-network-thread | producer-1) [WARN - org.apache.kafka.clients.NetworkClient.processDisconnection(NetworkClient.java:671)] [Producer clientId=producer-1] Connection to node -1 could not be established. Broker may not be available.
2023-07-26 11:13:42,139 (kafka-producer-network-thread | producer-1) [WARN - org.apache.kafka.clients.NetworkClient.processDisconnection(NetworkClient.java:671)] [Producer clientId=producer-1] Connection to node -3 could not be established. Broker may not be available.
2023-07-26 11:13:42,241 (kafka-producer-network-thread | producer-1) [WARN - org.apache.kafka.clients.NetworkClient.processDisconnection(NetworkClient.java:671)] [Producer clientId=producer-1] Connection to node -2 could not be established. Broker may not be available.
2023-07-26 11:13:43,157 (kafka-producer-network-thread | producer-1) [WARN - org.apache.kafka.clients.NetworkClient.processDisconnection(NetworkClient.java:671)] [Producer clientId=producer-1] Connection to node -3 could not be established. Broker may not be available.
2023-07-26 11:13:43,164 (kafka-producer-network-thread | producer-1) [WARN - org.apache.kafka.clients.NetworkClient.processDisconnection(NetworkClient.java:671)] [Producer clientId=producer-1] Connection to node -2 could not be established. Broker may not be available.

[2023-07-26 11:03:06,989] INFO Opening socket connection to server node002/192.168.10.102:2181. (org.apache.zookeeper.ClientCnxn)
[2023-07-26 11:03:06,989] INFO SASL config status: Will not attempt to authenticate using SASL (unknown error) (org.apache.zookeeper.ClientCnxn)
[2023-07-26 11:03:06,992] WARN Session 0x0 for sever node002/192.168.10.102:2181, Closing socket connection. Attempting reconnect except it is a SessionExpiredException. (org.apache.zookeeper.ClientCnxn)
java.net.ConnectException: 拒绝连接
    at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
    at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
    at org.apache.zookeeper.ClientCnxnSocketNIO.doTransport(ClientCnxnSocketNIO.java:344)
    at org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1290)

flume生效!

node001
启动hadoop、zookeeper、kafka,再启动flume。

[atguigu@node001 ~]$ cd /opt/module/flume/flume-1.9.0/
[atguigu@node001 flume-1.9.0]$ bin/flume-ng agent -n a1 -c conf/ -f job/file_to_kafka.conf
Info: Sourcing environment configuration script /opt/module/flume/flume-1.9.0/conf/flume-env.sh
Info: Including Hadoop libraries found via (/opt/module/hadoop/hadoop-3.1.3/bin/hadoop) for HDFS access
Info: Including Hive libraries found via () for Hive access
...
[atguigu@node001 ~]$ jpsall
================ node001 ================
6368 NodeManager
5793 NameNode
2819 QuorumPeerMain
6598 JobHistoryServer
5960 DataNode
6681 Application
4955 Kafka
7532 Jps
================ node002 ================
4067 NodeManager
2341 Kafka
3942 ResourceManager
4586 ConsoleConsumer
5131 Jps
1950 QuorumPeerMain
3742 DataNode
================ node003 ================
3472 NodeManager
3235 DataNode
1959 QuorumPeerMain
3355 SecondaryNameNode
2347 Kafka
3679 Jps
[atguigu@node001 ~]$ 
[atguigu@node002 ~]$ kafka-console-consumer.sh --bootstrap-server node001:9092 --topic topic_log
[atguigu@node001 ~]$ mock.sh
[atguigu@node001 ~]$ 

P034

# /opt/module/flume/flume-1.9.0/job


# 1、定义组件,为各组件命名
a1.sources = r1
a1.channels = c1
a1.sinks - k1


# 2、配置sources,描述source
a1.sources.r1.type = TAILDIR
a1.sources.r1.filegroups = f1
a1.sources.r1.filegroups.f1 = /opt/module/data_mocker/01-onlineEducation/log/app.*
a1.sources.r1.positionFile = /opt/module/flume/flume-1.9.0/taildir_position.json
a1.sources.r1.batchSize = 100

a1.sources.r1.interceptors =  i1
a1.sources.r1.interceptors.i1.type = com.atguigu.flume.interceptor.ETLInterceptor$Builder


# 3、配置channels,描述channel
a1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel
a1.channels.c1.kafka.bootstrap.servers = node001:9092,node002:9092,node003:9092
a1.channels.c1.kafka.topic = topic_log
a1.channels.c1.parseAsFlumeEvent = false


# 4、组装,绑定source和channel以及sink和channel的关系
a1.sources.r1.channels = c1
package com.atguigu.flume.interceptor;

import com.atguigu.flume.interceptor.utils.JSONUtil;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;

import java.nio.charset.StandardCharsets;
import java.util.Iterator;
import java.util.List;

public class ETLInterceptor implements Interceptor {
    @Override
    public void initialize() {

    }

    /**
     * 过滤掉脏数据(不完整的json)
     *
     * @param event
     * @return
     */
    @Override
    public Event intercept(Event event) {
        //1、获取body当中的数据
        byte[] body = event.getBody();
        String log = new String(body, StandardCharsets.UTF_8);

        //2、判断数据是否为完整的json
        if (JSONUtil.isJSONValidate(log)) {
            return event;
        }

        return null;
    }

    @Override
    public List<Event> intercept(List<Event> list) {
        Iterator<Event> iterator = list.iterator();
        while (iterator.hasNext()) {
            Event event = iterator.next();
            if (intercept(event) == null) {
                iterator.remove();
            }
        }
        return list;
    }

    @Override
    public void close() {

    }

    public static class Builder implements Interceptor.Builder {
        @Override
        public Interceptor build() {
            return new ETLInterceptor();
        }

        @Override
        public void configure(Context context) {
        }
    }
}
package com.atguigu.flume.interceptor.utils;

import com.alibaba.fastjson.JSONObject;

public class JSONUtil {
    public static boolean isJSONValidate(String log) {
        try {
            JSONObject.parseObject(log);
            return true;
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }
    }
}
[atguigu@node001 log]$ echo '{"id":1}' >> app.log
[atguigu@node001 log]$ echo '{"id": }' >> app.log
[atguigu@node001 log]$ echo '{"id":2}' >> app.log
[atguigu@node001 log]$ 

P035

#! /bin/bash

case $1 in
"start"){
        for i in hadoop102 hadoop103
        do
                echo " --------启动 $i 采集flume-------"
                ssh $i "nohup /opt/module/flume/bin/flume-ng agent --conf-file /opt/module/flume/job/file-flume-kafka.conf --name a1 -Dflume.root.logger=INFO,LOGFILE >/opt/module/flume/log1.txt 2>&1  &"
        done
};;	
"stop"){
        for i in hadoop102 hadoop103
        do
                echo " --------停止 $i 采集flume-------"
                ssh $i "ps -ef | grep file-flume-kafka | grep -v grep |awk  '{print \$2}' | xargs -n1 kill -9 "
        done

};;
esac
#! /bin/bash

case $1 in
"start"){
	for i in node001 node002
	do
		echo " --------启动 $i 采集flume-------"
		ssh $i "nohup /opt/module/flume/flume-1.9.0/bin/flume-ng agent --conf-file /opt/module/flume/flume-1.9.0/job/file_to_kafka.conf --name a1 -Dflume.root.logger=INFO,LOGFILE >/opt/module/flume/flume-1.9.0/log1.txt 2>&1 &"
	done
};;	
"stop"){
	for i in node001 node002
	do
		echo " --------停止 $i 采集flume-------"
		ssh $i "ps -ef | grep file-flume-kafka | grep -v grep |awk '{print \$2}' | xargs -n1 kill -9 "
	done
};;
esac
#! /bin/bash

case $1 in
"start") {
	echo " --------采集flume启动-------"
	ssh node001 "nohup /opt/module/flume/flume-1.9.0/bin/flume-ng agent -n a1 -c /opt/module/flume/flume-1.9.0/conf/ -f /opt/module/flume/flume-1.9.0/job/file_to_kafka.conf >/dev /null 2>&1 &"
};;	
"stop") {
	echo " --------采集flume关闭-------"
	ssh node001 "ps -ef | grep file_to_kafka | grep -v grep | awk '{print \$2}' | xargs -n1 kill -9"
};;
esac

P036

## 1、定义组件
a1.sources = r1
a1.channels = c1
a1.sinks = k1


## 2、配置sources
a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.r1.kafka.bootstrap.servers = node001:9092,node002:9092,node003:9092
a1.sources.r1.kafka.consumer.group.id = topic_log
a1.sources.r1.kafka.topics = topic_log
a1.sources.r1.batchSize = 1000
a1.sources.r1.batchDurationMillis = 1000
a1.sources.r1.useFlumeEventFormat = false


## 3、配置channels
a1.channels.c1.type = file
a1.channels.c1.checkpointDir = /opt/module/flume/flume-1.9.0/checkpoint/behavior1
a1.channels.c1.useDualCheckpoints = false
a1.channels.c1.dataDirs = /opt/module/flume/flume-1.9.0/data/behavior1/
a1.channels.c1.capacity = 1000000
a1.channels.c1.maxFileSize = 2146435071
a1.channels.c1.keep-alive = 3


## 4、配置sinks
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = /origin_data/edu/log/edu_log/%Y-%m-%d
a1.sinks.k1.hdfs.filePrefix = log
a1.sinks.k1.hdfs.round = false


## 控制输出文件是原生文件。
a1.sinks.k1.hdfs.fileType = CompressedStream
a1.sinks.k1.hdfs.codeC = gzip

#a1.sinks.k1.hdfs.rollInterval = 10
#a1.sinks.k1.hdfs.rollSize = 134217728
#a1.sinks.k1.hdfs.rollCount = 0


## 5、组装 拼装
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
[atguigu@node001 ~]$ cd /opt/module/flume/flume-1.9.0/
[atguigu@node001 flume-1.9.0]$ bin/flume-ng agent -n a1 -c conf/ -f job/kafka_to_hdfs_log.conf 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/798793.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【已解决】 Celery 报错:AttributeError: ‘EntryPoints‘ object has no attribute ‘get‘

【已解决】 Celery 报错&#xff1a;AttributeError: EntryPoints object has no attribute get 1、起因2、实验环境3、解决方案 1、起因 今天闲来无事学习 Celery 分布式任务队列&#xff0c;写好代码发布并执行&#xff0c;报错了 AttributeError: EntryPoints object has n…

芯片竞争总是一个王朝颠覆另一个王朝,壁仞科技会是下一个么?

AI的水位渐高&#xff0c;过河的船也随之身价暴涨。 刚刚解锁万亿美元市值的英伟达&#xff0c;是AI芯片产业中少有的“豪华游轮”。根据澎湃新闻的报道&#xff0c;受到禁售消息的影响&#xff0c;A800芯片近期价格出现大幅上涨。事实上&#xff0c;A800只是A100的替代&#…

QT【day3】

思维导图&#xff1a; 闹钟&#xff1a; //widget.h #ifndef WIDGET_H #define WIDGET_H#include <QWidget> #include<QTimerEvent> #include<QTimer> #include<QTime> //时间类 #include<QPushButton> //按钮类头文件 #include<QDebug&…

论文笔记--GloVe: Global Vectors for Word Representation

论文笔记--GloVe: Global Vectors for Word Representation 1. 文章简介2. 文章概括3 文章重点技术3.1 两种常用的单词向量训练方法3.2 GloVe3.3 模型的复杂度 4. 文章亮点5. 原文传送门6. References 1. 文章简介 标题&#xff1a;GloVe: Global Vectors for Word Representa…

使用阿里云OSS+PicGo搭建图床

需求&#xff1a;对于写博客来说&#xff0c;图片确实是一个重要的问题。对于大量图片的上传&#xff0c;手动操作确实会非常耗时。为此借助图床提高写博客时处理图片的效率。 1. 阿里云OSS 阿里云对象存储服务OSS&#xff08;Object Storage Service&#xff09;&#xff1a;是…

1000*B. Buttons

#include<bits/stdc.h> using namespace std; typedef long long ll; int n,sum; int main(){scanf("%d",&n);for(int i1;i<n-1;i) sumi*(n-i);cout<<sumn;return 0; }

进阶高级测试专项,Pytest自动化测试框架总结(一)

目录&#xff1a;导读 前言一、Python编程入门到精通二、接口自动化项目实战三、Web自动化项目实战四、App自动化项目实战五、一线大厂简历六、测试开发DevOps体系七、常用自动化测试工具八、JMeter性能测试九、总结&#xff08;尾部小惊喜&#xff09; 前言 1、框架简介 pyt…

Cesium态势标绘专题-进攻箭头(标绘+编辑)

标绘专题介绍:态势标绘专题介绍_总要学点什么的博客-CSDN博客 入口文件:Cesium态势标绘专题-入口_总要学点什么的博客-CSDN博客 辅助文件:Cesium态势标绘专题-辅助文件_总要学点什么的博客-CSDN博客 本专题没有废话,只有代码,代码中涉及到的引入文件方法,从上面三个链…

01背包、完全背包问题几种变式总结,以及多重背包、组合背包模板

目录 1.求有多少种方法能恰好装满背包 1.1装满背包的方法——按排列计算还是按组合计算&#xff1f; 2.最值问题——最少需要几枚硬币,货物的最大价值 2.1最少需要几枚硬币 2.1.1 memset用法注意 3.二维01背包问题 4.多重背包问题 4.1优化前 4.2二进制优化 1.求有多少…

C++STL库中的list

文章目录 list的介绍及使用 list的常用接口 list的模拟实现 list与vector的对比 一、list的介绍及使用 1. list是可以在常数范围内在任意位置进行插入和删除的序列式容器&#xff0c;并且该容器可以前后双向迭代。 2. list的底层是双向带头循环链表结构&#xff0c;双向带头循…

数据库对象

二十、数据库对象-视图 二十一、数据库对象-索引 age字段没有索引&#xff0c;查找需要扫描全表&#xff1a; name字段做了唯一索引&#xff0c;查找一次&#xff1a; 二十二、数据库对象-事务 事务的隔离级别和问题&#xff1a;

(链表) 剑指 Offer 52. 两个链表的第一个公共节点 ——【Leetcode每日一题】

❓剑指 Offer 52. 两个链表的第一个公共节点 难度&#xff1a;简单 输入两个链表&#xff0c;找出它们的第一个公共节点。 如下面的两个链表&#xff1a; 在节点 c1 开始相交。 示例 1&#xff1a; 输入&#xff1a;intersectVal 8, listA [4,1,8,4,5], listB [5,0,1,8…

三星GalaxyWatch放弃iOS:无法给用户一致的体验,还不如“丢掉”

昨晚&#xff0c;三星发布了全新的Galaxy Watch 6系列智能手表。然而&#xff0c;对于苹果手机用户来说&#xff0c;这个消息可能并不那么重要。因为从2021年开始&#xff0c;三星决定转向Wear OS系统&#xff0c;并计划在Galaxy Watch 4及以后的新款智能手表上采用该系统&…

Python基础语法第八章之使用库

目录 一、使用库 二、标准库 2.1认识标准库 2.2使用 import 导入模块 2.3第三方库 2.3.1认识第三方库 2.3.2使用 pip 一、使用库 库 就是是别人已经写好了的代码, 可以让我们直接拿来用. 按照库的来源, 可以大致分成两大类 标准库: Python 自带的库. 只要安装了 Pytho…

JavaEE——SpringMVC中的常用注解

目录 1、RestController &#xff08;1&#xff09;、Controller &#xff08;2&#xff09;、ResponseBody 2、RequestMappping &#xff08;1&#xff09;、定义 &#xff08;2&#xff09;、使用 【1】、修饰方法 【2】、修饰类 【3】、指定方法类型 【4】、简化版…

朝花夕拾思维导图怎么画?看看这种绘制方法

朝花夕拾思维导图怎么画&#xff1f;绘制思维导图的好处有很多&#xff0c;首先它可以帮助人们更好地组织和管理知识&#xff0c;提高工作效率和学习效果。其次&#xff0c;绘制思维导图可以帮助人们更好地记忆知识点和理解知识点。总之&#xff0c;绘制思维导图可以帮助人们更…

字符串函数介绍应用

字符串 1.前言 C语言中对字符和字符串的处理很是频繁&#xff0c;但是C语言本身是没有字符串类型的&#xff0c;字符串通常放在 常量字符串中或者字符数组中。 字符串常量适合于那些对他不做修改的函数。 2.库函数及其模拟实现 2.1 strlen函数 size_t strlen ( const char *…

机器学习深度学习——多层感知机的简洁实现

&#x1f468;‍&#x1f393;作者简介&#xff1a;一位即将上大四&#xff0c;正专攻机器学习的保研er &#x1f30c;上期文章&#xff1a;机器学习&&深度学习——多层感知机的从零开始实现 &#x1f4da;订阅专栏&#xff1a;机器学习&&深度学习 希望文章对你…

东南大学轴承故障诊断(Python代码,CNN模型,适合复合故障诊断研究)

运行代码要求&#xff1a; 代码运行环境要求&#xff1a;Keras版本>2.4.0&#xff0c;python版本>3.6.0 本次实验主要是在两种不同工况数据下&#xff0c;进行带有复合故障的诊断实验&#xff0c;没有复合故障的诊断实验。 实验结果证明&#xff0c;针对具有复合故障的…

Linux系统MySQL数据库的备份及应用

本节主要学习了MySQL数据库的备份&#xff1a;概念&#xff0c;数据备份的重要性&#xff0c;造成数据丢失的原因&#xff0c;备份的类型&#xff0c;常见的备份方法&#xff0c;实例与应用等。 目录 一、概述 二、数据备份的重要性 三、造成数据丢失的原因 四、备份类型 …