文章详情

短信预约-IT技能 免费直播动态提醒

请输入下面的图形验证码

提交验证

短信预约提醒成功

Scala数据库连接池的简单实现

2023-02-09 12:00

关注

在使用JDBC的时候,数据库据连接是非常宝贵的资源。为了复用这些资源,可以将连接保存在一个队列中。当需要的时候可以从队列中取出未使用的连接。如果没有可用连接,则可以在一定时间内等待,直到队列中有可用的连接,否则将抛出异常。

以下是DataSoucre的代码,DataSoucre负责对连接的管理以及分发,同时设置队列的大小,等待时间,连接的账号、密码等。

核心方法为getConenction()方法。且实现AutoCloseable接口,以便后面可以使用using方法自动关闭资源。队列中的连接为封装了conenction的DbConnection类。

package pool
import scala.util.control.Breaks._
import scala.collection.mutable.ArrayBuffer
import java.{util => ju}
import scala.collection.mutable.Buffer
import scala.util.control.Breaks
 
class DataSource(
    val driverName: String,
    val url: String,
    val user: String,
    val password: String,
    val minSize: Integer = 1,
    val maxSize: Integer = 10,
    val keepAliveTimeout: Long = 1000
) extends AutoCloseable {
 
  if (minSize < 0 || minSize > maxSize || keepAliveTimeout < 0) {
    throw new IllegalArgumentException("These arguments are Illegal")
  }
 
  Class.forName(driverName)
  private val pool: Buffer[DbConnection] = ArrayBuffer[DbConnection]()
  private val lock: ju.concurrent.locks.Lock = new ju.concurrent.locks.ReentrantLock(true)
 
  for (i <- 0 until minSize) {
    pool += new DbConnection(url, user, password)
  }
 
  def getConenction(): DbConnection = {
    val starEntry = System.currentTimeMillis()
    Breaks.breakable {
      while (true) {
        lock.lock()
        try {
          for (con <- pool) {
            if (!con.used) {
              con.used = true
              return con;
            }
          }
          if (pool.size < maxSize) {
            var con = new DbConnection(url, user, password) { used = true }
            pool.append(con)
            return con
          }
        } finally {
          lock.unlock()
        }
        if (System.currentTimeMillis() - starEntry > keepAliveTimeout) {
          break()
        }
      }
    }
    throw new IllegalArgumentException("Connection Pool is empty")
  }
  def close(): Unit = {
    lock.lock()
    try {
      if (pool != null) {
        pool.foreach(t => t.innerConnection.close())
        pool.clear()
      }
    } finally {
      lock.unlock()
    }
  }
}

以下是Dbconnection类,该类提供了三个方法且实现了AutoCloseable接口

BeginTransaction:开启事务,并返回封装了的DbTransaction类

close:将连接释放

CreateCommand:创建DbCommand类,该类是负责操作连接的类,比如提交sql,读取数据等

package pool
 
import java.sql.Connection
import java.sql.DriverAction
import java.sql.DriverManager
 
class DbConnection(
    val url: String,
    val user: String,
    val password: String
) extends AutoCloseable {
 
  private[pool] var used: Boolean = false
  private[pool] val innerConnection: Connection = DriverManager.getConnection(url, user, password)
 
  def close(): Unit = {
    if (used) {
      used = false
    }
  }
 
  def BeginTransaction(isolationLevel: Int = IsolationLevel.TRANSACTION_READ_COMMITTED): DbTransaction = {
    if (innerConnection.getAutoCommit()) {
      innerConnection.setAutoCommit(false)
    }
    innerConnection.setTransactionIsolation(isolationLevel)
    new DbTransaction(this)
  }
 
  def CreateCommand(): DbCommand = {
    new DbCommand(this)
  }
}

以下是DbCommand类的代码,该类负责操作数据库。如ExecuteResultSet,ExecuteScalar等。

ExecuteScalar:查询数据库并返回第一行第一个值的方法。

ExecuteResultSet:该方法有两个重载方法。

参数为callBack: ResultSet => Unit的方法,提供了一个回调函数,解析数据的操作可以在回调中实现。

无参的版本则通过反射直接将ResultSet通过字段位置映射,转换成你需要的类型。

package pool
 
import java.sql.CallableStatement
import java.sql.ResultSet
import java.sql.SQLType
import java.sql.Statement
import java.sql.Types
import scala.collection.mutable.ArrayBuffer
import scala.collection.mutable.Buffer
import scala.language.implicitConversions
import scala.reflect.ClassTag
import scala.reflect.runtime.{universe => ru}
 
import Dispose.using
import java.{util => ju}
 
