【智能大数据分析 | 实验三】Storm实验:实时WordCountTopology

news2024/12/23 22:28:35

在这里插入图片描述

【作者主页】Francek Chen
【专栏介绍】 ⌈ ⌈ 智能大数据分析 ⌋ ⌋ 智能大数据分析是指利用先进的技术和算法对大规模数据进行深入分析和挖掘,以提取有价值的信息和洞察。它结合了大数据技术、人工智能(AI)、机器学习(ML)和数据挖掘等多种方法,旨在通过自动化的方式分析复杂数据集,发现潜在的价值和关联性,实现数据的自动化处理和分析,从而支持决策和优化业务流程。与传统的人工分析相比,智能大数据分析具有自动化、深度挖掘、实时性和可视化等特点。智能大数据分析广泛应用于各个领域,包括金融服务、医疗健康、零售、市场营销等,帮助企业做出更为精准的决策,提升竞争力。
【GitCode】专栏资源保存在我的GitCode仓库:https://gitcode.com/Morse_Chen/Intelligent_bigdata_analysis。

文章目录

    • 一、实验目的
    • 二、实验要求
    • 三、实验原理
      • (一)Topologies
      • (二)Spouts
      • (三)Bolts
    • 四、实验环境
    • 五、实验内容和步骤
      • (一)启动 Storm 集群
      • (二)导入依赖 jar 包
      • (三)编写程序
      • (四)打包上传并运行
    • 六、实验结果
    • 七、实验心得


一、实验目的

掌握如何用 Java 代码来实现 Storm 任务的拓扑,掌握一个拓扑中 Spout 和 Bolt 的关系及如何组织它们之间的关系,掌握如何将 Storm 任务提交到集群。

二、实验要求

编写一个 Storm 拓扑,一个 Spout 每个一秒钟随机生成一个单词并发射给 Bolt,Bolt 统计接收到的每个单词出现的频率并每隔一秒钟实时打印一次统计结果,最后将任务提交到集群运行,并通过日志查看任务运行结果。

三、实验原理

Storm 集群和 Hadoop 集群表面上看很类似。但是 Hadoop 上运行的是 MapReduce jobs,而在 Storm 上运行的是拓扑(topology),这两者之间是非常不一样的。一个关键的区别是: 一个 MapReduce job 最终会结束, 而一个 topology 永远会运行(除非你手动 kill 掉)。

(一)Topologies

一个 topology 是spoutsbolts组成的图,通过 stream groupings 将图中的 spouts 和 bolts 连接起来,如图所示。

在这里插入图片描述
一个 topology 会一直运行直到你手动 kill 掉,Storm 自动重新分配执行失败的任务, 并且 Storm 可以保证你不会有数据丢失(如果开启了高可靠性的话)。如果一些机器意外停机它上面的所有任务会被转移到其他机器上。

运行一个 topology 很简单。首先,把你所有的代码以及所依赖的 jar 打进一个 jar 包。然后运行类似下面的这个命令:

storm jar all-my-code.jar backtype.storm.MyTopology arg1 arg2

这个命令会运行主类:backtype.strom.MyTopology,参数是arg1arg2。这个类的 main 函数定义这个 topology 并且把它提交给 Nimbus。storm jar负责连接到 Nimbus 并且上传 jar 包。

Topology 的定义是一个 Thrift 结构,并且 Nimbus 就是一个 Thrift 服务, 你可以提交由任何语言创建的 topology。上面的方面是用 JVM-based 语言提交的最简单的方法。

(二)Spouts

消息源 spout 是 Storm 里面一个 topology 里面的消息生产者。一般来说消息源会从一个外部源读取数据并且向 topology 里面发出消息:tuple。Spout 可以是可靠的也可以是不可靠的。如果这个 tuple 没有被 storm 成功处理,可靠的消息源 spouts 可以重新发射一个 tuple, 但是不可靠的消息源 spouts 一旦发出一个 tuple 就不能重发了。

消息源可以发射多条消息流 stream。使用OutputFieldsDeclarer.declareStream来定义多个 stream,然后使用SpoutOutputCollector来发射指定的 stream。

Spout类里面最重要的方法是nextTuple。要么发射一个新的 tuple 到 topology 里面或者简单的返回如果已经没有新的 tuple。要注意的是 nextTuple 方法不能阻塞,因为 storm 在同一个线程上面调用所有消息源 spout 的方法。

