Scala数据库连接池的简单实现

在使用JDBC的时候,数据库据连接是非常宝贵的资源。为了复用这些资源,可以将连接保存在一个队列中。当需要的时候可以从队列中取出未使用的连接。如果没有可用连接,则可以在一定时间内等待,直到队列中有可用的连接,否则将抛出异常。

以下是DataSoucre的代码,DataSoucre负责对连接的管理以及分发,同时设置队列的大小,等待时间,连接的账号、密码等。

核心方法为getConenction()方法。且实现AutoCloseable接口,以便后面可以使用using方法自动关闭资源。队列中的连接为封装了conenction的DbConnection类。

package pool
import scala.util.control.Breaks._
import scala.collection.mutable.ArrayBuffer
import java.{util => ju}
import scala.collection.mutable.Buffer
import scala.util.control.Breaks

class DataSource(
    val driverName: String,
    val url: String,
    val user: String,
    val password: String,
    val minSize: Integer = 1,
    val maxSize: Integer = 10,
    val keepAliveTimeout: Long = 1000
) extends AutoCloseable {

  if (minSize < 0 || minSize > maxSize || keepAliveTimeout < 0) {
    throw new IllegalArgumentException("These arguments are Illegal")
  }

  Class.forName(driverName)
  private val pool: Buffer[DbConnection] = ArrayBuffer[DbConnection]()
  private val lock: ju.concurrent.locks.Lock = new ju.concurrent.locks.ReentrantLock(true)

  for (i <- 0 until minSize) {
    pool += new DbConnection(url, user, password)
  }

  def getConenction(): DbConnection = {
    val starEntry = System.currentTimeMillis()
    Breaks.breakable {
      while (true) {
        lock.lock()
        try {
          for (con <- pool) {
            if (!con.used) {
              con.used = true
              return con;
            }
          }
          if (pool.size < maxSize) {
            var con = new DbConnection(url, user, password) { used = true }
            pool.append(con)
            return con
          }
        } finally {
          lock.unlock()
        }
        if (System.currentTimeMillis() - starEntry > keepAliveTimeout) {
          break()
        }
      }
    }
    throw new IllegalArgumentException("Connection Pool is empty")
  }
  def close(): Unit = {
    lock.lock()
    try {
      if (pool != null) {
        pool.foreach(t => t.innerConnection.close())
        pool.clear()
      }
    } finally {
      lock.unlock()
    }
  }
}

以下是Dbconnection类,该类提供了三个方法且实现了AutoCloseable接口

BeginTransaction:开启事务,并返回封装了的DbTransaction类

close:将连接释放

CreateCommand:创建DbCommand类,该类是负责操作连接的类,比如提交sql,读取数据等

package pool

import java.sql.Connection
import java.sql.DriverAction
import java.sql.DriverManager

class DbConnection(
    val url: String,
    val user: String,
    val password: String
) extends AutoCloseable {

  private[pool] var used: Boolean = false
  private[pool] val innerConnection: Connection = DriverManager.getConnection(url, user, password)

  def close(): Unit = {
    if (used) {
      used = false
    }
  }

  def BeginTransaction(isolationLevel: Int = IsolationLevel.TRANSACTION_READ_COMMITTED): DbTransaction = {
    if (innerConnection.getAutoCommit()) {
      innerConnection.setAutoCommit(false)
    }
    innerConnection.setTransactionIsolation(isolationLevel)
    new DbTransaction(this)
  }

  def CreateCommand(): DbCommand = {
    new DbCommand(this)
  }
}

以下是DbCommand类的代码,该类负责操作数据库。如ExecuteResultSet,ExecuteScalar等。

ExecuteScalar:查询数据库并返回第一行第一个值的方法。

ExecuteResultSet:该方法有两个重载方法。

参数为callBack: ResultSet => Unit的方法,提供了一个回调函数,解析数据的操作可以在回调中实现。

无参的版本则通过反射直接将ResultSet通过字段位置映射,转换成你需要的类型。

package pool

