大数据-Storm流式框架(二)--wordcount案例

news2025/1/16 16:14:31

一、编写wordcount案例

1、新建java项目

2、添加storm的jar包

storm软件包中lib目录下的所有jar包

3、编写java类

WordCountTopology.java
package com.bjsxt.storm.wc;

import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.generated.StormTopology;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.tuple.Fields;
import backtype.storm.utils.Utils;

public class WordCountTopology {

    public static void main(String[] args) {
        // 拓扑封装了计算逻辑
        TopologyBuilder builder = new TopologyBuilder();
        // 设置lineSpout:该spout负责向拓扑发送句子
        builder.setSpout("lineSpout", new LineSpout());
        // 设置切分闪电,该闪电处理从水龙头lineSpout通过随机分组发送过来的元组
        builder.setBolt("splitBolt", new SplitBolt())
            .shuffleGrouping("lineSpout");
        // 定义一个计数闪电,该闪电从splitBolt闪电通过按字段分组的方式分发过来的元组
        // 按照元组中word的值进行分组。要保证相同的单词一定发送给同一个闪电。
        builder.setBolt("countBolt", new CountBolt())
                .fieldsGrouping("splitBolt", new Fields("word"));

        // 通过建造者创建一个拓扑的实例
        StormTopology wordCountTopology = builder.createTopology();

        // 本地模拟集群
        LocalCluster cluster = new LocalCluster();

        Config config = new Config();

        // 将拓扑提交到本地模拟集群
        cluster.submitTopology("wordCountTopology", config, wordCountTopology);

        // 睡眠10s,也就是让本地模拟集群运行10s
        Utils.sleep(10000);

        // 关闭本地模拟集群
        cluster.shutdown();

    }

}
LineSpout.java
package com.bjsxt.storm.wc;

import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import backtype.storm.utils.Utils;

import java.util.Map;

public class LineSpout extends BaseRichSpout {

    private SpoutOutputCollector collector;

    private String[] lines = {
            "The logic for a realtime application is packaged into a Storm topology",
            "A stream is an unbounded sequence of tuples that is processed and created in parallel in a distributed fashion",
            "A spout is a source of streams in a topology",
            "Bolts can do anything from filtering, functions, aggregations, joins, talking to databases, and more.",
            "A stream grouping defines how that stream should be partitioned among the bolt's tasks.",
            "Storm guarantees that every spout tuple will be fully processed by the topology",
            "Each spout or bolt executes as many tasks across the cluster",
            "Each worker process is a physical JVM and executes a subset of all the tasks for the topology"
    };

    private int index = 0;

    @Override
    public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
        // 在该组件在集群中初始化的时候调用一次
        this.collector = collector;
    }

    @Override
    public void nextTuple() {
        // 由storm的线程不停地调用,以便从数据源获取元组
        // 该方法不需要自己写循环和遍历
        // 该方法不能阻塞
        // 负责从数据源获取元组,向DAG发送元组
        // 轮询取出句子
        String lingStr = lines[index % lines.length];
        // 将句子封装为元组发射
        collector.emit(new Values(lingStr));
        index++;

        Utils.sleep(10);
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        // 用于声明元组的结构以及流
//        declarer.declareStream("s1", new Fields("key1", "key2", "key3"));
//        declarer.declareStream("s2", new Fields("key21", "key22"));
        // 发送元组的时候就有一个字段,是line,它的值是句子
        // 可以将元组想象为map集合,只不过其key是固定的几个
        declarer.declare(new Fields("line"));
    }
}
SplitBolt.java
package com.bjsxt.storm.wc;

import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;

import java.util.Map;

public class SplitBolt extends BaseRichBolt {

    private OutputCollector collector;

    @Override
    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
        this.collector = collector;
    }

    @Override
    public void execute(Tuple input) {
        String lineStr = input.getStringByField("line");
        String[] wordStrs = lineStr.split(" ");

        for (String wordStr : wordStrs) {
            // <hello, 1>
            this.collector.emit(new Values(wordStr, 1));
        }

    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("word", "count"));
    }
}
CountBolt.java
package com.bjsxt.storm.wc;

import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Tuple;

import java.util.HashMap;
import java.util.Map;

public class CountBolt extends BaseRichBolt {

    private Map<String, Integer> counts;

