arm版Linux下安装大数据集群各种组件

news2025/1/19 8:16:06

背景:由于本人是用的Macbookpro m2来进行开发的,很多环境和Intel芯片的都不一样,期间安装各种软件遇到各种问题,为了以后不走之前的老路,现记录各种软件的安装步骤。

系统安装组件说明
序号组件名称组件版本
1jdkjdk-8u361-linux-aarch64.tar.gz
2zookeeperapache-zookeeper-3.6.4-bin.tar.gz
3kafkakafka_2.13-3.6.2.tgz
3flumeapache-flume-1.11.0-bin.tar.gz
一、kafka的安装

下载地址:https://kafka.apache.org/downloads

1.在安装目录下解压
tar -zxvf kafka_2.13-3.6.2.tgz
2.新建一个日志目录,最好不用默认的路径
mkdir -p /usr/local/soft/kafka/kafka_2.13-3.6.2/log
2.进入config目录下,编辑配置文件server.properties中log.dirs为自己新建好的日志目录
log.dirs=/usr/local/soft/kafka/kafka_2.13-3.6.2/log
3.进入目录下,执行启动命令,最好是带着配置文件启动

启动命令

./kafka-server-start.sh ../config/server.properties

–执行kafka的启动命令
./kafka-server-start.sh …/config/server.properties
报错:

/usr/local/soft/kafka/kafka_2.13-3.6.2/bin/kafka-run-class.sh:346 :exec: java: 未找到

报错原因:没有找到jdk
解决方案:安装一个jdk,然后再启动命令,发现jdk的报错解决了

启动命令报错:
./kafka-server-start.sh …/config/server.properties

报错如下:

Error: VM option 'UseG1GC' is experimental and must be enabled via -XX:+UnlockExperimentalVMOptions.
Error: Could not create the Java Virtual Machine.
Error: A fatal exception has occurred. Program will exit.

解决方案:
编辑bin/kafka-run-class.sh,删除-XX:+UseG1GC 这个地方,然后重启启动

然后报错如下;

[2024-04-18 22:22:54,936] WARN Session 0x0 for server localhost/127.0.0.1:2181, Closing socket connection. Attempting reconnect except it is a SessionExpiredException. (org.apache.zookeeper.ClientCnxn)
java.net.ConnectException: 拒绝连接
        at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
        at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:715)
        at org.apache.zookeeper.ClientCnxnSocketNIO.doTransport(ClientCnxnSocketNIO.java:344)
        at org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1289)
[2024-04-18 22:22:56,108] INFO Opening socket connection to server localhost/0:0:0:0:0:0:0:1:2181. (org.apache.zookeeper.ClientCnxn)
[2024-04-18 22:22:56,110] WARN Session 0x0 for server localhost/0:0:0:0:0:0:0:1:2181, Closing socket connection. Attempting reconnect except it is a SessionExpiredException. (org.apache.zookeeper.ClientCnxn)
java.net.ConnectException: 拒绝连接
        at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
        at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:715)
        at org.apache.zookeeper.ClientCnxnSocketNIO.doTransport(ClientCnxnSocketNIO.java:344)
        at org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1289)
[2024-04-18 22:22:57,296] INFO Opening socket connection to server localhost/127.0.0.1:2181. (org.apache.zookeeper.ClientCnxn)
[2024-04-18 22:22:57,298] WARN Session 0x0 for server localhost/127.0.0.1:2181, Closing socket connection. Attempting reconnect except it is a SessionExpiredException. (org.apache.zookeeper.ClientCnxn)
java.net.ConnectException: 拒绝连接

仔细看上面的报错,没有连接上zookeeper,那就安装一个zookeeper试试,安装并且启动zookeeper后发现kafka正常启动了

[2024-04-19 12:07:44,008] ERROR Fatal error during KafkaServer startup. Prepare to shutdown (kafka.server.KafkaServer)
kafka.common.InconsistentClusterIdException: The Cluster ID zO_QpRQ_Sj2ba8vgKw2Ntw doesn't match stored clusterId Some(yBFG2q0yRqGofaIr5862_Q) in meta.properties. The broker is trying to join the wrong cluster. Configured zookeeper.connect may be wrong.
        at kafka.server.KafkaServer.startup(KafkaServer.scala:244)
        at kafka.Kafka$.main(Kafka.scala:113)
        at kafka.Kafka.main(Kafka.scala)