class DbCommand(val connection: DbConnection, var commandText: String = null, val queryTimeout: Integer = 30) extends AutoCloseable {
  if (queryTimeout < 0) {
    throw new IllegalArgumentException(s"timeout (${queryTimeout}) value must be greater than 0.")
  }
  val Parameters: Buffer[DbParameter] = ArrayBuffer[DbParameter]()
  private val mirror = ru.runtimeMirror(getClass().getClassLoader())
  private var statement: CallableStatement = null
 
  
  def ExecuteScalar(): Any = {
    var obj: Any = None
    ExecuteResultSet(t => {
      if (t.next()) {
        if (t.getMetaData().getColumnCount() > 0)
          obj = t.getObject(1)
      }
    })
    obj
  }
 
  
  def ExecuteResultSet(callBack: ResultSet => Unit): Unit = {
    if (callBack == null) throw new IllegalArgumentException("The value of parameter callback is null.")
    statement = connection.innerConnection.prepareCall(commandText)
    statement.setQueryTimeout(queryTimeout)
    addParatemetrs()
    using(statement.executeQuery()) { t =>
      callBack(t)
      if (!t.isClosed())
        getOutParameterValue()
    }
  }
 
  def ExecuteResultSet[T: ru.TypeTag](): ArrayBuffer[T] = {
    val classSymbol = mirror.symbolOf[T].asClass
    val classMirror = mirror.reflectClass(classSymbol)
    val consMethodMirror = classMirror.reflectConstructor(classSymbol.primaryConstructor.asMethod)
    val fields = ru.typeOf[T].decls.filter(t => t.asTerm.isGetter && t.isPublic).map(t => t.asTerm)
    val result = new ArrayBuffer[T]()
    ExecuteResultSet(t => {
      while (t.next()) {
        var i = 1
        val values: Buffer[Any] = ArrayBuffer()
        for (f <- fields) {
          values += t.getObject(i)
          i += 1
        }
        result += consMethodMirror.apply(values: _*).asInstanceOf[T]
      }
    })
    result
  }
 
  def ExecuteBatch[T: ru.TypeTag: ClassTag](values: List[T]): Int = {
    statement = connection.innerConnection.prepareCall(commandText)
    var trans: DbTransaction = null
    val fields = ru.typeOf[T].decls.filter(t => t.asTerm.isGetter && t.isPublic).map(t => t.asTerm)
    for (t <- values) {
      var i = 1
      val filedMirror = mirror.reflect(t)
      for (f <- fields) {
        val instance = filedMirror.reflectField(f)
        statement.setObject(i, instance.get)
        i += 1
      }
      statement.addBatch()
    }
 
    try {
      trans = connection.BeginTransaction()
      val obj = statement.executeBatch()
      trans.Commit()
      statement.clearBatch()
      obj.sum
    } catch {
      case e: Exception => {
        if (trans != null)
          trans.RollBack()
        throw e
      }
    }
  }
 
  def ExecuteNoneQuery(): Integer = {
    statement = connection.innerConnection.prepareCall(commandText)
    statement.setQueryTimeout(queryTimeout)
    addParatemetrs()
    val obj = statement.executeUpdate()
    getOutParameterValue()
    obj
  }
 
  def CreateParameter(): DbParameter = {
    new DbParameter();
  }
 
  private def getOutParameterValue(): Unit = {
    for (i <- 1 to Parameters.size) {
      val parameter: DbParameter = Parameters(i - 1);
      if (parameter.parameterDirection == ParameterDirection.Output || parameter.parameterDirection == ParameterDirection.InputOutput) {
        parameter.value = statement.getObject(i);
      }
    }
  }
 
  private def addParatemetrs(): Unit = {
    statement.clearParameters()
    for (i <- 1 to Parameters.size) {
      val p = Parameters(i - 1);
      if (p.parameterDirection == ParameterDirection.Input || p.parameterDirection == ParameterDirection.InputOutput) {
        statement.setObject(i, p.value)
      }
      if (p.parameterDirection == ParameterDirection.Output || p.parameterDirection == ParameterDirection.InputOutput) {
        statement.registerOutParameter(p.parameterName, p.sqlType, p.scale)
      }
    }
  }
  def close() {
    if (statement != null) {
      statement.close()
    }
  }
}
 
case class DbParameter(
    var parameterName: String = null,
    var value: Any = null,
    var parameterDirection: Integer = ParameterDirection.Input,
    var scale: Integer = 0,
    var sqlType: Integer = null
) {}
 
object ParameterDirection {
  val Input = 1
  val InputOutput = 2
  val Output = 3
}

以下代码是DbTransaction,该类提供了事务的操作如提交、回滚。

package pool
 
class DbTransaction(private val connection: DbConnection) {
 