    @Override
    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
        counts = new HashMap<>();
    }

    @Override
    public void execute(Tuple input) {
//        new Fields("word", "count")
        String wordStr = input.getStringByField("word");
        Integer count = input.getIntegerByField("count");

        Integer sum = counts.get(wordStr);

        if (sum == null) {
            counts.put(wordStr, count);
        } else {
            counts.put(wordStr, sum + count);
        }

        counts.forEach((k, v) -> {
            System.out.println(k + "_________" + v);
        });
        System.out.println("========================================");
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {

    }
}

4、运行

右键运行WordCountTopology

二、Storm整体架构

Storm配置项

说明

java.library.path

Storm本身依赖包的路径,存在多个时用冒号分隔

storm.local.dir

Storm使用的本地文件系统目录(必须存在并且storm进程可读写)。默认是storm的根目录下的storm-local。

storm.zookeeper.servers

storm集群对应的zookeeper集群的主机列表

storm.zookeeper.port

storm集群对应的zookeeper集群的服务端口,zookeeper默认端口为2181

storm.zookeeper.root

storm的元数据在zookeeper中存储的根目录,默认值是/storm

storm.cluster.mode

storm运行模式,local或distributed。集群模式需设置为distributed

storm.messaging.transport

storm的消息传输机制,使用netty作为消息传输时设置为backtype.storm.messaging.netty.Context

nimbus.host

整个storm集群的nimbus节点

nimbus.supervisor.timeout.secs

storm中每个被发射出去的消息处理的超时时间,该时间影响到消息的处理,同时在storm ui上杀掉一个拓扑时的默认时间(kill动作发出后多长时间才会真正将该拓扑杀掉)。默认值是60

ui.port

storm自带UI,以http服务形式支持访问,此处设置该http服务的端口(非root用户端口号需要大于1024)

ui.childopts

storm UI进程的java参数设置(对java进程的约束都可以在此设置,如内存等)

logviewer.port

此处用于设置该Log Viewer进程的端口(Log Viewer进程也是http形式,需要运行在每个storm节点上)。默认值8000

logviewer.childopts

Log Viewer进程的参数设置

logviewer.appender.name

storm log4j的appender,设置的名字对应于文件storm/log4j2/cluster.xml中设置的appender,cluster.xml可以控制storm logger的级别

supervisor.slots.ports

storm的slot,最好设置为OS核数的整数倍;同时由于storm是基于内存的实时计算,slot数不要大于每台物理机可运行slot个数:(物理内存-虚拟内存)/单个java进程最大可占用内存数

worker.childopts

storm的worker进程的java限制,有效地设置该参数能够在拓扑异常时进行原因分析:

-Xms1024m -Xmx1024m -XX:+UseConcMarkSweepGC -XX:+UseCMSInitiatingOccupancyOnly -XX:CMSInitiatingOccupancyFraction=70 -XX:+HeapDumpOnOutOfMemoryError

其中:Xms为单个java进程最小占用内存数,Xmx为最大占用内存数,设置HeapDumpOnOutOfMemoryError的好处是,当内存使用量超过Xmx时,java进程将被JVM杀掉同时会生成java_pid<pid数字>.hprof文件,使用MemoryAnalyzer分析hprof文件将能分析出内存使用情况从而进行相应的调整、分析是否有内存溢出等情况

storm.messaging.netty.buffer_size

netty传输的buffer大小,默认为5MB,当spout发射的消息较大时,此处需要对应调整

storm.messaging.netty.max_retries

这几个参数是关于使用netty作为底层消息传输时的相关设置,需要重视,否则可能由于bug而引起错误:

java.lang.IllegalArgumentException: timeout value is negative

storm.messaging.netty.max_wait_ms

storm.messaging.netty.min_wait_ms

topology.debug

该参数可以在拓扑中覆盖,表示该拓扑是否运行于debug模式。运行于debug模式时,storm将记录拓扑中收发消息等的详细信息,线上环境不建议打开

topology.acker.executors

storm通过acker机制保证消息不丢失,此参数用于设置每个拓扑的acker数量,由于acker基本消耗的资源较小,强烈建议将此参数设置在较低的水平,可以在拓扑中进行覆盖

topology.max.spout.pending

一个spout任务中处于pending状态的最大元组数量。该配置应用于单个任务,而不是整个spout或拓扑,可在拓扑中进行覆盖。