[2024-04-19 12:07:44,009] INFO shutting down (kafka.server.KafkaServer)
[2024-04-19 12:07:44,011] INFO [ZooKeeperClient Kafka server] Closing. (kafka.zookeeper.ZooKeeperClient)
[2024-04-19 12:07:44,114] INFO Session: 0x100031457e00001 closed (org.apache.zookeeper.ZooKeeper)
[2024-04-19 12:07:44,114] INFO EventThread shut down for session: 0x100031457e00001 (org.apache.zookeeper.ClientCnxn)
[2024-04-19 12:07:44,115] INFO [ZooKeeperClient Kafka server] Closed. (kafka.zookeeper.ZooKeeperClient)
[2024-04-19 12:07:44,117] INFO App info kafka.server for 0 unregistered (org.apache.kafka.common.utils.AppInfoParser)
[2024-04-19 12:07:44,117] INFO shut down completed (kafka.server.KafkaServer)
[2024-04-19 12:07:44,118] ERROR Exiting Kafka due to fatal exception during startup. (kafka.Kafka$)
kafka.common.InconsistentClusterIdException: The Cluster ID zO_QpRQ_Sj2ba8vgKw2Ntw doesn't match stored clusterId Some(yBFG2q0yRqGofaIr5862_Q) in meta.properties. The broker is trying to join the wrong cluster. Configured zookeeper.connect may be wrong.
        at kafka.server.KafkaServer.startup(KafkaServer.scala:244)
        at kafka.Kafka$.main(Kafka.scala:113)
        at kafka.Kafka.main(Kafka.scala)
[2024-04-19 12:07:44,118] INFO shutting down (kafka.server.KafkaServer)

报错原因:由于kafka重复启动或者非正常关闭造成的
解决方案:
方法一
在server.properties 配置文件里面 找到 log.dirs 配置的路径,在该路径下找到meta.properties文件,按照报错提示,将meta.properties文件里面的cluster.id修改为报错提示的Cluster ID,重新启动kafka。

方法二
在server.properties 配置文件里面 找到 log.dirs 配置的路径,将该路径下的文件全部删除,重新启动kafka。

./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test_kafka

Exception in thread "main" joptsimple.UnrecognizedOptionException: zookeeper is not a recognized option
        at joptsimple.OptionException.unrecognizedOption(OptionException.java:108)
        at joptsimple.OptionParser.handleLongOptionToken(OptionParser.java:510)
        at joptsimple.OptionParserState$2.handleArgument(OptionParserState.java:56)
        at joptsimple.OptionParser.parse(OptionParser.java:396)
        at kafka.admin.TopicCommand$TopicCommandOptions.<init>(TopicCommand.scala:558)
        at kafka.admin.TopicCommand$.main(TopicCommand.scala:49)
        at kafka.admin.TopicCommand.main(TopicCommand.scala)

报错原因:
因为kafka的版本是2.8+,不需要依赖zookeeper创建主题

改用命令 --bootstrap-server

./kafka-topics.sh --bootstrap-server localhost:9092 --create --replication-factor 1 --partitions 1 --topic test_kafka
WARNING: Due to limitations in metric names, topics with a period ('.') or underscore ('_') could collide. To avoid issues it is best to use either, but not both.
Created topic test_kafka.

查看topic的命令

./kafka-topics.sh --bootstrap-server localhost:9092 --list 
test_kafka

创建生产者

./kafka-console-producer.sh --broker-list localhost:9092 --topic test_kafka

创建消费者

./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test_kafka -from-beginning
二、zookeeper的安装
1、zookeeper的下载

https://archive.apache.org/dist/zookeeper/zookeeper-3.6.4/

2、解压压缩包
tar -zxvf apache-zookeeper-3.6.4-bin.tar.gz
3、在解压后的目录下,创建一个文件夹zookeeper_data
mkdir -p /usr/local/soft/zookeeper/apache-zookeeper-3.6.4-bin/zookeeper_data
4、在config目录下,创建一个配置文件zoo.cfg
cp zoo_sample.cfg zoo.cfg
5、编辑配置文件zoo.cfg,将参数dataDir后面改为上面新创建的路径
dataDir=/usr/local/soft/zookeeper/apache-zookeeper-3.6.4-bin/zookeeper_data
6、在bin目录下,启动命令:
 ./zkServer.sh start
7、查看进程,验证zk是否正常启动了
jps

其实,启动kafka的试试,不一定非要用外部的zookeeper,也可以用他自带的,
./zookeeper-server-start.sh -daemon …/config/zookeeper.properties