  def Commit(): Unit = {
    connection.innerConnection.commit()
    if (!connection.innerConnection.getAutoCommit()) {
      connection.innerConnection.setAutoCommit(true);
    }
  }
  def RollBack(): Unit = {
    connection.innerConnection.rollback()
    if (!connection.innerConnection.getAutoCommit()) {
      connection.innerConnection.setAutoCommit(true)
    }
  }
 
  def getConnection(): DbConnection = {
    connection
  }
 
  def getTransactionIsolation(): Int = {
    connection.innerConnection.getTransactionIsolation()
  }
 
}
 
object IsolationLevel {
 
  val TRANSACTION_NONE = 0
 
  val TRANSACTION_READ_UNCOMMITTED = 1;
 
  val TRANSACTION_READ_COMMITTED = 2;
 
  val TRANSACTION_REPEATABLE_READ = 4;
 
  val TRANSACTION_SERIALIZABLE = 8;
 
}

最后是using的方法。通过柯里化以及Try-catch-finally的方式 自动关闭实现了AutoCloseable接口的资源。

package pool
 
object Dispose {
  def using[T <: AutoCloseable](cls: T)(op: T => Unit): Unit = {
    try {
      op(cls)
    } catch {
      case e: Exception => throw e
    } finally {
      cls.close()
    }
  }
}

以下是客户端调用,代码模拟了15个线程并发访问数据库,连接池最多3个资源,从而说明连接池是可以复用这些连接的。

import pool.DataSource
import pool.DbCommand
import pool.DbParameter
import pool.DbTransaction
import pool.Dispose.using
import pool.IsolationLevel
import pool.ParameterDirection
 
import java.sql.Date
import java.sql.ResultSet
import java.sql.Types
import java.time.LocalDate
import java.time.LocalDateTime
import java.time.LocalTime
import javax.xml.crypto.Data
import scala.collection.mutable.ArrayBuffer
import scala.collection.mutable.Buffer
import scala.language.experimental.macros
import scala.reflect.ClassTag
import scala.reflect.runtime.{universe => ru}
import com.nimbusds.oauth2.sdk.util.date.SimpleDate
import java.text.SimpleDateFormat
object App {
  def main(args: Array[String]): Unit = {
    val pool = new DataSource(
      "com.microsoft.sqlserver.jdbc.SQLServerDriver",
      "jdbc:sqlserver://localhost:1433;databaseName=HighwaveDW;trustServerCertificate=true",
      "账号",
      "密码",
      minSize = 1,
      maxSize = 3,
      keepAliveTimeout = 3000
    )
 
    val formatter: SimpleDateFormat = new SimpleDateFormat("HH:mm:ss.SSS");
    for (i <- 1 to 15) {
      val thread: Thread = new Thread(() => {
        val date = new Date(System.currentTimeMillis())
        using(pool.getConenction()) { con =>
          {
            using(new DbCommand(con)) { cmd =>
              {
                cmd.commandText = "{call p_get_out(?,?)}"
                val p1 = new DbParameter("@id", i)
                val p2 = new DbParameter(parameterName = "@msg", parameterDirection = ParameterDirection.Output, sqlType = Types.VARCHAR, scale = 20)
                cmd.Parameters.append(p1)
                cmd.Parameters.append(p2)
                val result = cmd.ExecuteScalar()
                println(s"result=${result},output=${p2.value},parameter=${i}")
              }
            }
          }
        }
      })
      thread.start()
    }
  }
}

开发环境VsCode,SQL Server数据库。以下是引用的第三方库。

version := "1.0"
libraryDependencies += "com.microsoft.sqlserver" % "mssql-jdbc" % "11.2.0.jre8"
libraryDependencies += "org.scala-lang" % "scala-reflect" % "2.13.8"

以下是执行结果。

到此这篇关于Scala数据库连接池的简单实现的文章就介绍到这了,更多相关Scala数据库连接池内容请搜索编程网以前的文章或继续浏览下面的相关文章希望大家以后多多支持编程网!

阅读原文内容投诉

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

软考中级精品资料免费领

  • 历年真题答案解析
  • 备考技巧名师总结
  • 高频考点精准押题
  • 2024年上半年信息系统项目管理师第二批次真题及答案解析(完整版)

    难度     807人已做
    查看
  • 【考后总结】2024年5月26日信息系统项目管理师第2批次考情分析

    难度     351人已做
    查看
  • 【考后总结】2024年5月25日信息系统项目管理师第1批次考情分析

    难度     314人已做
    查看
  • 2024年上半年软考高项第一、二批次真题考点汇总(完整版)

    难度     433人已做
    查看
  • 2024年上半年系统架构设计师考试综合知识真题

    难度     221人已做
    查看

相关文章

发现更多好内容

猜你喜欢

AI推送时光机
位置:首页-资讯-后端开发
咦!没有更多了?去看看其它编程学习网 内容吧
首页课程
资料下载
问答资讯