3 Commits
3.1.1 ... 4.0.0

Author SHA1 Message Date
e1fa2480d0 Major update to provide thread-safe, robust connection pooling.
Taking inspiration from the waterpark library, the connection pooling
mechanism has been refactored to be thread-safe. Additionally, the
pooling logic detects and handles stale connections in the pool. When a
connection is requested from the pool, the pool first validates that it
is healthy and replaces it with a fresh connection if necessary. This is
transparent to the requester.

Additionally we refactored the internal logging implementation to make
it more conventient to access logging infrastructure and log from
various sub-scopes within fiber_orm (query, pool, etc.)
2025-07-27 17:47:07 -05:00
b8c64cc693 Migrate to namespaced_logging v2. 2025-07-12 07:54:13 -05:00
aa02f9f5b1 Add support for records associated via join tables. 2025-05-19 17:56:40 -05:00
6 changed files with 305 additions and 128 deletions

2
.gitignore vendored
View File

@@ -1,2 +1,4 @@
*.sw? *.sw?
nimcache/ nimcache/
nimble.develop
nimble.paths

View File

@@ -1,6 +1,6 @@
# Package # Package
version = "3.1.1" version = "4.0.0"
author = "Jonathan Bernard" author = "Jonathan Bernard"
description = "Lightweight Postgres ORM for Nim." description = "Lightweight Postgres ORM for Nim."
license = "GPL-3.0" license = "GPL-3.0"
@@ -11,4 +11,4 @@ srcDir = "src"
# Dependencies # Dependencies
requires @["nim >= 1.4.0", "uuids"] requires @["nim >= 1.4.0", "uuids"]
requires "namespaced_logging >= 1.0.0" requires "namespaced_logging >= 2.0.2"

View File

