import {Observable, of, throwError} from 'rxjs' import {filter, map} from 'rxjs/operators' import type {ObservableSanityClient, SanityClient} from '../SanityClient' import { type Any, type ListenEvent, type ListenEventName, type ListenOptions, type ListenParams, type MutationEvent, type OpenEvent, type ReconnectEvent, type ResetEvent, type ResumableListenEventNames, type ResumableListenOptions, type WelcomeBackEvent, type WelcomeEvent, } from '../types' import defaults from '../util/defaults' import {pick} from '../util/pick' import {_getDataUrl} from './dataMethods' import {encodeQueryString} from './encodeQueryString' import {connectEventSource} from './eventsource' import {eventSourcePolyfill} from './eventsourcePolyfill' import {reconnectOnConnectionFailure} from './reconnectOnConnectionFailure' // Limit is 16K for a _request_, eg including headers. Have to account for an // unknown range of headers, but an average EventSource request from Chrome seems // to have around 700 bytes of cruft, so let us account for 1.2K to be "safe" const MAX_URL_LENGTH = 16000 - 1200 const possibleOptions = [ 'includePreviousRevision', 'includeResult', 'includeMutations', 'includeAllVersions', 'visibility', 'effectFormat', 'enableResume', 'tag', ] const defaultOptions = { includeResult: true, } /** * Maps an array of listen events names to their corresponding listen event type, e.g: * ``` * type Test = MapListenEventNamesToListenEvents * // ^? WelcomeEvent * ``` * * @public */ export type MapListenEventNamesToListenEvents< R extends Record = Record, Events extends (ResumableListenEventNames | ListenEventName)[] = ( | ResumableListenEventNames | ListenEventName )[], > = Events extends (infer E)[] ? E extends 'welcome' ? WelcomeEvent : E extends 'mutation' ? MutationEvent : E extends 'reconnect' ? ReconnectEvent : E extends 'welcomeback' ? WelcomeBackEvent : E extends 'reset' ? ResetEvent : E extends 'open' ? OpenEvent : never : never /** * Maps a ListenOptions object and returns the Listen events opted for, e.g: * ``` * type Test = ListenEventFromOptions * // ^? WelcomeEvent | MutationEvent * ``` * * @public */ export type ListenEventFromOptions< R extends Record = Record, Opts extends ListenOptions | ResumableListenOptions | undefined = undefined, > = Opts extends ListenOptions | ResumableListenOptions ? Opts['events'] extends (ResumableListenEventNames | ListenEventName)[] ? MapListenEventNamesToListenEvents : // fall back to ListenEvent if opts events is present, but we can't infer the literal event names ListenEvent : MutationEvent /** * Set up a listener that will be notified when mutations occur on documents matching the provided query/filter. * * @param query - GROQ-filter to listen to changes for * @param params - Optional query parameters * @param options - Optional listener options * @public */ export function _listen = Record>( this: SanityClient | ObservableSanityClient, query: string, params?: ListenParams, ): Observable> /** * Set up a listener that will be notified when mutations occur on documents matching the provided query/filter. * * @param query - GROQ-filter to listen to changes for * @param params - Optional query parameters * @param options - Optional listener options * @public */ export function _listen< R extends Record = Record, Opts extends ListenOptions | ResumableListenOptions = ListenOptions | ResumableListenOptions, >( this: SanityClient | ObservableSanityClient, query: string, params?: ListenParams, options?: Opts, ): Observable> /** @public */ export function _listen< R extends Record = Record, Opts extends ListenOptions | ResumableListenOptions = ListenOptions | ResumableListenOptions, >( this: SanityClient | ObservableSanityClient, query: string, params?: ListenParams, opts: Opts = {} as Opts, ): Observable> { const {url, token, withCredentials, requestTagPrefix, headers: configHeaders} = this.config() const tag = opts.tag && requestTagPrefix ? [requestTagPrefix, opts.tag].join('.') : opts.tag const options = {...defaults(opts, defaultOptions), tag} const listenOpts = pick(options, possibleOptions) const qs = encodeQueryString({query, params, options: {tag, ...listenOpts}}) const uri = `${url}${_getDataUrl(this, 'listen', qs)}` if (uri.length > MAX_URL_LENGTH) { return throwError(() => new Error('Query too large for listener')) } const listenFor = (options.events ? options.events : ['mutation']) satisfies Opts['events'] const esOptions: EventSourceInit & {headers?: Record} = {} if (withCredentials) { esOptions.withCredentials = true } if (token || configHeaders) { esOptions.headers = {} if (token) { esOptions.headers.Authorization = `Bearer ${token}` } if (configHeaders) { Object.assign(esOptions.headers, configHeaders) } } const initEventSource = () => // use polyfill if there is no global EventSource or if we need to set headers (typeof EventSource === 'undefined' || esOptions.headers ? eventSourcePolyfill : of(EventSource) ).pipe(map((EventSource) => new EventSource(uri, esOptions))) return connectEventSource(initEventSource, listenFor).pipe( reconnectOnConnectionFailure(), filter((event) => listenFor.includes(event.type)), map((event) => ({ type: event.type, ...('data' in event ? (event.data as object) : {}), })), ) as Observable> }