大数据:Storm集成HDFS和HBase

news2024/12/26 11:41:22

一、Storm集成HDFS

1.1 项目结构

在这里插入图片描述

1.2 项目主要依赖

项目主要依赖如下,有两个地方需要注意:

  • 这里由于我服务器上安装的是 CDH 版本的 Hadoop,在导入依赖时引入的也是 CDH 版本的依赖,需要使用 <repository> 标签指定 CDH 的仓库地址;
  • hadoop-commonhadoop-clienthadoop-hdfs 均需要排除 slf4j-log4j12 依赖,原因是 storm-core 中已经有该依赖,不排除的话有 JAR 包冲突的风险;
<properties>
    <storm.version>1.2.2</storm.version>
</properties>

<repositories>
    <repository>
        <id>cloudera</id>
        <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
    </repository>
</repositories>

<dependencies>
    <dependency>
        <groupId>org.apache.storm</groupId>
        <artifactId>storm-core</artifactId>
        <version>${storm.version}</version>
    </dependency>
    <!--Storm 整合 HDFS 依赖-->
    <dependency>
        <groupId>org.apache.storm</groupId>
        <artifactId>storm-hdfs</artifactId>
        <version>${storm.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-common</artifactId>
        <version>2.6.0-cdh5.15.2</version>
        <exclusions>
            <exclusion>
                <groupId>org.slf4j</groupId>
                <artifactId>slf4j-log4j12</artifactId>
            </exclusion>
        </exclusions>
    </dependency>
    <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-client</artifactId>
        <version>2.6.0-cdh5.15.2</version>
        <exclusions>
            <exclusion>
                <groupId>org.slf4j</groupId>
                <artifactId>slf4j-log4j12</artifactId>
            </exclusion>
        </exclusions>
    </dependency>
    <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-hdfs</artifactId>
        <version>2.6.0-cdh5.15.2</version>
        <exclusions>
            <exclusion>
                <groupId>org.slf4j</groupId>
                <artifactId>slf4j-log4j12</artifactId>
            </exclusion>
        </exclusions>
    </dependency>
</dependencies>

1.3 DataSourceSpout

/**
 * 产生词频样本的数据源
 */
public class DataSourceSpout extends BaseRichSpout {

    private List<String> list = Arrays.asList("Spark", "Hadoop", "HBase", "Storm", "Flink", "Hive");

    private SpoutOutputCollector spoutOutputCollector;

    @Override
    public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
        this.spoutOutputCollector = spoutOutputCollector;
    }

    @Override
    public void nextTuple() {
        // 模拟产生数据
        String lineData = productData();
        spoutOutputCollector.emit(new Values(lineData));
        Utils.sleep(1000);
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
        outputFieldsDeclarer.declare(new Fields("line"));
    }


    /**
     * 模拟数据
     */
    private String productData() {
        Collections.shuffle(list);
        Random random = new Random();
        int endIndex = random.nextInt(list.size()) % (list.size()) + 1;
        return StringUtils.join(list.toArray(), "\t", 0, endIndex);
    }

}

产生的模拟数据格式如下:

Spark	HBase
Hive	Flink	Storm	Hadoop	HBase	Spark
Flink
HBase	Storm
HBase	Hadoop	Hive	Flink
HBase	Flink	Hive	Storm
Hive	Flink	Hadoop
HBase	Hive
Hadoop	Spark	HBase	Storm

1.4 将数据存储到HDFS

这里 HDFS 的地址和数据存储路径均使用了硬编码,在实际开发中可以通过外部传参指定,这样程序更为灵活。

public class DataToHdfsApp {

    private static final String DATA_SOURCE_SPOUT = "dataSourceSpout";
    private static final String HDFS_BOLT = "hdfsBolt";