三、flume的安装
1、flume的下载地址

https://flume.apache.org/download.html
我们一般下载二进制的就可以了
在这里插入图片描述

2、解压压缩包

命令如下:

tar -zxvf apache-flume-1.11.0-bin.tar.gz
3、在conf目录下新建一个文件,并且配置相关参数

新建文件的命令如下:

cp flume-env.sh.template flume-env.sh

编辑该文件

#修改 JM 配置
export JAVA_OPTS="-Xms4096m -Xmx4096m -Dcom.sun.management.jmxremote"
4、新建目录job,然后开始编辑配置文件(读取文件到kafka的配置文件)

新建的目录job

/usr/local/soft/flume/apache-flume-1.11.0-bin/job  

编辑配置文件:vim file_to_kafka.conf
后面再补充flume的配置文件的参数详解

参数名称详解
#定义组件
a1.sources = r1
a1.channels =c1
#配置source
a1.sources.r1.type = TAILDIR
a1.sources.r1.filegroups = f1
#日志(数据)文件
#a1.sources.rl.filegroups.f1=/opt/module/data/test.logal.sources.rl.positionFile =/opt/module/flume/taildir position.json
a1.sources.r1.filegroups.f1=/usr/local/soft/flume_to_kafka/test.log
a1.sources.r1.positionFile =/usr/local/soft/flume/taildir_position.json
#配置channel
#采用Kafka Channel,省去了Sink,提高了效率
#a1.channels.cl.type = org.apache.flume.channel.kafka.KafkaChannela1.channels.cl,kafka,bootstrap,servers = kafka-broker1:9092,kafka-broker2:9092,kafka-broker3:9092
#a1.channels.cl.type = org.apache.flume.channel.kafka.KafkaChannela1.channels.cl,kafka,bootstrap,servers = kafka-broker1:9092
a1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel
a1.channels.c1.kafka.bootstrap.servers = xxxxx:9092
a1.channels.c1.kafka.topic = test_flume
a1.channels.c1.parseAsFlumeEvent = false
#组装
a1.sources.r1.channels =c1
5、新建数据文件的目录,如上面配置中的这个路径下
/usr/local/soft/flume_to_kafka/test.log
6、在bin目录下启动flume
./flume-ng agent -n a1 -c ../conf/ -f ../job/file_to_kafka.conf
7、查看kafka中是否接收到了数据,在可视化客户端上

发现已经生成相应的topic和对应的数据了
在这里插入图片描述

三、flink的安装

在Java代码中引用flink做wordcount

<dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java</artifactId>
            <version>1.17.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients</artifactId>
            <version>1.17.0</version>
</dependency>

先创建一个可读取的TXT文件

hello world
hello wzx
hello flink

需要注意的是,这种代码的实现方式,是基于 DataSet API的,也就是我们对数据的处理转换,是看作数据集来进行操作的。事实上Flink本身是流批统一的处理架构,批量的数据集本质上也是流,没有必要用两套不同的API来实现。所以从Flink1.12开始,官方推荐的做法是直接使用 DataStreamAPI,在提交任务时通过将执行模式设为BATCH来进行批处理:↔$ bin/flink run -Dexecution.runtime-mode=BATCH BatchWordCount.jar<这样,DataSetAPI就没什么用了,在实际应用中我们只要维护一套 DataStream Ap! 就可以。这里只是为了方便大家理解,我们依然用 DataSetAPI做了批处理的实现。

package com.ruoyi.web.controller.soft.flink;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.AggregateOperator;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.operators.FlatMapOperator;
import org.apache.flink.api.java.operators.UnsortedGrouping;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;

/**
 * @author wuzhanxi
 * @create 2024/4/22 21:44
 */
public class FlinkController {

    //
    public static void main(String[] args) throws Exception {

        // 1.创建集群环境
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

        // 2.读取数据,从文件中读取
        DataSource<String> lineDs = env.readTextFile("input/input.txt");

        FlatMapOperator<String, Tuple2<String, Integer>> wordList = lineDs.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public void flatMap(String value, Collector<Tuple2<String, Integer>> collector) throws Exception {
                // 按照空格进行切分
                String[] words = value.split(" ");
                //
                for (String word : words) {
                    Tuple2<String, Integer> wordTupe2 = Tuple2.of(word, 1);
                    //
                    collector.collect(wordTupe2);
                }
            }
        });

        // 3.切分、转换

        // 4.按照单词分组
        UnsortedGrouping<Tuple2<String, Integer>> wordgroup = wordList.groupBy(0);
        // 5.各分组内聚合,这里的1是数值 1
        AggregateOperator<Tuple2<String, Integer>> wordsum = wordgroup.sum(1);
        // 6.输出
        wordsum.print();
    }
}