import java.sql.CallableStatement
import java.sql.ResultSet
import java.sql.SQLType
import java.sql.Statement
import java.sql.Types
import scala.collection.mutable.ArrayBuffer
import scala.collection.mutable.Buffer
import scala.language.implicitConversions
import scala.reflect.ClassTag
import scala.reflect.runtime.{universe => ru}

import Dispose.using
import java.{util => ju}

class DbCommand(val connection: DbConnection, var commandText: String = null, val queryTimeout: Integer = 30) extends AutoCloseable {
  if (queryTimeout < 0) {
    throw new IllegalArgumentException(s"timeout (${queryTimeout}) value must be greater than 0.")
  }
  val Parameters: Buffer[DbParameter] = ArrayBuffer[DbParameter]()
  private val mirror = ru.runtimeMirror(getClass().getClassLoader())
  private var statement: CallableStatement = null

  /** @author:qingchuan
    *
    * @return
    */
  def ExecuteScalar(): Any = {
    var obj: Any = None
    ExecuteResultSet(t => {
      if (t.next()) {
        if (t.getMetaData().getColumnCount() > 0)
          obj = t.getObject(1)
      }
    })
    obj
  }

  /** @author
    *   qingchuan
    * @version 1.0
    *
    * @param callBack
    */
  def ExecuteResultSet(callBack: ResultSet => Unit): Unit = {
    if (callBack == null) throw new IllegalArgumentException("The value of parameter callback is null.")
    statement = connection.innerConnection.prepareCall(commandText)
    statement.setQueryTimeout(queryTimeout)
    addParatemetrs()
    using(statement.executeQuery()) { t =>
      callBack(t)
      if (!t.isClosed())
        getOutParameterValue()
    }
  }

  def ExecuteResultSet[T: ru.TypeTag](): ArrayBuffer[T] = {
    val classSymbol = mirror.symbolOf[T].asClass
    val classMirror = mirror.reflectClass(classSymbol)
    val consMethodMirror = classMirror.reflectConstructor(classSymbol.primaryConstructor.asMethod)
    val fields = ru.typeOf[T].decls.filter(t => t.asTerm.isGetter && t.isPublic).map(t => t.asTerm)
    val result = new ArrayBuffer[T]()
    ExecuteResultSet(t => {
      while (t.next()) {
        var i = 1
        val values: Buffer[Any] = ArrayBuffer()
        for (f <- fields) {
          values += t.getObject(i)
          i += 1
        }
        result += consMethodMirror.apply(values: _*).asInstanceOf[T]
      }
    })
    result
  }

  def ExecuteBatch[T: ru.TypeTag: ClassTag](values: List[T]): Int = {
    statement = connection.innerConnection.prepareCall(commandText)
    var trans: DbTransaction = null
    val fields = ru.typeOf[T].decls.filter(t => t.asTerm.isGetter && t.isPublic).map(t => t.asTerm)
    for (t <- values) {
      var i = 1
      val filedMirror = mirror.reflect(t)
      for (f <- fields) {
        val instance = filedMirror.reflectField(f)
        statement.setObject(i, instance.get)
        i += 1
      }
      statement.addBatch()
    }

    try {
      trans = connection.BeginTransaction()
      val obj = statement.executeBatch()
      trans.Commit()
      statement.clearBatch()
      obj.sum
    } catch {
      case e: Exception => {
        if (trans != null)
          trans.RollBack()
        throw e
      }
    }
  }

  def ExecuteNoneQuery(): Integer = {
    statement = connection.innerConnection.prepareCall(commandText)
    statement.setQueryTimeout(queryTimeout)
    addParatemetrs()
    val obj = statement.executeUpdate()
    getOutParameterValue()
    obj
  }

  def CreateParameter(): DbParameter = {
    new DbParameter();
  }

  private def getOutParameterValue(): Unit = {
    for (i <- 1 to Parameters.size) {
      val parameter: DbParameter = Parameters(i - 1);
      if (parameter.parameterDirection == ParameterDirection.Output || parameter.parameterDirection == ParameterDirection.InputOutput) {
        parameter.value = statement.getObject(i);
      }
    }
  }

