Scala數(shù)據(jù)庫(kù)連接池的簡(jiǎn)單實(shí)現(xiàn)
在使用JDBC的時(shí)候,數(shù)據(jù)庫(kù)據(jù)連接是非常寶貴的資源。為了復(fù)用這些資源,可以將連接保存在一個(gè)隊(duì)列中。當(dāng)需要的時(shí)候可以從隊(duì)列中取出未使用的連接。如果沒(méi)有可用連接,則可以在一定時(shí)間內(nèi)等待,直到隊(duì)列中有可用的連接,否則將拋出異常。
以下是DataSoucre的代碼,DataSoucre負(fù)責(zé)對(duì)連接的管理以及分發(fā),同時(shí)設(shè)置隊(duì)列的大小,等待時(shí)間,連接的賬號(hào)、密碼等。
核心方法為getConenction()方法。且實(shí)現(xiàn)AutoCloseable接口,以便后面可以使用using方法自動(dòng)關(guān)閉資源。隊(duì)列中的連接為封裝了conenction的DbConnection類(lèi)。
package pool import scala.util.control.Breaks._ import scala.collection.mutable.ArrayBuffer import java.{util => ju} import scala.collection.mutable.Buffer import scala.util.control.Breaks class DataSource( val driverName: String, val url: String, val user: String, val password: String, val minSize: Integer = 1, val maxSize: Integer = 10, val keepAliveTimeout: Long = 1000 ) extends AutoCloseable { if (minSize < 0 || minSize > maxSize || keepAliveTimeout < 0) { throw new IllegalArgumentException("These arguments are Illegal") } Class.forName(driverName) private val pool: Buffer[DbConnection] = ArrayBuffer[DbConnection]() private val lock: ju.concurrent.locks.Lock = new ju.concurrent.locks.ReentrantLock(true) for (i <- 0 until minSize) { pool += new DbConnection(url, user, password) } def getConenction(): DbConnection = { val starEntry = System.currentTimeMillis() Breaks.breakable { while (true) { lock.lock() try { for (con <- pool) { if (!con.used) { con.used = true return con; } } if (pool.size < maxSize) { var con = new DbConnection(url, user, password) { used = true } pool.append(con) return con } } finally { lock.unlock() } if (System.currentTimeMillis() - starEntry > keepAliveTimeout) { break() } } } throw new IllegalArgumentException("Connection Pool is empty") } def close(): Unit = { lock.lock() try { if (pool != null) { pool.foreach(t => t.innerConnection.close()) pool.clear() } } finally { lock.unlock() } } }
以下是Dbconnection類(lèi),該類(lèi)提供了三個(gè)方法且實(shí)現(xiàn)了AutoCloseable接口
BeginTransaction:開(kāi)啟事務(wù),并返回封裝了的DbTransaction類(lèi)
close:將連接釋放
CreateCommand:創(chuàng)建DbCommand類(lèi),該類(lèi)是負(fù)責(zé)操作連接的類(lèi),比如提交sql,讀取數(shù)據(jù)等
package pool import java.sql.Connection import java.sql.DriverAction import java.sql.DriverManager class DbConnection( val url: String, val user: String, val password: String ) extends AutoCloseable { private[pool] var used: Boolean = false private[pool] val innerConnection: Connection = DriverManager.getConnection(url, user, password) def close(): Unit = { if (used) { used = false } } def BeginTransaction(isolationLevel: Int = IsolationLevel.TRANSACTION_READ_COMMITTED): DbTransaction = { if (innerConnection.getAutoCommit()) { innerConnection.setAutoCommit(false) } innerConnection.setTransactionIsolation(isolationLevel) new DbTransaction(this) } def CreateCommand(): DbCommand = { new DbCommand(this) } }
以下是DbCommand類(lèi)的代碼,該類(lèi)負(fù)責(zé)操作數(shù)據(jù)庫(kù)。如ExecuteResultSet,ExecuteScalar等。
ExecuteScalar:查詢數(shù)據(jù)庫(kù)并返回第一行第一個(gè)值的方法。
ExecuteResultSet:該方法有兩個(gè)重載方法。
參數(shù)為callBack: ResultSet => Unit的方法,提供了一個(gè)回調(diào)函數(shù),解析數(shù)據(jù)的操作可以在回調(diào)中實(shí)現(xiàn)。
無(wú)參的版本則通過(guò)反射直接將ResultSet通過(guò)字段位置映射,轉(zhuǎn)換成你需要的類(lèi)型。
package pool import java.sql.CallableStatement import java.sql.ResultSet import java.sql.SQLType import java.sql.Statement import java.sql.Types import scala.collection.mutable.ArrayBuffer import scala.collection.mutable.Buffer import scala.language.implicitConversions import scala.reflect.ClassTag import scala.reflect.runtime.{universe => ru} import Dispose.using import java.{util => ju} class DbCommand(val connection: DbConnection, var commandText: String = null, val queryTimeout: Integer = 30) extends AutoCloseable { if (queryTimeout < 0) { throw new IllegalArgumentException(s"timeout (${queryTimeout}) value must be greater than 0.") } val Parameters: Buffer[DbParameter] = ArrayBuffer[DbParameter]() private val mirror = ru.runtimeMirror(getClass().getClassLoader()) private var statement: CallableStatement = null /** @author:qingchuan * * @return */ def ExecuteScalar(): Any = { var obj: Any = None ExecuteResultSet(t => { if (t.next()) { if (t.getMetaData().getColumnCount() > 0) obj = t.getObject(1) } }) obj } /** @author * qingchuan * @version 1.0 * * @param callBack */ def ExecuteResultSet(callBack: ResultSet => Unit): Unit = { if (callBack == null) throw new IllegalArgumentException("The value of parameter callback is null.") statement = connection.innerConnection.prepareCall(commandText) statement.setQueryTimeout(queryTimeout) addParatemetrs() using(statement.executeQuery()) { t => callBack(t) if (!t.isClosed()) getOutParameterValue() } } def ExecuteResultSet[T: ru.TypeTag](): ArrayBuffer[T] = { val classSymbol = mirror.symbolOf[T].asClass val classMirror = mirror.reflectClass(classSymbol) val consMethodMirror = classMirror.reflectConstructor(classSymbol.primaryConstructor.asMethod) val fields = ru.typeOf[T].decls.filter(t => t.asTerm.isGetter && t.isPublic).map(t => t.asTerm) val result = new ArrayBuffer[T]() ExecuteResultSet(t => { while (t.next()) { var i = 1 val values: Buffer[Any] = ArrayBuffer() for (f <- fields) { values += t.getObject(i) i += 1 } result += consMethodMirror.apply(values: _*).asInstanceOf[T] } }) result } def ExecuteBatch[T: ru.TypeTag: ClassTag](values: List[T]): Int = { statement = connection.innerConnection.prepareCall(commandText) var trans: DbTransaction = null val fields = ru.typeOf[T].decls.filter(t => t.asTerm.isGetter && t.isPublic).map(t => t.asTerm) for (t <- values) { var i = 1 val filedMirror = mirror.reflect(t) for (f <- fields) { val instance = filedMirror.reflectField(f) statement.setObject(i, instance.get) i += 1 } statement.addBatch() } try { trans = connection.BeginTransaction() val obj = statement.executeBatch() trans.Commit() statement.clearBatch() obj.sum } catch { case e: Exception => { if (trans != null) trans.RollBack() throw e } } } def ExecuteNoneQuery(): Integer = { statement = connection.innerConnection.prepareCall(commandText) statement.setQueryTimeout(queryTimeout) addParatemetrs() val obj = statement.executeUpdate() getOutParameterValue() obj } def CreateParameter(): DbParameter = { new DbParameter(); } private def getOutParameterValue(): Unit = { for (i <- 1 to Parameters.size) { val parameter: DbParameter = Parameters(i - 1); if (parameter.parameterDirection == ParameterDirection.Output || parameter.parameterDirection == ParameterDirection.InputOutput) { parameter.value = statement.getObject(i); } } } private def addParatemetrs(): Unit = { statement.clearParameters() for (i <- 1 to Parameters.size) { val p = Parameters(i - 1); if (p.parameterDirection == ParameterDirection.Input || p.parameterDirection == ParameterDirection.InputOutput) { statement.setObject(i, p.value) } if (p.parameterDirection == ParameterDirection.Output || p.parameterDirection == ParameterDirection.InputOutput) { statement.registerOutParameter(p.parameterName, p.sqlType, p.scale) } } } def close() { if (statement != null) { statement.close() } } } case class DbParameter( var parameterName: String = null, var value: Any = null, var parameterDirection: Integer = ParameterDirection.Input, var scale: Integer = 0, var sqlType: Integer = null ) {} object ParameterDirection { val Input = 1 val InputOutput = 2 val Output = 3 }
以下代碼是DbTransaction,該類(lèi)提供了事務(wù)的操作如提交、回滾。
package pool class DbTransaction(private val connection: DbConnection) { def Commit(): Unit = { connection.innerConnection.commit() if (!connection.innerConnection.getAutoCommit()) { connection.innerConnection.setAutoCommit(true); } } def RollBack(): Unit = { connection.innerConnection.rollback() if (!connection.innerConnection.getAutoCommit()) { connection.innerConnection.setAutoCommit(true) } } def getConnection(): DbConnection = { connection } def getTransactionIsolation(): Int = { connection.innerConnection.getTransactionIsolation() } } object IsolationLevel { val TRANSACTION_NONE = 0 val TRANSACTION_READ_UNCOMMITTED = 1; val TRANSACTION_READ_COMMITTED = 2; val TRANSACTION_REPEATABLE_READ = 4; val TRANSACTION_SERIALIZABLE = 8; }
最后是using的方法。通過(guò)柯里化以及Try-catch-finally的方式 自動(dòng)關(guān)閉實(shí)現(xiàn)了AutoCloseable接口的資源。
package pool object Dispose { def using[T <: AutoCloseable](cls: T)(op: T => Unit): Unit = { try { op(cls) } catch { case e: Exception => throw e } finally { cls.close() } } }
以下是客戶端調(diào)用,代碼模擬了15個(gè)線程并發(fā)訪問(wèn)數(shù)據(jù)庫(kù),連接池最多3個(gè)資源,從而說(shuō)明連接池是可以復(fù)用這些連接的。
import pool.DataSource import pool.DbCommand import pool.DbParameter import pool.DbTransaction import pool.Dispose.using import pool.IsolationLevel import pool.ParameterDirection import java.sql.Date import java.sql.ResultSet import java.sql.Types import java.time.LocalDate import java.time.LocalDateTime import java.time.LocalTime import javax.xml.crypto.Data import scala.collection.mutable.ArrayBuffer import scala.collection.mutable.Buffer import scala.language.experimental.macros import scala.reflect.ClassTag import scala.reflect.runtime.{universe => ru} import com.nimbusds.oauth2.sdk.util.date.SimpleDate import java.text.SimpleDateFormat object App { def main(args: Array[String]): Unit = { val pool = new DataSource( "com.microsoft.sqlserver.jdbc.SQLServerDriver", "jdbc:sqlserver://localhost:1433;databaseName=HighwaveDW;trustServerCertificate=true", "賬號(hào)", "密碼", minSize = 1, maxSize = 3, keepAliveTimeout = 3000 ) val formatter: SimpleDateFormat = new SimpleDateFormat("HH:mm:ss.SSS"); for (i <- 1 to 15) { val thread: Thread = new Thread(() => { val date = new Date(System.currentTimeMillis()) using(pool.getConenction()) { con => { using(new DbCommand(con)) { cmd => { cmd.commandText = "{call p_get_out(?,?)}" val p1 = new DbParameter("@id", i) val p2 = new DbParameter(parameterName = "@msg", parameterDirection = ParameterDirection.Output, sqlType = Types.VARCHAR, scale = 20) cmd.Parameters.append(p1) cmd.Parameters.append(p2) val result = cmd.ExecuteScalar() println(s"result=${result},output=${p2.value},parameter=${i}") } } } } }) thread.start() } } }
開(kāi)發(fā)環(huán)境VsCode,SQL Server數(shù)據(jù)庫(kù)。以下是引用的第三方庫(kù)。
version := "1.0" libraryDependencies += "com.microsoft.sqlserver" % "mssql-jdbc" % "11.2.0.jre8" libraryDependencies += "org.scala-lang" % "scala-reflect" % "2.13.8"
以下是執(zhí)行結(jié)果。
到此這篇關(guān)于Scala數(shù)據(jù)庫(kù)連接池的簡(jiǎn)單實(shí)現(xiàn)的文章就介紹到這了,更多相關(guān)Scala數(shù)據(jù)庫(kù)連接池內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
- IDEA中scala生成變量后自動(dòng)顯示變量類(lèi)型問(wèn)題
- Idea中添加Maven項(xiàng)目支持scala的詳細(xì)步驟
- Java和Scala集合間的相互轉(zhuǎn)換方式
- C# ExecuteScalar()方法案例講解
- idea中如何創(chuàng)建scala項(xiàng)目
- Scala函數(shù)式編程專(zhuān)題--scala集合和函數(shù)
- Scala函數(shù)式編程專(zhuān)題--scala基礎(chǔ)語(yǔ)法介紹
- Scala入門(mén)教程詳解
- scala中常用特殊符號(hào)詳解
- Scala基礎(chǔ)語(yǔ)法總結(jié)
相關(guān)文章
使用Java應(yīng)用程序添加或刪除 PDF 中的附件
當(dāng)我們?cè)谥谱鱌DF文件或者PPT演示文稿的時(shí)候,為了讓自己的文件更全面詳細(xì),就會(huì)在文件中添加附件,那么如何添加或刪除PDF中的附件呢,今天通過(guò)本文給大家詳細(xì)講解,需要的朋友參考下吧2023-01-01Java數(shù)據(jù)結(jié)構(gòu)實(shí)現(xiàn)折半查找的算法過(guò)程解析
這篇文章主要介紹了Java數(shù)據(jù)結(jié)構(gòu)實(shí)現(xiàn)折半查找的算法過(guò)程解析,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2020-03-03基于Java編寫(xiě)一個(gè)粽子大作戰(zhàn)小游戲
端午節(jié),又稱(chēng)龍舟節(jié)、重午節(jié),是中國(guó)的傳統(tǒng)節(jié)日之一,每年農(nóng)歷五月初五慶祝,雖然端午假期已經(jīng)過(guò)去了,小編還是用Java編寫(xiě)了一個(gè)粽子大作戰(zhàn)小游戲,感興趣的可以了解一下2023-06-06Maven3種打包方式中maven-assembly-plugin的使用詳解
這篇文章主要介紹了Maven3種打包方式中maven-assembly-plugin的使用,本文通過(guò)實(shí)例代碼給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2020-07-07java8實(shí)現(xiàn)List中對(duì)象屬性的去重方法
這篇文章主要介紹了java8實(shí)現(xiàn)List中對(duì)象屬性的去重方法,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2021-03-03解決JavaMail附件名字過(guò)長(zhǎng)導(dǎo)致的亂碼問(wèn)題
這篇文章主要介紹了解決JavaMail附件名字過(guò)長(zhǎng)導(dǎo)致的亂碼問(wèn)題,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過(guò)來(lái)看看吧2020-10-10