    public static void main(String[] args) {

        // 指定 Hadoop 的用户名 如果不指定,则在 HDFS 创建目录时候有可能抛出无权限的异常 (RemoteException: Permission denied)
        System.setProperty("HADOOP_USER_NAME", "root");

        // 定义输出字段 (Field) 之间的分隔符
        RecordFormat format = new DelimitedRecordFormat()
                .withFieldDelimiter("|");

        // 同步策略: 每 100 个 tuples 之后就会把数据从缓存刷新到 HDFS 中
        SyncPolicy syncPolicy = new CountSyncPolicy(100);

        // 文件策略: 每个文件大小上限 1M,超过限定时,创建新文件并继续写入
        FileRotationPolicy rotationPolicy = new FileSizeRotationPolicy(1.0f, Units.MB);

        // 定义存储路径
        FileNameFormat fileNameFormat = new DefaultFileNameFormat()
                .withPath("/storm-hdfs/");

        // 定义 HdfsBolt
        HdfsBolt hdfsBolt = new HdfsBolt()
                .withFsUrl("hdfs://hadoop001:8020")
                .withFileNameFormat(fileNameFormat)
                .withRecordFormat(format)
                .withRotationPolicy(rotationPolicy)
                .withSyncPolicy(syncPolicy);


        // 构建 Topology
        TopologyBuilder builder = new TopologyBuilder();
        builder.setSpout(DATA_SOURCE_SPOUT, new DataSourceSpout());
        // save to HDFS
        builder.setBolt(HDFS_BOLT, hdfsBolt, 1).shuffleGrouping(DATA_SOURCE_SPOUT);


        // 如果外部传参 cluster 则代表线上环境启动,否则代表本地启动
        if (args.length > 0 && args[0].equals("cluster")) {
            try {
                StormSubmitter.submitTopology("ClusterDataToHdfsApp", new Config(), builder.createTopology());
            } catch (AlreadyAliveException | InvalidTopologyException | AuthorizationException e) {
                e.printStackTrace();
            }
        } else {
            LocalCluster cluster = new LocalCluster();
            cluster.submitTopology("LocalDataToHdfsApp",
                    new Config(), builder.createTopology());
        }
    }
}

1.5 启动测试

可以用直接使用本地模式运行,也可以打包后提交到服务器集群运行。本仓库提供的源码默认采用 maven-shade-plugin 进行打包,打包命令如下:

# mvn clean package -D maven.test.skip=true

运行后,数据会存储到 HDFS 的 /storm-hdfs 目录下。使用以下命令可以查看目录内容:

# 查看目录内容
hadoop fs -ls /storm-hdfs
# 监听文内容变化
hadoop fs -tail -f /strom-hdfs/文件名

在这里插入图片描述

二、Storm集成HBase

2.1 项目结构

集成用例: 进行词频统计并将最后的结果存储到 HBase,项目主要结构如下:

在这里插入图片描述

2.2 项目主要依赖

 <properties>
        <storm.version>1.2.2</storm.version>
    </properties>


    <dependencies>
        <dependency>
            <groupId>org.apache.storm</groupId>
            <artifactId>storm-core</artifactId>
            <version>${storm.version}</version>
        </dependency>
        <!--Storm 整合 HBase 依赖-->
        <dependency>
            <groupId>org.apache.storm</groupId>
            <artifactId>storm-hbase</artifactId>
            <version>${storm.version}</version>
        </dependency>
    </dependencies>

2.3 DataSourceSpout

/**
 * 产生词频样本的数据源
 */
public class DataSourceSpout extends BaseRichSpout {

    private List<String> list = Arrays.asList("Spark", "Hadoop", "HBase", "Storm", "Flink", "Hive");

    private SpoutOutputCollector spoutOutputCollector;

    @Override
    public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
        this.spoutOutputCollector = spoutOutputCollector;
    }

    @Override
    public void nextTuple() {
        // 模拟产生数据
        String lineData = productData();
        spoutOutputCollector.emit(new Values(lineData));
        Utils.sleep(1000);
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
        outputFieldsDeclarer.declare(new Fields("line"));
    }


    /**
     * 模拟数据
     */
    private String productData() {
        Collections.shuffle(list);
        Random random = new Random();
        int endIndex = random.nextInt(list.size()) % (list.size()) + 1;
        return StringUtils.join(list.toArray(), "\t", 0, endIndex);
    }

}

产生的模拟数据格式如下:

Spark	HBase
Hive	Flink	Storm	Hadoop	HBase	Spark
Flink
HBase	Storm
HBase	Hadoop	Hive	Flink
HBase	Flink	Hive	Storm
Hive	Flink	Hadoop
HBase	Hive
Hadoop	Spark	HBase	Storm

2.4 SplitBolt

/**
 * 将每行数据按照指定分隔符进行拆分
 */
public class SplitBolt extends BaseRichBolt {