  private def addParatemetrs(): Unit = {
    statement.clearParameters()
    for (i <- 1 to Parameters.size) {
      val p = Parameters(i - 1);
      if (p.parameterDirection == ParameterDirection.Input || p.parameterDirection == ParameterDirection.InputOutput) {
        statement.setObject(i, p.value)
      }
      if (p.parameterDirection == ParameterDirection.Output || p.parameterDirection == ParameterDirection.InputOutput) {
        statement.registerOutParameter(p.parameterName, p.sqlType, p.scale)
      }
    }
  }
  def close() {
    if (statement != null) {
      statement.close()
    }
  }
}

case class DbParameter(
    var parameterName: String = null,
    var value: Any = null,
    var parameterDirection: Integer = ParameterDirection.Input,
    var scale: Integer = 0,
    var sqlType: Integer = null
) {}

object ParameterDirection {
  val Input = 1
  val InputOutput = 2
  val Output = 3
}

以下代码是DbTransaction,该类提供了事务的操作如提交、回滚。

package pool

class DbTransaction(private val connection: DbConnection) {

  def Commit(): Unit = {
    connection.innerConnection.commit()
    if (!connection.innerConnection.getAutoCommit()) {
      connection.innerConnection.setAutoCommit(true);
    }
  }
  def RollBack(): Unit = {
    connection.innerConnection.rollback()
    if (!connection.innerConnection.getAutoCommit()) {
      connection.innerConnection.setAutoCommit(true)
    }
  }

  def getConnection(): DbConnection = {
    connection
  }

  def getTransactionIsolation(): Int = {
    connection.innerConnection.getTransactionIsolation()
  }

}

object IsolationLevel {

  val TRANSACTION_NONE = 0

  val TRANSACTION_READ_UNCOMMITTED = 1;

  val TRANSACTION_READ_COMMITTED = 2;

  val TRANSACTION_REPEATABLE_READ = 4;

  val TRANSACTION_SERIALIZABLE = 8;

}

最后是using的方法。通过柯里化以及Try-catch-finally的方式 自动关闭实现了AutoCloseable接口的资源。

package pool

object Dispose {
  def using[T <: AutoCloseable](cls: T)(op: T => Unit): Unit = {
    try {
      op(cls)
    } catch {
      case e: Exception => throw e
    } finally {
      cls.close()
    }
  }
}

以下是客户端调用,代码模拟了15个线程并发访问数据库,连接池最多3个资源,从而说明连接池是可以复用这些连接的。

import pool.DataSource
import pool.DbCommand
import pool.DbParameter
import pool.DbTransaction
import pool.Dispose.using
import pool.IsolationLevel
import pool.ParameterDirection

import java.sql.Date
import java.sql.ResultSet
import java.sql.Types
import java.time.LocalDate
import java.time.LocalDateTime
import java.time.LocalTime
import javax.xml.crypto.Data
import scala.collection.mutable.ArrayBuffer
import scala.collection.mutable.Buffer
import scala.language.experimental.macros
import scala.reflect.ClassTag
import scala.reflect.runtime.{universe => ru}
import com.nimbusds.oauth2.sdk.util.date.SimpleDate
import java.text.SimpleDateFormat
object App {
  def main(args: Array[String]): Unit = {
    val pool = new DataSource(
      "com.microsoft.sqlserver.jdbc.SQLServerDriver",
      "jdbc:sqlserver://localhost:1433;databaseName=HighwaveDW;trustServerCertificate=true",
      "账号",
      "密码",
      minSize = 1,
      maxSize = 3,
      keepAliveTimeout = 3000
    )

    val formatter: SimpleDateFormat = new SimpleDateFormat("HH:mm:ss.SSS");
    for (i <- 1 to 15) {
      val thread: Thread = new Thread(() => {
        val date = new Date(System.currentTimeMillis())
        using(pool.getConenction()) { con =>
          {
            using(new DbCommand(con)) { cmd =>
              {
                cmd.commandText = "{call p_get_out(?,?)}"
                val p1 = new DbParameter("@id", i)
                val p2 = new DbParameter(parameterName = "@msg", parameterDirection = ParameterDirection.Output, sqlType = Types.VARCHAR, scale = 20)
                cmd.Parameters.append(p1)
                cmd.Parameters.append(p2)
                val result = cmd.ExecuteScalar()
                println(s"result=${result},output=${p2.value},parameter=${i}")
              }
            }
          }
        }
      })
      thread.start()
    }
  }
}