此外,storm/log4j2/cluster.xml文件中可以配置storm的日志级别矩阵信息等。

操作系统的配置,其中有两项需要配置(通过ulimit -a查看):

1、open files:当前用户可以打开的文件描述符数。

2、max user processes:当前用户可以运行的进程数,此参数太小将引起storm的一个错误:

java.lang.OutOfMemoryError: unable to create new native thread

部署注意事项:

  1. 在storm根目录下有一个lib目录,存放storm本身依赖的jar包,此处的所有jar会被storm worker进行启动时加载,个人编写的jar包不能放在此处,以免包更新带来不便
  2. 向storm集群提交拓扑时,建议将该拓扑所有依赖的jar包和业务源代码打到一个jar包中(fat jar),如此则业务需要的jar包都和拓扑在同一个jar包中,否则当拓扑依赖的jar包更新时需要将该更新包放到所有的storm节点上。如果是在一个集群中,fat jar可以保证不同业务的jar包是独立的,不会混淆。

nimbus

  1. 接收jar包:提交应用拓扑
  2. 任务分配:将拓扑的任务分配给worker
  3. 资源调度:监控各个supervisor节点的状态进行负载均衡等。
  4. Nimbus不需要像supervisor节点那么高的配置,storm ui也不需要高配置,可以和nimbus节点运行在同一台服务器节点上。

supervisor

  1. 监听nimbus的任务分配,启动分配到的worker来对相应的任务进行处理。
  2. 监控本地的worker进程,如果发现状态不正常会杀死worker并重启,超过一定次数后将分配给该错误状态的worker的任务交还给nimbus进行再次分配。
  3. 删除本地不再运行的任务

worker

完成拓扑中定义的业务逻辑,即执行拓扑的进程。

一个worker的基本执行步骤:

  1. 根据zookeeper中拓扑的组件分配变化,创建或移除worker到worker的链接
  2. 创建executor(执行器)的输入队列receive-queue-map和输出队列transfer-queue
  3. 创建worker的接收线程receive-thread和发送线程transfer-thread
  4. 根据组件分配关系创建executor
    1. executor即worker JVM进程中的一个java线程,一般默认每个executor负责执行一个task任务
  5. 在executor中执行具体的任务(spout或者bolt)来执行具体的业务逻辑。
    1. 检查需要运行的task信息
    2. 获取相应的task信息,即spout/bolt信息

每个任务对应一个线程或多个任务对应一个线程

线程称为executor

executor在worker中运行

worker是一个JVM进程

在supervisor中运行

worker中的数据流:

worker中线程间通信使用的是Disruptor,进程间通信可能是netty也可以是zmq。默认使用netty。

数据流:

  1. 每个worker绑定一个socket端口作为数据的输入,此端口作为socket的服务器端一直监听运行。
  2. 根据拓扑的关系,确定需要向外通信的任务所在的worker地址,并同该worker也创建好socket连接,此时该worker是作为socket的客户端。
  3. receive thread负责将每个executor所需要的数据放入对应的receive-queue-map中,然后由executor来获取自己所需要的数据,这个过程通过disruptor进行通信。
  4. executor执行完操作需要对外发送数据时,首先kryo将数据序列化,然后通过disruptor将数据放入对外的transfer-queue中。
  5. transfer thread完成数据的发送工作。
  6. 如果executor需要对外发送的数据接收方和executor在同一个worker节点,则不需要执行序列化操作,调用disruptor的publish方法直接放到接收方的executor对应的队列中即可。

MapReduce架构的对比:

提交作业过程

  1. 客户端提交拓扑代码到nimbus的nimbus/inbox目录下。
  2. nimbus对topology进行校验、处理
  3. nimbus针对该拓扑建立本地目录:nimbus/stormdist/topology-id

该目录下有三个文件:

    1. stormjar.jar 从nimbus/inbox移动来的topology的jar包
    2. stormcode.ser 对topology对象的序列化
    3. stormconf.ser topology的运行配置信息
  1. nimbus的调度器根据拓扑的配置计算task,并把task分配到不同的worker上,调度的结果写入zookeeper的/task节点下。
  2. zookeeper上建立assignments节点,存储task和supervisor中worker的对应关系。
  3. zookeeper上创建workerbeats节点监控worker的心跳。
  4. supervisor去zookeeper上获取分配的task信息,启动一个或多个worker来执行。
  5. 每个worker上运行多个task,task由executor来执行。
  6. worker根据拓扑信息初始化建立task之间的连接
  7. 相同worker内的task通过DisruptorQueue通信,不同worker间默认采用netty通信

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1131524.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

