Scala 简单实现数据库连接池

news2025/1/11 20:00:45

在使用JDBC的时候,连接池是非常宝贵的资源。为了复用这些资源,可以将连接保存在一个队列中。当需要的时候可以从队列中取出未使用的连接。如果没有可用连接,则可以在一定时间内等待,直到队列中有可用的连接,否则将抛出异常。

以下是DataSoucre的代码,DataSoucre负责对连接的管理以及分发,同时设置队列的大小,等待时间,连接的账号、密码等。

核心方法为getConenction()方法。且实现AutoCloseable接口,以便后面可以使用using方法自动关闭资源。队列中的连接为封装了conenction的DbConnection类。

package pool
import scala.util.control.Breaks._
import scala.collection.mutable.ArrayBuffer
import java.{util => ju}
import scala.collection.mutable.Buffer
import scala.util.control.Breaks

class DataSource(
    val driverName: String,
    val url: String,
    val user: String,
    val password: String,
    val minSize: Integer = 1,
    val maxSize: Integer = 10,
    val keepAliveTimeout: Long = 1000
) extends AutoCloseable {

  if (minSize < 0 || minSize > maxSize || keepAliveTimeout < 0) {
    throw new IllegalArgumentException("These arguments are Illegal")
  }

  Class.forName(driverName)
  private val pool: Buffer[DbConnection] = ArrayBuffer[DbConnection]()
  private val lock: ju.concurrent.locks.Lock = new ju.concurrent.locks.ReentrantLock(true)

  for (i <- 0 until minSize) {
    pool += new DbConnection(url, user, password)
  }

  def getConenction(): DbConnection = {
    val starEntry = System.currentTimeMillis()
    Breaks.breakable {
      while (true) {
        lock.lock()
        try {
          for (con <- pool) {
            if (!con.used) {
              con.used = true
              return con;
            }
          }
          if (pool.size < maxSize) {
            var con = new DbConnection(url, user, password) { used = true }
            pool.append(con)
            return con
          }
        } finally {
          lock.unlock()
        }
        if (System.currentTimeMillis() - starEntry > keepAliveTimeout) {
          break()
        }
      }
    }
    throw new IllegalArgumentException("Connection Pool is empty")
  }
  def close(): Unit = {
    lock.lock()
    try {
      if (pool != null) {
        pool.foreach(t => t.innerConnection.close())
        pool.clear()
      }
    } finally {
      lock.unlock()
    }
  }
}

以下是Dbconnection类,该类提供了三个方法且实现了AutoCloseable接口

BeginTransaction:开启事务,并返回封装了的DbTransaction类

close:将连接释放

CreateCommand:创建DbCommand类,该类是负责操作连接的类,比如提交sql,读取数据等

package pool

import java.sql.Connection
import java.sql.DriverAction
import java.sql.DriverManager

class DbConnection(
    val url: String,
    val user: String,
    val password: String
) extends AutoCloseable {

  private[pool] var used: Boolean = false
  private[pool] val innerConnection: Connection = DriverManager.getConnection(url, user, password)

  def close(): Unit = {
    if (used) {
      used = false
    }
  }

  def BeginTransaction(isolationLevel: Int = IsolationLevel.TRANSACTION_READ_COMMITTED): DbTransaction = {
    if (innerConnection.getAutoCommit()) {
      innerConnection.setAutoCommit(false)
    }
    innerConnection.setTransactionIsolation(isolationLevel)
    new DbTransaction(this)
  }

  def CreateCommand(): DbCommand = {
    new DbCommand(this)
  }
}

以下是DbCommand类的代码,该类负责操作数据库。如ExecuteResultSet,ExecuteScalar等。

ExecuteScalar:查询数据库并返回第一行第一个值的方法。

ExecuteResultSet:该方法有两个重载方法。

参数为callBack: ResultSet => Unit的方法,提供了一个回调函数,解析数据的操作可以在回调中实现。

无参的版本则通过反射直接将ResultSet通过名字映射为你需要的类型。

package pool

import java.sql.CallableStatement
import java.sql.ResultSet
import java.sql.SQLType
import java.sql.Statement
import java.sql.Types
import scala.collection.mutable.ArrayBuffer
import scala.collection.mutable.Buffer
import scala.language.implicitConversions
import scala.reflect.ClassTag
import scala.reflect.runtime.{universe => ru}

