Scala 练习一 将Mysql表数据导入HBase

news2025/1/21 9:25:25

Scala 练习一 将Mysql表数据导入HBase

续第一篇:Java代码将Mysql表数据导入HBase表

源码仓库地址:https://gitee.com/leaf-domain/data-to-hbase

  • 一、整体介绍
  • 二、依赖
  • 三、测试结果
  • 四、源码

一、整体介绍

在这里插入图片描述

  1. HBase特质

    连接HBase, 创建HBase执行对象

    1. 初始化配置信息:多条(hbase.zookeeper.quorum=>ip:2181)
      Configuration conf = HBaseConfiguration.create()
      conf.set(String, String)
    2. 创建连接:多个连接(池化)
      Connection con = ConnectionFactory.createConnection()
    3. 创建数据表:表名: String
      Table table = con.getTable(TableName)
    def build(): HBase		// 初始化配置信息
    def initPool(): HBase	// 初始化连接池
    def finish(): Executor	// 完成 返回执行对象
    
  2. Executor特质

    对HBase进行操作的方法: 包含如下函数

    def exists(tableName: String): Boolean	// 验证数据表是否存在
    def create(tableName: String, columnFamilies: Seq[String]): Boolean	// 创建数据表
    def drop(tableName: String): Boolean	// 删除数据表
    def put(tableName: String, data: util.List[Put]): Boolean	// 批量插入数据
    
  3. Jdbc 封装

    Jdbc封装

    1. 初始化连接
      driver : com.mysql.cj.jdbc.Driver
      参数:url, username, password
      创建连接
    2. 初始化执行器
      sql, parameters
      创建执行器【初始化参数】
    3. 执行操作并返回【结果】
      DML: 返回影响数据库表行数
      DQL: 返回查询的数据集合
      EX: 出现异常结果
  4. MyHBase用于实现HBaseExecutor特质

  5. 测试数据格式

    mysql表

    SET NAMES utf8mb4;
    SET FOREIGN_KEY_CHECKS = 0;
    
    DROP TABLE IF EXISTS `test_table_for_hbase`;
    CREATE TABLE `test_table_for_hbase`  (
      `test_id` int NULL DEFAULT NULL,
      `test_name` varchar(20) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NULL DEFAULT NULL,
      `test_age` int NULL DEFAULT NULL,
      `test_gender` varchar(6) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NULL DEFAULT NULL,
      `test_phone` varchar(11) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NULL DEFAULT NULL
    ) ENGINE = InnoDB CHARACTER SET = utf8mb4 COLLATE = utf8mb4_0900_ai_ci ROW_FORMAT = Dynamic;
    
    INSERT INTO `test_table_for_hbase` VALUES (1, 'testName1', 26, 'male', '18011111112');
    INSERT INTO `test_table_for_hbase` VALUES (2, 'testName2', 25, 'female', '18011111113');
    INSERT INTO `test_table_for_hbase` VALUES (3, 'testName3', 27, 'male', '18011111114');
    INSERT INTO `test_table_for_hbase` VALUES (4, 'testName4', 35, 'male', '18011111115');
    -- .... 省略以下数据部分
    

    hbase表

    # 创建表  库名:表名, 列族1, 列族2
    create "hbase_test:tranfer_from_mysql","baseInfo","scoreInfo"	
    truncate 'hbase_test:tranfer_from_mysql'  # 清空hbase_test命名空间下的tranfer_from_mysql表
    scan 'hbase_test:tranfer_from_mysql'	  # 查看表
    

二、依赖

<dependencies>
    <!-- HBase 驱动 -->
    <dependency>
        <groupId>org.apache.hbase</groupId>
        <artifactId>hbase-client</artifactId>
        <version>2.3.5</version>
    </dependency>
    <!-- Hadoop -->
    <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-common</artifactId>
        <version>3.1.3</version>
    </dependency>
    <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-auth</artifactId>
        <version>3.1.3</version>
    </dependency>
    <!-- mysql -->
    <dependency>
        <groupId>com.mysql</groupId>
        <artifactId>mysql-connector-j</artifactId>
        <version>8.0.33</version>
    </dependency>

    <!-- zookeeper -->
    <dependency>
        <groupId>org.apache.zookeeper</groupId>
        <artifactId>zookeeper</artifactId>
        <version>3.6.3</version>
    </dependency>
</dependencies>

三、测试结果

终端有个日志的小警告(无伤大雅hh),输出为 true
在这里插入图片描述

查看hbase表,发现数据正常导入

