import type { ChatMessage, DataPublishOptions, LocalParticipant, Participant, Room, } from '@cc-livekit/livekit-client'; import type { Subscriber } from 'rxjs'; import { Observable, filter, map } from 'rxjs'; import { createChatObserver, createDataObserver } from './room'; export const DataTopic = { CHAT: 'lk-chat-topic', CHAT_UPDATE: 'lk-chat-update-topic', } as const; /** Publish data from the LocalParticipant. */ export async function sendMessage( localParticipant: LocalParticipant, payload: Uint8Array, options: DataPublishOptions = {}, ) { const { reliable, destinationIdentities, topic } = options; await localParticipant.publishData(payload, { destinationIdentities, topic, reliable, }); } export interface BaseDataMessage { topic?: T; payload: Uint8Array; } export interface ReceivedDataMessage extends BaseDataMessage { from?: Participant; } export function setupDataMessageHandler( room: Room, topic?: T | [T, ...T[]], onMessage?: (msg: ReceivedDataMessage) => void, ) { const topics = Array.isArray(topic) ? topic : [topic]; /** Setup a Observable that returns all data messages belonging to a topic. */ const messageObservable = createDataObserver(room).pipe( filter( ([, , , messageTopic]) => topic === undefined || (messageTopic !== undefined && topics.includes(messageTopic as T)), ), map(([payload, participant, , messageTopic]) => { const msg = { payload, topic: messageTopic as T, from: participant, } satisfies ReceivedDataMessage; onMessage?.(msg); return msg; }), ); let isSendingSubscriber: Subscriber; const isSendingObservable = new Observable((subscriber) => { isSendingSubscriber = subscriber; }); const send = async (payload: Uint8Array, options: DataPublishOptions = {}) => { isSendingSubscriber.next(true); try { await sendMessage(room.localParticipant, payload, { topic: topics[0], ...options }); } finally { isSendingSubscriber.next(false); } }; return { messageObservable, isSendingObservable, send }; } export function setupChatMessageHandler(room: Room) { const chatObservable = createChatObserver(room); const send = async (text: string) => { const msg = await room.localParticipant.sendChatMessage(text); return msg; }; const edit = async (text: string, originalMsg: ChatMessage) => { const msg = await room.localParticipant.editChatMessage(text, originalMsg); return msg; }; return { chatObservable, send, edit }; }