import Dispose.using
import java.{util => ju}

class DbCommand(val connection: DbConnection, var commandText: String = null, val queryTimeout: Integer = 30) extends AutoCloseable {
  if (queryTimeout < 0) {
    throw new IllegalArgumentException(s"timeout (${queryTimeout}) value must be greater than 0.")
  }
  val Parameters: Buffer[DbParameter] = ArrayBuffer[DbParameter]()
  private val mirror = ru.runtimeMirror(getClass().getClassLoader())
  private var statement: CallableStatement = null

  /** @author:qingchuan
    *
    * @return
    */
  def ExecuteScalar(): Any = {
    var obj: Any = None
    ExecuteResultSet(t => {
      if (t.next()) {
        if (t.getMetaData().getColumnCount() > 0)
          obj = t.getObject(1)
      }
    })
    obj
  }

  /** @author
    *   qingchuan
    * @version 1.0
    *
    * @param callBack
    */
  def ExecuteResultSet(callBack: ResultSet => Unit): Unit = {
    if (callBack == null) throw new IllegalArgumentException("The value of parameter callback is null.")
    statement = connection.innerConnection.prepareCall(commandText)
    statement.setQueryTimeout(queryTimeout)
    addParatemetrs()
    using(statement.executeQuery()) { t =>
      callBack(t)
      if (!t.isClosed())
        getOutParameterValue()
    }
  }

  def ExecuteResultSet[T: ru.TypeTag](): ArrayBuffer[T] = {
    val classSymbol = mirror.symbolOf[T].asClass
    val classMirror = mirror.reflectClass(classSymbol)
    val consMethodMirror = classMirror.reflectConstructor(classSymbol.primaryConstructor.asMethod)
    val fields = ru.typeOf[T].decls.filter(t => t.asTerm.isGetter && t.isPublic).map(t => t.asTerm)
    val result = new ArrayBuffer[T]()
    ExecuteResultSet(t => {
      while (t.next()) {
        var i = 1
        val values: Buffer[Any] = ArrayBuffer()
        for (f <- fields) {
          values += t.getObject(i)
          i += 1
        }
        result += consMethodMirror.apply(values: _*).asInstanceOf[T]
      }
    })
    result
  }

  def ExecuteBatch[T: ru.TypeTag: ClassTag](values: List[T]): Int = {
    statement = connection.innerConnection.prepareCall(commandText)
    var trans: DbTransaction = null
    val fields = ru.typeOf[T].decls.filter(t => t.asTerm.isGetter && t.isPublic).map(t => t.asTerm)
    for (t <- values) {
      var i = 1
      val filedMirror = mirror.reflect(t)
      for (f <- fields) {
        val instance = filedMirror.reflectField(f)
        statement.setObject(i, instance.get)
        i += 1
      }
      statement.addBatch()
    }

    try {
      trans = connection.BeginTransaction()
      val obj = statement.executeBatch()
      trans.Commit()
      statement.clearBatch()
      obj.sum
    } catch {
      case e: Exception => {
        if (trans != null)
          trans.RollBack()
        throw e
      }
    }
  }

  def ExecuteNoneQuery(): Integer = {
    statement = connection.innerConnection.prepareCall(commandText)
    statement.setQueryTimeout(queryTimeout)
    addParatemetrs()
    val obj = statement.executeUpdate()
    getOutParameterValue()
    obj
  }

  def CreateParameter(): DbParameter = {
    new DbParameter();
  }

  private def getOutParameterValue(): Unit = {
    for (i <- 1 to Parameters.size) {
      val parameter: DbParameter = Parameters(i - 1);
      if (parameter.parameterDirection == ParameterDirection.Output || parameter.parameterDirection == ParameterDirection.InputOutput) {
        parameter.value = statement.getObject(i);
      }
    }
  }

  private def addParatemetrs(): Unit = {
    statement.clearParameters()
    for (i <- 1 to Parameters.size) {
      val p = Parameters(i - 1);
      if (p.parameterDirection == ParameterDirection.Input || p.parameterDirection == ParameterDirection.InputOutput) {
        statement.setObject(i, p.value)
      }
      if (p.parameterDirection == ParameterDirection.Output || p.parameterDirection == ParameterDirection.InputOutput) {
        statement.registerOutParameter(p.parameterName, p.sqlType, p.scale)
      }
    }
  }
  def close() {
    if (statement != null) {
      statement.close()
    }
  }
}

