Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 33 additions & 24 deletions packages/core/realtime-js/src/RealtimeChannel.ts
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,7 @@ export default class RealtimeChannel {
if (!this.socket.isConnected()) {
this.socket.connect()
}
const delay = this.socket._recordChannelJoin()
if (this.channelAdapter.isClosed()) {
const {
config: { broadcast, presence, private: isPrivate },
Expand Down Expand Up @@ -299,30 +300,38 @@ export default class RealtimeChannel {

this._updateFilterMessage()

this.channelAdapter
.subscribe(timeout)
.receive('ok', async ({ postgres_changes }: PostgresChangesFilters) => {
// Only refresh auth if using callback-based tokens
if (!this.socket._isManualToken()) {
this.socket.setAuth()
}
if (postgres_changes === undefined) {
callback?.(REALTIME_SUBSCRIBE_STATES.SUBSCRIBED)
return
}

this._updatePostgresBindings(postgres_changes, callback)
})
.receive('error', (error: { [key: string]: any }) => {
this.state = CHANNEL_STATES.errored
callback?.(
REALTIME_SUBSCRIBE_STATES.CHANNEL_ERROR,
new Error(JSON.stringify(Object.values(error).join(', ') || 'error'))
)
})
.receive('timeout', () => {
callback?.(REALTIME_SUBSCRIBE_STATES.TIMED_OUT)
})
const doSubscribe = () => {
this.channelAdapter
.subscribe(timeout)
.receive('ok', async ({ postgres_changes }: PostgresChangesFilters) => {
// Only refresh auth if using callback-based tokens
if (!this.socket._isManualToken()) {
this.socket.setAuth()
}
if (postgres_changes === undefined) {
callback?.(REALTIME_SUBSCRIBE_STATES.SUBSCRIBED)
return
}

this._updatePostgresBindings(postgres_changes, callback)
})
.receive('error', (error: { [key: string]: any }) => {
this.state = CHANNEL_STATES.errored
callback?.(
REALTIME_SUBSCRIBE_STATES.CHANNEL_ERROR,
new Error(JSON.stringify(Object.values(error).join(', ') || 'error'))
)
})
.receive('timeout', () => {
callback?.(REALTIME_SUBSCRIBE_STATES.TIMED_OUT)
})
}

if (delay > 0) {
setTimeout(doSubscribe, delay)
} else {
doSubscribe()
}
}
return this
}
Expand Down
31 changes: 30 additions & 1 deletion packages/core/realtime-js/src/RealtimeClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,11 @@ import {
} from './lib/constants'

import Serializer from './lib/serializer'
import {
RateLimiter,
DEFAULT_SUBSCRIPTION_WARNING_CONFIG,
type SubscriptionWarningConfig,
} from './lib/rate-limiter'
import { httpEndpointURL } from './lib/transformers'
import RealtimeChannel from './RealtimeChannel'
import type { RealtimeChannelOptions } from './RealtimeChannel'
Expand Down Expand Up @@ -72,6 +77,7 @@ export type RealtimeClientOptions = {
worker?: boolean
workerUrl?: string
accessToken?: () => Promise<string | null>
subscriptionWarnings?: Partial<SubscriptionWarningConfig> | false
}

const WORKER_SCRIPT = `
Expand Down Expand Up @@ -177,6 +183,7 @@ export default class RealtimeClient {
private _authPromise: Promise<void> | null = null
private _workerHeartbeatTimer: HeartbeatTimer = undefined
private _pendingWorkerHeartbeatRef: string | null = null
private _rateLimiter: RateLimiter | null = null

/**
* Initializes the Socket.
Expand Down Expand Up @@ -223,6 +230,13 @@ export default class RealtimeClient {
this.httpEndpoint = httpEndpointURL(endPoint)

this.fetch = this._resolveFetch(options?.fetch)

if (options?.subscriptionWarnings !== false) {
this._rateLimiter = new RateLimiter({
...DEFAULT_SUBSCRIPTION_WARNING_CONFIG,
...options?.subscriptionWarnings,
})
}
}

/**
Expand Down Expand Up @@ -412,13 +426,28 @@ export default class RealtimeClient {
if (!exists) {
const chan = new RealtimeChannel(`realtime:${topic}`, params, this)
this.channels.push(chan)

return chan
} else {
return exists
}
}

/**
* Records a channel join attempt for rate limit tracking.
* Called by RealtimeChannel.subscribe() before initiating the join.
* @internal
*/
_recordChannelJoin(): number {
if (!this._rateLimiter) return 0

this._rateLimiter.recordJoin()
const { warning, delayMs } = this._rateLimiter.evaluate()
if (warning) {
this.log('warn', warning.message)
}
return delayMs
}

/**
* Push out a message if the socket is connected.
*
Expand Down
88 changes: 88 additions & 0 deletions packages/core/realtime-js/src/lib/rate-limiter.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
export type SubscriptionWarningConfig = {
readonly joinRatePerSecond: number
readonly joinDelayMs: number
}

export const DEFAULT_SUBSCRIPTION_WARNING_CONFIG = {
joinRatePerSecond: 20,
joinDelayMs: 100,
} as const satisfies SubscriptionWarningConfig

export type SubscriptionWarning = {
readonly current: number
readonly threshold: number
readonly message: string
}

export type RateLimitEvaluation = {
readonly warning: SubscriptionWarning | null
readonly delayMs: number
}

const WARN_COOLDOWN_MS = 10_000
const TROUBLESHOOTING_URL =
'https://supabase.com/docs/guides/troubleshooting/realtime-too-many-channels-error'

class RingBuffer {
private readonly slots: Float64Array
private head = 0
private count = 0

constructor(capacity: number) {
this.slots = new Float64Array(capacity)
}

push(ts: number): void {
this.slots[this.head] = ts
this.head = (this.head + 1) % this.slots.length
if (this.count < this.slots.length) this.count++
}

countWithin(ms: number, now: number): number {
const cutoff = now - ms
let n = 0
for (let i = 0; i < this.count; i++) {
const idx = (this.head - 1 - i + this.slots.length) % this.slots.length
if (this.slots[idx] > cutoff) n++
}
return n
}
}

export class RateLimiter {
private readonly joins: RingBuffer
private lastWarnedAt = 0

constructor(private readonly config: SubscriptionWarningConfig) {
this.joins = new RingBuffer(config.joinRatePerSecond * 3)
}

recordJoin(): void {
this.joins.push(Date.now())
}

evaluate(now: number = Date.now()): RateLimitEvaluation {
const { joinRatePerSecond, joinDelayMs } = this.config
const recentJoins = this.joins.countWithin(1000, now)

let warning: SubscriptionWarning | null = null
if (recentJoins >= joinRatePerSecond) {
if (now - this.lastWarnedAt > WARN_COOLDOWN_MS) {
this.lastWarnedAt = now
warning = {
current: recentJoins,
threshold: joinRatePerSecond,
message:
`Realtime: ${recentJoins} channel joins in the last second (threshold: ${joinRatePerSecond}). ` +
`You may be creating channels too rapidly — this often indicates a missing cleanup or channels being created in a render loop. ` +
`See: ${TROUBLESHOOTING_URL}`,
}
}
}

const excess = recentJoins - joinRatePerSecond
const delayMs = excess > 0 ? excess * joinDelayMs : 0

return { warning, delayMs }
}
}
Loading
Loading