另外两个比较重要的 spout 方法是ackfail。storm 在检测到一个 tuple 被整个 topology 成功处理的时候调用 ack,否则调用 fail。storm 只对可靠的 spout 调用 ack 和 fail。

(三)Bolts

所有的消息处理逻辑被封装在 bolts 里面。Bolts 可以做很多事情:过滤,聚合,查询数据库等等。Bolts 可以简单的做消息流的传递。复杂的消息流处理往往需要很多步骤,从而也就需要经过很多 bolts。比如算出一堆图片里面被转发最多的图片就至少需要两步:第一步算出每个图片的转发数量。第二步找出转发最多的前10个图片。(如果要把这个过程做得更具有扩展性那么可能需要更多的步骤)。

Bolts 可以发射多条消息流, 使用OutputFieldsDeclarer.declareStream定义 stream,使用OutputCollector.emit来选择要发射的 stream。

Bolts 的主要方法是execute,它以一个 tuple 作为输入,bolts 使用OutputCollector来发射 tuple,bolts 必须要为它处理的每一个 tuple 调用OutputCollector的 ack 方法,以通知 Storm 这个 tuple 被处理完成了,从而通知这个 tuple 的发射者 spouts。 一般的流程是: bolts 处理一个输入 tuple,发射0个或者多个 tuple,然后调用 ack 通知 storm 自己已经处理过这个 tuple 了。storm 提供了一个 IBasicBolt 会自动调用 ack。

四、实验环境

  • 云创大数据实验平台:
    在这里插入图片描述

  • Java 版本:jdk1.7.0_79

  • Hadoop 版本:hadoop-2.7.1

  • ZooKeeper 版本:zookeeper-3.4.6

  • Storm 版本:storm-0.10.0

五、实验内容和步骤

本实验主要演示一个完整的 Storm 拓扑编码过程,主要包含 Spout、Bolt 和构建 Topology 几个步骤。

(一)启动 Storm 集群

首先,启动 Storm 集群。

实验的准备工作是:域名映射、免密登录、JDK 配置、部署 ZooKeeper、部署 Storm 等。该实验可以点击一键搭建后能看到搭建成功,即可自动搭建好环境。

(二)导入依赖 jar 包

其次,将 Storm 安装包的 lib 目录内如下 jar 包导入到开发工具:

在这里插入图片描述

然后再在 Eclipse 中对每个 jar 执行如下操作进行添加配置:

在这里插入图片描述

出现这样即可:

在这里插入图片描述

(三)编写程序

我们在项目的 src 中首先创建一个cproc.word包。

在这里插入图片描述

然后,编写代码,实现一个完整的 Topology,内容如下:

Spout 随机发送单词,代码实现:

package cproc.word;

import java.util.Map;
import java.util.Random;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import backtype.storm.utils.Utils;

public class WordReaderSpout extends BaseRichSpout {
    private SpoutOutputCollector collector;
    @Override
    public void open(Map conf, TopologyContext context, SpoutOutputCollector collector)
    {
        this.collector = collector;
    }
    @Override
    public void nextTuple() {
    	 //这个方法会不断被调用,为了降低它对CPU的消耗,让它sleep一下
     Utils.sleep(1000);
     final String[] words = new String[] {"nathan", "mike", "jackson", "golda", "bertels"};
     Random rand = new Random();
     String word = words[rand.nextInt(words.length)];
     collector.emit(new Values(word));
    }
    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("word"));
    }
}

Bolt 单词计数,并每隔一秒打印一次,代码实现:

package cproc.word;

import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseBasicBolt;
import backtype.storm.tuple.Tuple;

public class WordCounterBolt extends BaseBasicBolt {
    private static final long serialVersionUID = 5683648523524179434L;
    private HashMap<String, Integer> counters = new HashMap<String, Integer>();
    private volatile boolean edit = false;
    @Override
    public void prepare(Map stormConf, TopologyContext context) {
        //定义一个线程1秒钟打印一次统计的信息
        new Thread(new Runnable() {
          public void run() {
             while (true) {
               if (edit) {
                   for (Entry<String, Integer> entry : counters.entrySet())
                   {
                      System.out.println(entry.getKey() + " : " + entry.getValue());
                    }
                    edit = false;
                }
                try {
                    Thread.sleep(1000);
                 } catch (InterruptedException e) {
                     e.printStackTrace();
                  }
              }
            }
        }).start();
    }
    @Override
    public void execute(Tuple input, BasicOutputCollector collector) {
        String str = input.getString(0);
        if (!counters.containsKey(str)) {
            counters.put(str, 1);
        } else {
            Integer c = counters.get(str) + 1;
            counters.put(str, c);
        }
        edit = true;
    }
    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {

    }
}