case class DbParameter(
    var parameterName: String = null,
    var value: Any = null,
    var parameterDirection: Integer = ParameterDirection.Input,
    var scale: Integer = 0,
    var sqlType: Integer = null
) {}

object ParameterDirection {
  val Input = 1
  val InputOutput = 2
  val Output = 3
}

以下代码是DbTransaction,该类提供了事务的操作如提交、回滚。

package pool

class DbTransaction(private val connection: DbConnection) {

  def Commit(): Unit = {
    connection.innerConnection.commit()
    if (!connection.innerConnection.getAutoCommit()) {
      connection.innerConnection.setAutoCommit(true);
    }
  }
  def RollBack(): Unit = {
    connection.innerConnection.rollback()
    if (!connection.innerConnection.getAutoCommit()) {
      connection.innerConnection.setAutoCommit(true)
    }
  }

  def getConnection(): DbConnection = {
    connection
  }

  def getTransactionIsolation(): Int = {
    connection.innerConnection.getTransactionIsolation()
  }

}

object IsolationLevel {

  val TRANSACTION_NONE = 0

  val TRANSACTION_READ_UNCOMMITTED = 1;

  val TRANSACTION_READ_COMMITTED = 2;

  val TRANSACTION_REPEATABLE_READ = 4;

  val TRANSACTION_SERIALIZABLE = 8;

}

最后是using的方法。通过柯里化以及Try-catch-finally的方式 自动关闭实现了AutoCloseable接口的资源。

package pool

object Dispose {
  def using[T <: AutoCloseable](cls: T)(op: T => Unit): Unit = {
    try {
      op(cls)
    } catch {
      case e: Exception => throw e
    } finally {
      cls.close()
    }
  }
}

以下是客户端调用,代码模拟了15个线程并发访问数据库,连接池最多3个资源,从而说明连接池是可以复用这些连接的。

import pool.DataSource
import pool.DbCommand
import pool.DbParameter
import pool.DbTransaction
import pool.Dispose.using
import pool.IsolationLevel
import pool.ParameterDirection

import java.sql.Date
import java.sql.ResultSet
import java.sql.Types
import java.time.LocalDate
import java.time.LocalDateTime
import java.time.LocalTime
import javax.xml.crypto.Data
import scala.collection.mutable.ArrayBuffer
import scala.collection.mutable.Buffer
import scala.language.experimental.macros
import scala.reflect.ClassTag
import scala.reflect.runtime.{universe => ru}
import com.nimbusds.oauth2.sdk.util.date.SimpleDate
import java.text.SimpleDateFormat
object App {
  def main(args: Array[String]): Unit = {
    val pool = new DataSource(
      "com.microsoft.sqlserver.jdbc.SQLServerDriver",
      "jdbc:sqlserver://localhost:1433;databaseName=HighwaveDW;trustServerCertificate=true",
      "账号",
      "密码",
      minSize = 1,
      maxSize = 3,
      keepAliveTimeout = 3000
    )

    val formatter: SimpleDateFormat = new SimpleDateFormat("HH:mm:ss.SSS");
    for (i <- 1 to 15) {
      val thread: Thread = new Thread(() => {
        val date = new Date(System.currentTimeMillis())
        using(pool.getConenction()) { con =>
          {
            using(new DbCommand(con)) { cmd =>
              {
                cmd.commandText = "{call p_get_out(?,?)}"
                val p1 = new DbParameter("@id", i)
                val p2 = new DbParameter(parameterName = "@msg", parameterDirection = ParameterDirection.Output, sqlType = Types.VARCHAR, scale = 20)
                cmd.Parameters.append(p1)
                cmd.Parameters.append(p2)
                val result = cmd.ExecuteScalar()
                println(s"result=${result},output=${p2.value},parameter=${i}")
              }
            }
          }
        }
      })
      thread.start()
    }
  }
}

开发环境VsCode,SQL Server数据库。以下是引用的第三方库。

