flink: 将接收到的tcp文本流写入HBase

news2025/1/22 20:51:55

一、依赖:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>org.example</groupId>
    <artifactId>pulsar-demo2</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
        <pulsar.version>2.8.0</pulsar.version>
        <jackson.version>2.10.5</jackson.version>
        <!--<jackson.version>2.6.7</jackson.version>-->

    </properties>

    <dependencies>
        <dependency>
            <groupId>org.apache.pulsar</groupId>
            <artifactId>pulsar-client-all</artifactId>
            <version>${pulsar.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.pulsar</groupId>
            <artifactId>pulsar-client-kafka</artifactId>
            <version>${pulsar.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.pulsar</groupId>
            <artifactId>pulsar-spark</artifactId>
            <version>${pulsar.version}</version>
            <exclusions>
                <exclusion>
                    <groupId>org.apache.spark</groupId>
                    <artifactId>spark-streaming_2.10</artifactId>
                </exclusion>
            </exclusions>
        </dependency>

<!--

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_2.11</artifactId>
            <version>2.4.0</version>
        </dependency>-->


        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-core</artifactId>
            <version>${jackson.version}</version>
        </dependency>

        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-annotations</artifactId>
            <version>${jackson.version}</version>
        </dependency>
        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-databind</artifactId>
            <version>${jackson.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_2.12</artifactId>
            <version>3.0.0</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_2.12</artifactId>
            <version>1.13.6</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner-blink_2.12</artifactId>
            <version>1.13.6</version>
        </dependency>



        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-java</artifactId>
            <version>1.13.6</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-csv</artifactId>
            <version>1.13.6</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_2.12</artifactId>
            <version>1.13.6</version>
        </dependency>

        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-client</artifactId>
            <version>2.4.2</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-hbase-1.4_2.12</artifactId>
            <version>1.13.6</version>
        </dependency>

    </dependencies>

</project>

二、HBase中建表:

create 'hbasetable','family1','family2','family3','family4'

三、在一台服务器上开启nc

nc -lk 9999

四、运行,demo程序

package cn.edu.tju;


import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.descriptors.Csv;
import org.apache.flink.table.descriptors.FileSystem;
import org.apache.flink.table.descriptors.Schema;

import java.util.UUID;

public class FlinkHBase3 {
//nc 服务器地址
    private static String HOST_NAME = "xx.xx.xx.xx";
    private static int PORT = 9999;
    private static String DELIMITER ="\n";
    public static void main(String[] args) throws Exception{
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);


        DataStream<String> socketDataInfo =  env.socketTextStream(HOST_NAME, PORT, DELIMITER);
        SingleOutputStreamOperator<DataInfo> dataInfoStream = socketDataInfo.map(new MapFunction<String, DataInfo>() {
            @Override
            public DataInfo map(String value) throws Exception {

                String[] stringList = value.split(",");
                DataInfo dataInfo = new DataInfo(UUID.randomUUID().toString(), Long.parseLong(
                        stringList[0]), stringList[1], Double.parseDouble(stringList[2]));
                return dataInfo;
            }
        });

        Table dataTable = tableEnv.fromDataStream(dataInfoStream,"rowkey,ts,info,val");

        tableEnv.createTemporaryView("dataTable", dataTable);


// 这里要配自己HBase的zookeeper地址
        tableEnv.executeSql("CREATE TABLE flinkTable (\n" +
                " rowkey STRING,\n" +
                " family1 ROW<ts BIGINT, info STRING, val DOUBLE>,\n" +
                " PRIMARY KEY (rowkey) NOT ENFORCED\n" +
                ") WITH (\n" +
                " 'connector' = 'hbase-1.4',\n" +
                " 'table-name' = 'hbasetable',\n" +
                " 'zookeeper.quorum' = 'xx.xx.xx.xx:2181'\n" +
                ")");


        tableEnv.executeSql("INSERT INTO flinkTable " +
                "SELECT rowkey, ROW(ts,info,val) FROM dataTable");
        env.execute("HBaseFlinkJob");

    }

    public static class DataInfo{

        private String rowkey;
        private Long ts;
        private String info;
        private double val;

        public String getRowkey() {
            return rowkey;
        }

        public void setRowkey(String rowkey) {
            this.rowkey = rowkey;
        }

        public Long getTs() {
            return ts;
        }

        public void setTs(Long ts) {
            this.ts = ts;
        }

        public String getInfo() {
            return info;
        }

        public void setInfo(String info) {
            this.info = info;
        }

        public double getVal() {
            return val;
        }

        public void setVal(double val) {
            this.val = val;
        }

        @Override
        public String toString() {
            return "DataInfo{" +
                    "ts=" + ts +
                    ", info='" + info + '\'' +
                    ", val='" + val + '\'' +
                    '}';
        }

        public DataInfo( String rowkey, Long ts, String info, double val) {
            this.rowkey = rowkey;
            this.ts = ts;
            this.info = info;
            this.val = val;
        }

        public DataInfo() {

        }
    }

}

五、在nc窗口输入:

1689999832,dong,32.45

六、在HBase检查数据是否已经写入:

scan 'hbasetable'

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1562231.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

总结:微信小程序中跨组件的通信、状态管理的方案

在微信小程序中实现跨组件通信和状态管理,有以下几种主要方案: 事件机制 通过事件机制可以实现父子组件、兄弟组件的通信。 示例: 父组件向子组件传递数据: 父组件: <child binddata"handleChildData" /> 子组件: Component({..., methods: { handleChildData(…

VTK 简介

VTK 简介 VTK 简介什么是 VTK&#xff1f;VTK 能做什么&#xff1f;VTK 的基本组成VTK 的框架结构VTK 的数据结构VTK 的可视化流程参考 VTK 简介 什么是 VTK&#xff1f; VTK&#xff0c;全称是Visualization Toolkit&#xff0c;即可视化工具包。是一个开源、跨平台、可自由…

C++面向对象程序设计 - 访问对象中成员的3种方法

在C程序中访问对象的成员变量和成员函数&#xff0c;有三种方法&#xff1a; 通过对象名和成员运算符访问对象中的成员&#xff1b;通过指向对象的指针访问对象中的成员&#xff1b;通过对象的引用变量访问对象中的成员 在了解访问对象中成员的3种方法前&#xff0c;先了解下C…

uniapp 小程序和app map地图上显示多个酷炫动态的标点,头像后端传过来,真机测试有效

展示效果 二、引入地图 如果需要搜索需要去腾讯地图官网上看文档&#xff0c;找到对应的内容 1.申请开发者密钥&#xff08;key&#xff09;&#xff1a;申请密钥 2.开通webserviceAPI服务&#xff1a;控制台 ->应用管理 -> 我的应用 ->添加key-> 勾选WebService…

LCD TP触摸屏调试方法

一、硬件连接 I2C总线&#xff1a;I2C-SDA和i2C-SCL 中断信号&#xff1a;touch-gpio 复位信号&#xff1a;reset-gpio 电源信号&#xff1a;power-gpio 二、驱动调试 2.1 确认从设备地址 在给TP供电正常后&#xff0c;检测其I2C设备从地址&#xff0c;或者通过datashee…

香港科技大学广州|数据科学与分析学域硕博招生宣讲会—天津大学专场

时间&#xff1a;2024年4月12日&#xff08;星期五&#xff09;14:00 地点&#xff1a;天津大学北洋园校区55楼B204 报名链接&#xff1a;https://www.wjx.top/vm/Q0cKTUI.aspx# 跨学科研究领域 *数据驱动的人工智能和机器学习 *统计学习和建模 工业和商业分析 *特定行业的数…

渗透测试:数据库UDF提权(linux)

目录 开头: 1.UDF提权简介&#xff1a; 1.1共享库文件(UDF文件)指定目录&#xff1a; 版本特征&#xff1a; 操作系统版本&#xff1a; 2.靶场UDF提权复现 提权前提 1.要有一个高权限的MySQL的账号 ​编辑 2.MySQL的权限配置secure_file_priv为空 3.必须有存放UDF文件的…

非关系型数据库——Redis配置与优化

目录 一、关系型数据库和非关系型数据库 1.定义 1.1关系型数据库 1.2非关系型数据库 2.非关系型数据库产生的背景 3.关系型数据库和非关系型数据库区别 3.1适用性不同 3.2数据一致性要求不同 3.3数据模型不同 3.4数据查询语言不同 3.5数据存储方式不同 3.6扩展方式…

【面试八股总结】超文本传输协议HTTP(二)

一、HTTP缓存技术 将资源&#xff08;如网页、图像、脚本等&#xff09;的副本存储在客户端或中间代理服务器上&#xff0c;以便将来的请求可以直接从缓存中获取&#xff0c;而不必重新从服务器下载资源。这有助于减少网络延迟&#xff0c;提高页面加载速度&#xff0c;并减轻…

Docker工作流

1.工作流 开发应用编写Dockerfile构建Docker镜像运行Docker容器测试应用发布镜像到Hub迭代更新镜像 2.开发应用 首先你需要创建一个应用&#xff0c;这个应用可以是后端应用或者前端应用&#xff0c;任何语言都可以。 比如&#xff1a;我使用IDEA 创建一个Java后端应用&…

行人重识别项目 | 基于Pytorch实现ReID行人重识别算法

项目应用场景 面向行人重识别场景&#xff0c;项目具有轻量化 (训练的时候也只需要 2GB 的显存占用)、性能好 (只使用 softmax 损失就能够达到 Rank188.24%, mAP70.68%)&#xff0c;另外提供友好的上手项目流程教程 项目效果&#xff1a; 项目流程 > 具体参见项目内README.…

穿山甲广告平台SDK接入效果怎么样?

广告收入是大多数开发者的应用变现收入来源&#xff0c;如何进行流流量变现是从应用设计之初就需要开发者思考的问题。 穿山甲广告平台作为国内第三方广告变现平台&#xff0c;是不少开发者选择的对接平台。 穿山甲广告平台的广告类型较多&#xff0c;有信息流&#xff0c;ba…

用Python实现办公自动化(自动化处理PDF文件)

自动化处理 PDF 文件 目录 自动化处理 PDF 文件 谷歌浏览器 Chrome与浏览器驱动ChromeDriver安装 &#xff08;一&#xff09;批量下载 PDF 文件 1.使用Selenium模块爬取多页内容 2.使用Selenium模块下载PDF文件 3.使用urllib模块来进行网页的下载和保存 4.使用urllib…

前端性能优化-Table渲染速度优化

教务系统-排课页面性能优化总结 一、前言 在公司教务系统中,排课页面慢的令人发指,在某些情况由于数据量大导致页面主进程卡死,遂组织进行一次排查优化,现记录一下 二、效果对比 以下数据均为UAT环境 Performence对比 更改前: 主进程渲染时间为 8s 教务系统-排课页面性…

SpringBoot+uniApp宠物领养小程序系统 附带详细运行指导视频

文章目录 一、项目演示二、项目介绍三、运行截图四、主要代码1.保存宠物信息代码2.提交订单信息代码3.查询评论信息代码 一、项目演示 项目演示地址&#xff1a; 视频地址 二、项目介绍 项目描述&#xff1a;这是一个基于SpringBootuniApp框架开发的宠物领养微信小程序系统。…

wps 开发插件

官方文档参考wps官方文档参考 1.环境安装 安装wps https://www.wps.cn/ 安装Node.js https://nodejs.org/en 安装代码编辑器 Visual Studio Code https://code.visualstudio.com/ 环境检查-进入cmd查看 node -v2.demo 2.1 demo下载 打开vscode&#xff0c;新建终端 安装…

渗透测试练习题解析 5(CTF web)

1、[安洵杯 2019]easy_serialize_php 1 考点&#xff1a;PHP 反序列化逃逸 变量覆盖 【代码审计】 通过 GET 的方式获取参数 f 的值&#xff0c;传递给变量 function 定义一个过滤函数&#xff0c;过滤掉特定字符&#xff08;用空字符替换&#xff09; 下面的代码其实没什么用…

面试智力题

面试智力题 二进位bit1. 题目&#xff1a;一千杯水&#xff0c;一杯水有毒&#xff0c;怎么用10只老鼠&#xff0c;把这杯水找出来&#xff08;1&#xff09;解法1&#xff1a;常规法&#xff08;2&#xff09;bit法&#xff08;位数法&#xff09; (3) 时间法三级目录 二进位b…

Flutter Web 的未来,Wasm Native 即将到来

早在去年 Google I/O 发布 Flutter 3.10 的时候就提到过&#xff0c; Flutter Web 的未来会是 Wasm Native &#xff0c;当时 Flutter 团队就表示&#xff0c;Flutter Web 的定位不是设计为通用 Web 的框架&#xff0c;类似的 Web 框架现在有很多&#xff0c;而 Flutter 的定位…

Vision-Language Models for Vision Tasks: A Survey

论文地址&#xff1a;https://arxiv.org/pdf/2304.00685.pdf 项目地址&#xff1a;https://github.com/jingyi0000/VLM_survey 一、综述动机 视觉语言模型&#xff0c;如CLIP&#xff0c;以其独特的训练方式显著简化了视觉识别任务的流程。它减少了对大量精细标注数据的依赖&a…