    private OutputCollector collector;

    @Override
    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
        this.collector = collector;
    }

    @Override
    public void execute(Tuple input) {
        String line = input.getStringByField("line");
        String[] words = line.split("\t");
        for (String word : words) {
            collector.emit(tuple(word, 1));
        }
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("word", "count"));
    }
}

2.5 CountBolt

/**
 * 进行词频统计
 */
public class CountBolt extends BaseRichBolt {

    private Map<String, Integer> counts = new HashMap<>();

    private OutputCollector collector;


    @Override
    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
            this.collector=collector;
    }

    @Override
    public void execute(Tuple input) {
        String word = input.getStringByField("word");
        Integer count = counts.get(word);
        if (count == null) {
            count = 0;
        }
        count++;
        counts.put(word, count);
        // 输出
        collector.emit(new Values(word, String.valueOf(count)));

    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("word", "count"));
    }
}

2.6 WordCountToHBaseApp

/**
 * 进行词频统计 并将统计结果存储到 HBase 中
 */
public class WordCountToHBaseApp {

    private static final String DATA_SOURCE_SPOUT = "dataSourceSpout";
    private static final String SPLIT_BOLT = "splitBolt";
    private static final String COUNT_BOLT = "countBolt";
    private static final String HBASE_BOLT = "hbaseBolt";

    public static void main(String[] args) {

        // storm 的配置
        Config config = new Config();

        // HBase 的配置
        Map<String, Object> hbConf = new HashMap<>();
        hbConf.put("hbase.rootdir", "hdfs://hadoop001:8020/hbase");
        hbConf.put("hbase.zookeeper.quorum", "hadoop001:2181");

        // 将 HBase 的配置传入 Storm 的配置中
        config.put("hbase.conf", hbConf);

        // 定义流数据与 HBase 中数据的映射
        SimpleHBaseMapper mapper = new SimpleHBaseMapper()
                .withRowKeyField("word")
                .withColumnFields(new Fields("word","count"))
                .withColumnFamily("info");

        /*
         * 给 HBaseBolt 传入表名、数据映射关系、和 HBase 的配置信息
         * 表需要预先创建: create 'WordCount','info'
         */
        HBaseBolt hbase = new HBaseBolt("WordCount", mapper)
                .withConfigKey("hbase.conf");

        // 构建 Topology
        TopologyBuilder builder = new TopologyBuilder();
        builder.setSpout(DATA_SOURCE_SPOUT, new DataSourceSpout(),1);
        // split
        builder.setBolt(SPLIT_BOLT, new SplitBolt(), 1).shuffleGrouping(DATA_SOURCE_SPOUT);
        // count
        builder.setBolt(COUNT_BOLT, new CountBolt(),1).shuffleGrouping(SPLIT_BOLT);
        // save to HBase
        builder.setBolt(HBASE_BOLT, hbase, 1).shuffleGrouping(COUNT_BOLT);


        // 如果外部传参 cluster 则代表线上环境启动,否则代表本地启动
        if (args.length > 0 && args[0].equals("cluster")) {
            try {
                StormSubmitter.submitTopology("ClusterWordCountToRedisApp", config, builder.createTopology());
            } catch (AlreadyAliveException | InvalidTopologyException | AuthorizationException e) {
                e.printStackTrace();
            }
        } else {
            LocalCluster cluster = new LocalCluster();
            cluster.submitTopology("LocalWordCountToRedisApp",
                    config, builder.createTopology());
        }
    }
}

2.7 启动测试

可以用直接使用本地模式运行,也可以打包后提交到服务器集群运行。本仓库提供的源码默认采用 maven-shade-plugin 进行打包,打包命令如下:

# mvn clean package -D maven.test.skip=true

运行后,数据会存储到 HBase 的 WordCount 表中。使用以下命令查看表的内容:

hbase >  scan 'WordCount'

在这里插入图片描述

2.8 withCounterFields

在上面的用例中我们是手动编码来实现词频统计,并将最后的结果存储到 HBase 中。其实也可以在构建 SimpleHBaseMapper 的时候通过 withCounterFields 指定 count 字段,被指定的字段会自动进行累加操作,这样也可以实现词频统计。需要注意的是 withCounterFields 指定的字段必须是 Long 类型,不能是 String 类型。