构建 Topology 并提交到集群主函数,代码实现:

package cproc.word;

import backtype.storm.Config;
import backtype.storm.StormSubmitter;
import backtype.storm.generated.AlreadyAliveException;
import backtype.storm.generated.AuthorizationException;
import backtype.storm.generated.InvalidTopologyException;
import backtype.storm.topology.TopologyBuilder;

public class WordCountTopo {
    public static void main(String[] args) throws Exception{
      //构建Topology
      TopologyBuilder builder = new TopologyBuilder();
      builder.setSpout("word-reader", new WordReaderSpout());
      builder.setBolt("word-counter", new WordCounterBolt())
      .shuffleGrouping("word-reader");
      Config conf = new Config();
      //集群方式提交
      StormSubmitter.submitTopologyWithProgressBar("wordCount", conf,
      builder.createTopology());
    }
}

(四)打包上传并运行

将 Storm 代码打成wordCount-Storm.jar (打包的时候不要包含 storm 中的 jar,不然会报错的,将无法运行,即:wordCount-Storm.jar中只包含上面三个类的代码) 上传到主节点的/usr/cstor/storm/bin目录下。

在这里插入图片描述

这里需要注意的是我们不勾选上图框选的选项,这样就不会打包项目中的jar包。

在这里插入图片描述

在主节点进入 Storm 安装目录的 bin 下面用以下命令提交任务:

cd /usr/cstor/storm/bin
./storm jar wordCount-Storm.jar cproc.word.WordCountTopo wordCount

在这里插入图片描述

因为 topology 会永远运行,需要手动 kill 掉,使用以下命令结束 storm 任务:

./storm kill wordCount

在这里插入图片描述

六、实验结果

Storm 任务执行时,可以查看 Storm 日志文件,日志里面打印了统计的单词结果,日志内容如图。注意要到 slave1 服务器上查看日志文件。

cd /usr/cstor/storm/logs
ls
cat wordCount-1-1728785733-worker-6703.log

在这里插入图片描述
……

在这里插入图片描述

七、实验心得

  在本次 Storm 实验中,我深入了解了如何使用 Apache Storm 实现一个实时 WordCountTopology。Apache Storm 是一个开源的分布式实时计算系统,用于处理大量的数据流。通过本次实验,我不仅掌握了 Storm 的基本概念,还学会了如何使用 Java 代码来实现 Storm 任务的拓扑,以及如何将 Storm 任务提交到集群中运行。

  实验的核心是创建一个能够实时统计单词频率的 Topology。这个 Topology 由一个 Spout 和多个 Bolt 组成。Spout 负责生成或接收外部数据流,并将其转换为 Storm 内部的 Tuple(消息传递的基本单元)。在这个实验中,Spout 每秒随机生成一个单词,并将其发送给 Bolt。Bolt 则负责处理接收到的 Tuple,进行单词统计,并每隔一秒打印一次统计结果。

  在实验过程中,我首先通过 Eclipse 创建了一个 StormTest 项目,并导入了所需的依赖 jar 包。然后,我创建了三个 Java 类:WordReaderSpoutWordCounterBoltWordCountTopo。WordReaderSpout 负责生成单词,WordCounterBolt 负责将单词拆分和统计单词频率。最后,在 WordCountTopo 类中定义了 Topology 的结构,并将这些组件组织起来。

  在将 Topology 提交到 Storm 集群之前,我首先在本地模式下进行了测试。通过运行storm jar命令,我成功地将 Topology 提交给 Nimbus(Storm 的主节点),并在本地机器上模拟了 Storm 集群的运行环境。测试结果显示,Topology 能够正确地生成单词、拆分单词并统计单词频率。

  接下来,我将 Topology 提交到了实际的 Storm 集群中运行。在集群模式下,我需要注意一些额外的配置,如设置 worker 的数量、executor 的数量以及 task 的数量等。这些配置对于优化 Topology 的性能至关重要。通过合理地配置并行度,我成功地提高了 Topology 的处理效率。

  总的来说,这次 Storm 实验让我对分布式实时计算系统有了更深入的了解。通过实践,我不仅掌握了 Storm 的基本概念和操作方法,还学会了如何优化 Topology 的性能和解决实际问题。我相信这些经验和知识将对我未来的学习和工作产生积极的影响。

