Skip to content

Commit 18d1fd9

Browse files
committed
feat: add onConnectionError callback option
Add optional callback for handling terminal reconnection failures that occur asynchronously after a connection drop. The callback is only invoked for async errors — initial connect() failures are handled via the rejected promise directly. Also fixes Error cause being passed incorrectly (now uses { cause: err }) and ensures _reconnect() promise rejections are properly caught.
1 parent 8a78129 commit 18d1fd9

6 files changed

Lines changed: 250 additions & 8 deletions

File tree

README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,7 @@ import PGPubSub from 'pg-notify'
7171
- `options` (`object`) Configuration options for pg-notify pubsub instance. Accepts same options as [pg](https://github.com/brianc/node-postgres) with few custom ones described below.
7272
- reconnectMaxRetries (`number`) Maximum number of reconnect attempts after losing connection. Pass `0` to disable reconnecting. Default: `10`.
7373
- maxPayloadSize (`number`) Maximum payload size, exceeding given size will throw an error. Default: `7999` ([In the default configuration it must be shorter than 8000 bytes.](https://www.postgresql.org/docs/current/sql-notify.html)).
74+
- onConnectionError (`function`) Optional callback invoked with the error when reconnection fails after exhausting all retries. Only called for errors that occur asynchronously after a connection drop, preventing the process from crashing with an uncaught exception. Not called during the initial `connect()` — handle that via the rejected promise directly.
7475

7576
### emit(channel, payload)
7677
- `channel` (`string`)

cjs/src/pg-notify.js

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,10 @@ class PGPubSub {
2121

2222
this.reconnectMaxRetries = typeof opts.reconnectMaxRetries !== 'undefined' ? opts.reconnectMaxRetries : 10
2323
this.maxPayloadSize = opts.maxPayloadSize || 7999 // default on a standard pg installation
24+
if (opts.onConnectionError && typeof opts.onConnectionError !== 'function') {
25+
throw new TypeError('onConnectionError must be a function')
26+
}
27+
this.onConnectionError = opts.onConnectionError || null
2428

2529
this.state = states.init
2630
this.reconnectRetries = 0
@@ -127,7 +131,7 @@ class PGPubSub {
127131
} catch (err) {
128132
if (this.reconnectRetries >= this.reconnectMaxRetries) {
129133
await this.close()
130-
throw new Error('[PGPubSub]: max reconnect attempts reached, aborting', err)
134+
throw new Error('[PGPubSub]: max reconnect attempts reached, aborting', { cause: err })
131135
}
132136
if (![states.closing, states.connected].includes((this.state))) {
133137
await sleep(10)
@@ -155,10 +159,21 @@ class PGPubSub {
155159

156160
if (this.reconnectRetries > this.reconnectMaxRetries) {
157161
this.close()
158-
throw new Error('[PGPubSub]: max reconnect attempts reached, aborting', err)
162+
const error = new Error('[PGPubSub]: max reconnect attempts reached, aborting', { cause: err })
163+
if (this.onConnectionError) {
164+
try { this.onConnectionError(error) } catch (e) { this._debug('[onConnectionError] callback error', e) }
165+
return
166+
}
167+
throw error
159168
}
160169

161-
this._reconnect()
170+
this._reconnect().catch(reconnectError => {
171+
if (this.onConnectionError) {
172+
try { this.onConnectionError(reconnectError) } catch (e) { this._debug('[onConnectionError] callback error', e) }
173+
return
174+
}
175+
process.nextTick(() => { throw reconnectError })
176+
})
162177
})
163178

164179
this._debug('[_setupClient] init listeners')

cjs/test/pg-notify.js

Lines changed: 106 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,12 +16,16 @@ const opts = {
1616
ssl: process.env.DB_SSL === 'true' ? { rejectUnauthorized: false } : false
1717
}
1818

19-
async function waitUntilTrue (cb) {
19+
async function waitUntilTrue (cb, timeout = 5000) {
20+
const start = Date.now()
2021
while (true) {
2122
const result = cb()
2223
if (result) {
2324
break
2425
}
26+
if (Date.now() - start > timeout) {
27+
throw new Error('waitUntilTrue timed out')
28+
}
2529
await sleep(1)
2630
}
2731
}
@@ -122,6 +126,13 @@ test('works with concurrent emits', async (t) => {
122126
await waitUntilTrue(() => emitCount === expected)
123127
})
124128

129+
test('throws TypeError when onConnectionError is not a function', (t) => {
130+
t.throws(() => new PGPubSub({ ...opts, onConnectionError: 'not-a-function' }), {
131+
instanceOf: TypeError,
132+
message: 'onConnectionError must be a function'
133+
})
134+
})
135+
125136
test('retries and throws when initial connection fails', async (t) => {
126137
const pubsub = new PGPubSub({
127138
...opts,
@@ -142,6 +153,28 @@ test('retries and throws when initial connection fails', async (t) => {
142153
})
143154
})
144155

156+
test('onConnectionError is not called during initial connect failure', async (t) => {
157+
const errors = []
158+
const pubsub = new PGPubSub({
159+
...opts,
160+
reconnectMaxRetries: 2,
161+
host: 'xxx',
162+
onConnectionError: (err) => {
163+
errors.push(err)
164+
}
165+
})
166+
167+
const err = await t.throwsAsync(() => pubsub.connect())
168+
t.is(err.message, '[PGPubSub]: max reconnect attempts reached, aborting')
169+
170+
t.is(errors.length, 0)
171+
t.is(pubsub.state, 'closing')
172+
173+
t.teardown(() => {
174+
pubsub.close()
175+
})
176+
})
177+
145178
test('connection can be re-established', async (t) => {
146179
const channel = getChannel()
147180
const pubsub = new PGPubSub({
@@ -240,6 +273,78 @@ test('connection cannot be re-established', async (t) => {
240273
})
241274
})
242275

276+
test('onConnectionError callback receives error when retries exhausted', async (t) => {
277+
const channel = getChannel()
278+
const errors = []
279+
const pubsub = new PGPubSub({
280+
...opts,
281+
reconnectMaxRetries: 1,
282+
onConnectionError: (err) => {
283+
errors.push(err)
284+
}
285+
})
286+
287+
await pubsub.connect()
288+
t.is(pubsub.state, 'connected')
289+
290+
await new Promise(resolve => {
291+
pubsub.on(channel, (payload) => {
292+
t.deepEqual(payload, 'this-is-the-payload')
293+
resolve()
294+
})
295+
pubsub.emit(channel, 'this-is-the-payload')
296+
})
297+
298+
// emulate pg client emitting an error after retries exceeded
299+
pubsub.reconnectRetries = 2
300+
pubsub.client.emit('error', new Error('connection reset'))
301+
302+
await sleep(10)
303+
304+
await waitUntilTrue(() => pubsub.state === 'closing')
305+
t.is(errors.length, 1)
306+
t.is(errors[0].message, '[PGPubSub]: max reconnect attempts reached, aborting')
307+
308+
t.teardown(() => {
309+
pubsub.close()
310+
})
311+
})
312+
313+
test('onConnectionError callback receives error when reconnect exhausts retries', async (t) => {
314+
const channel = getChannel()
315+
const errors = []
316+
const pubsub = new PGPubSub({
317+
...opts,
318+
reconnectMaxRetries: 1,
319+
onConnectionError: (err) => {
320+
errors.push(err)
321+
}
322+
})
323+
324+
await pubsub.connect()
325+
t.is(pubsub.state, 'connected')
326+
327+
await new Promise(resolve => {
328+
pubsub.on(channel, (payload) => {
329+
t.deepEqual(payload, 'this-is-the-payload')
330+
resolve()
331+
})
332+
pubsub.emit(channel, 'this-is-the-payload')
333+
})
334+
335+
// point host to invalid address so _reconnect() will fail and exhaust retries
336+
pubsub.opts.host = 'xxx'
337+
pubsub.client.emit('error', new Error('connection reset'))
338+
339+
await waitUntilTrue(() => errors.length === 1)
340+
t.is(errors[0].message, '[PGPubSub]: max reconnect attempts reached, aborting')
341+
t.is(pubsub.state, 'closing')
342+
343+
t.teardown(() => {
344+
pubsub.close()
345+
})
346+
})
347+
243348
test('disabled reconnection with zero', async (t) => {
244349
const channel = getChannel()
245350
const pubsub = new PGPubSub({

src/pg-notify.js

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,10 @@ class PGPubSub {
2121

2222
this.reconnectMaxRetries = typeof opts.reconnectMaxRetries !== 'undefined' ? opts.reconnectMaxRetries : 10
2323
this.maxPayloadSize = opts.maxPayloadSize || 7999 // default on a standard pg installation
24+
if (opts.onConnectionError && typeof opts.onConnectionError !== 'function') {
25+
throw new TypeError('onConnectionError must be a function')
26+
}
27+
this.onConnectionError = opts.onConnectionError || null
2428

2529
this.state = states.init
2630
this.reconnectRetries = 0
@@ -127,7 +131,7 @@ class PGPubSub {
127131
} catch (err) {
128132
if (this.reconnectRetries >= this.reconnectMaxRetries) {
129133
await this.close()
130-
throw new Error('[PGPubSub]: max reconnect attempts reached, aborting', err)
134+
throw new Error('[PGPubSub]: max reconnect attempts reached, aborting', { cause: err })
131135
}
132136
if (![states.closing, states.connected].includes((this.state))) {
133137
await sleep(10)
@@ -155,10 +159,21 @@ class PGPubSub {
155159

156160
if (this.reconnectRetries > this.reconnectMaxRetries) {
157161
this.close()
158-
throw new Error('[PGPubSub]: max reconnect attempts reached, aborting', err)
162+
const error = new Error('[PGPubSub]: max reconnect attempts reached, aborting', { cause: err })
163+
if (this.onConnectionError) {
164+
try { this.onConnectionError(error) } catch (e) { this._debug('[onConnectionError] callback error', e) }
165+
return
166+
}
167+
throw error
159168
}
160169

161-
this._reconnect()
170+
this._reconnect().catch(reconnectError => {
171+
if (this.onConnectionError) {
172+
try { this.onConnectionError(reconnectError) } catch (e) { this._debug('[onConnectionError] callback error', e) }
173+
return
174+
}
175+
process.nextTick(() => { throw reconnectError })
176+
})
162177
})
163178

164179
this._debug('[_setupClient] init listeners')

test/pg-notify.js

Lines changed: 106 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,12 +16,16 @@ const opts = {
1616
ssl: process.env.DB_SSL === 'true' ? { rejectUnauthorized: false } : false
1717
}
1818

19-
async function waitUntilTrue (cb) {
19+
async function waitUntilTrue (cb, timeout = 5000) {
20+
const start = Date.now()
2021
while (true) {
2122
const result = cb()
2223
if (result) {
2324
break
2425
}
26+
if (Date.now() - start > timeout) {
27+
throw new Error('waitUntilTrue timed out')
28+
}
2529
await sleep(1)
2630
}
2731
}
@@ -122,6 +126,13 @@ test('works with concurrent emits', async (t) => {
122126
await waitUntilTrue(() => emitCount === expected)
123127
})
124128

129+
test('throws TypeError when onConnectionError is not a function', (t) => {
130+
t.throws(() => new PGPubSub({ ...opts, onConnectionError: 'not-a-function' }), {
131+
instanceOf: TypeError,
132+
message: 'onConnectionError must be a function'
133+
})
134+
})
135+
125136
test('retries and throws when initial connection fails', async (t) => {
126137
const pubsub = new PGPubSub({
127138
...opts,
@@ -142,6 +153,28 @@ test('retries and throws when initial connection fails', async (t) => {
142153
})
143154
})
144155

156+
test('onConnectionError is not called during initial connect failure', async (t) => {
157+
const errors = []
158+
const pubsub = new PGPubSub({
159+
...opts,
160+
reconnectMaxRetries: 2,
161+
host: 'xxx',
162+
onConnectionError: (err) => {
163+
errors.push(err)
164+
}
165+
})
166+
167+
const err = await t.throwsAsync(() => pubsub.connect())
168+
t.is(err.message, '[PGPubSub]: max reconnect attempts reached, aborting')
169+
170+
t.is(errors.length, 0)
171+
t.is(pubsub.state, 'closing')
172+
173+
t.teardown(() => {
174+
pubsub.close()
175+
})
176+
})
177+
145178
test('connection can be re-established', async (t) => {
146179
const channel = getChannel()
147180
const pubsub = new PGPubSub({
@@ -240,6 +273,78 @@ test('connection cannot be re-established', async (t) => {
240273
})
241274
})
242275

276+
test('onConnectionError callback receives error when retries exhausted', async (t) => {
277+
const channel = getChannel()
278+
const errors = []
279+
const pubsub = new PGPubSub({
280+
...opts,
281+
reconnectMaxRetries: 1,
282+
onConnectionError: (err) => {
283+
errors.push(err)
284+
}
285+
})
286+
287+
await pubsub.connect()
288+
t.is(pubsub.state, 'connected')
289+
290+
await new Promise(resolve => {
291+
pubsub.on(channel, (payload) => {
292+
t.deepEqual(payload, 'this-is-the-payload')
293+
resolve()
294+
})
295+
pubsub.emit(channel, 'this-is-the-payload')
296+
})
297+
298+
// emulate pg client emitting an error after retries exceeded
299+
pubsub.reconnectRetries = 2
300+
pubsub.client.emit('error', new Error('connection reset'))
301+
302+
await sleep(10)
303+
304+
await waitUntilTrue(() => pubsub.state === 'closing')
305+
t.is(errors.length, 1)
306+
t.is(errors[0].message, '[PGPubSub]: max reconnect attempts reached, aborting')
307+
308+
t.teardown(() => {
309+
pubsub.close()
310+
})
311+
})
312+
313+
test('onConnectionError callback receives error when reconnect exhausts retries', async (t) => {
314+
const channel = getChannel()
315+
const errors = []
316+
const pubsub = new PGPubSub({
317+
...opts,
318+
reconnectMaxRetries: 1,
319+
onConnectionError: (err) => {
320+
errors.push(err)
321+
}
322+
})
323+
324+
await pubsub.connect()
325+
t.is(pubsub.state, 'connected')
326+
327+
await new Promise(resolve => {
328+
pubsub.on(channel, (payload) => {
329+
t.deepEqual(payload, 'this-is-the-payload')
330+
resolve()
331+
})
332+
pubsub.emit(channel, 'this-is-the-payload')
333+
})
334+
335+
// point host to invalid address so _reconnect() will fail and exhaust retries
336+
pubsub.opts.host = 'xxx'
337+
pubsub.client.emit('error', new Error('connection reset'))
338+
339+
await waitUntilTrue(() => errors.length === 1)
340+
t.is(errors[0].message, '[PGPubSub]: max reconnect attempts reached, aborting')
341+
t.is(pubsub.state, 'closing')
342+
343+
t.teardown(() => {
344+
pubsub.close()
345+
})
346+
})
347+
243348
test('disabled reconnection with zero', async (t) => {
244349
const channel = getChannel()
245350
const pubsub = new PGPubSub({

types/index.d.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ import { ClientConfig } from 'pg'
33
export interface Options extends ClientConfig {
44
reconnectMaxRetries?: number;
55
maxPayloadSize?: number;
6+
onConnectionError?: (error: Error) => void;
67
}
78

89
declare class PGPubSub {

0 commit comments

Comments
 (0)