SimpleHBaseMapper mapper = new SimpleHBaseMapper() 
        .withRowKeyField("word")
        .withColumnFields(new Fields("word"))
        .withCounterFields(new Fields("count"))
        .withColumnFamily("cf");

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/73836.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

自适应且不可删除的水印蒙层

目录 canvas自适应文字长度&#xff0c;旋转角度生成水印背景图 生成蒙层 禁止蒙层的删除和修改 canvas自适应文字长度&#xff0c;旋转角度生成水印背景图 设置canvas字体大小后&#xff0c;通过ctx.measureText(text).width获取两行文字的宽度text1&#xff0c;text2&…

python-(6-5-1)爬虫---xpath解析实战

文章目录一 环境准备二 需求三 分析1 拿到页面源代码2 提取和解析数据四 步骤流程1 拿到页面源代码2 提取和解析数据五 完整代码xpath是在XML文档中搜索内容的一门语言 html是xml的一个子集 一 环境准备 安装lxml模块 二 需求 爬取某网站的数据 三 分析 1 拿到页面源代码 …

计算机领域热知识【2】消息队列与celery

Celery是实现消息队列的一个工具&#xff0c;本篇博客将介绍消息队列的基础知识&#xff0c;以及celery实现消息队列的总体方法。想要实现用Celery实现消息队列实例的朋友&#xff0c;可以从本篇博客中找到我写的另一篇介绍使用Celery和RabbitMQ实现消息队列的博客。 目录消息队…

Java+Swing+mysql天气信息管理系统

JavaSwingmysql天气信息管理系统一、系统介绍二、功能展示1.主要功能2.主页3.查询历史天气三、数据库四、其他系统一、系统介绍 该系统实现: 通过高德天气API查询天气数据 将查询的数据存入本地数据库 删除数据。 二、功能展示 1.主要功能 2.主页 3.查询历史天气 三、数据库…

Netty04——优化与源码

目录1. 优化1.1 扩展序列化算法1.2 参数调优1.2.1.CONNECT_TIMEOUT_MILLIS1.2.2.SO_BACKLOG1.2.3.ulimit -n1.2.4.TCP_NODELAY1.2.5.SO_SNDBUF & SO_RCVBUF1.2.6.ALLOCATOR1.2.7.RCVBUF_ALLOCATOR1.3 RPC 框架1.3.1.准备工作1.3.2.服务器 handler1.3.3.客户端代码第一版1.3…

C#车库信息管理系统

车库信息管理系统实现 技术 C# sqlserver 系统功能 基本的登录注册车库管理员进行车辆信息的添加&#xff0c;即给车库登记车辆信息管理员对登记信息进行修改管理员对登记信息进行删除管理员对登记信息进行查询管理员对登记的车辆进行进库&#xff0c;出库处理实时统计车库…

CS61A学习笔记 lecture1 Computer science

CS61A学习笔记 lecture1 Computer science SICP: Structure and Interpretation of Computer Programs 计算机程序的构造和解释 一开始其实是想做南大的SICP学习笔记的&#xff0c;但是没有找到南大这门课的视频&#xff0c;还有就是他是CS61A的clone&#xff0c;网上也有CS61A…

Qt 多线程之QtConcurrent::map(处理序列容器)

QtConcurrent::map()、QtConcurrent::mapped() 和 QtConcurrent::mappedReduced() 函数对一个序列中&#xff08;例如&#xff1a;QList、QVector&#xff09;的项目并行地进行计算。 1、map函数 map函数的功能是在其他线程运行指定的函数&#xff0c;map函数有两个参数 第一…

耗时一个月整理的,最新出炉的Java面试题合集(2022亲身经历)

面试题清单 个人近来面试了不少的公司的&#xff0c;该挂的挂&#xff0c;该应付通过的应付通过&#xff0c;目前对面试题部分做一个系统的总结。最起码要保证被问过的问题第二次被问到的时候是可以回答并且理解的。算是一个被动输入学习的过程。 题目持续更新&#xff0c;答…

xdma linux 驱动

一、下载XDMA文件 输入命令: sudo git clone https://github.com/Xilinx/dma_ip_drivers cd xx_dma/dma_ip_drivers/XDMA/linux-kernel/xdma$ 二 、编译: sudo make install 在最后会遇到下面这个问题: 三、添加key 文件 cd /lib/modules/5.4.0-135-generic/build/ce…