输出如下;

(wzx,1)
(flink,1)
(world,1)
(hello,3)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1622517.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

2024年电子商务与大数据经济国际会议 (EBDE 2024)

2024年电子商务与大数据经济国际会议 (EBDE 2024) 2024 International Conference on E-commerce and Big Data Economy 【会议简介】 2024年电子商务与大数据经济国际会议即将在厦门召开。本次会议旨在汇聚全球电子商务与大数据经济领域的专家学者&#xff0c;共同探讨电子商务…

基于MaxKB搭建一个知识库问答系统

什么是MaxKB MaxKB 是一款基于 LLM 大语言模型的知识库问答系统。MaxKB Max Knowledge Base&#xff0c;旨在成为企业的最强大脑。 开箱即用&#xff1a;支持直接上传文档、自动爬取在线文档&#xff0c;支持文本自动拆分、向量化&#xff0c;智能问答交互体验好&#xff1b…

深度学习| 注意力机制

注意力机制 为什么需要注意力机制Seq2Seq问题Transfomer Attention注意力机制分类软硬注意力注意力域 为什么需要注意力机制 这个可以从NLP的Seq2Seq问题来慢慢理解。 Seq2Seq问题 Seq2Seq&#xff08;Sequence to Sequence&#xff09;&#xff1a;早期很多模型中&#xff…

扭蛋机小程序对市场的发展有哪些推动作用?

近几年&#xff0c;扭蛋机发展的非常迅猛。随着二次元文化的火热&#xff0c;给扭蛋机带来了发展机遇&#xff0c;扭蛋机行业也受到了大众的喜爱。扭蛋机的商品种类多样化&#xff0c;包含了各类热门IP周边衍生品、玩具、小商品等&#xff0c;适合所有消费人群&#xff0c;市场…

2024年汉字小达人活动还有5个月开赛:来做18道历年选择题备考吧

现在距离2024年第11届汉字小达人比赛还有五个多月的时间&#xff0c;如何利用这段时间有条不紊地备考呢&#xff1f;我的建议是两手准备&#xff1a;①把小学1-5年级的语文课本上的知识点熟悉&#xff0c;重点是字、词、成语、古诗。阅读理解不需要。②把历年真题刷刷熟&#x…

【漏洞复现】云时空社会化商业ERP系统slogin SQL注入漏洞

漏洞描述&#xff1a; 云时空社会化商业ERP系统slogin存在SQL注入漏洞&#xff0c;攻击者可以通过此漏洞获取数据库敏感信息。 搜索语法: Fofa-Query: app"云时空社会化商业ERP系统" 漏洞详情&#xff1a; 1.云时空社会化商业ERP系统。 2.漏洞POC&#xff1a; …

浏览器和nodejs中的eventloop

浏览器和nodejs中的eventloop 浏览器中的Event Loop 在浏览器中&#xff0c;设计成为了单线程。如果要处理异步请求&#xff0c;则需要增加一层调度逻辑&#xff0c;把js代码封装成一个个的任务&#xff0c;放在一个任务队列中&#xff0c;主线程不断的读取任务执行。每次调取…

IDEA2023版本创建Sping项目无法使用Java8

1. 问题复现 1.1 当前版本2023.3.2 1.2 创建项目时&#xff1a;不存在jdk8选项 提示报错 1.3 原因分析 Spring官方发布Spring Boot 3.0.0 的时候告知了一些情况&#xff0c;Java 17将成为未来的主流版本 2. 如何解决 2.1 替换创建项目的源 我们只知道IDEA页面创建Spring项目…

玩转PyCharm

玩转PyCharm PyCharm是由JetBrains公司开发的提供给Python专业的开发者的一个集成开发环境&#xff0c;它最大的优点是能够大大提升Python开发者的工作效率&#xff0c;为开发者集成了很多用起来非常顺手的功能&#xff0c;包括代码调试、高亮语法、代码跳转、智能提示、自动补…

历史遗留问题-Oracle 19c RAC 安装时节点连接性问题

测试服务器的二节点数据库宕掉了&#xff0c;原因不明&#xff0c;需要产环境重新安装。我想上次在自己虚拟机安装实验过一次&#xff0c;应该一天能搞定&#xff0c;事实证明&#xff0c;你永远有学不完的bug&#xff01;&#xff01;&#xff01;&#xff01; 首先查看一下系…

