Major update to provide thread-safe, robust connection pooling.
Taking inspiration from the waterpark library, the connection pooling mechanism has been refactored to be thread-safe. Additionally, the pooling logic detects and handles stale connections in the pool. When a connection is requested from the pool, the pool first validates that it is healthy and replaces it with a fresh connection if necessary. This is transparent to the requester. Additionally we refactored the internal logging implementation to make it more conventient to access logging infrastructure and log from various sub-scopes within fiber_orm (query, pool, etc.)
This commit is contained in:
@@ -1,6 +1,6 @@
|
|||||||
# Package
|
# Package
|
||||||
|
|
||||||
version = "3.2.0"
|
version = "4.0.0"
|
||||||
author = "Jonathan Bernard"
|
author = "Jonathan Bernard"
|
||||||
description = "Lightweight Postgres ORM for Nim."
|
description = "Lightweight Postgres ORM for Nim."
|
||||||
license = "GPL-3.0"
|
license = "GPL-3.0"
|
||||||
|
@@ -286,15 +286,16 @@
|
|||||||
##
|
##
|
||||||
import std/[json, macros, options, sequtils, strutils]
|
import std/[json, macros, options, sequtils, strutils]
|
||||||
import db_connector/db_common
|
import db_connector/db_common
|
||||||
import namespaced_logging, uuids
|
import uuids
|
||||||
|
|
||||||
from std/unicode import capitalize
|
from std/unicode import capitalize
|
||||||
|
|
||||||
import ./fiber_orm/db_common as fiber_db_common
|
import ./fiber_orm/db_common as fiber_db_common
|
||||||
import ./fiber_orm/pool
|
import ./fiber_orm/[pool, util]
|
||||||
import ./fiber_orm/util
|
import ./fiber_orm/private/logging
|
||||||
|
|
||||||
export pool, util
|
export pool, util
|
||||||
|
export logging.enableDbLogging
|
||||||
|
|
||||||
type
|
type
|
||||||
PagedRecords*[T] = object
|
PagedRecords*[T] = object
|
||||||
@@ -308,26 +309,6 @@ type
|
|||||||
NotFoundError* = object of CatchableError
|
NotFoundError* = object of CatchableError
|
||||||
## Error type raised when no record matches a given ID
|
## Error type raised when no record matches a given ID
|
||||||
|
|
||||||
var logService {.threadvar.}: Option[ThreadLocalLogService]
|
|
||||||
var logger {.threadvar.}: Option[Logger]
|
|
||||||
|
|
||||||
proc makeQueryLogEntry(
|
|
||||||
m: string,
|
|
||||||
sql: string,
|
|
||||||
args: openArray[(string, string)] = []): JsonNode =
|
|
||||||
result = %*{ "method": m, "sql": sql }
|
|
||||||
for (k, v) in args: result[k] = %v
|
|
||||||
|
|
||||||
proc logQuery*(methodName: string, sqlStmt: string, args: openArray[(string, string)] = []) =
|
|
||||||
# namespaced_logging would do this check for us, but we don't want to even
|
|
||||||
# build the log object if we're not actually logging
|
|
||||||
if logService.isNone: return
|
|
||||||
if logger.isNone: logger = logService.getLogger("fiber_orm/query")
|
|
||||||
logger.debug(makeQueryLogEntry(methodName, sqlStmt, args))
|
|
||||||
|
|
||||||
proc enableDbLogging*(svc: ThreadLocalLogService) =
|
|
||||||
logService = some(svc)
|
|
||||||
|
|
||||||
proc newMutateClauses(): MutateClauses =
|
proc newMutateClauses(): MutateClauses =
|
||||||
return MutateClauses(
|
return MutateClauses(
|
||||||
columns: @[],
|
columns: @[],
|
||||||
@@ -353,7 +334,7 @@ proc createRecord*[D: DbConnType, T](db: D, rec: T): T =
|
|||||||
" RETURNING " & columnNamesForModel(rec).join(",")
|
" RETURNING " & columnNamesForModel(rec).join(",")
|
||||||
|
|
||||||
logQuery("createRecord", sqlStmt)
|
logQuery("createRecord", sqlStmt)
|
||||||
debug(logService.getLogger("fiber_orm/query"), %*{ "values": mc.values })
|
debug(getLogger("query"), %*{ "values": mc.values })
|
||||||
|
|
||||||
let newRow = db.getRow(sql(sqlStmt), mc.values)
|
let newRow = db.getRow(sql(sqlStmt), mc.values)
|
||||||
|
|
||||||
@@ -845,35 +826,6 @@ macro generateJoinTableProcs*(
|
|||||||
db.withConnection conn:
|
db.withConnection conn:
|
||||||
associate(conn, `joinTableNameNode`, rec1, rec2)
|
associate(conn, `joinTableNameNode`, rec1, rec2)
|
||||||
|
|
||||||
proc initPool*[D: DbConnType](
|
|
||||||
connect: proc(): D,
|
|
||||||
poolSize = 10,
|
|
||||||
hardCap = false,
|
|
||||||
healthCheckQuery = "SELECT 'true' AS alive"): DbConnPool[D] =
|
|
||||||
|
|
||||||
## Initialize a new DbConnPool. See the `initDb` procedure in the `Example
|
|
||||||
## Fiber ORM Usage`_ for an example
|
|
||||||
##
|
|
||||||
## * `connect` must be a factory which creates a new `DbConn`.
|
|
||||||
## * `poolSize` sets the desired capacity of the connection pool.
|
|
||||||
## * `hardCap` defaults to `false`.
|
|
||||||
## When `false`, the pool can grow beyond the configured capacity, but will
|
|
||||||
## release connections down to the its capacity (no less than `poolSize`).
|
|
||||||
##
|
|
||||||
## When `true` the pool will not create more than its configured capacity.
|
|
||||||
## It a connection is requested, none are free, and the pool is at
|
|
||||||
## capacity, this will result in an Error being raised.
|
|
||||||
## * `healthCheckQuery` should be a simple and fast SQL query that the pool
|
|
||||||
## can use to test the liveliness of pooled connections.
|
|
||||||
##
|
|
||||||
## .. _Example Fiber ORM Usage: #basic-usage-example-fiber-orm-usage
|
|
||||||
|
|
||||||
initDbConnPool(DbConnPoolConfig[D](
|
|
||||||
connect: connect,
|
|
||||||
poolSize: poolSize,
|
|
||||||
hardCap: hardCap,
|
|
||||||
healthCheckQuery: healthCheckQuery))
|
|
||||||
|
|
||||||
template inTransaction*(db, body: untyped) =
|
template inTransaction*(db, body: untyped) =
|
||||||
db.withConnection conn:
|
db.withConnection conn:
|
||||||
conn.exec(sql"BEGIN TRANSACTION")
|
conn.exec(sql"BEGIN TRANSACTION")
|
||||||
|
@@ -4,85 +4,77 @@
|
|||||||
|
|
||||||
## Simple database connection pooling implementation compatible with Fiber ORM.
|
## Simple database connection pooling implementation compatible with Fiber ORM.
|
||||||
|
|
||||||
import std/[sequtils, sugar]
|
when (NimMajor, NimMinor, NimPatch) < (2, 0, 0):
|
||||||
|
when not defined(gcArc) and not defined (gcOrc):
|
||||||
|
{.error: "fiber_orm requires either --mm:arc or --mm:orc.".}
|
||||||
|
|
||||||
|
import std/[deques, locks, sequtils, sugar]
|
||||||
import db_connector/db_common
|
import db_connector/db_common
|
||||||
|
|
||||||
from db_connector/db_sqlite import getRow, close
|
from db_connector/db_sqlite import getRow, close
|
||||||
from db_connector/db_postgres import getRow, close
|
from db_connector/db_postgres import getRow, close
|
||||||
|
|
||||||
import ./db_common as fiber_db_common
|
import ./db_common as fiber_db_common
|
||||||
|
import ./private/logging
|
||||||
|
|
||||||
type
|
type
|
||||||
DbConnPoolConfig*[D: DbConnType] = object
|
DbConnPool*[D: DbConnType] = ptr DbConnPoolObj[D]
|
||||||
connect*: () -> D ## Factory procedure to create a new DBConn
|
|
||||||
poolSize*: int ## The pool capacity.
|
|
||||||
|
|
||||||
hardCap*: bool ## Is the pool capacity a hard cap?
|
DbConnPoolObj[D: DbConnType] = object
|
||||||
##
|
|
||||||
## When `false`, the pool can grow beyond the
|
|
||||||
## configured capacity, but will release connections
|
|
||||||
## down to the its capacity (no less than `poolSize`).
|
|
||||||
##
|
|
||||||
## When `true` the pool will not create more than its
|
|
||||||
## configured capacity. It a connection is requested,
|
|
||||||
## none are free, and the pool is at capacity, this
|
|
||||||
## will result in an Error being raised.
|
|
||||||
|
|
||||||
healthCheckQuery*: string ## Should be a simple and fast SQL query that the
|
|
||||||
## pool can use to test the liveliness of pooled
|
|
||||||
## connections.
|
|
||||||
|
|
||||||
PooledDbConn[D: DbConnType] = ref object
|
|
||||||
conn: D
|
|
||||||
id: int
|
|
||||||
free: bool
|
|
||||||
|
|
||||||
DbConnPool*[D: DbConnType] = ref object
|
|
||||||
## Database connection pool
|
## Database connection pool
|
||||||
conns: seq[PooledDbConn[D]]
|
connect: proc (): D {.raises: [DbError].}
|
||||||
cfg: DbConnPoolConfig[D]
|
healthCheckQuery: SqlQuery
|
||||||
lastId: int
|
entries: Deque[D]
|
||||||
|
cond: Cond
|
||||||
|
lock: Lock
|
||||||
|
|
||||||
proc initDbConnPool*[D: DbConnType](cfg: DbConnPoolConfig[D]): DbConnPool[D] =
|
|
||||||
result = DbConnPool[D](
|
|
||||||
conns: @[],
|
|
||||||
cfg: cfg)
|
|
||||||
|
|
||||||
proc newConn[D: DbConnType](pool: DbConnPool[D]): PooledDbConn[D] =
|
proc close*[D: DbConnType](pool: DbConnPool[D]) =
|
||||||
pool.lastId += 1
|
## Safely close all connections and release resources for the given pool.
|
||||||
{.gcsafe.}:
|
getLogger("pool").debug("closing connection pool")
|
||||||
let conn = pool.cfg.connect()
|
withLock(pool.lock):
|
||||||
result = PooledDbConn[D](
|
while pool.entries.len > 0: close(pool.entries.popFirst())
|
||||||
conn: conn,
|
|
||||||
id: pool.lastId,
|
|
||||||
free: true)
|
|
||||||
pool.conns.add(result)
|
|
||||||
|
|
||||||
proc maintain[D: DbConnType](pool: DbConnPool[D]): void =
|
deinitLock(pool.lock)
|
||||||
pool.conns.keepIf(proc (pc: PooledDbConn[D]): bool =
|
deinitCond(pool.cond)
|
||||||
if not pc.free: return true
|
`=destroy`(pool[])
|
||||||
|
deallocShared(pool)
|
||||||
|
|
||||||
try:
|
|
||||||
discard getRow(pc.conn, sql(pool.cfg.healthCheckQuery), [])
|
|
||||||
return true
|
|
||||||
except:
|
|
||||||
try: pc.conn.close() # try to close the connection
|
|
||||||
except: discard ""
|
|
||||||
return false
|
|
||||||
)
|
|
||||||
|
|
||||||
let freeConns = pool.conns.filterIt(it.free)
|
proc newDbConnPool*[D: DbConnType](
|
||||||
if pool.conns.len > pool.cfg.poolSize and freeConns.len > 0:
|
poolSize: int,
|
||||||
let numToCull = min(freeConns.len, pool.conns.len - pool.cfg.poolSize)
|
connectFunc: proc(): D {.raises: [DbError].},
|
||||||
|
healthCheckQuery = "SELECT 1;"): DbConnPool[D] =
|
||||||
|
## Initialize a new DbConnPool. See the `initDb` procedure in the `Example
|
||||||
|
## Fiber ORM Usage`_ for an example
|
||||||
|
##
|
||||||
|
## * `connect` must be a factory which creates a new `DbConn`.
|
||||||
|
## * `poolSize` sets the desired capacity of the connection pool.
|
||||||
|
## * `healthCheckQuery` should be a simple and fast SQL query that the pool
|
||||||
|
## can use to test the liveliness of pooled connections. By default it uses
|
||||||
|
## `SELECT 1;`
|
||||||
|
##
|
||||||
|
## .. _Example Fiber ORM Usage: ../fiber_orm.html#basic-usage-example-fiber-orm-usage
|
||||||
|
|
||||||
if numToCull > 0:
|
result = cast[DbConnPool[D]](allocShared0(sizeof(DbConnPoolObj[D])))
|
||||||
let toCull = freeConns[0..numToCull]
|
initCond(result.cond)
|
||||||
pool.conns.keepIf((pc) => toCull.allIt(it.id != pc.id))
|
initLock(result.lock)
|
||||||
for culled in toCull:
|
result.entries = initDeque[D](poolSize)
|
||||||
try: culled.conn.close()
|
result.connect = connectFunc
|
||||||
except: discard ""
|
result.healthCheckQuery = sql(healthCheckQuery)
|
||||||
|
|
||||||
proc take*[D: DbConnType](pool: DbConnPool[D]): tuple[id: int, conn: D] =
|
try:
|
||||||
|
for _ in 0 ..< poolSize: result.entries.addLast(connectFunc())
|
||||||
|
except DbError as ex:
|
||||||
|
try: result.close()
|
||||||
|
except: discard
|
||||||
|
getLogger("pool").error(
|
||||||
|
msg = "unable to initialize connection pool",
|
||||||
|
err = ex)
|
||||||
|
raise ex
|
||||||
|
|
||||||
|
|
||||||
|
proc take*[D: DbConnType](pool: DbConnPool[D]): D {.raises: [DbError], gcsafe.} =
|
||||||
## Request a connection from the pool. Returns a DbConn if the pool has free
|
## Request a connection from the pool. Returns a DbConn if the pool has free
|
||||||
## connections, or if it has the capacity to create a new connection. If the
|
## connections, or if it has the capacity to create a new connection. If the
|
||||||
## pool is configured with a hard capacity limit and is out of free
|
## pool is configured with a hard capacity limit and is out of free
|
||||||
@@ -90,25 +82,33 @@ proc take*[D: DbConnType](pool: DbConnPool[D]): tuple[id: int, conn: D] =
|
|||||||
##
|
##
|
||||||
## Connections taken must be returned via `release` when the caller is
|
## Connections taken must be returned via `release` when the caller is
|
||||||
## finished using them in order for them to be released back to the pool.
|
## finished using them in order for them to be released back to the pool.
|
||||||
pool.maintain
|
withLock(pool.lock):
|
||||||
let freeConns = pool.conns.filterIt(it.free)
|
while pool.entries.len == 0: wait(pool.cond, pool.lock)
|
||||||
|
result = pool.entries.popFirst()
|
||||||
|
|
||||||
let reserved =
|
# check that the connection is healthy
|
||||||
if freeConns.len > 0: freeConns[0]
|
try: discard getRow(result, pool.healthCheckQuery, [])
|
||||||
else: pool.newConn()
|
except DbError:
|
||||||
|
{.gcsafe.}:
|
||||||
|
# if it's not, let's try to close it and create a new connection
|
||||||
|
try:
|
||||||
|
getLogger("pool").info(
|
||||||
|
"pooled connection failed health check, opening a new connection")
|
||||||
|
close(result)
|
||||||
|
except: discard
|
||||||
|
result = pool.connect()
|
||||||
|
|
||||||
reserved.free = false
|
|
||||||
return (id: reserved.id, conn: reserved.conn)
|
|
||||||
|
|
||||||
proc release*[D: DbConnType](pool: DbConnPool[D], connId: int): void =
|
proc release*[D: DbConnType](pool: DbConnPool[D], conn: D) {.raises: [], gcsafe.} =
|
||||||
## Release a connection back to the pool.
|
## Release a connection back to the pool.
|
||||||
let foundConn = pool.conns.filterIt(it.id == connId)
|
withLock(pool.lock):
|
||||||
if foundConn.len > 0: foundConn[0].free = true
|
pool.entries.addLast(conn)
|
||||||
|
signal(pool.cond)
|
||||||
|
|
||||||
template withConnection*[D: DbConnType](pool: DbConnPool[D], conn, stmt: untyped): untyped =
|
template withConnection*[D: DbConnType](pool: DbConnPool[D], conn, stmt: untyped): untyped =
|
||||||
## Convenience template to provide a connection from the pool for use in a
|
## Convenience template to provide a connection from the pool for use in a
|
||||||
## statement block, automatically releasing that connnection when done.
|
## statement block, automatically releasing that connnection when done.
|
||||||
block:
|
block:
|
||||||
let (connId, conn) = take(pool)
|
let conn = take(pool)
|
||||||
try: stmt
|
try: stmt
|
||||||
finally: release(pool, connId)
|
finally: release(pool, conn)
|
||||||
|
34
src/fiber_orm/private/logging.nim
Normal file
34
src/fiber_orm/private/logging.nim
Normal file
@@ -0,0 +1,34 @@
|
|||||||
|
import std/[json, options]
|
||||||
|
|
||||||
|
import namespaced_logging
|
||||||
|
|
||||||
|
export namespaced_logging.log
|
||||||
|
export namespaced_logging.debug
|
||||||
|
export namespaced_logging.info
|
||||||
|
export namespaced_logging.notice
|
||||||
|
export namespaced_logging.warn
|
||||||
|
export namespaced_logging.error
|
||||||
|
export namespaced_logging.fatal
|
||||||
|
|
||||||
|
var logService {.threadvar.}: Option[ThreadLocalLogService]
|
||||||
|
var logger {.threadvar.}: Option[Logger]
|
||||||
|
|
||||||
|
proc makeQueryLogEntry(
|
||||||
|
m: string,
|
||||||
|
sql: string,
|
||||||
|
args: openArray[(string, string)] = []): JsonNode =
|
||||||
|
result = %*{ "method": m, "sql": sql }
|
||||||
|
for (k, v) in args: result[k] = %v
|
||||||
|
|
||||||
|
proc logQuery*(methodName: string, sqlStmt: string, args: openArray[(string, string)] = []) =
|
||||||
|
# namespaced_logging would do this check for us, but we don't want to even
|
||||||
|
# build the log object if we're not actually logging
|
||||||
|
if logService.isNone: return
|
||||||
|
if logger.isNone: logger = logService.getLogger("fiber_orm/query")
|
||||||
|
logger.debug(makeQueryLogEntry(methodName, sqlStmt, args))
|
||||||
|
|
||||||
|
proc enableDbLogging*(svc: ThreadLocalLogService) =
|
||||||
|
logService = some(svc)
|
||||||
|
|
||||||
|
proc getLogger*(scope: string): Option[Logger] =
|
||||||
|
logService.getLogger("fiber_orm/" & scope)
|
Reference in New Issue
Block a user