diff --git a/fiber_orm.nimble b/fiber_orm.nimble index d383787..3b52251 100644 --- a/fiber_orm.nimble +++ b/fiber_orm.nimble @@ -1,6 +1,6 @@ # Package -version = "3.2.0" +version = "4.0.0" author = "Jonathan Bernard" description = "Lightweight Postgres ORM for Nim." license = "GPL-3.0" diff --git a/src/fiber_orm.nim b/src/fiber_orm.nim index 33e53eb..549ad14 100644 --- a/src/fiber_orm.nim +++ b/src/fiber_orm.nim @@ -286,15 +286,16 @@ ## import std/[json, macros, options, sequtils, strutils] import db_connector/db_common -import namespaced_logging, uuids +import uuids from std/unicode import capitalize import ./fiber_orm/db_common as fiber_db_common -import ./fiber_orm/pool -import ./fiber_orm/util +import ./fiber_orm/[pool, util] +import ./fiber_orm/private/logging export pool, util +export logging.enableDbLogging type PagedRecords*[T] = object @@ -308,26 +309,6 @@ type NotFoundError* = object of CatchableError ## Error type raised when no record matches a given ID -var logService {.threadvar.}: Option[ThreadLocalLogService] -var logger {.threadvar.}: Option[Logger] - -proc makeQueryLogEntry( - m: string, - sql: string, - args: openArray[(string, string)] = []): JsonNode = - result = %*{ "method": m, "sql": sql } - for (k, v) in args: result[k] = %v - -proc logQuery*(methodName: string, sqlStmt: string, args: openArray[(string, string)] = []) = - # namespaced_logging would do this check for us, but we don't want to even - # build the log object if we're not actually logging - if logService.isNone: return - if logger.isNone: logger = logService.getLogger("fiber_orm/query") - logger.debug(makeQueryLogEntry(methodName, sqlStmt, args)) - -proc enableDbLogging*(svc: ThreadLocalLogService) = - logService = some(svc) - proc newMutateClauses(): MutateClauses = return MutateClauses( columns: @[], @@ -353,7 +334,7 @@ proc createRecord*[D: DbConnType, T](db: D, rec: T): T = " RETURNING " & columnNamesForModel(rec).join(",") logQuery("createRecord", sqlStmt) - debug(logService.getLogger("fiber_orm/query"), %*{ "values": mc.values }) + debug(getLogger("query"), %*{ "values": mc.values }) let newRow = db.getRow(sql(sqlStmt), mc.values) @@ -845,35 +826,6 @@ macro generateJoinTableProcs*( db.withConnection conn: associate(conn, `joinTableNameNode`, rec1, rec2) -proc initPool*[D: DbConnType]( - connect: proc(): D, - poolSize = 10, - hardCap = false, - healthCheckQuery = "SELECT 'true' AS alive"): DbConnPool[D] = - - ## Initialize a new DbConnPool. See the `initDb` procedure in the `Example - ## Fiber ORM Usage`_ for an example - ## - ## * `connect` must be a factory which creates a new `DbConn`. - ## * `poolSize` sets the desired capacity of the connection pool. - ## * `hardCap` defaults to `false`. - ## When `false`, the pool can grow beyond the configured capacity, but will - ## release connections down to the its capacity (no less than `poolSize`). - ## - ## When `true` the pool will not create more than its configured capacity. - ## It a connection is requested, none are free, and the pool is at - ## capacity, this will result in an Error being raised. - ## * `healthCheckQuery` should be a simple and fast SQL query that the pool - ## can use to test the liveliness of pooled connections. - ## - ## .. _Example Fiber ORM Usage: #basic-usage-example-fiber-orm-usage - - initDbConnPool(DbConnPoolConfig[D]( - connect: connect, - poolSize: poolSize, - hardCap: hardCap, - healthCheckQuery: healthCheckQuery)) - template inTransaction*(db, body: untyped) = db.withConnection conn: conn.exec(sql"BEGIN TRANSACTION") diff --git a/src/fiber_orm/pool.nim b/src/fiber_orm/pool.nim index ed4e1c4..205c923 100644 --- a/src/fiber_orm/pool.nim +++ b/src/fiber_orm/pool.nim @@ -4,85 +4,77 @@ ## Simple database connection pooling implementation compatible with Fiber ORM. -import std/[sequtils, sugar] +when (NimMajor, NimMinor, NimPatch) < (2, 0, 0): + when not defined(gcArc) and not defined (gcOrc): + {.error: "fiber_orm requires either --mm:arc or --mm:orc.".} + +import std/[deques, locks, sequtils, sugar] import db_connector/db_common from db_connector/db_sqlite import getRow, close from db_connector/db_postgres import getRow, close import ./db_common as fiber_db_common +import ./private/logging type - DbConnPoolConfig*[D: DbConnType] = object - connect*: () -> D ## Factory procedure to create a new DBConn - poolSize*: int ## The pool capacity. + DbConnPool*[D: DbConnType] = ptr DbConnPoolObj[D] - hardCap*: bool ## Is the pool capacity a hard cap? - ## - ## When `false`, the pool can grow beyond the - ## configured capacity, but will release connections - ## down to the its capacity (no less than `poolSize`). - ## - ## When `true` the pool will not create more than its - ## configured capacity. It a connection is requested, - ## none are free, and the pool is at capacity, this - ## will result in an Error being raised. - - healthCheckQuery*: string ## Should be a simple and fast SQL query that the - ## pool can use to test the liveliness of pooled - ## connections. - - PooledDbConn[D: DbConnType] = ref object - conn: D - id: int - free: bool - - DbConnPool*[D: DbConnType] = ref object + DbConnPoolObj[D: DbConnType] = object ## Database connection pool - conns: seq[PooledDbConn[D]] - cfg: DbConnPoolConfig[D] - lastId: int + connect: proc (): D {.raises: [DbError].} + healthCheckQuery: SqlQuery + entries: Deque[D] + cond: Cond + lock: Lock -proc initDbConnPool*[D: DbConnType](cfg: DbConnPoolConfig[D]): DbConnPool[D] = - result = DbConnPool[D]( - conns: @[], - cfg: cfg) -proc newConn[D: DbConnType](pool: DbConnPool[D]): PooledDbConn[D] = - pool.lastId += 1 - {.gcsafe.}: - let conn = pool.cfg.connect() - result = PooledDbConn[D]( - conn: conn, - id: pool.lastId, - free: true) - pool.conns.add(result) +proc close*[D: DbConnType](pool: DbConnPool[D]) = + ## Safely close all connections and release resources for the given pool. + getLogger("pool").debug("closing connection pool") + withLock(pool.lock): + while pool.entries.len > 0: close(pool.entries.popFirst()) -proc maintain[D: DbConnType](pool: DbConnPool[D]): void = - pool.conns.keepIf(proc (pc: PooledDbConn[D]): bool = - if not pc.free: return true + deinitLock(pool.lock) + deinitCond(pool.cond) + `=destroy`(pool[]) + deallocShared(pool) - try: - discard getRow(pc.conn, sql(pool.cfg.healthCheckQuery), []) - return true - except: - try: pc.conn.close() # try to close the connection - except: discard "" - return false - ) - let freeConns = pool.conns.filterIt(it.free) - if pool.conns.len > pool.cfg.poolSize and freeConns.len > 0: - let numToCull = min(freeConns.len, pool.conns.len - pool.cfg.poolSize) +proc newDbConnPool*[D: DbConnType]( + poolSize: int, + connectFunc: proc(): D {.raises: [DbError].}, + healthCheckQuery = "SELECT 1;"): DbConnPool[D] = + ## Initialize a new DbConnPool. See the `initDb` procedure in the `Example + ## Fiber ORM Usage`_ for an example + ## + ## * `connect` must be a factory which creates a new `DbConn`. + ## * `poolSize` sets the desired capacity of the connection pool. + ## * `healthCheckQuery` should be a simple and fast SQL query that the pool + ## can use to test the liveliness of pooled connections. By default it uses + ## `SELECT 1;` + ## + ## .. _Example Fiber ORM Usage: ../fiber_orm.html#basic-usage-example-fiber-orm-usage - if numToCull > 0: - let toCull = freeConns[0..numToCull] - pool.conns.keepIf((pc) => toCull.allIt(it.id != pc.id)) - for culled in toCull: - try: culled.conn.close() - except: discard "" + result = cast[DbConnPool[D]](allocShared0(sizeof(DbConnPoolObj[D]))) + initCond(result.cond) + initLock(result.lock) + result.entries = initDeque[D](poolSize) + result.connect = connectFunc + result.healthCheckQuery = sql(healthCheckQuery) -proc take*[D: DbConnType](pool: DbConnPool[D]): tuple[id: int, conn: D] = + try: + for _ in 0 ..< poolSize: result.entries.addLast(connectFunc()) + except DbError as ex: + try: result.close() + except: discard + getLogger("pool").error( + msg = "unable to initialize connection pool", + err = ex) + raise ex + + +proc take*[D: DbConnType](pool: DbConnPool[D]): D {.raises: [DbError], gcsafe.} = ## Request a connection from the pool. Returns a DbConn if the pool has free ## connections, or if it has the capacity to create a new connection. If the ## pool is configured with a hard capacity limit and is out of free @@ -90,25 +82,33 @@ proc take*[D: DbConnType](pool: DbConnPool[D]): tuple[id: int, conn: D] = ## ## Connections taken must be returned via `release` when the caller is ## finished using them in order for them to be released back to the pool. - pool.maintain - let freeConns = pool.conns.filterIt(it.free) + withLock(pool.lock): + while pool.entries.len == 0: wait(pool.cond, pool.lock) + result = pool.entries.popFirst() - let reserved = - if freeConns.len > 0: freeConns[0] - else: pool.newConn() + # check that the connection is healthy + try: discard getRow(result, pool.healthCheckQuery, []) + except DbError: + {.gcsafe.}: + # if it's not, let's try to close it and create a new connection + try: + getLogger("pool").info( + "pooled connection failed health check, opening a new connection") + close(result) + except: discard + result = pool.connect() - reserved.free = false - return (id: reserved.id, conn: reserved.conn) -proc release*[D: DbConnType](pool: DbConnPool[D], connId: int): void = +proc release*[D: DbConnType](pool: DbConnPool[D], conn: D) {.raises: [], gcsafe.} = ## Release a connection back to the pool. - let foundConn = pool.conns.filterIt(it.id == connId) - if foundConn.len > 0: foundConn[0].free = true + withLock(pool.lock): + pool.entries.addLast(conn) + signal(pool.cond) template withConnection*[D: DbConnType](pool: DbConnPool[D], conn, stmt: untyped): untyped = ## Convenience template to provide a connection from the pool for use in a ## statement block, automatically releasing that connnection when done. block: - let (connId, conn) = take(pool) + let conn = take(pool) try: stmt - finally: release(pool, connId) + finally: release(pool, conn) diff --git a/src/fiber_orm/private/logging.nim b/src/fiber_orm/private/logging.nim new file mode 100644 index 0000000..0225965 --- /dev/null +++ b/src/fiber_orm/private/logging.nim @@ -0,0 +1,34 @@ +import std/[json, options] + +import namespaced_logging + +export namespaced_logging.log +export namespaced_logging.debug +export namespaced_logging.info +export namespaced_logging.notice +export namespaced_logging.warn +export namespaced_logging.error +export namespaced_logging.fatal + +var logService {.threadvar.}: Option[ThreadLocalLogService] +var logger {.threadvar.}: Option[Logger] + +proc makeQueryLogEntry( + m: string, + sql: string, + args: openArray[(string, string)] = []): JsonNode = + result = %*{ "method": m, "sql": sql } + for (k, v) in args: result[k] = %v + +proc logQuery*(methodName: string, sqlStmt: string, args: openArray[(string, string)] = []) = + # namespaced_logging would do this check for us, but we don't want to even + # build the log object if we're not actually logging + if logService.isNone: return + if logger.isNone: logger = logService.getLogger("fiber_orm/query") + logger.debug(makeQueryLogEntry(methodName, sqlStmt, args)) + +proc enableDbLogging*(svc: ThreadLocalLogService) = + logService = some(svc) + +proc getLogger*(scope: string): Option[Logger] = + logService.getLogger("fiber_orm/" & scope)