version := "1.0"
libraryDependencies += "com.microsoft.sqlserver" % "mssql-jdbc" % "11.2.0.jre8"
libraryDependencies += "org.scala-lang" % "scala-reflect" % "2.13.8"

以下是执行结果。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/195236.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

浅谈估值模型:PB指标与剩余收益估值

摘要及声明 1&#xff1a;本文简单介绍PB指标的推导以及剩余收益的估值方式&#xff1b; 2&#xff1a;本文主要为理念的讲解&#xff0c;模型也是笔者自建&#xff0c;文中假设与观点是基于笔者对模型及数据的一孔之见&#xff0c;若有不同见解欢迎随时留言交流&#xff1b…

【HTML】HTML 标签 ① ( 骨架标签 | 双标签和单标签 | 嵌套关系和并列关系 | 文档类型 | 页面语言 | 编码字符集 )

文章目录一、HTML 标签简介二、HTML 骨架标签三、双标签和单标签四、嵌套关系和并列关系五、文档类型六、页面语言七、编码字符集一、HTML 标签简介 HTML 英文全称 " HyperText Mark-up Language " , 中文名称是 " 超文本标记语言 " ; 多媒体 : 超文本 指…

小 C 爱观察(observe)

小 C 爱观察&#xff08;observe&#xff09;题目描述输入格式输出格式样例输入数据#1输出数据#1解释#1输入数据#2输出数据#2输入数据#3输出数据#3题目描述 小 C 非常喜欢树。上次后院的蚂蚁看腻了&#xff0c;这次准备来观察树。 小 C 每天起得早早的&#xff0c;给小树浇水…

shell 流程控制之条件判断及案例

目录 流程控制之条件判断 一&#xff0c;if条件语句的语法及案例 1&#xff0c;单分支结构 2&#xff0c;双分支结构 3&#xff0c;多分支结构 二&#xff0c;复合指令 三&#xff0c; exit退出程序 四&#xff0c; 多条件判断语句case 流程控制之条件判断 条件判断语句…

【SpringCloud】Sentinel 之隔离与降级

一、上集回顾上级文章地址&#xff1a;【SpringCloud】Sentinel 之流量控制_面向架构编程的博客-CSDN博客上一篇文章我们讲解了Sentinel 流量控制、流控效果、热点参数限流的用法&#xff0c;统称为限流&#xff0c;它是一种预防措施&#xff0c;可以尽量避免因高并发而引起的服…

根据官方文档详细说明 Kubernetes 网络流量走向,包含详细的图文说明和介绍

根据官方文档详细说明 Kubernetes 网络流量走向&#xff0c;包含详细的图文说明和介绍。 阅读本文&#xff0c;你可以了解在 Kubernetes 内外&#xff0c;数据包是如何转发的&#xff0c;从原始的 Web 请求开始&#xff0c;到托管应用程序的容器。 Kubernetes 网络要求 在深入…

【three.js】本地搭建Threejs官方文档网站 解决threejs官方文档打开过慢得到问题

本文主要为了解决three.js 官方文档 打开过慢的问题 因为Three.js官网是国外的服务器&#xff0c;所以为了方便学习和快速的查阅文档&#xff0c;我们可以自己搭建Three.js官网和文档&#xff0c;方便随时查看案例和文档内容进行学习。 1、首先进入threejs库GitHub地址&#xf…

如何成为一名FPGA工程师?需要掌握哪些知识?

我国每年对于FPGA设计人才的需求缺口很大。在需求缺口巨大的情形下&#xff0c;发展前景相对可观。那么如何成为一名FPGA工程师&#xff1f; 什么是FPGA&#xff1f; FPGA&#xff08;FieldProgrammable Gate Array&#xff09;&#xff0c;即现场可编程门阵列&#xff0c;它…

新手入门Pinia

什么是Pinia Pinia 是 Vue 的专属状态管理库&#xff0c;它允许你跨组件或页面共享状态。它和Vuex作用类似(发音为 /piːnjʌ/),官网 为什么要使用Pinia 与 Vuex 相比&#xff0c;Pinia 不仅提供了一个更简单的 API&#xff0c;也提供了符合组合式 API 风格的 API&#xff0…

vue项目创建