在这里插入图片描述

四、源码

scala代码较简单这里直接上源码了,去除了部分注释,更多请去仓库下载

Executor

package hbase
import org.apache.hadoop.hbase.client.Put
import java.util
trait Executor {
  def exists(tableName: String): Boolean
  def create(tableName: String, columnFamilies: Seq[String]): Boolean
  def drop(tableName: String): Boolean
  def put(tableName: String, data: util.List[Put]): Boolean
}

HBase

package hbase
import org.apache.hadoop.hbase.client.Connection
trait HBase {
  protected var statusCode: Int = -1
  def build(): HBase
  case class PoolCon(var available: Boolean, con: Connection) {
    def out = {
      available = false
      this
    }
    def in = available = true
  }
  def initPool(): HBase
  def finish(): Executor
}

MyHBase

package hbase.impl

import hbase.{Executor, HBase}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hbase.client.{ColumnFamilyDescriptorBuilder, ConnectionFactory, Put, TableDescriptorBuilder}
import org.apache.hadoop.hbase.exceptions.HBaseException
import org.apache.hadoop.hbase.{HBaseConfiguration, TableName}

import java.util
import scala.collection.mutable.ArrayBuffer

class MyHBase (conf: Map[String, String])(pooled: Boolean = false, poolSize: Int = 3) extends HBase{
  private lazy val config: Configuration = HBaseConfiguration.create()
  private lazy val pool: ArrayBuffer[PoolCon] = ArrayBuffer()
  
  override def build(): HBase = {
    if(statusCode == -1){
      conf.foreach(t => config.set(t._1, t._2))
      statusCode = 0
      this
    }else{
      throw new HBaseException("build() function must be invoked first")
    }
  }
  
  override def initPool(): HBase = {
    if(statusCode == 0){
      val POOL_SIZE = if (pooled) {
        if (poolSize <= 0) 3 else poolSize
      } else 1
      for (i <- 1 to POOL_SIZE) {
        pool.append(PoolCon(available = true, ConnectionFactory.createConnection(config)))
      }
      statusCode = 1
      this
    }else{
      throw new HBaseException("initPool() function must be invoked only after build()")
    }

  }
  
  override def finish(): Executor = {
    if (statusCode == 1) {
      statusCode = 2
      new Executor {
        override def exists(tableName: String): Boolean = {
          var pc: PoolCon = null
          try{
            pc = getCon
            val exists = pc.con.getAdmin.tableExists(TableName.valueOf(tableName))
            pc.in
            exists
          }catch {
            case e: Exception => e.printStackTrace()
              false
          }finally {
            close(pc)
          }
        }

        override def create(tableName: String, columnFamilies: Seq[String]): Boolean = {
          if (exists(tableName)) {
            return false
          }
          var pc: PoolCon = null
          try {
            pc = getCon
            val builder: TableDescriptorBuilder = TableDescriptorBuilder
              .newBuilder(TableName.valueOf(tableName))

            columnFamilies.foreach(
              cf => builder.setColumnFamily(
                ColumnFamilyDescriptorBuilder.of(cf)
              )
            )
            pc.con.getAdmin.createTable(builder.build())
            true
          } catch {
            case e: Exception => e.printStackTrace()
              false
          } finally {
              close(pc)
          }
        }
        override def drop(tableName: String): Boolean = {
          if(!exists(tableName)){
            return false
          }
          var pc: PoolCon = null
          try {
            pc = getCon
            pc.con.getAdmin.deleteTable(TableName.valueOf(tableName))
            true
          } catch {
            case e: Exception => e.printStackTrace()
              false
          } finally {
            close(pc)
          }
        }

        override def put(tableName: String, data: util.List[Put]): Boolean = {
          if(!exists(tableName)){
            return false
          }
          var pc: PoolCon = null
          try {
            pc = getCon
            pc.con.getTable(TableName.valueOf(tableName)).put(data)
            true
          } catch {
            case e: Exception => e.printStackTrace()
              false
          } finally {
            close(pc)
          }
        }
      }
    }
    else {
      throw new HBaseException("finish() function must be invoked only after initPool()")
    }
  }
  private def getCon = {
    val left: ArrayBuffer[PoolCon] = pool.filter(_.available)
    if (left.isEmpty) {
      throw new HBaseException("no available connection")
    }
    left.apply(0).out
  }

  private def close(con: PoolCon) = {
    if (null != con) {
      con.in
    }
  }
}

