import { CHANNEL_EVENTS, CHANNEL_STATES } from './lib/constants' import type { ChannelState } from './lib/constants' import type RealtimeClient from './RealtimeClient' import RealtimePresence, { REALTIME_PRESENCE_LISTEN_EVENTS } from './RealtimePresence' import type { RealtimePresenceJoinPayload, RealtimePresenceLeavePayload, RealtimePresenceState, } from './RealtimePresence' import * as Transformers from './lib/transformers' import { httpEndpointURL } from './lib/transformers' import { normalizeChannelError } from './lib/normalizeChannelError' import ChannelAdapter from './phoenix/channelAdapter' import { ChannelBindingCallback, ChannelOnErrorCallback } from './phoenix/types' import type { Timer } from './phoenix/types' type ReplayOption = { since: number limit?: number } export type RealtimeChannelOptions = { config: { /** * self option enables client to receive message it broadcast * ack option instructs server to acknowledge that broadcast message was received * replay option instructs server to replay broadcast messages */ broadcast?: { self?: boolean; ack?: boolean; replay?: ReplayOption } /** * key option is used to track presence payload across clients */ presence?: { key?: string; enabled?: boolean } /** * defines if the channel is private or not and if RLS policies will be used to check data */ private?: boolean } } type RealtimeChangesPayloadBase = { schema: string table: string } type RealtimeBroadcastChangesPayloadBase = RealtimeChangesPayloadBase & { id: string } export type RealtimeBroadcastInsertPayload = RealtimeBroadcastChangesPayloadBase & { operation: `${REALTIME_POSTGRES_CHANGES_LISTEN_EVENT.INSERT}` record: T old_record: null } export type RealtimeBroadcastUpdatePayload = RealtimeBroadcastChangesPayloadBase & { operation: `${REALTIME_POSTGRES_CHANGES_LISTEN_EVENT.UPDATE}` record: T old_record: T } export type RealtimeBroadcastDeletePayload = RealtimeBroadcastChangesPayloadBase & { operation: `${REALTIME_POSTGRES_CHANGES_LISTEN_EVENT.DELETE}` record: null old_record: T } export type RealtimeBroadcastPayload = | RealtimeBroadcastInsertPayload | RealtimeBroadcastUpdatePayload | RealtimeBroadcastDeletePayload type RealtimePostgresChangesPayloadBase = { schema: string table: string commit_timestamp: string errors: string[] } export type RealtimePostgresInsertPayload = RealtimePostgresChangesPayloadBase & { eventType: `${REALTIME_POSTGRES_CHANGES_LISTEN_EVENT.INSERT}` new: T old: {} } export type RealtimePostgresUpdatePayload = RealtimePostgresChangesPayloadBase & { eventType: `${REALTIME_POSTGRES_CHANGES_LISTEN_EVENT.UPDATE}` new: T old: Partial } export type RealtimePostgresDeletePayload = RealtimePostgresChangesPayloadBase & { eventType: `${REALTIME_POSTGRES_CHANGES_LISTEN_EVENT.DELETE}` new: {} old: Partial } export type RealtimePostgresChangesPayload = | RealtimePostgresInsertPayload | RealtimePostgresUpdatePayload | RealtimePostgresDeletePayload export type RealtimePostgresChangesFilter = { /** * The type of database change to listen to. */ event: T /** * The database schema to listen to. */ schema: string /** * The database table to listen to. */ table?: string /** * Receive database changes when filter is matched. */ filter?: string } export type RealtimeChannelSendResponse = 'ok' | 'timed out' | 'error' | (string & {}) export enum REALTIME_POSTGRES_CHANGES_LISTEN_EVENT { ALL = '*', INSERT = 'INSERT', UPDATE = 'UPDATE', DELETE = 'DELETE', } export enum REALTIME_LISTEN_TYPES { BROADCAST = 'broadcast', PRESENCE = 'presence', POSTGRES_CHANGES = 'postgres_changes', SYSTEM = 'system', } export enum REALTIME_SUBSCRIBE_STATES { SUBSCRIBED = 'SUBSCRIBED', TIMED_OUT = 'TIMED_OUT', CLOSED = 'CLOSED', CHANNEL_ERROR = 'CHANNEL_ERROR', } export const REALTIME_CHANNEL_STATES = CHANNEL_STATES type PostgresChangesFilters = { postgres_changes: { id: string event: string schema?: string table?: string filter?: string }[] } type Binding = { type: string filter: { [key: string]: any } callback: ChannelBindingCallback ref: number id?: string } /** A channel is the basic building block of Realtime * and narrows the scope of data flow to subscribed clients. * You can think of a channel as a chatroom where participants are able to see who's online * and send and receive messages. */ export default class RealtimeChannel { bindings: Record = {} subTopic: string broadcastEndpointURL: string private: boolean presence: RealtimePresence /** @internal */ channelAdapter: ChannelAdapter get state() { return this.channelAdapter.state } set state(state: ChannelState) { this.channelAdapter.state = state } get joinedOnce() { return this.channelAdapter.joinedOnce } get timeout() { return this.socket.timeout } get joinPush() { return this.channelAdapter.joinPush } get rejoinTimer(): Timer { return this.channelAdapter.rejoinTimer } /** * Creates a channel that can broadcast messages, sync presence, and listen to Postgres changes. * * The topic determines which realtime stream you are subscribing to. Config options let you * enable acknowledgement for broadcasts, presence tracking, or private channels. * * @category Realtime * * @example Using supabase-js (recommended) * ```ts * import { createClient } from '@supabase/supabase-js' * * const supabase = createClient('https://xyzcompany.supabase.co', 'your-publishable-key') * const channel = supabase.channel('room1') * channel * .on('broadcast', { event: 'cursor-pos' }, (payload) => console.log(payload)) * .subscribe() * ``` * * @example Standalone import for bundle-sensitive environments * ```ts * import RealtimeClient from '@supabase/realtime-js' * * const client = new RealtimeClient('https://xyzcompany.supabase.co/realtime/v1', { * params: { apikey: 'your-publishable-key' }, * }) * const channel = new RealtimeChannel('realtime:public:messages', { config: {} }, client) * ``` */ constructor( /** Topic name can be any string. */ public topic: string, public params: RealtimeChannelOptions = { config: {} }, public socket: RealtimeClient ) { this.subTopic = topic.replace(/^realtime:/i, '') this.params.config = { ...{ broadcast: { ack: false, self: false }, presence: { key: '', enabled: false }, private: false, }, ...params.config, } this.channelAdapter = new ChannelAdapter(this.socket.socketAdapter, topic, this.params) this.presence = new RealtimePresence(this) this._onClose(() => { this.socket._remove(this) }) this._updateFilterTransform() this.broadcastEndpointURL = httpEndpointURL(this.socket.socketAdapter.endPointURL()) this.private = this.params.config.private || false if (!this.private && this.params.config?.broadcast?.replay) { throw new Error( `tried to use replay on public channel '${this.topic}'. It must be a private channel.` ) } } /** * Subscribe registers your client with the server * @category Realtime */ subscribe( callback?: (status: REALTIME_SUBSCRIBE_STATES, err?: Error) => void, timeout = this.timeout ): RealtimeChannel { if (!this.socket.isConnected()) { this.socket.connect() } if (this.channelAdapter.isClosed()) { const { config: { broadcast, presence, private: isPrivate }, } = this.params const postgres_changes = this.bindings.postgres_changes?.map((r) => r.filter) ?? [] const presence_enabled = (!!this.bindings[REALTIME_LISTEN_TYPES.PRESENCE] && this.bindings[REALTIME_LISTEN_TYPES.PRESENCE].length > 0) || this.params.config.presence?.enabled === true const accessTokenPayload: { access_token?: string } = {} const config = { broadcast, presence: { ...presence, enabled: presence_enabled }, postgres_changes, private: isPrivate, } if (this.socket.accessTokenValue) { accessTokenPayload.access_token = this.socket.accessTokenValue } this._onError((reason: unknown) => { callback?.(REALTIME_SUBSCRIBE_STATES.CHANNEL_ERROR, normalizeChannelError(reason)) }) this._onClose(() => callback?.(REALTIME_SUBSCRIBE_STATES.CLOSED)) this.updateJoinPayload({ ...{ config }, ...accessTokenPayload }) this._updateFilterMessage() this.channelAdapter .subscribe(timeout) .receive('ok', async ({ postgres_changes }: PostgresChangesFilters) => { // Only refresh auth if using callback-based tokens if (!this.socket._isManualToken()) { this.socket.setAuth() } if (postgres_changes === undefined) { callback?.(REALTIME_SUBSCRIBE_STATES.SUBSCRIBED) return } this._updatePostgresBindings(postgres_changes, callback) }) .receive('error', (error: { [key: string]: any }) => { this.state = CHANNEL_STATES.errored const message = Object.values(error).join(', ') || 'error' callback?.(REALTIME_SUBSCRIBE_STATES.CHANNEL_ERROR, new Error(message, { cause: error })) }) .receive('timeout', () => { callback?.(REALTIME_SUBSCRIBE_STATES.TIMED_OUT) }) } return this } private _updatePostgresBindings( postgres_changes: PostgresChangesFilters['postgres_changes'], callback?: (status: REALTIME_SUBSCRIBE_STATES, err?: Error) => void ) { const clientPostgresBindings = this.bindings.postgres_changes const bindingsLen = clientPostgresBindings?.length ?? 0 const newPostgresBindings = [] for (let i = 0; i < bindingsLen; i++) { const clientPostgresBinding = clientPostgresBindings[i] const { filter: { event, schema, table, filter }, } = clientPostgresBinding const serverPostgresFilter = postgres_changes && postgres_changes[i] if ( serverPostgresFilter && serverPostgresFilter.event === event && RealtimeChannel.isFilterValueEqual(serverPostgresFilter.schema, schema) && RealtimeChannel.isFilterValueEqual(serverPostgresFilter.table, table) && RealtimeChannel.isFilterValueEqual(serverPostgresFilter.filter, filter) ) { newPostgresBindings.push({ ...clientPostgresBinding, id: serverPostgresFilter.id, }) } else { this.unsubscribe() this.state = CHANNEL_STATES.errored callback?.( REALTIME_SUBSCRIBE_STATES.CHANNEL_ERROR, new Error('mismatch between server and client bindings for postgres changes') ) return } } this.bindings.postgres_changes = newPostgresBindings if (this.state != CHANNEL_STATES.errored && callback) { callback(REALTIME_SUBSCRIBE_STATES.SUBSCRIBED) } } /** * Returns the current presence state for this channel. * * The shape is a map keyed by presence key (for example a user id) where each entry contains the * tracked metadata for that user. * * @category Realtime */ presenceState(): RealtimePresenceState { return this.presence.state as RealtimePresenceState } /** * Sends the supplied payload to the presence tracker so other subscribers can see that this * client is online. Use `untrack` to stop broadcasting presence for the same key. * * @category Realtime */ async track( payload: { [key: string]: any }, opts: { [key: string]: any } = {} ): Promise { return await this.send( { type: 'presence', event: 'track', payload, }, opts.timeout || this.timeout ) } /** * Removes the current presence state for this client. * * @category Realtime */ async untrack(opts: { [key: string]: any } = {}): Promise { return await this.send( { type: 'presence', event: 'untrack', }, opts ) } /** * Creates an event handler that listens to changes. */ on( type: `${REALTIME_LISTEN_TYPES.PRESENCE}`, filter: { event: `${REALTIME_PRESENCE_LISTEN_EVENTS.SYNC}` }, callback: () => void ): RealtimeChannel on( type: `${REALTIME_LISTEN_TYPES.PRESENCE}`, filter: { event: `${REALTIME_PRESENCE_LISTEN_EVENTS.JOIN}` }, callback: (payload: RealtimePresenceJoinPayload) => void ): RealtimeChannel on( type: `${REALTIME_LISTEN_TYPES.PRESENCE}`, filter: { event: `${REALTIME_PRESENCE_LISTEN_EVENTS.LEAVE}` }, callback: (payload: RealtimePresenceLeavePayload) => void ): RealtimeChannel on( type: `${REALTIME_LISTEN_TYPES.PRESENCE}`, filter: { event: '*' }, callback: (payload?: RealtimePresenceJoinPayload | RealtimePresenceLeavePayload) => void ): RealtimeChannel on( type: `${REALTIME_LISTEN_TYPES.POSTGRES_CHANGES}`, filter: RealtimePostgresChangesFilter<`${REALTIME_POSTGRES_CHANGES_LISTEN_EVENT.ALL}`>, callback: (payload: RealtimePostgresChangesPayload) => void ): RealtimeChannel on( type: `${REALTIME_LISTEN_TYPES.POSTGRES_CHANGES}`, filter: RealtimePostgresChangesFilter<`${REALTIME_POSTGRES_CHANGES_LISTEN_EVENT.INSERT}`>, callback: (payload: RealtimePostgresInsertPayload) => void ): RealtimeChannel on( type: `${REALTIME_LISTEN_TYPES.POSTGRES_CHANGES}`, filter: RealtimePostgresChangesFilter<`${REALTIME_POSTGRES_CHANGES_LISTEN_EVENT.UPDATE}`>, callback: (payload: RealtimePostgresUpdatePayload) => void ): RealtimeChannel on( type: `${REALTIME_LISTEN_TYPES.POSTGRES_CHANGES}`, filter: RealtimePostgresChangesFilter<`${REALTIME_POSTGRES_CHANGES_LISTEN_EVENT.DELETE}`>, callback: (payload: RealtimePostgresDeletePayload) => void ): RealtimeChannel on( type: `${REALTIME_LISTEN_TYPES.POSTGRES_CHANGES}`, filter: RealtimePostgresChangesFilter<`${REALTIME_POSTGRES_CHANGES_LISTEN_EVENT}`>, callback: (payload: RealtimePostgresChangesPayload) => void ): RealtimeChannel /** * The following is placed here to display on supabase.com/docs/reference/javascript/subscribe. * @param type One of "broadcast", "presence", or "postgres_changes". * @param filter Custom object specific to the Realtime feature detailing which payloads to receive. * @param callback Function to be invoked when event handler is triggered. */ on( type: `${REALTIME_LISTEN_TYPES.BROADCAST}`, filter: { event: string }, callback: (payload: { type: `${REALTIME_LISTEN_TYPES.BROADCAST}` event: string meta?: { replayed?: boolean id: string } [key: string]: any }) => void ): RealtimeChannel on( type: `${REALTIME_LISTEN_TYPES.BROADCAST}`, filter: { event: string }, callback: (payload: { type: `${REALTIME_LISTEN_TYPES.BROADCAST}` event: string meta?: { replayed?: boolean id: string } payload: T }) => void ): RealtimeChannel on>( type: `${REALTIME_LISTEN_TYPES.BROADCAST}`, filter: { event: REALTIME_POSTGRES_CHANGES_LISTEN_EVENT.ALL }, callback: (payload: { type: `${REALTIME_LISTEN_TYPES.BROADCAST}` event: REALTIME_POSTGRES_CHANGES_LISTEN_EVENT.ALL payload: RealtimeBroadcastPayload }) => void ): RealtimeChannel on( type: `${REALTIME_LISTEN_TYPES.BROADCAST}`, filter: { event: REALTIME_POSTGRES_CHANGES_LISTEN_EVENT.INSERT }, callback: (payload: { type: `${REALTIME_LISTEN_TYPES.BROADCAST}` event: REALTIME_POSTGRES_CHANGES_LISTEN_EVENT.INSERT payload: RealtimeBroadcastInsertPayload }) => void ): RealtimeChannel on( type: `${REALTIME_LISTEN_TYPES.BROADCAST}`, filter: { event: REALTIME_POSTGRES_CHANGES_LISTEN_EVENT.UPDATE }, callback: (payload: { type: `${REALTIME_LISTEN_TYPES.BROADCAST}` event: REALTIME_POSTGRES_CHANGES_LISTEN_EVENT.UPDATE payload: RealtimeBroadcastUpdatePayload }) => void ): RealtimeChannel on( type: `${REALTIME_LISTEN_TYPES.BROADCAST}`, filter: { event: REALTIME_POSTGRES_CHANGES_LISTEN_EVENT.DELETE }, callback: (payload: { type: `${REALTIME_LISTEN_TYPES.BROADCAST}` event: REALTIME_POSTGRES_CHANGES_LISTEN_EVENT.DELETE payload: RealtimeBroadcastDeletePayload }) => void ): RealtimeChannel on( type: `${REALTIME_LISTEN_TYPES.SYSTEM}`, filter: {}, callback: (payload: any) => void ): RealtimeChannel /** * Listen to realtime events on this channel. * @category Realtime * * @remarks * - By default, Broadcast and Presence are enabled for all projects. * - By default, listening to database changes is disabled for new projects due to database performance and security concerns. You can turn it on by managing Realtime's [replication](/docs/guides/api#realtime-api-overview). * - You can receive the "previous" data for updates and deletes by setting the table's `REPLICA IDENTITY` to `FULL` (e.g., `ALTER TABLE your_table REPLICA IDENTITY FULL;`). * - Row level security is not applied to delete statements. When RLS is enabled and replica identity is set to full, only the primary key is sent to clients. * * @example Listen to broadcast messages * ```js * const channel = supabase.channel("room1") * * channel.on("broadcast", { event: "cursor-pos" }, (payload) => { * console.log("Cursor position received!", payload); * }).subscribe((status) => { * if (status === "SUBSCRIBED") { * channel.send({ * type: "broadcast", * event: "cursor-pos", * payload: { x: Math.random(), y: Math.random() }, * }); * } * }); * ``` * * @example Listen to presence sync * ```js * const channel = supabase.channel('room1') * channel * .on('presence', { event: 'sync' }, () => { * console.log('Synced presence state: ', channel.presenceState()) * }) * .subscribe(async (status) => { * if (status === 'SUBSCRIBED') { * await channel.track({ online_at: new Date().toISOString() }) * } * }) * ``` * * @example Listen to presence join * ```js * const channel = supabase.channel('room1') * channel * .on('presence', { event: 'join' }, ({ newPresences }) => { * console.log('Newly joined presences: ', newPresences) * }) * .subscribe(async (status) => { * if (status === 'SUBSCRIBED') { * await channel.track({ online_at: new Date().toISOString() }) * } * }) * ``` * * @example Listen to presence leave * ```js * const channel = supabase.channel('room1') * channel * .on('presence', { event: 'leave' }, ({ leftPresences }) => { * console.log('Newly left presences: ', leftPresences) * }) * .subscribe(async (status) => { * if (status === 'SUBSCRIBED') { * await channel.track({ online_at: new Date().toISOString() }) * await channel.untrack() * } * }) * ``` * * @example Listen to all database changes * ```js * supabase * .channel('room1') * .on('postgres_changes', { event: '*', schema: '*' }, payload => { * console.log('Change received!', payload) * }) * .subscribe() * ``` * * @example Listen to a specific table * ```js * supabase * .channel('room1') * .on('postgres_changes', { event: '*', schema: 'public', table: 'countries' }, payload => { * console.log('Change received!', payload) * }) * .subscribe() * ``` * * @example Listen to inserts * ```js * supabase * .channel('room1') * .on('postgres_changes', { event: 'INSERT', schema: 'public', table: 'countries' }, payload => { * console.log('Change received!', payload) * }) * .subscribe() * ``` * * @exampleDescription Listen to updates * By default, Supabase will send only the updated record. If you want to receive the previous values as well you can * enable full replication for the table you are listening to: * * ```sql * alter table "your_table" replica identity full; * ``` * * @example Listen to updates * ```js * supabase * .channel('room1') * .on('postgres_changes', { event: 'UPDATE', schema: 'public', table: 'countries' }, payload => { * console.log('Change received!', payload) * }) * .subscribe() * ``` * * @exampleDescription Listen to deletes * By default, Supabase does not send deleted records. If you want to receive the deleted record you can * enable full replication for the table you are listening to: * * ```sql * alter table "your_table" replica identity full; * ``` * * @example Listen to deletes * ```js * supabase * .channel('room1') * .on('postgres_changes', { event: 'DELETE', schema: 'public', table: 'countries' }, payload => { * console.log('Change received!', payload) * }) * .subscribe() * ``` * * @exampleDescription Listen to multiple events * You can chain listeners if you want to listen to multiple events for each table. * * @example Listen to multiple events * ```js * supabase * .channel('room1') * .on('postgres_changes', { event: 'INSERT', schema: 'public', table: 'countries' }, handleRecordInserted) * .on('postgres_changes', { event: 'DELETE', schema: 'public', table: 'countries' }, handleRecordDeleted) * .subscribe() * ``` * * @exampleDescription Listen to row level changes * You can listen to individual rows using the format `{table}:{col}=eq.{val}` - where `{col}` is the column name, and `{val}` is the value which you want to match. * * @example Listen to row level changes * ```js * supabase * .channel('room1') * .on('postgres_changes', { event: 'UPDATE', schema: 'public', table: 'countries', filter: 'id=eq.200' }, handleRecordUpdated) * .subscribe() * ``` */ on( type: `${REALTIME_LISTEN_TYPES}`, filter: { event: string; [key: string]: string }, callback: (payload: any) => void ): RealtimeChannel { const stateCheck = this.channelAdapter.isJoined() || this.channelAdapter.isJoining() const typeCheck = type === REALTIME_LISTEN_TYPES.PRESENCE || type === REALTIME_LISTEN_TYPES.POSTGRES_CHANGES if (stateCheck && typeCheck) { this.socket.log( 'channel', `cannot add \`${type}\` callbacks for ${this.topic} after \`subscribe()\`.` ) throw new Error(`cannot add \`${type}\` callbacks for ${this.topic} after \`subscribe()\`.`) } return this._on(type, filter, callback) } /** * Sends a broadcast message explicitly via REST API. * * This method always uses the REST API endpoint regardless of WebSocket connection state. * Useful when you want to guarantee REST delivery or when gradually migrating from implicit REST fallback. * * @param event The name of the broadcast event * @param payload Payload to be sent (required) * @param opts Options including timeout * @returns Promise resolving to object with success status, and error details if failed * * @category Realtime */ async httpSend( event: string, payload: any, opts: { timeout?: number } = {} ): Promise<{ success: true } | { success: false; status: number; error: string }> { if (payload === undefined || payload === null) { return Promise.reject(new Error('Payload is required for httpSend()')) } const headers: Record = { apikey: this.socket.apiKey ? this.socket.apiKey : '', 'Content-Type': 'application/json', } if (this.socket.accessTokenValue) { headers['Authorization'] = `Bearer ${this.socket.accessTokenValue}` } const options = { method: 'POST', headers, body: JSON.stringify({ messages: [ { topic: this.subTopic, event, payload: payload, private: this.private, }, ], }), } const response = await this._fetchWithTimeout( this.broadcastEndpointURL, options, opts.timeout ?? this.timeout ) if (response.status === 202) { return { success: true } } let errorMessage = response.statusText try { const errorBody = await response.json() errorMessage = errorBody.error || errorBody.message || errorMessage } catch {} return Promise.reject(new Error(errorMessage)) } /** * Sends a message into the channel. * * @param args Arguments to send to channel * @param args.type The type of event to send * @param args.event The name of the event being sent * @param args.payload Payload to be sent * @param opts Options to be used during the send process * * @category Realtime * * @remarks * - When using REST you don't need to subscribe to the channel * - REST calls are only available from 2.37.0 onwards * * @example Send a message via websocket * ```js * const channel = supabase.channel('room1') * * channel.subscribe((status) => { * if (status === 'SUBSCRIBED') { * channel.send({ * type: 'broadcast', * event: 'cursor-pos', * payload: { x: Math.random(), y: Math.random() }, * }) * } * }) * ``` * * @exampleResponse Send a message via websocket * ```js * ok | timed out | error * ``` * * @example Send a message via REST * ```js * supabase * .channel('room1') * .httpSend('cursor-pos', { x: Math.random(), y: Math.random() }) * ``` */ async send( args: { type: 'broadcast' | 'presence' | 'postgres_changes' event: string payload?: any [key: string]: any }, opts: { [key: string]: any } = {} ): Promise { if (!this.channelAdapter.canPush() && args.type === 'broadcast') { console.warn( 'Realtime send() is automatically falling back to REST API. ' + 'This behavior will be deprecated in the future. ' + 'Please use httpSend() explicitly for REST delivery.' ) const { event, payload: endpoint_payload } = args const headers: Record = { apikey: this.socket.apiKey ? this.socket.apiKey : '', 'Content-Type': 'application/json', } if (this.socket.accessTokenValue) { headers['Authorization'] = `Bearer ${this.socket.accessTokenValue}` } const options = { method: 'POST', headers, body: JSON.stringify({ messages: [ { topic: this.subTopic, event, payload: endpoint_payload, private: this.private, }, ], }), } try { const response = await this._fetchWithTimeout( this.broadcastEndpointURL, options, opts.timeout ?? this.timeout ) await response.body?.cancel() return response.ok ? 'ok' : 'error' } catch (error) { if (error instanceof Error && error.name === 'AbortError') { return 'timed out' } else { return 'error' } } } else { return new Promise((resolve) => { const push = this.channelAdapter.push(args.type, args, opts.timeout || this.timeout) if (args.type === 'broadcast' && !this.params?.config?.broadcast?.ack) { resolve('ok') } push.receive('ok', () => resolve('ok')) push.receive('error', () => resolve('error')) push.receive('timeout', () => resolve('timed out')) }) } } /** * Updates the payload that will be sent the next time the channel joins (reconnects). * Useful for rotating access tokens or updating config without re-creating the channel. * * @category Realtime */ updateJoinPayload(payload: Record) { this.channelAdapter.updateJoinPayload(payload) } /** * Leaves the channel. * * Unsubscribes from server events, and instructs channel to terminate on server. * Triggers onClose() hooks. * * To receive leave acknowledgements, use the a `receive` hook to bind to the server ack, ie: * channel.unsubscribe().receive("ok", () => alert("left!") ) * * @category Realtime */ async unsubscribe(timeout = this.timeout) { return new Promise((resolve) => { this.channelAdapter .unsubscribe(timeout) .receive('ok', () => resolve('ok')) .receive('timeout', () => resolve('timed out')) .receive('error', () => resolve('error')) }) } /** * Destroys and stops related timers. * * @category Realtime */ teardown() { this.channelAdapter.teardown() } /** @internal */ async _fetchWithTimeout(url: string, options: { [key: string]: any }, timeout: number) { const controller = new AbortController() const id = setTimeout(() => controller.abort(), timeout) const response = await this.socket.fetch(url, { ...options, signal: controller.signal, }) clearTimeout(id) return response } /** @internal */ _on(type: string, filter: { [key: string]: any }, callback: ChannelBindingCallback) { const typeLower = type.toLocaleLowerCase() const ref = this.channelAdapter.on(type, callback) const binding: Binding = { type: typeLower, filter: filter, callback: callback, ref: ref, } if (this.bindings[typeLower]) { this.bindings[typeLower].push(binding) } else { this.bindings[typeLower] = [binding] } this._updateFilterMessage() return this } /** * Registers a callback that will be executed when the channel closes. * * @internal */ private _onClose(callback: ChannelBindingCallback) { this.channelAdapter.onClose(callback) } /** * Registers a callback that will be executed when the channel encounteres an error. * * @internal */ private _onError(callback: ChannelOnErrorCallback) { this.channelAdapter.onError(callback) } /** @internal */ private _updateFilterMessage() { this.channelAdapter.updateFilterBindings((binding, payload: any, ref) => { const typeLower = binding.event.toLocaleLowerCase() if (this._notThisChannelEvent(typeLower, ref)) { return false } const bind = this.bindings[typeLower]?.find((bind) => bind.ref === binding.ref) if (!bind) { return true } if (['broadcast', 'presence', 'postgres_changes'].includes(typeLower)) { if ('id' in bind) { const bindId = bind.id const bindEvent = bind.filter?.event return ( bindId && payload.ids?.includes(bindId) && (bindEvent === '*' || bindEvent?.toLocaleLowerCase() === payload.data?.type.toLocaleLowerCase()) ) } else { const bindEvent = bind?.filter?.event?.toLocaleLowerCase() return bindEvent === '*' || bindEvent === payload?.event?.toLocaleLowerCase() } } else { return bind.type.toLocaleLowerCase() === typeLower } }) } /** @internal */ private _notThisChannelEvent(event: string, ref?: string | null) { const { close, error, leave, join } = CHANNEL_EVENTS const events: string[] = [close, error, leave, join] return ref && events.includes(event) && ref !== this.joinPush.ref } /** @internal */ private _updateFilterTransform() { this.channelAdapter.updatePayloadTransform((event, payload: any, ref) => { if (typeof payload === 'object' && 'ids' in payload) { const postgresChanges = payload.data const { schema, table, commit_timestamp, type, errors } = postgresChanges const enrichedPayload = { schema: schema, table: table, commit_timestamp: commit_timestamp, eventType: type, new: {}, old: {}, errors: errors, } return { ...enrichedPayload, ...this._getPayloadRecords(postgresChanges), } } return payload }) } copyBindings(other: RealtimeChannel) { if (this.joinedOnce) { throw new Error('cannot copy bindings into joined channel') } for (const kind in other.bindings) { for (const binding of other.bindings[kind]) { this._on(binding.type, binding.filter, binding.callback) } } } /** * Compares two optional filter values for equality. * Treats undefined, null, and empty string as equivalent empty values. * @internal */ private static isFilterValueEqual( serverValue: string | undefined | null, clientValue: string | undefined ): boolean { const normalizedServer = serverValue ?? undefined const normalizedClient = clientValue ?? undefined return normalizedServer === normalizedClient } /** @internal */ private _getPayloadRecords(payload: any) { const records = { new: {}, old: {}, } if (payload.type === 'INSERT' || payload.type === 'UPDATE') { records.new = Transformers.convertChangeData(payload.columns, payload.record) } if (payload.type === 'UPDATE' || payload.type === 'DELETE') { records.old = Transformers.convertChangeData(payload.columns, payload.old_record) } return records } }