Skip to content

Commit 8ac9bbb

Browse files
authored
Merge pull request #465 from ali-ince/1.7-connection-leak
Fix a connection leak issue
2 parents 17f6de3 + 3d7de3f commit 8ac9bbb

File tree

6 files changed

+294
-111
lines changed

6 files changed

+294
-111
lines changed

src/v1/driver.js

Lines changed: 17 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -80,13 +80,15 @@ class Driver {
8080
this._authToken = authToken
8181
this._config = config
8282
this._log = Logger.create(config)
83-
this._pool = new Pool(
84-
this._createConnection.bind(this),
85-
this._destroyConnection.bind(this),
86-
this._validateConnection.bind(this),
87-
PoolConfig.fromDriverConfig(config),
88-
this._log
89-
)
83+
this._pool = new Pool({
84+
create: this._createConnection.bind(this),
85+
destroy: this._destroyConnection.bind(this),
86+
validate: this._validateConnection.bind(this),
87+
installIdleObserver: this._installIdleObserverOnConnection.bind(this),
88+
removeIdleObserver: this._removeIdleObserverOnConnection.bind(this),
89+
config: PoolConfig.fromDriverConfig(config),
90+
log: this._log
91+
})
9092

9193
/**
9294
* Reference to the connection provider. Initialized lazily by {@link _getOrCreateConnectionProvider}.
@@ -177,6 +179,14 @@ class Driver {
177179
return lifetime <= maxConnectionLifetime
178180
}
179181

182+
_installIdleObserverOnConnection (conn, observer) {
183+
conn._queueObserver(observer)
184+
}
185+
186+
_removeIdleObserverOnConnection (conn) {
187+
conn._updateCurrentObserver()
188+
}
189+
180190
/**
181191
* Dispose of a connection.
182192
* @return {Connection} the connection to dispose.

src/v1/internal/pool.js

Lines changed: 23 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -31,19 +31,25 @@ class Pool {
3131
* @param {function} validate called at various times (like when an instance is acquired and
3232
* when it is returned). If this returns false, the resource will
3333
* be evicted
34+
* @param {function} installIdleObserver called when the resource is released back to pool
35+
* @param {function} removeIdleObserver called when the resource is acquired from the pool
3436
* @param {PoolConfig} config configuration for the new driver.
3537
* @param {Logger} log the driver logger.
3638
*/
37-
constructor (
38-
create,
39-
destroy = () => true,
40-
validate = () => true,
39+
constructor ({
40+
create = (address, release) => {},
41+
destroy = conn => true,
42+
validate = conn => true,
43+
installIdleObserver = (conn, observer) => {},
44+
removeIdleObserver = conn => {},
4145
config = PoolConfig.defaultConfig(),
4246
log = Logger.noOp()
43-
) {
47+
} = {}) {
4448
this._create = create
4549
this._destroy = destroy
4650
this._validate = validate
51+
this._installIdleObserver = installIdleObserver
52+
this._removeIdleObserver = removeIdleObserver
4753
this._maxSize = config.maxSize
4854
this._acquisitionTimeout = config.acquisitionTimeout
4955
this._pools = {}
@@ -165,6 +171,10 @@ class Pool {
165171
const resource = pool.pop()
166172

167173
if (this._validate(resource)) {
174+
if (this._removeIdleObserver) {
175+
this._removeIdleObserver(resource)
176+
}
177+
168178
// idle resource is valid and can be acquired
169179
return Promise.resolve(resource)
170180
} else {
@@ -197,6 +207,14 @@ class Pool {
197207
if (this._log.isDebugEnabled()) {
198208
this._log.debug(`${resource} released to the pool ${key}`)
199209
}
210+
if (this._installIdleObserver) {
211+
this._installIdleObserver(resource, {
212+
onError: () => {
213+
this._pools[key] = this._pools[key].filter(r => r !== resource)
214+
this._destroy(resource)
215+
}
216+
})
217+
}
200218
pool.push(resource)
201219
}
202220
} else {

test/internal/connection-providers.test.js

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1210,9 +1210,10 @@ function setupLoadBalancerToRememberRouters (loadBalancer, routersArray) {
12101210
}
12111211

12121212
function newPool () {
1213-
return new Pool((address, release) =>
1214-
Promise.resolve(new FakeConnection(address, release))
1215-
)
1213+
return new Pool({
1214+
create: (address, release) =>
1215+
Promise.resolve(new FakeConnection(address, release))
1216+
})
12161217
}
12171218

12181219
function expectRoutingTable (loadBalancer, routers, readers, writers) {

test/internal/least-connected-load-balancing-strategy.test.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -140,7 +140,7 @@ describe('LeastConnectedLoadBalancingStrategy', () => {
140140

141141
class DummyPool extends Pool {
142142
constructor (activeConnections) {
143-
super(() => 42)
143+
super({ create: () => 42 })
144144
this._activeConnections = activeConnections
145145
}
146146

test/internal/node/direct.driver.boltkit.test.js

Lines changed: 53 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
import neo4j from '../../../src/v1'
2121
import { READ, WRITE } from '../../../src/v1/driver'
2222
import boltStub from '../bolt-stub'
23-
import { SERVICE_UNAVAILABLE } from '../../../src/v1/error'
23+
import { newError, SERVICE_UNAVAILABLE } from '../../../src/v1/error'
2424

2525
describe('direct driver with stub server', () => {
2626
let originalTimeout
@@ -423,6 +423,58 @@ describe('direct driver with stub server', () => {
423423
})
424424
})
425425

426+
it('should close connection if it dies sitting idle in connection pool', done => {
427+
if (!boltStub.supported) {
428+
done()
429+
return
430+
}
431+
432+
const server = boltStub.start(
433+
'./test/resources/boltstub/read_server_v3_read.script',
434+
9001
435+
)
436+
437+
boltStub.run(() => {
438+
const driver = boltStub.newDriver('bolt://127.0.0.1:9001')
439+
const session = driver.session(READ)
440+
441+
session
442+
.run('MATCH (n) RETURN n.name')
443+
.then(result => {
444+
const records = result.records
445+
expect(records.length).toEqual(3)
446+
expect(records[0].get(0)).toBe('Bob')
447+
expect(records[1].get(0)).toBe('Alice')
448+
expect(records[2].get(0)).toBe('Tina')
449+
450+
const connectionKey = Object.keys(driver._openConnections)[0]
451+
expect(connectionKey).toBeTruthy()
452+
453+
const connection = driver._openConnections[connectionKey]
454+
session.close(() => {
455+
// generate a fake fatal error
456+
connection._handleFatalError(
457+
newError('connection reset', SERVICE_UNAVAILABLE)
458+
)
459+
460+
// expect that the connection to be removed from the pool
461+
expect(driver._pool._pools['127.0.0.1:9001'].length).toEqual(0)
462+
expect(
463+
driver._pool._activeResourceCounts['127.0.0.1:9001']
464+
).toBeFalsy()
465+
// expect that the connection to be unregistered from the open connections registry
466+
expect(driver._openConnections[connectionKey]).toBeFalsy()
467+
driver.close()
468+
server.exit(code => {
469+
expect(code).toEqual(0)
470+
done()
471+
})
472+
})
473+
})
474+
.catch(error => done.fail(error))
475+
})
476+
})
477+
426478
describe('should fail if commit fails due to broken connection', () => {
427479
it('v1', done => {
428480
verifyFailureOnConnectionFailureWhenExplicitTransactionIsCommitted(

0 commit comments

Comments
 (0)