使用 EF Core 处理Sqlite数据库

使用 EF Core 处理Sqlite数据库 1.通过NuGet安装Microsoft.EntityFrameworkCore.Sqlite 2.编写生成数据库的实体类 因为EF Core是通过实体类来作为数据库的字段 public class User {/// <summary>/// 主键 Id/// </summary>[Key]public int Id { get; set; }///…

代码上传gitee

有两种场景&#xff1a; 一、自己的代码没有用git管理&#xff0c;先将自己的代码用git管理起来&#xff0c;然后上传gitee gitee注册账号后&#xff0c;点“”号创建代码仓库&#xff0c;这就是你的代码库上传后的路径。 然后填写仓库名称和路径&#xff0c;这个可以随便写&…

【论文合集】2022年10月医学影像期刊论文合集

★ 本月IEEE Transactions on Medical Imaging(1区 top if 11.037) 共32篇, Medical Image Analysis&#xff08;1区 top if 13.828&#xff09; 共30篇. ”标题高频词汇 (segmentation, 13), (brain, 9), (mri, 6), (graph, 4)(attention, 4), (3d, 4), (contrastive, 4), …

代理模式--【学习笔记】

什么是代理模式&#xff1f; 代理是一种模式,提供了对目标对象的间接访问方式,即通过代理对象访问目标对象.如此便于在目标实现的基础上增加额外的功能操作,以满足自身的业务需求. 代理模式又分为静态代理&#xff0c;动态代理 静态代理模式 编写代理类, 要求: 代理类与目标类…

【我爱世界杯】伪球迷眼里的世界杯

大家好&#xff0c;我是【架构师李肯】&#xff0c;一个专注于嵌入式物联网架构设计的攻城狮。 文章目录按理说聊一聊我和足球第一次热衷于关注世界杯后ying情时代的卡塔尔世界杯祝愿世界杯按理说 嗯&#xff0c;按理说&#xff0c;我一个程序猿&#xff0c;既不踢球&#xff…

ChatGPT原理解析-张俊林

本文将从以下几个方面展开&#xff1a; 引言 ChatGPT的技术原理 引言 作为智能对话系统&#xff0c;ChatGPT最近两天爆火&#xff0c;都火出技术圈了&#xff0c;网上到处都在转ChatGPT相关的内容和测试例子&#xff0c;效果确实很震撼。我记得上一次能引起如此轰动的AI技术…

湃睿PMDS-Fx传感器在电动牙刷上的应用

电动牙刷、冲牙器等产品市场的爆发性增长&#xff0c;显示全球人口正在越来越关注牙齿/口腔的健康问题。 根据资料显示&#xff0c;中国电动牙刷市场规模呈现逐年上涨的态势&#xff0c;2017年中国电动牙刷市场规模为43亿元&#xff0c;2021年中国电动牙刷市场规模上涨为125亿…

01-go基础-10-结构体 struct (定义结构体、声明结构体变量、结构体赋值、结构体做参数、结构体指针、结构体嵌套、结构体打印)

文章目录1. 定义结构体类型2. 声明结构体变量3. 赋值3.1 用结构体赋值3.2 每个成员分别赋值4. 结构体使用4.1 结构体作参数4.2 结构体指针做参数4.3 二者区别4.4 本质原因5. 结构体嵌套5.1 一个结构体作为另一个结构体的成员1&#xff09;定义2&#xff09;赋值和引用3&#xf…

RabbitMQ的学习

MQ引言 什么是MQ MQ(Message Quene)&#xff1a;翻译为消息队列&#xff0c;通过典型的生产者和消费者模型生产者不断向消息队列中生产消息&#xff0c;消费者不断的从队列中获取消息。因为消息的生产和消费都是异步的&#xff0c;而且只关心消息的发送和接收&#xff0c;没有…

【MySQL 原理篇】- 凭这个,我拿下字节面试

若是想查看原图&#xff0c;请点击这里 刘卡卡 | ProcessOn 超链接 索引 从存储结构上看&#xff0c;有哪些索引从存储结构上来划分&#xff1a;BTree索引&#xff08;B-Tree或BTree索引&#xff09;&#xff0c;Hash索引&#xff0c;full-index全文索引&#xff0c;R-Tree索…