python 调试 c++源码

1. gdb常用调试命令概览和说明 2. 编译c库设置Debug模式 cmake设置debug 在CMake中设置debug模式通常意味着启用调试信息和优化。以下是一个简单的CMakeLists.txt文件示例&#xff0c;展示了如何设置项目以便在Debug模式下构建&#xff1a; cmake_minimum_required(VERSION 3…

【WEEK9】 【DAY3】JSR303数据校验及多环境切换【中文版】

2024.4.24 Wednesday 目录 4.JSR303数据校验及多环境切换4.1.JSR303数据校验&#xff08;了解即可&#xff09;4.1.1.修改Person.java4.1.2.修改pom.xml&#xff08;添加依赖&#xff09;4.1.3.运行Springboot02ConfigApplicationTests.java进行测试4.1.4.使用数据校验&#x…

HotSpot JVM 中的应用程序/动态类数据共享

0.前言 本文的目的是详细讨论 HotSpot JVM 自 JDK 1.5 以来提供的一项功能&#xff0c;该功能可以减少启动时间&#xff0c;但如果在多个 JVM 之间共享相同的类数据共享 (CDS) 存档&#xff0c;则还可以减少内存占用。 1.类数据共享 (CDS) CDS 的想法是使用特定格式将预处理…

【氮化镓】液态Ga在GaN(0001)和(0001̅)表面上的三维有序排列随温度的变化

文章标题是《Temperature dependence of liquid-gallium ordering on the surface of epitaxially grown GaN》&#xff0c;作者是Takuo Sasaki等人&#xff0c;发表在《Applied Physics Express》上。文章主要研究了在分子束外延(MBE)条件下&#xff0c;液态镓(Ga)在GaN(0001)…

探索在Apache SeaTunnel上使用Hudi连接器,高效管理大数据的技术

Apache Hudi是一个数据湖处理框架&#xff0c;通过提供简单的方式来进行数据的插入、更新和删除操作&#xff0c;Hudi能够帮助数据工程师和科学家更高效地处理大数据&#xff0c;并支持实时查询。 支持的处理引擎 Spark Flink SeaTunnel Zeta 主要特性 批处理 流处理 精确一次性…

状态模式和策略模式对比

状态模式和策略模式都是行为型设计模式&#xff0c;它们的主要目标都是将变化的行为封装起来&#xff0c;使得程序更加灵活和可维护。之所以将状态模式和策略模式进行比较&#xff0c;主要是因为两个设计模式的类图相似度较高。但是&#xff0c;从状态模式和策略模式的应用场景…

2024最新版JavaScript逆向爬虫教程-------基础篇之深入JavaScript运行原理以及内存管理

目录 一、JavaScript运行原理1.1 前端需要掌握的三大技术1.2 为什么要学习JavaScript1.3 浏览器的工作原理1.4 浏览器的内核1.5 浏览器渲染过程1.6 认识JavaScript引擎1.7 V8引擎以及JavaScript的执行过程1.8 V8引擎执行过程 二、JavaScript的执行过程2.1 初始化全局对象2.2 执…

解决宏定义后面无法加分号

总结&#xff1a;注意是针对单行if语句使用&#xff0c;并且宏定义后面必须带分号&#xff08;格式统一&#xff09; 参考连接 C语言种do_while(0)的妙用_哔哩哔哩_bilibilihttps://www.bilibili.com/video/BV1vk4y1R7VJ/?spm_id_from333.337.search-card.all.click&vd_…

Excel数据处理:动态数据分析报表、单元格数字格式、使用排序工具

1、在生成数据透视表之后选中一个单元格&#xff0c;点击插入&#xff0c;在图表中选择一个自己想要的图表。&#xff08;生成可视化的图表&#xff09; 2、在分析中找到切片器&#xff0c;通过点击切片器可以即时变换生成不同的可视化图&#xff0c;可以右键切片器选择关联两个…

matlab 对数坐标画图,及在曲线上加竖直线

matlab 对数坐标画图 方法一&#xff1a;直接对x、y值取对数&#xff0c;然后画图 plot(log(x), log(y), m, LineWidth,1, Marker,.);% ,Color,#EDB120 方法二&#xff1a;将x、y轴刻度改为对数形式 plot(x, y, r, LineWidth,1, Marker,); ax gca();% 获取当前坐标句柄 ax…