前提&#xff1a;node安装&#xff1a;02node安装_哔哩哔哩_bilibili 1. 查看node版本 查看node版本 node -v 查看npm 版本 npm -v 2. 输入cmd 用管理员打开控制台 3. 设置淘宝镜像 npm config set registry https://registry.npm.taobao.org 4. 安装vue脚手架 npm install -g …

北大青鸟昌平校区:2023年云计算发展趋势!

云计算的大规模应用一直是许多最具变革性技术——如人工智能、物联网等的关键驱动力&#xff0c;未来也将进一步推动虚拟现实和增强现实(VR/AR)、元宇宙、甚至量子计算等技术的发展。近日&#xff0c;在美国《福布斯》网站报道中&#xff0c;列出了2023年云计算领域的五大主要趋…

56.Isaac教程--ROS

ROS ISAAC教程合集地址文章目录ROS安装ROS创建和使用自定义 ROS 包创建 ROS BridgeRos节点时间同步器消息转换器基地姿势同步自定义小码示例&#xff1a;将 ROS 导航堆栈与 Isaac 结合使用在此示例上构建将 Isaac 映射转换为 ROS 映射Isaac 和机器人操作系统 (ROS) 都使用消息传…

other-chatGPT记录

title: other-chatGPT记录 categories: Others tags: [人工智能, ai, 机器人, chatGPT] date: 2023-02-02 10:04:33 comments: false mathjax: true toc: true other-chatGPT记录 前篇 官网 https://openai.com/api - https://openai.com/api/测试 - https://platform.openai.…

DSP_CCS7实现变量的导出与MatLAB读取

前言 最近在做基于dsp平台的无通信接口系统辨识&#xff0c;因此需要直接利用CCS将数据导出&#xff0c;然后再利用MatLAB解析读取后的数据。MatLAB的代码参考了以下这篇链接: -/导出CCS3.3数据及使用matlab处理的方法.md at master dailai/- GitHub 高版本的CCS&#xff…

GitHub访问问题与 Steam++下载及使用(适合小白)

前言 &#x1f4dc; “ 作者 久绊A ” 专注记录自己所整理的Java、web、sql等&#xff0c;IT技术干货、学习经验、面试资料、刷题记录&#xff0c;以及遇到的问题和解决方案&#xff0c;记录自己成长的点滴 ​ 目录 前言 一、Steam的介绍 1、大概介绍 2、详细介绍 二、Ste…

Unity与Android交互(双端通信)

前言 最近小编开始做关于手部康复的项目&#xff0c;需要Android集成Unity&#xff0c;以Android为主&#xff0c;Unity为辅的开发&#xff1b;上一篇给大家分享了Unity嵌入Android的操作过程&#xff0c;所以今天想给大家分享一下双端通信的知识&#xff1b; 一. Android与Un…

安装OpenResty

安装OpenResty 1.安装 首先你的Linux虚拟机必须联网 1&#xff09;安装开发库 首先要安装OpenResty的依赖开发库&#xff0c;执行命令&#xff1a; yum install -y pcre-devel openssl-devel gcc --skip-broken2&#xff09;安装OpenResty仓库 你可以在你的 CentOS 系统中…

物联网平台+业务平台基本架构设计与优化想法

前言 目前的交付底座有点老&#xff0c;而且集成的有点杂&#xff0c;计划是要升级下&#xff0c;先说想法&#xff0c;看领导做不做。 1 业务平台定位 我们的愿景&#xff1a;通过物联平台赋能&#xff0c;让数据产生价值。 为客户提供可视化的平台&#xff08;数据价值…

【王道数据结构】第二章 | 线性表

目录 2.1线性表的定义 2.2线性表的基础操作 2.3顺序表的定义 2.4顺序表的基本操作 2.5 线性表的链式表示 2.1线性表的定义 线性表是具有相同数据类型的n(n>0)个数据元素的有限序列&#xff0c;其中n为表长&#xff0c;当n0时线性表是一个空表。若用L命名线性表&#xf…

Block底层原理读书笔记-《高级编程- iOS与OS多线程和内存管理》(更新中)

1 一个Block 真正的底层都有些什么&#xff1f; Block会被解析成一个结构体&#xff08;这里成为Block结构体&#xff09;&#xff0c;这个结构体里有&#xff1a; &#xff08;1&#xff09;isa指针&#xff08;说明Block的本质是一个对象&#xff09;&#xff1a;指向Stack…