ideaSSM在线商务管理系统VS开发mysql数据库web结构java编程计算机网页源码maven项目

一、源码特点 SSM 在线商务管理系统是一套完善的信息管理系统&#xff0c;结合SSM框架和bootstrap完成本系统&#xff0c;对理解JSP java编程开发语言有帮助系统采用SSM框架&#xff08;MVC模式开发&#xff09;&#xff0c;系统具有完整的源代码 和数据库&#xff0c;系统主…

轮胎尺寸后面的91W、101Y是啥意思?解释一下:轮胎载重指数和轮胎速度等级。

轮胎载重指数 轮胎的语言沟通是一组数字&#xff0c;并表示数据规格&#xff0c;品牌和类型的一系列信件的形式。这是普遍的&#xff0c;并得到了所有轮胎制造商全球达成一致。 “载重指数”是一个对应于最大载重量的数字&#xff08;单位&#xff1a;公斤&#xff09;&#xf…

【尘缘赠书活动:01期】Python数据挖掘——入门进阶与实用案例分析

引言 本案例将根据已收集到的电力数据&#xff0c;深度挖掘各电力设备的电流、电压和功率等情况&#xff0c;分析各电力设备的实际用电量&#xff0c;进而为电力公司制定电能能源策略提供一定的参考依据。更多详细内容请参考**《Python数据挖掘&#xff1a;入门进阶与实用案例…

RocksDB基本架构与原理详解

Rocksdb Flink提供基于流的有状态计算&#xff0c;除了提供实时数据流的处理能力&#xff0c;还需要将计算产生的状态存储起来。 为了满足状态存取需求&#xff0c;提供了memory、flie system、rocksdb三种类型的状态存储机制。 memory存取高效单空间有限&#xff0c;且可用…

【VPX302】基于3U VPX总线架构的高性能数据预处理平台

板卡概述 VPX302是一款基于3U VPX总线架构的高性能数据预处理FMC载板&#xff0c;板卡具有1个FMC&#xff08;HPC&#xff09;接口&#xff0c;1个X8 GTH背板互联接口&#xff0c;可以实现1路PCIe x8&#xff1b;具有4路SRIO X4。板卡采用Xilinx的高性能Kintex UltraScale系列…

Kmssink插件添加缩放显示功能的分析思路与具体实现

XILINX MPSOC 实现输出缩放&#xff0c;PL一侧的配置如下&#xff1a; 修改PL侧的显示通道流程为&#xff1a;DDR -> FRAMBUF_RD -> VPSS(SCALE) -> V_MIX -> HDMI_TX -> MONITOR , 通过设置HDMI_TX的宽高&#xff0c;利用xlnx_bridge 接口关联设置VPSS(scal…

MathType7.4绿色和谐版数学公式编辑器

MathType 是一个功能强大、所见即所得的数学公式编辑器&#xff0c;可以在 Word、PowerPoint 等办公软件中轻松输入各种复杂的物理公式、化学方程式和符号。由 MathType 创建的公式能与 Office 文档完美结合&#xff0c;显示效果很好&#xff1b;MathType 可在任何支持 OLE 对象…

数据分享 I 各地级市2022年乡村振兴数据

数据地址&#xff1a; 各地级市2022年乡村振兴数据https://www.xcitybox.com/datamarketview/#/Productpage?id364 基本信息. 数据名称: 各地级市2022年乡村振兴数据 数据格式: Shpxlsx 数据时间: 2022年 数据几何类型: 面 数据坐标系: WGS84坐标系 数据来源&#xff…

分布式事务-Seata-详细图文讲解

目录 分布式事务问题概述现象 Seata简介作用分布式事务处理过程处理过程 使用安装下载解压修改配置文件创建数据库创建数据表修改配置文件启动 异常超时异常——没加GlobalTransactional故障分析 解决异常 部分补充再看TC/TM/RM三大组件分布式事务的执行流程AT模式如何做到对业…

【Cheat Engine7.5】基础教程第一关(STEP1-2)

