Skip to content

Commit 8bfa170

Browse files
committed
fix(realtime): throttle connections based on subscription attempts
1 parent d86463f commit 8bfa170

12 files changed

Lines changed: 527 additions & 30 deletions

File tree

packages/core/realtime-js/src/RealtimeChannel.ts

Lines changed: 33 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -266,6 +266,7 @@ export default class RealtimeChannel {
266266
if (!this.socket.isConnected()) {
267267
this.socket.connect()
268268
}
269+
const delay = this.socket._recordChannelJoin()
269270
if (this.channelAdapter.isClosed()) {
270271
const {
271272
config: { broadcast, presence, private: isPrivate },
@@ -299,30 +300,38 @@ export default class RealtimeChannel {
299300

300301
this._updateFilterMessage()
301302

302-
this.channelAdapter
303-
.subscribe(timeout)
304-
.receive('ok', async ({ postgres_changes }: PostgresChangesFilters) => {
305-
// Only refresh auth if using callback-based tokens
306-
if (!this.socket._isManualToken()) {
307-
this.socket.setAuth()
308-
}
309-
if (postgres_changes === undefined) {
310-
callback?.(REALTIME_SUBSCRIBE_STATES.SUBSCRIBED)
311-
return
312-
}
313-
314-
this._updatePostgresBindings(postgres_changes, callback)
315-
})
316-
.receive('error', (error: { [key: string]: any }) => {
317-
this.state = CHANNEL_STATES.errored
318-
callback?.(
319-
REALTIME_SUBSCRIBE_STATES.CHANNEL_ERROR,
320-
new Error(JSON.stringify(Object.values(error).join(', ') || 'error'))
321-
)
322-
})
323-
.receive('timeout', () => {
324-
callback?.(REALTIME_SUBSCRIBE_STATES.TIMED_OUT)
325-
})
303+
const doSubscribe = () => {
304+
this.channelAdapter
305+
.subscribe(timeout)
306+
.receive('ok', async ({ postgres_changes }: PostgresChangesFilters) => {
307+
// Only refresh auth if using callback-based tokens
308+
if (!this.socket._isManualToken()) {
309+
this.socket.setAuth()
310+
}
311+
if (postgres_changes === undefined) {
312+
callback?.(REALTIME_SUBSCRIBE_STATES.SUBSCRIBED)
313+
return
314+
}
315+
316+
this._updatePostgresBindings(postgres_changes, callback)
317+
})
318+
.receive('error', (error: { [key: string]: any }) => {
319+
this.state = CHANNEL_STATES.errored
320+
callback?.(
321+
REALTIME_SUBSCRIBE_STATES.CHANNEL_ERROR,
322+
new Error(JSON.stringify(Object.values(error).join(', ') || 'error'))
323+
)
324+
})
325+
.receive('timeout', () => {
326+
callback?.(REALTIME_SUBSCRIBE_STATES.TIMED_OUT)
327+
})
328+
}
329+
330+
if (delay > 0) {
331+
setTimeout(doSubscribe, delay)
332+
} else {
333+
doSubscribe()
334+
}
326335
}
327336
return this
328337
}

packages/core/realtime-js/src/RealtimeClient.ts

Lines changed: 30 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,11 @@ import {
1111
} from './lib/constants'
1212

1313
import Serializer from './lib/serializer'
14+
import {
15+
RateLimiter,
16+
DEFAULT_SUBSCRIPTION_WARNING_CONFIG,
17+
type SubscriptionWarningConfig,
18+
} from './lib/rate-limiter'
1419
import { httpEndpointURL } from './lib/transformers'
1520
import RealtimeChannel from './RealtimeChannel'
1621
import type { RealtimeChannelOptions } from './RealtimeChannel'
@@ -72,6 +77,7 @@ export type RealtimeClientOptions = {
7277
worker?: boolean
7378
workerUrl?: string
7479
accessToken?: () => Promise<string | null>
80+
subscriptionWarnings?: Partial<SubscriptionWarningConfig> | false
7581
}
7682

7783
const WORKER_SCRIPT = `
@@ -177,6 +183,7 @@ export default class RealtimeClient {
177183
private _authPromise: Promise<void> | null = null
178184
private _workerHeartbeatTimer: HeartbeatTimer = undefined
179185
private _pendingWorkerHeartbeatRef: string | null = null
186+
private _rateLimiter: RateLimiter | null = null
180187

181188
/**
182189
* Initializes the Socket.
@@ -223,6 +230,13 @@ export default class RealtimeClient {
223230
this.httpEndpoint = httpEndpointURL(endPoint)
224231

225232
this.fetch = this._resolveFetch(options?.fetch)
233+
234+
if (options?.subscriptionWarnings !== false) {
235+
this._rateLimiter = new RateLimiter({
236+
...DEFAULT_SUBSCRIPTION_WARNING_CONFIG,
237+
...options?.subscriptionWarnings,
238+
})
239+
}
226240
}
227241

228242
/**
@@ -412,13 +426,28 @@ export default class RealtimeClient {
412426
if (!exists) {
413427
const chan = new RealtimeChannel(`realtime:${topic}`, params, this)
414428
this.channels.push(chan)
415-
416429
return chan
417430
} else {
418431
return exists
419432
}
420433
}
421434

435+
/**
436+
* Records a channel join attempt for rate limit tracking.
437+
* Called by RealtimeChannel.subscribe() before initiating the join.
438+
* @internal
439+
*/
440+
_recordChannelJoin(): number {
441+
if (!this._rateLimiter) return 0
442+
443+
this._rateLimiter.recordJoin()
444+
const warning = this._rateLimiter.check()
445+
if (warning) {
446+
this.log('warn', warning.message)
447+
}
448+
return this._rateLimiter.calculateDelay()
449+
}
450+
422451
/**
423452
* Push out a message if the socket is connected.
424453
*
Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
export type SubscriptionWarningConfig = {
2+
readonly joinRatePerSecond: number
3+
readonly joinDelayMs: number
4+
}
5+
6+
export const DEFAULT_SUBSCRIPTION_WARNING_CONFIG = {
7+
joinRatePerSecond: 20,
8+
joinDelayMs: 100,
9+
} as const satisfies SubscriptionWarningConfig
10+
11+
export type SubscriptionWarning = {
12+
readonly current: number
13+
readonly threshold: number
14+
readonly message: string
15+
}
16+
17+
const WARN_COOLDOWN_MS = 10_000
18+
const TROUBLESHOOTING_URL =
19+
'https://supabase.com/docs/guides/troubleshooting/realtime-too-many-channels-error'
20+
21+
class RingBuffer {
22+
private readonly slots: Float64Array
23+
private head = 0
24+
private count = 0
25+
26+
constructor(capacity: number) {
27+
this.slots = new Float64Array(capacity)
28+
}
29+
30+
push(ts: number): void {
31+
this.slots[this.head] = ts
32+
this.head = (this.head + 1) % this.slots.length
33+
if (this.count < this.slots.length) this.count++
34+
}
35+
36+
countWithin(ms: number): number {
37+
const cutoff = Date.now() - ms
38+
let n = 0
39+
for (let i = 0; i < this.count; i++) {
40+
const idx = (this.head - 1 - i + this.slots.length) % this.slots.length
41+
if (this.slots[idx] > cutoff) n++
42+
else break
43+
}
44+
return n
45+
}
46+
47+
purge(): void {
48+
this.head = 0
49+
this.count = 0
50+
}
51+
}
52+
53+
export class RateLimiter {
54+
private readonly joins: RingBuffer
55+
private lastWarnedAt = 0
56+
57+
constructor(private readonly config: SubscriptionWarningConfig) {
58+
// capacity: 3× the per-second threshold — enough to accurately track one burst
59+
this.joins = new RingBuffer(config.joinRatePerSecond * 3)
60+
}
61+
62+
recordJoin(): void {
63+
this.joins.push(Date.now())
64+
}
65+
66+
check(): SubscriptionWarning | null {
67+
const now = Date.now()
68+
const { joinRatePerSecond } = this.config
69+
70+
const recentJoins = this.joins.countWithin(1000)
71+
if (recentJoins >= joinRatePerSecond) {
72+
if (now - this.lastWarnedAt > WARN_COOLDOWN_MS) {
73+
this.lastWarnedAt = now
74+
return {
75+
current: recentJoins,
76+
threshold: joinRatePerSecond,
77+
message:
78+
`Realtime: ${recentJoins} channel joins in the last second (threshold: ${joinRatePerSecond}). ` +
79+
`You may be creating channels too rapidly — this often indicates a missing cleanup or channels being created in a render loop. ` +
80+
`See: ${TROUBLESHOOTING_URL}`,
81+
}
82+
}
83+
} else {
84+
// back under the rate limit — purge so stale burst data doesn't carry over
85+
this.joins.purge()
86+
}
87+
88+
return null
89+
}
90+
91+
calculateDelay(): number {
92+
const excess = this.joins.countWithin(1000) - this.config.joinRatePerSecond
93+
return excess > 0 ? excess * this.config.joinDelayMs : 0
94+
}
95+
}

0 commit comments

Comments
 (0)