object MyHBase{
  def apply(conf: Map[String, String])(poolSize: Int): MyHBase = new MyHBase(conf)(true, poolSize)
}

Jdbc

package mysql
import java.sql.{Connection, DriverManager, ResultSet, SQLException}
import java.util
object Jdbc {
  object Result extends Enumeration {
    val EX = Value(0) 
    val DML = Value(1) 
    val DQL = Value(2) 
  }
  // 3种结果(异常,DML,DQL)封装
  case class ResThree(rst: Result.Value) {
    def to[T <: ResThree]: T = this.asInstanceOf[T]
  }
  class Ex(throwable: Throwable) extends ResThree(Result.EX)
  object Ex {
    def apply(throwable: Throwable): Ex = new Ex(throwable)
  }

  class Dml(affectedRows: Int) extends ResThree(Result.DML) {
    def update = affectedRows
  }
  object Dml {
    def apply(affectedRows: Int): Dml = new Dml(affectedRows)
  }

  class Dql(set: ResultSet) extends ResThree(Result.DQL) {
    def generate[T](f: ResultSet => T) = {
      val list: util.List[T] = new util.ArrayList()
      while (set.next()) {
        list.add(f(set))
      }
      list
    }
  }
  object Dql {
    def apply(set: ResultSet): Dql = new Dql(set)
  }
  // JDBC 函数封装
  def jdbc(url: String, user: String, password: String)(sql: String, params: Seq[Any] = null): ResThree = {
    def con() = {
      // 1.1 显式加载 JDBC 驱动程序(只需要一次)
      Class.forName("com.mysql.cj.jdbc.Driver")
      // 1.2 创建连接对象
      DriverManager.getConnection(url, user, password)
    }
    def pst(con: Connection) = {
      // 2.1 创建执行对象
      val pst = con.prepareStatement(sql)
      // 2.2 初始化 SQL 参数
      if (null != params && params.nonEmpty) {
        params.zipWithIndex.foreach(t => pst.setObject(t._2 + 1, t._1))
      }
      pst
    }
    try {
      val connect = con()
      val prepared = pst(connect)
      sql match {
        case sql if sql.matches("^(insert|INSERT|delete|DELETE|update|UPDATE) .*")
        => Dml(prepared.executeUpdate())
        case sql if sql.matches("^(select|SELECT) .*")
        => Dql(prepared.executeQuery())
        case _ => Ex(new SQLException(s"illegal sql command : $sql"))
      }

    } catch {
      case e: Exception => Ex(e)
    }

  }

}

Test

import hbase.impl.MyHBase
import mysql.Jdbc._
import org.apache.hadoop.hbase.client.Put
import org.apache.hadoop.hbase.util.Bytes
import java.util

