Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/internal/database/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ async function getDbSettings(
isMultitenant,
databasePoolURL,
databaseURL,
databaseEngine,
databaseMaxConnections,
requestXForwardedHostRegExp,
} = getConfig()
Expand Down Expand Up @@ -76,6 +77,7 @@ async function getDbSettings(

return {
dbUrl,
databaseEngine,
isExternalPool,
maxConnections,
}
Expand Down
282 changes: 282 additions & 0 deletions src/internal/database/pg-connection.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import PgConnection from 'pg/lib/connection'
import { vi } from 'vitest'
import {
getPgCancelConnectionTarget,
type PgExecutor,
PgPoolExecutor,
PgPoolStrategy,
PgTenantConnection,
Expand Down Expand Up @@ -533,6 +534,84 @@ describe('PgPoolExecutor', () => {
})

describe('PgTransaction', () => {
it('applies a pending statement timeout before the first direct query', async () => {
const client = {
query: vi.fn().mockResolvedValue({ rows: [] }),
release: vi.fn(),
} as unknown as PoolClient
const transaction = new PgTransaction(client)

transaction.setPendingStatementTimeout(4321)

await transaction.query('SELECT 1')
await transaction.query('SELECT 2')

expect(client.query).toHaveBeenCalledTimes(3)
expect(client.query).toHaveBeenNthCalledWith(
1,
"SELECT set_config('statement_timeout', $1, true)",
['4321ms']
)
expect(client.query).toHaveBeenNthCalledWith(2, 'SELECT 1', undefined)
expect(client.query).toHaveBeenNthCalledWith(3, 'SELECT 2', undefined)
})

it('rejects a pre-aborted direct query before applying a pending statement timeout', async () => {
const client = {
query: vi.fn().mockResolvedValue({ rows: [] }),
release: vi.fn(),
} as unknown as PoolClient
const transaction = new PgTransaction(client)

transaction.setPendingStatementTimeout(4321)

await expect(
transaction.query('SELECT 1', { signal: AbortSignal.abort() })
).rejects.toMatchObject({
name: 'AbortError',
code: 'ABORT_ERR',
})
expect(client.query).not.toHaveBeenCalled()
expect(client.release).toHaveBeenCalledWith(expect.objectContaining({ name: 'AbortError' }))

await expect(transaction.query('SELECT 2')).rejects.toThrow(
'Cannot query a completed transaction'
)
expect(client.query).not.toHaveBeenCalled()
})

it('honors abort signals while applying a pending statement timeout', async () => {
const controller = new AbortController()
const client = Object.assign(new EventEmitter(), {
query: vi.fn(() => {
controller.abort()
return new Promise(() => undefined)
}),
release: vi.fn(),
}) as unknown as PoolClient & EventEmitter
const transaction = new PgTransaction(client)

transaction.setPendingStatementTimeout(4321)

await expect(
transaction.query('SELECT 1', { signal: controller.signal })
).rejects.toMatchObject({
name: 'AbortError',
code: 'ABORT_ERR',
})

expect(client.query).toHaveBeenCalledTimes(1)
expect(client.query).toHaveBeenNthCalledWith(
1,
"SELECT set_config('statement_timeout', $1, true)",
['4321ms']
)
expect(client.release).toHaveBeenCalledWith(expect.objectContaining({ name: 'AbortError' }))

await transaction.rollback()
expect(client.query).toHaveBeenCalledTimes(1)
})

it('rejects queries after commit releases the client', async () => {
const client = {
query: vi.fn().mockResolvedValue({ rows: [] }),
Expand Down Expand Up @@ -688,6 +767,209 @@ describe('PgTenantConnection', () => {
})
})

it('defers statement_timeout setup until the first scope application', async () => {
const query = vi.fn().mockResolvedValue({ rows: [] })
const client = {
query,
release: vi.fn(),
} as unknown as PoolClient
const transaction = new PgTransaction(client)
const pool = {
acquire: vi.fn().mockReturnValue({
beginTransaction: vi.fn().mockResolvedValue(transaction),
}),
} as unknown as PgPoolStrategy
const connection = new PgTenantConnection(
pool,
createPoolStrategySettings({
isExternalPool: false,
})
)

await expect(connection.transaction({ timeout: 4321 })).resolves.toBe(transaction)
expect(query).not.toHaveBeenCalled()

await connection.setScope(transaction)

expect(query).toHaveBeenCalledTimes(1)
const [statement, values] = query.mock.calls[0]
expect(statement).toContain("set_config('role', $1, true)")
expect(statement).toContain("set_config('statement_timeout', $10, true)")
expect(values).toEqual([
'authenticated',
'authenticated',
'jwt',
'',
JSON.stringify({ role: 'authenticated' }),
'{}',
'',
'',
'',
'4321ms',
])
})

it('keeps external-pool search_path setup before deferring statement_timeout', async () => {
const query = vi.fn().mockResolvedValue({ rows: [] })
const client = {
query,
release: vi.fn(),
} as unknown as PoolClient
const transaction = new PgTransaction(client)
const pool = {
acquire: vi.fn().mockReturnValue({
beginTransaction: vi.fn().mockResolvedValue(transaction),
}),
} as unknown as PgPoolStrategy
const connection = new PgTenantConnection(
pool,
createPoolStrategySettings({
isExternalPool: true,
})
)

await expect(connection.transaction({ timeout: 4321 })).resolves.toBe(transaction)

expect(query).toHaveBeenCalledTimes(1)
expect(query).toHaveBeenNthCalledWith(
1,
"SELECT set_config('search_path', $1, true)",
expect.any(Array)
)

await connection.setScope(transaction)

expect(query).toHaveBeenCalledTimes(2)
const [scopeStatement, scopeValues] = query.mock.calls[1]
expect(scopeStatement).toContain("set_config('role', $1, true)")
expect(scopeStatement).toContain("set_config('statement_timeout', $10, true)")
expect(scopeValues).toEqual([
'authenticated',
'authenticated',
'jwt',
'',
JSON.stringify({ role: 'authenticated' }),
'{}',
'',
'',
'',
'4321ms',
])
})

it('uses a standalone statement_timeout setup for Multigres transactions', async () => {
const query = vi.fn().mockResolvedValue({ rows: [] })
const client = {
query,
release: vi.fn(),
} as unknown as PoolClient
const transaction = new PgTransaction(client)
const pool = {
acquire: vi.fn().mockReturnValue({
beginTransaction: vi.fn().mockResolvedValue(transaction),
}),
} as unknown as PgPoolStrategy
const connection = new PgTenantConnection(
pool,
createPoolStrategySettings({
isExternalPool: true,
databaseEngine: 'multigres',
})
)

await expect(connection.transaction({ timeout: 4321 })).resolves.toBe(transaction)

expect(query).toHaveBeenCalledTimes(2)
expect(query).toHaveBeenNthCalledWith(
1,
"SELECT set_config('search_path', $1, true)",
expect.any(Array)
)
expect(query).toHaveBeenNthCalledWith(2, "SET LOCAL statement_timeout = '4321ms'", undefined)

await connection.setScope(transaction)

expect(query).toHaveBeenCalledTimes(4)
const [scopeStatement, scopeValues] = query.mock.calls[2]
expect(scopeStatement).not.toContain("set_config('statement_timeout'")
expect(scopeValues).toEqual([
'authenticated',
'authenticated',
'jwt',
'',
JSON.stringify({ role: 'authenticated' }),
'{}',
'',
'',
'',
])
expect(query).toHaveBeenNthCalledWith(4, "SET LOCAL statement_timeout = '4321ms'", undefined)
})

it('does not re-apply statement_timeout after setScope consumes it', async () => {
const query = vi.fn().mockResolvedValue({ rows: [] })
const client = {
query,
release: vi.fn(),
} as unknown as PoolClient
const transaction = new PgTransaction(client)
const pool = {
acquire: vi.fn().mockReturnValue({
beginTransaction: vi.fn().mockResolvedValue(transaction),
}),
} as unknown as PgPoolStrategy
const connection = new PgTenantConnection(
pool,
createPoolStrategySettings({
isExternalPool: false,
})
)

await connection.transaction({ timeout: 4321 })
await connection.setScope(transaction)
await transaction.query('SELECT 1')

expect(query).toHaveBeenCalledTimes(2)
expect(query).toHaveBeenNthCalledWith(2, 'SELECT 1', undefined)
expect(
query.mock.calls.filter(([statement]) => String(statement).includes('statement_timeout'))
).toHaveLength(1)
})

it('reuses precomputed scope JSON payloads across repeated scope applications', async () => {
const pool = {
acquire: vi.fn(),
} as unknown as PgPoolStrategy
const connection = new PgTenantConnection(
pool,
createPoolStrategySettings({
headers: {
'x-test-header': 'test-value',
},
user: {
jwt: 'jwt',
payload: {
role: 'authenticated',
sub: 'user-id',
},
},
})
)
const executor = {
query: vi.fn().mockResolvedValue({ rows: [] }),
} as unknown as PgExecutor
const stringifySpy = vi.spyOn(JSON, 'stringify')

try {
await connection.setScope(executor)
await connection.setScope(executor)

expect(stringifySpy).not.toHaveBeenCalled()
} finally {
stringifySpy.mockRestore()
}
})

it('preserves setup errors when external-pool rollback fails', async () => {
const setupError = new Error('search_path setup failed')
const rollbackError = new Error('rollback failed')
Expand Down
Loading