:以上文中的数据文件及相关资源下载地址:
链接:https://pan.quark.cn/s/d94bcadb79c8
提取码:U84E

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2212760.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

并查集的实现(朴素版)

这是C算法基础-数据结构专栏的第二十九篇文章&#xff0c;专栏详情请见此处。 由于作者即将参加CSP&#xff0c;所以到比赛结束前将不再发表文章&#xff01; 引入 并查集是一种可以快速合并查找集合的一种数据结构&#xff0c;这次我们将通过三道题来详细讲解并查集&#xff…

迈普pnsr2900x DOWNLOAD_FILE 任意文件读取漏洞

0x01 产品描述&#xff1a; ‌ 迈普NSR2900X系列是一款专为军队、政府、金融、中小型企业分支机构和中小型企业总部设计的信创接入路由器。‌ 该路由器采用国产核心元器件&#xff0c;基于国产操作系统运行迈普自主研发的网络操作系统及应用软件。它全面支持IPv4、IPv6、OS…

insert into values 语句优化

insert into values插入单行数据 SQL语句&#xff0c;insert into values插入单行数据&#xff0c;执行10万次&#xff0c;执行时间1279秒&#xff0c;优化总体执行耗时。 SQL文本&#xff0c;单行insert values&#xff0c;没有select部分。需要进一步分析执行过程消耗。 ins…

软考《信息系统运行管理员》- 5.1 信息系统数据资源维护体系

5.1 信息系统数据资源维护体系 文章目录 5.1 信息系统数据资源维护体系数据资源维护的管理对象数据资源维护的管理类型运行监控故障响应数据备份归档检索数据优化 数据资源维护的管理内容维护方案例行管理应急响应数据资源的开发与利用 数据是信息系统管理的对象与结果&#xf…

7-基于国产化FT-M6678+JFM7K325T的6U CPCI信号处理卡

一、板卡概述 本板卡系我公司自主研发&#xff0c;基于6U CPCI的通用高性能信号处理平台。板卡采用一片国产8核DSP FT-C6678和一片国产FPGA JFM7K325T-2FFG900作为主处理器。为您提供了丰富的运算资源。如下图所示&#xff1a; 二、设计参考标准 ● PCIMG 2.0 R3.0 CompactP…

Python酷库之旅-第三方库Pandas(147)

目录 一、用法精讲 666、pandas.Timestamp.astimezone方法 666-1、语法 666-2、参数 666-3、功能 666-4、返回值 666-5、说明 666-6、用法 666-6-1、数据准备 666-6-2、代码示例 666-6-3、结果输出 667、pandas.Timestamp.ceil方法 667-1、语法 667-2、参数 667…

基础篇:带你打开Vue的大门(一)

学习目标&#xff1a; 理解Vue的基本概念&#xff1a;掌握Vue.js是什么&#xff0c;它的设计理念&#xff0c;以及它在现代Web开发中的应用。掌握Vue的基本语法&#xff1a;学习Vue的基础指令和语法&#xff0c;能够使用Vue构建简单的交互式界面。熟悉Vue组件化开发&#xff1…

DBA | 如何将 .bak 的数据库备份文件导入到SQL Server 数据库中?