object Test {
  def main(args: Array[String]): Unit = {
    // 初始化MySQL JDBC操作函数
    val jdbcOpr: (String, Seq[Any]) => ResThree = jdbc(
      user = "root",
      url = "jdbc:mysql://localhost:3306/test_db_for_bigdata",
      password = "123456"
    )

    // 执行SQL查询,并将结果封装在ResThree对象中
    val toEntity: ResThree = jdbcOpr(
      "select * from test_table_for_hbase where test_id between ? and ?",
      Seq(2, 4)
    )

    // 判断ResThree对象中的结果是否为异常
    if (toEntity.rst == Result.EX) {
      // 如果异常,执行异常结果处理
      toEntity.to[Ex]
      println("出现异常结果处理")
    } else {
      // 如果没有异常,将查询结果转换为HBase的Put对象列表
      val puts: util.List[Put] = toEntity.to[Dql].generate(rst => {
        // 创建一个Put对象,表示HBase中的一行
        val put = new Put(
          Bytes.toBytes(rst.getInt("test_id")), // row key设置为test_id
          System.currentTimeMillis() // 设置时间戳
        )
        // 向Put对象中添加列值
        // baseInfo是列族名,test_name、test_age、test_gender、test_phone是列名
        put.addColumn(
          Bytes.toBytes("baseInfo"), Bytes.toBytes("test_name"),
          Bytes.toBytes(rst.getString("test_name"))
        )
        put.addColumn(
          Bytes.toBytes("baseInfo"), Bytes.toBytes("test_age"),
          Bytes.toBytes(rst.getString("test_age")) // 注意:这里假设test_age是字符串类型,但通常应为整数类型
        )
        put.addColumn(
          Bytes.toBytes("baseInfo"), Bytes.toBytes("test_gender"),
          Bytes.toBytes(rst.getString("test_gender"))
        )
        put.addColumn(
          Bytes.toBytes("baseInfo"), Bytes.toBytes("test_phone"),
          Bytes.toBytes(rst.getString("test_phone"))
        )
        // 返回构建好的Put对象
        put
      })

      // 如果有数据需要插入HBase
      if (puts.size() > 0) {
        // 初始化HBase连接池并执行Put操作
        val exe = MyHBase(Map("hbase.zookeeper.quorum" -> "single01:2181"))(1)
          .build()
          .initPool()
          .finish()

        // 执行Put操作,并返回是否成功
        val bool = exe.put("hbase_test:tranfer_from_mysql", puts)

        // 打印操作结果
        println(bool)
      } else {
        // 如果没有数据需要插入
        println("查无数据")
      }
    }
  }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1801429.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

项目-基于LangChain的ChatPDF系统

问答系统需求文档 一、项目概述 本项目旨在开发一个能够上传 PDF 文件&#xff0c;并基于 PDF 内容进行问答互动的系统。用户可以上传 PDF 文件&#xff0c;系统将解析 PDF 内容&#xff0c;并允许用户通过对话框进行问答互动&#xff0c;获取有关 PDF 文件内容的信息。 二、…

java自动化测试之03-08java基础之条件判断

java基础之条件判断 java中表示判断语句有三个&#xff0c;分别为if语句、switch语句和三元运算 if语句 1.1 只含有一个if if(布尔表达式){ //如果布尔表达式为true将执行的语句 } 代码举例如下 public class ConditionStudy {public static void main(String[] args) …

Java现在还适合入门吗?

计算机技术在当今的社会&#xff0c;已经变得越来越热&#xff0c;充斥着我们生活的方方面面。人们的工作或是休闲&#xff0c;离不开互联网和电脑&#xff0c;这既受益于各类软件的诞生&#xff0c;也与时下的技术息息相关。Java作为编程界赫赫有名的语言&#xff0c;在最近几…

Java——JVM

前言 JVM.即Java虚拟机.用来解释执行Java字节码. 一、JVM中的内存区域划分 JVM其实也是一个进程,进程运行过程中,要从操作系统这里申请一些资源(内存就是其中的典型资源) 这些内存空间,就支撑了后续Java程序的执行. JVM从系统中申请了一大块内存,这一大块内存给Java程序使…

数据结构笔记2 栈和队列

为什么在循环队列中&#xff0c;判断队满的条件是&#xff08;Q.rear1&#xff09;模maxqsize? 取模运算&#xff08;%&#xff09;在循环队列中起到关键作用&#xff0c;主要是因为它能确保索引值在数组的有效范围内循环。具体来说&#xff0c;取模运算有以下几个重要作用&am…

Linux进程间通信之System V

目录 认识system V&#xff1a; system V共享内存&#xff1a; 共享内存的基本原理&#xff1a; 共享内存的数据结构&#xff1a; 共享内存的建立与释放&#xff1a; 共享内存的建立&#xff1a; 共享内存的释放&#xff1a; 共享内存的关联&#xff1a; 共享内存的去关联…

驱动开发之 input 子系统

1.input 子系统介绍 input 就是输入的意思&#xff0c;input 子系统就是管理输入的子系统&#xff0c;和 pinctrl、gpio 子系统 一样&#xff0c;都是 Linux 内核针对某一类设备而创建的框架。比如按键输入、键盘、鼠标、触摸屏等 等这些都属于输入设备&#xff0c;不同的输入…

区块链的基本原理和优势

人不走空 &#x1f308;个人主页&#xff1a;人不走空 &#x1f496;系列专栏&#xff1a;算法专题 ⏰诗词歌赋&#xff1a;斯是陋室&#xff0c;惟吾德馨 目录 &#x1f308;个人主页&#xff1a;人不走空 &#x1f496;系列专栏&#xff1a;算法专题 ⏰诗词歌…

数据结构_手撕八大排序(计数,快排,归并,堆排,希尔,选择,插入,冒泡)

✨✨所属专栏&#xff1a;数据结构✨✨ ✨✨作者主页&#xff1a;嶔某✨✨ 排序的概念 排序&#xff1a;所谓排序&#xff0c;就是使一串记录&#xff0c;按照其中的某个或某些关键字的大小&#xff0c;递增或递减的排列起来的操作。 稳定性&#xff1a;假定在待排序的记录序…

Docker:认识镜像仓库及其命令

文章目录 Docker Registry什么是Docker Registry 镜像仓库工作机制使用流程实际使用方法仓库的拉取机制 常用的镜像仓库---DockerHub什么是DockerHub私有仓库 镜像仓库命令docker logindocker pulldocker pushdocker searchdocker logout Docker Registry 什么是Docker Regist…

[线程与网络] 网络编程与通信原理(六):深入理解应用层http与https协议(网络编程与通信原理完结)

&#x1f338;个人主页:https://blog.csdn.net/2301_80050796?spm1000.2115.3001.5343 &#x1f3f5;️热门专栏:&#x1f355; Collection与数据结构 (92平均质量分)https://blog.csdn.net/2301_80050796/category_12621348.html?spm1001.2014.3001.5482 &#x1f9c0;Java …

【java】速度搭建一个springboot项目

使用软件&#xff1a;IDEA&#xff0c;mysql 使用框架&#xff1a;springboot mybatis-plus druid 坑点 使用IDEA搭建一个springboot项目的时候&#xff0c;需要考虑一下IDEA版本支持的JDK版本以及maven版本。否则再构建项目&#xff0c;引入pom的时候就会报错。 需要检查…

C++全栈聊天项目(21) 滚动聊天布局设计

滚动聊天布局设计 我们的聊天布局如下图 最外层的是一个chatview&#xff08;黑色&#xff09;&#xff0c; chatview内部在添加一个MainLayout&#xff08;蓝色&#xff09;&#xff0c;MainLayout内部添加一个scrollarea(红色)&#xff0c;scrollarea内部包含一个widget&…

Linux shell编程学习笔记57:lshw命令 获取cpu设备信息

0 前言 在Linux中&#xff0c;获取cpu信息的命令很多&#xff0c;除了我们已经研究的 cat /proc/cpuinfo、lscpu、nproc、hwinfo --cpu 命令&#xff0c;还有 lshw命令。 1 lshw命令的功能 lshw命令源自英文list hardware&#xff0c;即列出系统的硬件信息&#xff0c;这些硬…

UI 自动化分布式测试 -Docker Selenium Grid

分布式测试Selenium Grid 对于大型项目或者有大量测试用例的项目,单机的测试环境往往无法快速完成所有测试用例的执行,此时自动化测试执行效率将会成为最大的瓶颈,Selenium Grid 可以通过多机的分布式架构允许测试用例并行运行,大大缩短了测试时间。 Selenium Grid 提供了多…

限时限量!6.18云服务器大促盘点,错过一次,再等一年!

随着云计算技术的飞速发展&#xff0c;云服务器已成为企业和个人构建和扩展在线业务的首选平台。特别是在大型促销活动如618年中大促期间&#xff0c;云服务提供商纷纷推出极具吸引力的优惠&#xff0c;以降低用户上云的门槛。以下是对当前市场上几个主流云服务提供商的优惠活动…

JavaScript入门宝典:核心知识全攻略(下)

文章目录 前言一、获取标签元素二、操作标签元素属性1. 属性的操作2. innerHTML 三、数组及操作方法1. 数组的定义2. 数组的操作 四、循环语句五、字符串拼接六、定时器1. 定时器的使用3. 清除定时器 七、ajax1. ajax的介绍2. ajax的使用 前言 JavaScript是前端开发不可或缺的技…

C++| 一维线性插值、imadjust函数

前言&#xff1a;最近要从Matlab代码改C代码&#xff0c;不能直接用Matlab生成的C代码&#xff0c;因为需要嵌入到已有项目中。Matlab本身有很多很方便的数学公式&#xff0c;但是在C里没有相关的库的话&#xff0c;需要自己实现。 一维线性插值、imadjust函数 一维线性插值原理…

常见八大排序(纯C语言版)

目录 基本排序 一.冒泡排序 二.选择排序 三.插入排序 进阶排序&#xff08;递归实现&#xff09; 一.快排hoare排序 1.单趟排序 快排步凑 快排的优化 &#xff08;1&#xff09;三数取中 &#xff08;2&#xff09;小区间优化 二.前后指针法(递归实现) 三.快排的非…

【爬虫】使用Python爬取百度学术页面的标题、作者、摘要和关键词

目录 安装所需库编写爬虫代码解释运行脚本结果 在本文中&#xff0c;我将介绍如何使用Python编写一个网络爬虫&#xff0c;从百度学术页面提取研究论文的标题、作者、摘要和关键词。我们将使用 requests和 BeautifulSoup库来实现这一目标。 安装所需库 首先&#xff0c;确保…