@@ -286,15 +286,16 @@
## ##
import std/[json, macros, options, sequtils, strutils] import std/[json, macros, options, sequtils, strutils]
import db_connector/db_common import db_connector/db_common
import namespaced_logging, uuids import uuids
from std/unicode import capitalize from std/unicode import capitalize
import ./fiber_orm/db_common as fiber_db_common import ./fiber_orm/db_common as fiber_db_common
import ./fiber_orm/pool import ./fiber_orm/[pool, util]
import ./fiber_orm/util import ./fiber_orm/private/logging
export pool, util export pool, util
export logging.enableDbLogging
type type
PagedRecords*[T] = object PagedRecords*[T] = object
@@ -302,27 +303,12 @@ type
records*: seq[T] records*: seq[T]
totalRecords*: int totalRecords*: int
DbUpdateError* = object of CatchableError ##\ DbUpdateError* = object of CatchableError
## Error types raised when a DB modification fails. ## Error types raised when a DB modification fails.
NotFoundError* = object of CatchableError ##\ NotFoundError* = object of CatchableError
## Error type raised when no record matches a given ID ## Error type raised when no record matches a given ID
var logService {.threadvar.}: Option[LogService]
var logger {.threadvar.}: Option[Logger]
proc logQuery*(methodName: string, sqlStmt: string, args: openArray[(string, string)] = []) =
# namespaced_logging would do this check for us, but we don't want to even
# build the log object if we're not actually logging
if logService.isNone: return
if logger.isNone: logger = logService.getLogger("fiber_orm/query")
var log = %*{ "method": methodName, "sql": sqlStmt }
for (k, v) in args: log[k] = %v
logger.debug(log)
proc enableDbLogging*(svc: LogService) =
logService = some(svc)
proc newMutateClauses(): MutateClauses = proc newMutateClauses(): MutateClauses =
return MutateClauses( return MutateClauses(
columns: @[], columns: @[],
@@ -348,6 +334,8 @@ proc createRecord*[D: DbConnType, T](db: D, rec: T): T =
" RETURNING " & columnNamesForModel(rec).join(",") " RETURNING " & columnNamesForModel(rec).join(",")
logQuery("createRecord", sqlStmt) logQuery("createRecord", sqlStmt)
debug(getLogger("query"), %*{ "values": mc.values })
let newRow = db.getRow(sql(sqlStmt), mc.values) let newRow = db.getRow(sql(sqlStmt), mc.values)
result = rowToModel(T, newRow) result = rowToModel(T, newRow)
@@ -500,6 +488,91 @@ template findRecordsBy*[D: DbConnType](
else: db.getRow(sql(countStmt), values)[0].parseInt) else: db.getRow(sql(countStmt), values)[0].parseInt)
template associate*[D: DbConnType, I, J](
db: D,
joinTableName: string,
rec1: I,
rec2: J): void =
## Associate two records via a join table.
let insertStmt =
"INSERT INTO " & joinTableName &
" (" & tableName(I) & "_id, " & tableName(J) & "_id) " &
" VALUES (?, ?)"
logQuery("associate", insertStmt, [("id1", $rec1.id), ("id2", $rec2.id)])
db.exec(sql(insertStmt), [$rec1.id, $rec2.id])
template findViaJoinTable*[D: DbConnType, L](
db: D,
joinTableName: string,
targetType: type,
rec: L,
page: Option[PaginationParams]): untyped =
## Find all records of `targetType` that are associated with `rec` via a
## join table.
let columns = columnNamesForModel(targetType).mapIt("t." & it).join(",")
var fetchStmt =
"SELECT " & columns &
" FROM " & tableName(targetType) & " AS t " &
" JOIN " & joinTableName & " AS j " &
" ON t.id = jt." & tableName(targetType) & "_id " &
" WHERE jt." & tableName(rec) & "_id = ?"
var countStmt =
"SELECT COUNT(*) FROM " & joinTableName &
" WHERE " & tableName(rec) & "_id = ?"
if page.isSome: fetchStmt &= getPagingClause(page.get)
logQuery("findViaJoinTable", fetchStmt, [("id", $rec.id)])
let records = db.getAllRows(sql(fetchStmt), $rec.id)
.mapIt(rowToModel(targetType, it))
PagedRecords[targetType](
pagination: page,
records: records,
totalRecords:
if page.isNone: records.len
else: db.getRow(sql(countStmt))[0].parseInt)
template findViaJoinTable*[D: DbConnType](
db: D,
joinTableName: string,
targetType: type,
lookupType: type,
id: typed,
page: Option[PaginationParams]): untyped =
## Find all records of `targetType` that are associated with a record of
## `lookupType` via a join table.
let columns = columnNamesForModel(targetType).mapIt("t." & it).join(",")
var fetchStmt =
"SELECT " & columns &
" FROM " & tableName(targetType) & " AS t " &
" JOIN " & joinTableName & " AS j " &
" ON t.id = jt." & tableName(targetType) & "_id " &
" WHERE jt." & tableName(lookupType) & "_id = ?"
var countStmt =
"SELECT COUNT(*) FROM " & joinTableName &
" WHERE " & tableName(lookupType) & "_id = ?"
if page.isSome: fetchStmt &= getPagingClause(page.get)
logQuery("findViaJoinTable", fetchStmt, [("id", $id)])
let records = db.getAllRows(sql(fetchStmt), $id)
.mapIt(rowToModel(targetType, it))
PagedRecords[targetType](
pagination: page,
records: records,
totalRecords:
if page.isNone: records.len
else: db.getRow(sql(countStmt))[0].parseInt)
macro generateProcsForModels*(dbType: type, modelTypes: openarray[type]): untyped = macro generateProcsForModels*(dbType: type, modelTypes: openarray[type]): untyped =
## Generate all standard access procedures for the given model types. For a ## Generate all standard access procedures for the given model types. For a
## `model class`_ named `TodoItem`, this will generate the following ## `model class`_ named `TodoItem`, this will generate the following
@@ -656,34 +729,102 @@ macro generateProcsForFieldLookups*(dbType: type, modelsAndFields: openarray[tup
result.add procDefAST result.add procDefAST
proc initPool*[D: DbConnType]( macro generateJoinTableProcs*(
connect: proc(): D, dbType, model1Type, model2Type: type,
poolSize = 10, joinTableName: string): untyped =
hardCap = false, ## Generate lookup procedures for a pair of models with a join table. For
healthCheckQuery = "SELECT 'true' AS alive"): DbConnPool[D] = ## example, given the TODO database demonstrated above, where `TodoItem` and
## `TimeEntry` have a many-to-many relationship, you might have a join table
## `todo_items_time_entries` with columns `todo_item_id` and `time_entry_id`.
## This macro will generate the following procedures:
##
## .. code-block:: Nim
## proc findTodoItemsByTimeEntry*(db: SampleDB, timeEntry: TimeEntry): seq[TodoItem]
## proc findTimeEntriesByTodoItem*(db: SampleDB, todoItem: TodoItem): seq[TimeEntry]
##
## `dbType` is expected to be some type that has a defined `withConnection`
## procedure (see `Database Object`_ for details).
##
## .. _Database Object: #database-object
result = newStmtList()
## Initialize a new DbConnPool. See the `initDb` procedure in the `Example if model1Type.getType[1].typeKind == ntyRef or
## Fiber ORM Usage`_ for an example model2Type.getType[1].typeKind == ntyRef:
## raise newException(ValueError,
## * `connect` must be a factory which creates a new `DbConn`. "fiber_orm model object must be objects, not refs")
## * `poolSize` sets the desired capacity of the connection pool.
## * `hardCap` defaults to `false`.
## When `false`, the pool can grow beyond the configured capacity, but will
## release connections down to the its capacity (no less than `poolSize`).
##
## When `true` the pool will not create more than its configured capacity.
## It a connection is requested, none are free, and the pool is at
## capacity, this will result in an Error being raised.
## * `healthCheckQuery` should be a simple and fast SQL query that the pool
## can use to test the liveliness of pooled connections.
##
## .. _Example Fiber ORM Usage: #basic-usage-example-fiber-orm-usage
initDbConnPool(DbConnPoolConfig[D]( let model1Name = $(model1Type.getType[1])
connect: connect, let model2Name = $(model2Type.getType[1])
poolSize: poolSize, let getModel1Name = ident("get" & pluralize(model1Name) & "By" & model2Name)
hardCap: hardCap, let getModel2Name = ident("get" & pluralize(model2Name) & "By" & model1Name)
healthCheckQuery: healthCheckQuery)) let id1Type = typeOfColumn(model1Type, "id")
let id2Type = typeOfColumn(model2Type, "id")
let joinTableNameNode = newStrLitNode($joinTableName)
result.add quote do:
proc `getModel1Name`*(
db: `dbType`,
id: `id2Type`,
pagination = none[PaginationParams]()): PagedRecords[`model1Type`] =
db.withConnection conn:
result = findViaJoinTable(
conn,
`joinTableNameNode`,
`model1Type`,
`model2Type`,
id,
pagination)
proc `getModel1Name`*(
db: `dbType`,
rec: `model2Type`,
pagination = none[PaginationParams]()): PagedRecords[`model1Type`] =
db.withConnection conn:
result = findViaJoinTable(
conn,
`joinTableNameNode`,
`model1Type`,
rec,
pagination)
proc `getModel2Name`*(
db: `dbType`,
id: `id1Type`,
pagination = none[PaginationParams]()): Pagedrecords[`model2Type`] =
db.withConnection conn:
result = findViaJoinTable(
conn,
`joinTableNameNode`,
`model2Type`,
`model1Type`,
id,
pagination)
proc `getModel2Name`*(
db: `dbType`,
rec: `model1Type`,
pagination = none[PaginationParams]()): Pagedrecords[`model2Type`] =
db.withConnection conn:
result = findViaJoinTable(
conn,
`joinTableNameNode`,
`model2Type`,
rec,
pagination)
proc associate*(
db: `dbType`,
rec1: `model1Type`,
rec2: `model2Type`): void =
db.withConnection conn:
associate(conn, `joinTableNameNode`, rec1, rec2)
proc associate*(
db: `dbType`,
rec2: `model2Type`,
rec1: `model1Type`): void =
db.withConnection conn:
associate(conn, `joinTableNameNode`, rec1, rec2)
template inTransaction*(db, body: untyped) = template inTransaction*(db, body: untyped) =
db.withConnection conn: db.withConnection conn:

View File

@@ -4,85 +4,77 @@
## Simple database connection pooling implementation compatible with Fiber ORM. ## Simple database connection pooling implementation compatible with Fiber ORM.
import std/[sequtils, strutils, sugar] when (NimMajor, NimMinor, NimPatch) < (2, 0, 0):
when not defined(gcArc) and not defined (gcOrc):
{.error: "fiber_orm requires either --mm:arc or --mm:orc.".}
import std/[deques, locks, sequtils, sugar]
import db_connector/db_common import db_connector/db_common
from db_connector/db_sqlite import getRow, close from db_connector/db_sqlite import getRow, close
from db_connector/db_postgres import getRow, close from db_connector/db_postgres import getRow, close
import ./db_common as fiber_db_common import ./db_common as fiber_db_common
import ./private/logging
type type
DbConnPoolConfig*[D: DbConnType] = object DbConnPool*[D: DbConnType] = ptr DbConnPoolObj[D]
connect*: () -> D ## Factory procedure to create a new DBConn
poolSize*: int ## The pool capacity.
hardCap*: bool ## Is the pool capacity a hard cap? DbConnPoolObj[D: DbConnType] = object
##
## When `false`, the pool can grow beyond the
## configured capacity, but will release connections
## down to the its capacity (no less than `poolSize`).
##
## When `true` the pool will not create more than its
## configured capacity. It a connection is requested,
## none are free, and the pool is at capacity, this
## will result in an Error being raised.
healthCheckQuery*: string ## Should be a simple and fast SQL query that the
## pool can use to test the liveliness of pooled
## connections.
PooledDbConn[D: DbConnType] = ref object
conn: D
id: int
free: bool
DbConnPool*[D: DbConnType] = ref object
## Database connection pool ## Database connection pool
conns: seq[PooledDbConn[D]] connect: proc (): D {.raises: [DbError].}
cfg: DbConnPoolConfig[D] healthCheckQuery: SqlQuery
lastId: int entries: Deque[D]
cond: Cond
lock: Lock
proc initDbConnPool*[D: DbConnType](cfg: DbConnPoolConfig[D]): DbConnPool[D] =
result = DbConnPool[D](
conns: @[],
cfg: cfg)
proc newConn[D: DbConnType](pool: DbConnPool[D]): PooledDbConn[D] = proc close*[D: DbConnType](pool: DbConnPool[D]) =
pool.lastId += 1 ## Safely close all connections and release resources for the given pool.
{.gcsafe.}: getLogger("pool").debug("closing connection pool")
let conn = pool.cfg.connect() withLock(pool.lock):
result = PooledDbConn[D]( while pool.entries.len > 0: close(pool.entries.popFirst())
conn: conn,
id: pool.lastId,
free: true)
pool.conns.add(result)
proc maintain[D: DbConnType](pool: DbConnPool[D]): void = deinitLock(pool.lock)
pool.conns.keepIf(proc (pc: PooledDbConn[D]): bool = deinitCond(pool.cond)
if not pc.free: return true `=destroy`(pool[])
deallocShared(pool)
proc newDbConnPool*[D: DbConnType](
poolSize: int,
connectFunc: proc(): D {.raises: [DbError].},
healthCheckQuery = "SELECT 1;"): DbConnPool[D] =
## Initialize a new DbConnPool. See the `initDb` procedure in the `Example
## Fiber ORM Usage`_ for an example
##
## * `connect` must be a factory which creates a new `DbConn`.
## * `poolSize` sets the desired capacity of the connection pool.
## * `healthCheckQuery` should be a simple and fast SQL query that the pool
## can use to test the liveliness of pooled connections. By default it uses
## `SELECT 1;`
##
## .. _Example Fiber ORM Usage: ../fiber_orm.html#basic-usage-example-fiber-orm-usage
result = cast[DbConnPool[D]](allocShared0(sizeof(DbConnPoolObj[D])))
initCond(result.cond)
initLock(result.lock)
result.entries = initDeque[D](poolSize)
result.connect = connectFunc
result.healthCheckQuery = sql(healthCheckQuery)
try: try:
discard getRow(pc.conn, sql(pool.cfg.healthCheckQuery), []) for _ in 0 ..< poolSize: result.entries.addLast(connectFunc())
return true except DbError as ex:
except: try: result.close()
try: pc.conn.close() # try to close the connection except: discard
except: discard "" getLogger("pool").error(
return false msg = "unable to initialize connection pool",
) err = ex)
raise ex
let freeConns = pool.conns.filterIt(it.free)
if pool.conns.len > pool.cfg.poolSize and freeConns.len > 0:
let numToCull = min(freeConns.len, pool.conns.len - pool.cfg.poolSize)
if numToCull > 0: proc take*[D: DbConnType](pool: DbConnPool[D]): D {.raises: [DbError], gcsafe.} =
let toCull = freeConns[0..numToCull]
pool.conns.keepIf((pc) => toCull.allIt(it.id != pc.id))
for culled in toCull:
try: culled.conn.close()
except: discard ""
proc take*[D: DbConnType](pool: DbConnPool[D]): tuple[id: int, conn: D] =
## Request a connection from the pool. Returns a DbConn if the pool has free ## Request a connection from the pool. Returns a DbConn if the pool has free
## connections, or if it has the capacity to create a new connection. If the ## connections, or if it has the capacity to create a new connection. If the
## pool is configured with a hard capacity limit and is out of free ## pool is configured with a hard capacity limit and is out of free
@@ -90,25 +82,33 @@ proc take*[D: DbConnType](pool: DbConnPool[D]): tuple[id: int, conn: D] =
## ##
## Connections taken must be returned via `release` when the caller is ## Connections taken must be returned via `release` when the caller is
## finished using them in order for them to be released back to the pool. ## finished using them in order for them to be released back to the pool.
pool.maintain withLock(pool.lock):
let freeConns = pool.conns.filterIt(it.free) while pool.entries.len == 0: wait(pool.cond, pool.lock)
result = pool.entries.popFirst()
let reserved = # check that the connection is healthy
if freeConns.len > 0: freeConns[0] try: discard getRow(result, pool.healthCheckQuery, [])
else: pool.newConn() except DbError:
{.gcsafe.}:
# if it's not, let's try to close it and create a new connection
try:
getLogger("pool").info(
"pooled connection failed health check, opening a new connection")
close(result)
except: discard
result = pool.connect()
reserved.free = false
return (id: reserved.id, conn: reserved.conn)
proc release*[D: DbConnType](pool: DbConnPool[D], connId: int): void = proc release*[D: DbConnType](pool: DbConnPool[D], conn: D) {.raises: [], gcsafe.} =
## Release a connection back to the pool. ## Release a connection back to the pool.
let foundConn = pool.conns.filterIt(it.id == connId) withLock(pool.lock):
if foundConn.len > 0: foundConn[0].free = true pool.entries.addLast(conn)
signal(pool.cond)
template withConnection*[D: DbConnType](pool: DbConnPool[D], conn, stmt: untyped): untyped = template withConnection*[D: DbConnType](pool: DbConnPool[D], conn, stmt: untyped): untyped =
## Convenience template to provide a connection from the pool for use in a ## Convenience template to provide a connection from the pool for use in a
## statement block, automatically releasing that connnection when done. ## statement block, automatically releasing that connnection when done.
block: block:
let (connId, conn) = take(pool) let conn = take(pool)
try: stmt try: stmt
finally: release(pool, connId) finally: release(pool, conn)

View File

@@ -0,0 +1,34 @@
import std/[json, options]
import namespaced_logging
export namespaced_logging.log
export namespaced_logging.debug
export namespaced_logging.info
export namespaced_logging.notice
export namespaced_logging.warn
export namespaced_logging.error
export namespaced_logging.fatal
var logService {.threadvar.}: Option[ThreadLocalLogService]
var logger {.threadvar.}: Option[Logger]
proc makeQueryLogEntry(
m: string,
sql: string,
args: openArray[(string, string)] = []): JsonNode =
result = %*{ "method": m, "sql": sql }
for (k, v) in args: result[k] = %v
proc logQuery*(methodName: string, sqlStmt: string, args: openArray[(string, string)] = []) =
# namespaced_logging would do this check for us, but we don't want to even
# build the log object if we're not actually logging
if logService.isNone: return
if logger.isNone: logger = logService.getLogger("fiber_orm/query")
logger.debug(makeQueryLogEntry(methodName, sqlStmt, args))
proc enableDbLogging*(svc: ThreadLocalLogService) =
logService = some(svc)
proc getLogger*(scope: string): Option[Logger] =
logService.getLogger("fiber_orm/" & scope)

View File

@@ -217,7 +217,7 @@ proc parseDbArray*(val: string): seq[string] =
result.add(curStr) result.add(curStr)
func createParseStmt*(t, value: NimNode): NimNode = func createParseStmt*(t, value: NimNode): NimNode =
## Utility method to create the Nim cod required to parse a value coming from ## Utility method to create the Nim code required to parse a value coming from
## the a database query. This is used by functions like `rowToModel` to parse ## the a database query. This is used by functions like `rowToModel` to parse
## the dataabase columns into the Nim object fields. ## the dataabase columns into the Nim object fields.
@@ -240,7 +240,7 @@ func createParseStmt*(t, value: NimNode): NimNode =
elif t.getType == DateTime.getType: elif t.getType == DateTime.getType:
result = quote do: parsePGDatetime(`value`) result = quote do: parsePGDatetime(`value`)
else: error "Unknown value object type: " & $t.getTypeInst else: error "Cannot parse column with unknown object type: " & $t.getTypeInst
elif t.typeKind == ntyGenericInst: elif t.typeKind == ntyGenericInst:
@@ -254,7 +254,7 @@ func createParseStmt*(t, value: NimNode): NimNode =
if `value`.len == 0: none[`innerType`]() if `value`.len == 0: none[`innerType`]()
else: some(`parseStmt`) else: some(`parseStmt`)
else: error "Unknown generic instance type: " & $t.getTypeInst else: error "Cannot parse column with unknown generic instance type: " & $t.getTypeInst
elif t.typeKind == ntyRef: elif t.typeKind == ntyRef:
@@ -262,7 +262,7 @@ func createParseStmt*(t, value: NimNode): NimNode =
result = quote do: parseJson(`value`) result = quote do: parseJson(`value`)
else: else:
error "Unknown ref type: " & $t.getTypeInst error "Cannot parse column with unknown ref type: " & $t.getTypeInst
elif t.typeKind == ntySequence: elif t.typeKind == ntySequence:
let innerType = t[1] let innerType = t[1]
@@ -281,14 +281,14 @@ func createParseStmt*(t, value: NimNode): NimNode =
result = quote do: parseFloat(`value`) result = quote do: parseFloat(`value`)
elif t.typeKind == ntyBool: elif t.typeKind == ntyBool:
result = quote do: "true".startsWith(`value`.toLower) result = quote do: "true".startsWith(`value`.toLower) or `value` == "1"
elif t.typeKind == ntyEnum: elif t.typeKind == ntyEnum:
let innerType = t.getTypeInst let innerType = t.getTypeInst
result = quote do: parseEnum[`innerType`](`value`) result = quote do: parseEnum[`innerType`](`value`)
else: else:
error "Unknown value type: " & $t.typeKind error "Cannot parse column with unknown value type: " & $t.typeKind
func fields(t: NimNode): seq[tuple[fieldIdent: NimNode, fieldType: NimNode]] = func fields(t: NimNode): seq[tuple[fieldIdent: NimNode, fieldType: NimNode]] =
#[ #[