开发环境VsCode,SQL Server数据库。以下是引用的第三方库。

version := "1.0"
libraryDependencies += "com.microsoft.sqlserver" % "mssql-jdbc" % "11.2.0.jre8"
libraryDependencies += "org.scala-lang" % "scala-reflect" % "2.13.8"

以下是执行结果。

到此这篇关于Scala数据库连接池的简单实现的文章就介绍到这了,更多相关Scala数据库连接池内容请搜索我们以前的文章或继续浏览下面的相关文章希望大家以后多多支持我们!

(0)

相关推荐

  • Scala 操作Redis使用连接池工具类RedisUtil

    本文介绍了Scala 操作Redis,分享给大家,具体如下: package com.zjw.util import java.util import org.apache.commons.pool2.impl.GenericObjectPoolConfig import org.apache.logging.log4j.scala.Logging import redis.clients.jedis.{Jedis, JedisPool, Response} import redis.clien

  • Scala数据库连接池的简单实现

    在使用JDBC的时候,数据库据连接是非常宝贵的资源.为了复用这些资源,可以将连接保存在一个队列中.当需要的时候可以从队列中取出未使用的连接.如果没有可用连接,则可以在一定时间内等待,直到队列中有可用的连接,否则将抛出异常. 以下是DataSoucre的代码,DataSoucre负责对连接的管理以及分发,同时设置队列的大小,等待时间,连接的账号.密码等. 核心方法为getConenction()方法.且实现AutoCloseable接口,以便后面可以使用using方法自动关闭资源.队列中的连接为封

  • Java 数据库连接池详解及简单实例

    Java 数据库连接池详解 数据库连接池的原理是: 连接池基本的思想是在系统初始化的时候,将数据库连接作为对象存储在内存中,当用户需要访问数据库时,并非建立一个新的连接,而是从连接池中取出一个已建立的空闲连接对象.使用完毕后,用户也并非将连接关闭,而是将连接放回连接池中,以供下一个请求访问使用.而连接的建立.断开都由连接池自身来管理.同时,还可以通过设置连接池的参数来控制连接池中的初始连接数.连接的上下限数以及每个连接的最大使用次数.最大空闲时间等等.也可以通过其自身的管理机制来监视数据库连接的

  • java数据库连接池新手入门一篇就够了,太简单了!

    1.什么是数据库连接池 就是一个容器持有多个数据库连接,当程序需要操作数据库的时候直接从池中取出连接,使用完之后再还回去,和线程池一个道理. 2.为什么需要连接池,好处是什么? 1.节省资源,如果每次访问数据库都创建新的连接,创建和销毁都浪费系统资源 2.响应性更好,省去了创建的时间,响应性更好. 3.统一管理数据库连接,避免因为业务的膨胀导致数据库连接的无限增多. 4.便于监控. 3.都有哪些连接池方案 数据库连接池的方案有不少,我接触过的连接池方案有: 1.C3p0 这个连接池我很久之前看到

  • Spring 数据库连接池(JDBC)详解

    数据库连接池 对一个简单的数据库应用,由于对数据库的访问不是很频繁,这时可以简单地在需要访问数据库时,就新创建一个连接,就完后就关闭它,这样做也不会带来什么性能上的开销.但是对于一个复杂的数据库应用,情况就完全不同而,频繁的建立.关闭连接,会极大地减低系统的性能,因为对于连接的使用成了系统性能的瓶颈. 通过建立一个数据库连接池以及一套连接使用管理策略,可以达到连接复用的效果,使得一个数据库连接可以得到安全.高效的复用,避免了数据库连接频繁建立.关闭的开销. 数据库连接池的基本原理是在内部对象池中

  • Java中常用的数据库连接池_动力节点Java学院整理

    定义 数据库连接是一种关键的有限的昂贵的资源,这一点在多用户的网页应用程序中体现得尤为突出.对数据库连接的管理能显著影响到整个应用程序的伸缩性和健壮性,影响到程序的性能指标.数据库连接池正是针对这个问题提出来的. 数据库连接池负责分配.管理和释放数据库连接,它允许应用程序重复使用一个现有的数据库连接,而不是再重新建立一个:释放空闲时间超过最大空闲时间的数据库连接来避免因为没有释放数据库连接而引起的数据库连接遗漏.这项技术能明显提高对数据库操作的性能. 参考资料 DBCP 下载地址:http://

  • java使用dbcp2数据库连接池

    在开发中中我们经常会使用到数据库连接池,比如dbcp数据库连接池,本章将讲解java连接dbcp数据库库连接池的简单使用. 开发工具myeclipse2014 1.首先创建一个web项目,我把项目名取名为testjdbc,需要带有web.xml的配置文件,进行servlet的配置,创建完成以后的项目结构如下: 2.创建包,我创建的包名是com.szkingdom.db 3.创建帮助类CastUtil,代码如下: package com.szkingdom.db; /** * Created by

  • Java实现数据库连接池简易教程

    一.引言 池化技术在Java中应用的很广泛,简而论之,使用对象池存储某个实例数受限制的实例,开发者从对象池中获取实例,使用完之后再换回对象池,从而在一定程度上减少了系统频繁创建对象销毁对象的开销.Java线程池和数据库连接池就是典型的应用,但并非所有的对象都适合拿来池化,对于创建开销比较小的对象拿来池化反而会影响性能,因为维护对象池也需要一定的资源开销,对于创建开销较大,又频繁创建使用的对象,采用池化技术会极大提高性能. 业界有很多成熟的数据库连接池,比如C3P0,DBCP,Proxool以及阿

  • Java数据库连接池的几种配置方法(以MySQL数据库为例)

    一.Tomcat配置数据源: 前提:需要将连接MySQL数据库驱动jar包放进Tomcat安装目录中common文件夹下的lib目录中 1.方法一:在WebRoot下面建文件夹META-INF,里面建一个文件context.xml,如下: <?xml version="1.0" encoding="UTF-8"?> <Context> <Resource name="jdbc/chaoshi" auth="

  • Java数据库连接池之DBCP浅析_动力节点Java学院整理

    一. 为何要使用数据库连接池 假设网站一天有很大的访问量,数据库服务器就需要为每次连接创建一次数据库连接,极大的浪费数据库的资源,并且极易造成数据库服务器内存溢出.拓机. 数据库连接是一种关键的有限的昂贵的资源,这一点在多用户的网页应用程序中体现的尤为突出.对数据库连接的管理能显著影响到整个应用程序的伸缩性和健壮性,影响到程序的性能指标.数据库连接池正式针对这个问题提出来的.数据库连接池负责分配,管理和释放数据库连接,它允许应用程序重复使用一个现有的数据库连接,而不是重新建立一个. 数据库连接池

  • java配置数据库连接池的方法步骤

    先来了解下什么是数据库连接池数据库连接池技术的思想非常简单,将数据库连接作为对象存储在一个Vector对象中,一旦数据库连接建立后,不同的数据库访问请求就可以共享这些连接,这样,通过复用这些已经建立的数据库连接,可以克服上述缺点,极大地节省系统资源和时间. 在实际应用开发中,特别是在WEB应用系统中,如果JSP.Servlet或EJB使用JDBC直接访问数据库中的数据,每一次数据访问请求都必须经历建立数据库连接.打开数据库.存取数据和关闭数据库连接等步骤,而连接并打开数据库是一件既消耗资源又费时

随机推荐