[ 知识是人生的灯塔&#xff0c;只有不断学习&#xff0c;才能照亮前行的道路 ] 原文链接&#xff1a;DBA | 如何将 .bak 的数据库备份文件导入到SQL Server 数据库中? 如何将&#xff08;.bak&#xff09;的SQL Server 数据库备份文件导入到当前数据库中? Step 1.登录到 Sql…

Centos7安装RocketMQ[图文教程]

文章目录 RocketMQ介绍基于Linux服务部署RocketMQ&#xff08;单机&#xff09;配置JDK环境下载RocketMQ部署RocketMQ1、解压2、修改VM参数3、配置环境变量4、编写Service文件5、启动服务 基于Docker方式部署RocketMQ安装Docker编写docker-compose文件启动RocketMQ服务 部署Roc…

前端学习-css的背景(十六)

提示&#xff1a;文章写完后&#xff0c;目录可以自动生成&#xff0c;如何生成可参考右边的帮助文档 目录 前言 背景颜色 语法格式 背景图片 语法格式 背景平铺 语法格式 背景图片位置 语法格式 参数代表的意思 参数是方位名词 参数是精确单位 参数是混合单位 背…

架构设计笔记-11-未来信息综合技术

知识要点 云原生架构原则包括&#xff1a;服务化原则、弹性原则、可观测原则、韧性原则、所有过程自动化原则、零信任原则和架构持续演进原则。 区块链是一种按照时间顺序将数据区块以顺序相连的方式组合成的一种链式数据结构&#xff0c;并以密码学方式保证的不可篡改和不可…

【项目案例】-音乐播放器-Android前端实现-Java后端实现

精品专题&#xff1a; 01.C语言从不挂科到高绩点 https://blog.csdn.net/yueyehuguang/category_12753294.html?spm1001.2014.3001.5482https://blog.csdn.net/yueyehuguang/category_12753294.html?spm1001.2014.3001.5482 02. SpringBoot详细教程 https://blog.csdn.ne…

项目管理系统如何助力新药研发?药物研发企业康诺亚上线瑞杰项目管理系统

在新药研发过程中&#xff0c;其特点是&#xff1a;周期长、风险高、投入大&#xff0c;同时还要与其他科学相结合&#xff0c;相互渗透、更加需要多部门的共同参与&#xff0c;因此面临的问题相对复杂&#xff0c;而且要求也比较高。所以在这一过程中&#xff0c;必须对新药研…

软考系统分析师知识点十一:系统规划

前言 今年报考了11月份的软考高级&#xff1a;系统分析师。 考试时间为&#xff1a;11月9日。 倒计时&#xff1a;26天。 目标&#xff1a;优先应试&#xff0c;其次学习&#xff0c;再次实践。 复习计划第一阶段&#xff1a;扫平基础知识点&#xff0c;仅抽取有用信息&am…

49 | 桥接模式:如何实现支持不同类型和渠道的消息推送系统?

上一篇文章我们学习了第一种结构型模式&#xff1a;代理模式。它在不改变原始类&#xff08;或者叫被代理类&#xff09;代码的情况下&#xff0c;通过引入代理类来给原始类附加功能。代理模式在平时的开发经常被用到&#xff0c;常用在业务系统中开发一些非功能性需求&#xf…

嵌入式~CAN-专辑2

我自己的原文哦~ 只发CAN相关2 随时更新~~ 一、CAN总线错误分析与解决 从实际工作中碰到的具体问题来分析一些常见的CAN总线错误和解决办法。 CAN节点数据收发过程 我们知道&#xff0c;CAN总线上的每个节点往总线上发送数据的同时&#xff0c;会读取总线上的数据&#x…

stm32单片机个人学习笔记10(TIM编码器接口)

前言 本篇文章属于stm32单片机&#xff08;以下简称单片机&#xff09;的学习笔记&#xff0c;来源于B站教学视频。下面是这位up主的视频链接。本文为个人学习笔记&#xff0c;只能做参考&#xff0c;细节方面建议观看视频&#xff0c;肯定受益匪浅。 STM32入门教程-2023版 细…

简单实现手机投屏到电脑代码

1、从手机截图到sdcard 2、将图片导出到PC 3、从PC加载图片 4、开启定时器 1、 private static void takeScreenshot(String path) {long t1 System.currentTimeMillis();String command "adb devices"; // 替换为你需要执行的shell命令String command1 "…

氧化锆ZrO2纳米颗粒50nm|L-ZrO2@mSiO2|Ir1-N-C/ZrO2|AuPd/HB-ZrO2

氧化锆ZrO2纳米颗粒50nm|L-ZrO2mSiO2|Ir1-N-C/ZrO2|AuPd/HB-ZrO2 氧化锆&#xff08;ZrO₂&#xff09;纳米颗粒&#xff0c;特别是直径为50纳米&#xff08;nm&#xff09;的颗粒&#xff0c;是一种具有多种应用前景的功能材料。这种材料因其独特的物理和化学性质&#xff0c…

大一计算机课程之线性代数

《大一计算机课程之线性代数》 在大一的计算机课程中&#xff0c;线性代数是一门极为重要的基础学科&#xff0c;它就像一把神奇的钥匙&#xff0c;为计算机科学领域的诸多方面开启了智慧之门。 线性代数主要研究线性方程组、向量空间、线性变换等内容。对于计算机专业的学生…