Cheat Engine简称CE 一、CE STEP1-2练习 1、打开 2、简介 欢迎使用 Cheat Engine 训练教程 (3.4) 本教程将尝试讲解在游戏中作弊的一些基本知识. 并帮助你熟悉 Cheat Engine 的使用方法 (简称为CE). 请按下面的步骤开始. 1: 首先要打开Cheat Engine (如果你还没有运行它的话…

1078. Bigram 分词

1078. Bigram 分词 java代码&#xff1a; class Solution {public String[] findOcurrences(String text, String first, String second) {String[] arr text.split(" ");List<String> list new ArrayList<String>();for (int i 0; i < arr.lengt…

基于物联网云平台的分布式光伏监控系统的设计与实现

贾丽丽 安科瑞电气股份有限公司 上海嘉定 201801 摘要&#xff1a;针对国内光伏发电监控系统的研究现状&#xff0c;文中提出了基于云平台的光伏发电监控体系。构建基于B/S架构的数据实时采集与推送&#xff0c;以SSH(strutsspringhibernate)作为Web开发框架&#xff0c;开发基…

维基百科是如何定义联合办公空间的?

联合办公是不同公司的员工共享办公空间的一种安排。它通过使用通用基础设施&#xff08;例如设备、公用设施、接待员和保管服务&#xff0c;以及在某些情况下的茶点和包裹接收服务&#xff09;来节省成本和提供便利。它对独立承包商、独立科学家、远程工作者、数字游民和经常旅…

跨项目配置,nacos的动态更新配置,如何才能生效

在SpringCloud项目中&#xff0c;有时会出现多个项目读取同一配置的场景&#xff0c;那么这种跨项目的动态更新配置&#xff0c;如何才能生效。 方案1.使用refreshable-dataids 如果配置文件是使用如下方式获取配置&#xff0c;只需要使用refreshable-dataids 在Nacos中&am…

startActivityForResult()方法被弃用

一、现象 在新版androidX里面&#xff0c;startActivityForResult()被标注弃用&#xff0c;推荐使用registerForActivityResult()方法 二、解决方案 &#xff1a; 使用registerForActivityResult()方法 但是注意了&#xff1a; 1、registerForActivityResult只能在onCreate()…

中兴通讯-000063 三季报分析(20231024)

中兴通讯-000063 基本情况 公司名称&#xff1a;中兴通讯股份有限公司 A股简称&#xff1a;中兴通讯 成立日期&#xff1a;1997-11-11 上市日期&#xff1a;1997-11-18 所属行业&#xff1a;计算机、通信和其他电子设备制造业 主营业务&#xff1a;信息产业、通讯及电子设备、计…

Django分页功能的使用和自定义分装

1. 在settings中进行注册 # drf配置 REST_FRAMEWORK {DEFAULT_AUTHENTICATION_CLASSES: (# rest_framework_jwt.authentication.JSONWebTokenAuthentication,rest_framework_simplejwt.authentication.JWTAuthentication,rest_framework.authentication.SessionAuthenticatio…

java对接homeassistant实现远程控制(配置frp实现内网穿透)

Home Assistant API文档 https://dev-docs.home-assistant.io/en/master/ 这里是设备的基本前缀 以下是Home Assistant的全部设备前缀及代表的设备类型&#xff1a;1. air_quality&#xff1a;空气质量监测器设备&#xff1b; 2. alarm_control_panel&#xff1a;报警面板设…

科普丨语音芯片的宽电压设计作用

语音芯片的宽电压设计具有以下几个作用&#xff1a; 1. 适用范围广&#xff0c;适应性强。宽电压设计使语音芯片能够在不同电压范围内工作&#xff0c;从而适应电源供电系统的不稳定性。无论是在低电压还是高电压情况下&#xff0c;宽电压设计可以确保语音芯片正常工作&#x…

内存泄漏问题,4种智能指针(介绍+模拟实现)

目录 内存泄漏 介绍 分类 堆内存泄漏 系统资源泄漏 检测内存泄漏的方式 智能指针 引入 介绍 原理 引入 RAII原则 指针性质 拷贝 auto_ptr 介绍 代码 boost库 unique_ptr 介绍 代码 shared_ptr 介绍 删除器 代码 问题(循环引用) weak_ptr 介…