// ****************************************************************************** // Copyright 2024 TypeFox GmbH // This program and the accompanying materials are made available under the // terms of the MIT License, which is available in the project root. // ****************************************************************************** import { Deferred, DisposableCollection, ProtocolBroadcastConnection, encodeUserPermission, getUserPermissionKey, resolveReadonly } from '@hereugo/open-collaboration-protocol'; import * as Y from 'yjs'; import * as monaco from 'monaco-editor'; import * as awarenessProtocol from 'y-protocols/awareness'; import * as types from '@hereugo/open-collaboration-protocol'; import { LOCAL_ORIGIN, OpenCollaborationYjsProvider, YTextChange, YTextChangeDelta } from '@hereugo/open-collaboration-yjs'; import { createMutex } from 'lib0/mutex'; import { debounce } from 'lodash'; import { MonacoCollabCallbacks } from './monaco-api.js'; import { DisposablePeer } from './collaboration-peer.js'; export type UsersChangeEvent = () => void; export type FileNameChangeEvent = (fileName: string) => void; export interface Disposable { dispose(): void; } export interface CollaborationInstanceOptions { connection: ProtocolBroadcastConnection; host: boolean; callbacks: MonacoCollabCallbacks; editor?: monaco.editor.IStandaloneCodeEditor; roomClaim: types.CreateRoomResponse | types.JoinRoomResponse; } export interface FollowOptions { followViewport?: boolean; } export class CollaborationInstance implements Disposable { protected readonly yjs: Y.Doc = new Y.Doc(); protected readonly yjsAwareness: awarenessProtocol.Awareness; protected readonly yjsProvider: OpenCollaborationYjsProvider; protected readonly yjsMutex = createMutex(); protected readonly identity = new Deferred(); protected readonly documentDisposables = new Map(); protected readonly peers = new Map(); protected readonly throttles = new Map void>(); protected readonly resyncTimestamps = new Map(); protected resyncCooldownMs = 1500; protected readonly decorations = new Map(); protected readonly usersChangedCallbacks: UsersChangeEvent[] = []; protected readonly fileNameChangeCallbacks: FileNameChangeEvent[] = []; protected _permissions: types.Permissions = { readonly: false }; protected effectiveReadonly = false; protected readonly peerTagVisibility = new Map(); protected readonly peerTagTimers = new Map>(); protected tagHideDelayMs = 2000; protected currentPath?: string; protected stopPropagation = false; protected _following?: string; protected followViewport = false; protected _fileName: string; protected previousFileName?: string; protected _workspaceName: string; protected connection: ProtocolBroadcastConnection; get following(): string | undefined { return this._following; } get connectedUsers(): DisposablePeer[] { return Array.from(this.peers.values()); } get ownUserData(): Promise { return this.identity.promise; } get isHost(): boolean { return this.options.host; } get host(): types.Peer | undefined { return 'host' in this.options.roomClaim ? this.options.roomClaim.host : undefined; } get roomId(): string { return this.options.roomClaim.roomId; } get fileName(): string { return this._fileName; } get workspaceName(): string { return this._workspaceName; } set workspaceName(_workspaceName: string) { this._workspaceName = _workspaceName; } /** * access token for the room. allow to join or reconnect as host */ get roomToken(): string { return this.options.roomClaim.roomToken; } onUsersChanged(callback: UsersChangeEvent) { this.usersChangedCallbacks.push(callback); } onFileNameChange(callback: FileNameChangeEvent) { this.fileNameChangeCallbacks.push(callback); } constructor(protected options: CollaborationInstanceOptions) { this.connection = options.connection; this.yjsAwareness = new awarenessProtocol.Awareness(this.yjs); this.yjsProvider = new OpenCollaborationYjsProvider(this.options.connection, this.yjs, this.yjsAwareness, { resyncTimer: 10_000 }); this.yjsProvider.connect(); this.connection.onReconnect(() => { this.yjsProvider.connect(); if (!this.isHost && this.options.editor) { const model = this.options.editor.getModel(); if (model) { void this.resyncActiveDocument(model); } } }); this._fileName = 'myFile.txt'; this._workspaceName = this.roomId; this.setupConnectionHandlers(); this.setupFileSystemHandlers(); this.options.editor && this.registerEditorEvents(); } private setupConnectionHandlers(): void { this.connection.peer.onJoinRequest(async (_, user) => { const result = await this.options.callbacks.onUserRequestsAccess(user); return result ? { workspace: { name: this.workspaceName, folders: [this.workspaceName] } } : undefined; }); this.connection.room.onJoin(async (_, peer) => { this.peers.set(peer.id, new DisposablePeer(this.yjsAwareness, peer)); const initData: types.InitData = { protocol: '0.0.1', host: await this.identity.promise, guests: Array.from(this.peers.values()).map(e => e.peer), capabilities: {}, permissions: { ...this._permissions, [getUserPermissionKey(peer.id)]: encodeUserPermission(true) }, workspace: { name: this.workspaceName, folders: [this.workspaceName] } }; this.connection.peer.init(peer.id, initData); this.notifyUsersChanged(); }); this.connection.room.onLeave(async (_, peer) => { const disposable = this.peers.get(peer.id); if (disposable) { this.peers.delete(peer.id); this.clearPeerTagState(peer.id); this.notifyUsersChanged(); } this.rerenderPresence(); }); this.connection.peer.onInfo((_, peer) => { this.yjsAwareness.setLocalStateField('peer', peer.id); this.identity.resolve(peer); }); this.connection.peer.onInit(async (_, initData) => { await this.initialize(initData); }); this.connection.room.onPermissions((_, permissions) => { void this.applyPermissionsUpdate(permissions); }); } private async applyPermissionsUpdate(permissions: types.Permissions): Promise { this._permissions = permissions; const peer = await this.identity.promise; const readonly = resolveReadonly(permissions, peer.id); this.effectiveReadonly = readonly; this.updateEditorReadonly(readonly); } private updateEditorReadonly(readonly: boolean): void { if (this.options.editor) { this.options.editor.updateOptions({ readOnly: readonly }); } } private setupFileSystemHandlers(): void { this.connection.fs.onReadFile(this.handleReadFile.bind(this)); this.connection.fs.onStat(this.handleStat.bind(this)); this.connection.fs.onReaddir(this.handleReaddir.bind(this)); this.connection.fs.onChange(this.handleFileChange.bind(this)); } private async handleReadFile(_: unknown, path: string): Promise<{ content: Uint8Array }> { if (path === this._fileName && this.options.editor) { const text = this.options.editor.getModel()?.getValue(); const encoder = new TextEncoder(); const content = encoder.encode(text); return { content }; } throw new Error('Could not read file'); } private async handleStat(_: unknown, path: string): Promise<{ type: types.FileType; mtime: number; ctime: number; size: number }> { return { type: path === this.workspaceName ? types.FileType.Directory : types.FileType.File, mtime: 0, ctime: 0, size: 0 }; } private async handleReaddir(_: unknown, path: string): Promise> { const uri = this.getResourceUri(path); if (uri) { return { [this._fileName]: types.FileType.File }; } throw new Error('Could not read directory'); } private handleFileChange(_: unknown, change: types.FileChangeEvent): void { const deleteChange = change.changes.find(c => c.type === types.FileChangeEventType.Delete); const createChange = change.changes.find(c => c.type === types.FileChangeEventType.Create); if (deleteChange && createChange) { this._fileName = createChange.path; const model = this.options.editor?.getModel(); if (model) { this.registerTextDocument(model); } } } private notifyUsersChanged(): void { this.usersChangedCallbacks.forEach(callback => callback()); } private notifyFileNameChanged(fileName: string): void { this.fileNameChangeCallbacks.forEach(callback => callback(fileName)); } setEditor(editor: monaco.editor.IStandaloneCodeEditor): void { this.options.editor = editor; this.updateEditorReadonly(this.effectiveReadonly); this.registerEditorEvents(); } async setFileName(fileName: string): Promise { const oldFileName = this._fileName; this._fileName = fileName; const model = this.options.editor?.getModel(); if (model) { await this.registerTextDocument(model); this.connection.fs.change({ changes: [ { type: types.FileChangeEventType.Create, path: fileName }, { type: types.FileChangeEventType.Delete, path: oldFileName } ] }); } } dispose() { this.peers.clear(); this.documentDisposables.forEach(e => e.dispose()); this.documentDisposables.clear(); } leaveRoom() { this.options.connection.room.leave(); } getCurrentConnection(): ProtocolBroadcastConnection { return this.options.connection; } protected pushDocumentDisposable(path: string, disposable: Disposable) { let disposables = this.documentDisposables.get(path); if (!disposables) { disposables = new DisposableCollection(); this.documentDisposables.set(path, disposables); } disposables.push(disposable); } protected registerEditorEvents(): void { if (!this.options.editor) { return; } const text = this.options.editor.getModel(); if (text) { this.registerTextDocument(text); } this.options.editor.onDidChangeModelContent(event => { if (text && !this.stopPropagation) { this.updateTextDocument(event, text); } }); this.options.editor.onDidChangeCursorSelection(() => { if (this.options.editor && !this.stopPropagation) { this.updateTextSelection(this.options.editor); } }); const awarenessDebounce = debounce(() => { this.rerenderPresence(); }, 2000); this.yjsAwareness.on('change', async (change: { added: number[]; updated: number[]; removed: number[] }, origin: string) => { if (origin !== LOCAL_ORIGIN) { this.markActivePeers(change); this.updateFollow(); this.rerenderPresence(); awarenessDebounce(); } }); } followUser(id?: string, options?: FollowOptions) { this._following = id; if (options?.followViewport !== undefined) { this.followViewport = options.followViewport; } if (id) { this.updateFollow(); } } protected updateFollow(): void { if (this._following) { let userState: types.ClientAwareness | undefined = undefined; const states = this.yjsAwareness.getStates() as Map; for (const state of states.values()) { const peer = this.peers.get(state.peer); if (peer?.peer.id === this._following) { userState = state; } } if (userState) { if (types.ClientTextSelection.is(userState.selection)) { this.followSelection(userState.selection); } } } } protected markActivePeers(change: { added: number[]; updated: number[]; removed: number[] }): void { const states = this.yjsAwareness.getStates() as Map; for (const clientId of [...change.added, ...change.updated]) { if (clientId === this.yjs.clientID) { continue; } const state = states.get(clientId); if (!state?.peer) { continue; } this.setPeerTagVisible(state.peer, true); this.resetPeerTagTimer(state.peer); } } protected setPeerTagVisible(peerId: string, visible: boolean): void { const current = this.peerTagVisibility.get(peerId) === true; if (current !== visible) { this.peerTagVisibility.set(peerId, visible); } } protected resetPeerTagTimer(peerId: string): void { const existing = this.peerTagTimers.get(peerId); if (existing) { clearTimeout(existing); } const timeout = setTimeout(() => { this.peerTagVisibility.set(peerId, false); this.peerTagTimers.delete(peerId); this.rerenderPresence(); }, this.tagHideDelayMs); this.peerTagTimers.set(peerId, timeout); } protected clearPeerTagState(peerId: string): void { const existing = this.peerTagTimers.get(peerId); if (existing) { clearTimeout(existing); this.peerTagTimers.delete(peerId); } this.peerTagVisibility.delete(peerId); } protected async followSelection(selection: types.ClientTextSelection): Promise { if (!this.options.editor) { return; } const uri = this.getResourceUri(selection.path); const text = this.yjs.getText(selection.path); const prevPath = this.currentPath; this.currentPath = selection.path; if (prevPath !== selection.path) { this.stopPropagation = true; this.options.editor.setValue(text.toString()); this.stopPropagation = false; } const filename = this.getHostPath(selection.path); if (this._fileName !== filename) { this._fileName = filename; this.previousFileName = filename; this.notifyFileNameChanged(this._fileName); } this.registerTextObserver(selection.path, this.options.editor.getModel()!, text); if (this.followViewport && uri && selection.visibleRanges && selection.visibleRanges.length > 0) { const visibleRange = selection.visibleRanges[0]; const range = new monaco.Range(visibleRange.start.line, visibleRange.start.character, visibleRange.end.line, visibleRange.end.character); this.options.editor && this.options.editor.revealRange(range); } } protected updateTextSelection(editor: monaco.editor.IStandaloneCodeEditor): void { const document = editor.getModel(); const selections = editor.getSelections(); if (!document || !selections) { return; } const path = this.currentPath; if (path) { const ytext = this.yjs.getText(path); const textSelections: types.RelativeTextSelection[] = []; for (const selection of selections) { const start = document.getOffsetAt(selection.getStartPosition()); const end = document.getOffsetAt(selection.getEndPosition()); const direction = selection.getDirection() === monaco.SelectionDirection.RTL ? types.SelectionDirection.RightToLeft : types.SelectionDirection.LeftToRight; const editorSelection: types.RelativeTextSelection = { start: Y.createRelativePositionFromTypeIndex(ytext, start), end: Y.createRelativePositionFromTypeIndex(ytext, end), direction }; textSelections.push(editorSelection); } const textSelection: types.ClientTextSelection = { path, textSelections, visibleRanges: editor.getVisibleRanges().map(range => ({ start: { line: range.startLineNumber, character: range.startColumn }, end: { line: range.endLineNumber, character: range.endColumn } })) }; this.setSharedSelection(textSelection); } } protected async registerTextDocument(document: monaco.editor.ITextModel): Promise { const uri = this.getResourceUri(`${this._workspaceName}/${this._fileName}`); const path = this.getProtocolPath(uri); if (!this.currentPath || this.currentPath !== path) { this.currentPath = path; } if (path) { const text = document.getValue(); const yjsText = this.yjs.getText(path); let ytextContent = ''; if (this.isHost) { this.yjs.transact(() => { yjsText.delete(0, yjsText.length); yjsText.insert(0, text); }); ytextContent = yjsText.toString(); } else { ytextContent = await this.readFile(); if (yjsText.toString() !== ytextContent) { this.yjs.transact(() => { yjsText.delete(0, yjsText.length); yjsText.insert(0, ytextContent); }); } if (this._fileName !== this.previousFileName) { this.previousFileName = this._fileName; this.notifyFileNameChanged(this._fileName); } } if (text !== ytextContent) { this.yjsMutex(() => { this.stopPropagation = true; document.setValue(ytextContent); this.stopPropagation = false; }); } this.registerTextObserver(path, document, yjsText); } } private async resyncActiveDocument(document: monaco.editor.ITextModel): Promise { if (this.isHost) { return; } const path = this.currentPath ?? this.getProtocolPath(this.getResourceUri(`${this._workspaceName}/${this._fileName}`)); if (!path) { return; } const now = Date.now(); const lastResync = this.resyncTimestamps.get(path) ?? 0; if (now - lastResync < this.resyncCooldownMs) { return; } this.resyncTimestamps.set(path, now); this.currentPath = path; const yjsText = this.yjs.getText(path); const hostContent = await this.readFile(); if (yjsText.toString() !== hostContent) { this.yjs.transact(() => { yjsText.delete(0, yjsText.length); yjsText.insert(0, hostContent); }); } if (document.getValue() !== hostContent) { this.stopPropagation = true; document.setValue(hostContent); this.stopPropagation = false; } } protected registerTextObserver(path: string, document: monaco.editor.ITextModel, yjsText: Y.Text): void { const textObserver = this.documentDisposables.get(path); if (textObserver) { textObserver.dispose(); } const resyncThrottle = this.getOrCreateThrottle(path, document); const observer = (textEvent: Y.YTextEvent) => { if (textEvent.transaction.local) { // Local changes already updated the model; avoid re-applying and shifting offsets. return; } this.yjsMutex(async () => { if (this.options.editor) { const changes = YTextChangeDelta.toChanges(textEvent.delta); const edits = this.createEditsFromTextEvent(changes, document); this.updateDocument(document, edits); resyncThrottle(); } }); }; yjsText.observe(observer); this.pushDocumentDisposable(path, { dispose: () => yjsText.unobserve(observer) }); } protected updateDocument(document: monaco.editor.ITextModel, edits: monaco.editor.IIdentifiedSingleEditOperation[]): void { this.stopPropagation = true; document.pushStackElement(); document.pushEditOperations(null, edits, () => null); document.pushStackElement(); this.stopPropagation = false; } private createEditsFromTextEvent(changes: YTextChange[], document: monaco.editor.ITextModel): monaco.editor.IIdentifiedSingleEditOperation[] { const edits: monaco.editor.IIdentifiedSingleEditOperation[] = []; const sortedChanges = [...changes].sort((a, b) => { const startDiff = b.start - a.start; if (startDiff !== 0) { return startDiff; } return b.end - a.end; }); sortedChanges.forEach(change => { const start = document.getPositionAt(change.start); const end = document.getPositionAt(change.end); edits.push({ range: new monaco.Range(start.lineNumber, start.column, end.lineNumber, end.column), text: change.text }); }); return edits; } protected updateTextDocument(event: monaco.editor.IModelContentChangedEvent, document: monaco.editor.ITextModel): void { if (this.effectiveReadonly) { return; } const path = this.currentPath; if (path) { this.yjsMutex(() => { const ytext = this.yjs.getText(path); this.yjs.transact(() => { const sortedChanges = [...event.changes].sort((a, b) => b.rangeOffset - a.rangeOffset); for (const change of sortedChanges) { ytext.delete(change.rangeOffset, change.rangeLength); ytext.insert(change.rangeOffset, change.text); } }); this.getOrCreateThrottle(path, document)(); }); } } protected getOrCreateThrottle(path: string, document: monaco.editor.ITextModel): () => void { let value = this.throttles.get(path); if (!value) { value = debounce(() => { this.yjsMutex(() => { const yjsText = this.yjs.getText(path); const newContent = yjsText.toString(); if (newContent !== document.getValue()) { if (!this.isHost) { void this.resyncActiveDocument(document); return; } this.updateDocumentContent(document, newContent); } }); }, 100, { leading: false, trailing: true }); this.throttles.set(path, value); } return value; } private updateDocumentContent(document: monaco.editor.ITextModel, newContent: string): void { this.yjsMutex(() => { if (this.options.editor) { const edits: monaco.editor.IIdentifiedSingleEditOperation[] = [{ range: document.getFullModelRange(), text: newContent }]; this.updateDocument(document, edits); } }); } protected rerenderPresence() { const states = this.yjsAwareness.getStates() as Map; for (const [clientID, state] of states.entries()) { if (clientID === this.yjs.clientID) { // Ignore own awareness state continue; } const peerId = state.peer; const peer = this.peers.get(peerId); if (!state.selection || !peer) { continue; } if (!types.ClientTextSelection.is(state.selection)) { continue; } const { path, textSelections } = state.selection; const selection = textSelections[0]; if (!selection) { continue; } const uri = this.getResourceUri(path); if (uri && this.options.editor) { const model = this.options.editor.getModel(); const forward = selection.direction === 1; let startIndex = Y.createAbsolutePositionFromRelativePosition(selection.start, this.yjs); let endIndex = Y.createAbsolutePositionFromRelativePosition(selection.end, this.yjs); if (model && startIndex && endIndex) { if (startIndex.index > endIndex.index) { [startIndex, endIndex] = [endIndex, startIndex]; } const start = model.getPositionAt(startIndex.index); const end = model.getPositionAt(endIndex.index); const inverted = (forward && end.lineNumber === 1) || (!forward && start.lineNumber === 1); const range: monaco.IRange = { startLineNumber: start.lineNumber, startColumn: start.column, endLineNumber: end.lineNumber, endColumn: end.column }; const contentClassNames: string[] = [peer.decoration.cursorClassName]; if (inverted) { contentClassNames.push(peer.decoration.cursorInvertedClassName); } if (this.peerTagVisibility.get(peer.peer.id)) { contentClassNames.push(peer.decoration.tagVisibleClassName); } this.setDecorations(peer, [{ range, options: { className: peer.decoration.selectionClassName, beforeContentClassName: !forward ? contentClassNames.join(' ') : undefined, afterContentClassName: forward ? contentClassNames.join(' ') : undefined, stickiness: monaco.editor.TrackedRangeStickiness.NeverGrowsWhenTypingAtEdges } }]); } } } } protected setDecorations(peer: DisposablePeer, decorations: monaco.editor.IModelDeltaDecoration[]): void { if (this.decorations.has(peer)) { this.decorations.get(peer)?.set(decorations); } else if (this.options.editor) { this.decorations.set(peer, this.options.editor.createDecorationsCollection(decorations)); } } protected setSharedSelection(selection?: types.ClientSelection): void { this.yjsAwareness.setLocalStateField('selection', selection); } protected updateSelectionPath(newPath: string): void { const currentState = this.yjsAwareness.getLocalState() as types.ClientAwareness; if (currentState?.selection && types.ClientTextSelection.is(currentState.selection)) { const newSelection: types.ClientTextSelection = { ...currentState.selection, path: newPath }; this.setSharedSelection(newSelection); } } protected createSelectionFromRelative(selection: types.RelativeTextSelection, model: monaco.editor.ITextModel): monaco.Selection | undefined { const start = Y.createAbsolutePositionFromRelativePosition(selection.start, this.yjs); const end = Y.createAbsolutePositionFromRelativePosition(selection.end, this.yjs); if (start && end) { let anchor = model.getPositionAt(start.index); let head = model.getPositionAt(end.index); if (selection.direction === types.SelectionDirection.RightToLeft) { [anchor, head] = [head, anchor]; } return new monaco.Selection(anchor.lineNumber, anchor.column, head.lineNumber, head.column); } return undefined; } protected getHostPath(path: string): string { // When creating a URI as a guest, we always prepend it with the name of the workspace // This just removes the workspace name from the path to get the path expected by the protocol const subpath = path.substring(1).split('/'); return subpath.slice(1).join('/'); } async initialize(data: types.InitData): Promise { for (const peer of [data.host, ...data.guests]) { this.peers.set(peer.id, new DisposablePeer(this.yjsAwareness, peer)); } await this.applyPermissionsUpdate(data.permissions); this.workspaceName = data.workspace.name; this.notifyUsersChanged(); } getProtocolPath(uri?: monaco.Uri): string | undefined { if (!uri) { return undefined; } return uri.path.startsWith('/') ? uri.path.substring(1) : uri.path; } getResourceUri(path?: string): monaco.Uri | undefined { return new monaco.Uri().with({ path }); } async readFile(): Promise { if (!this.currentPath) { return ''; } const path = this.getHostPath(this.currentPath); if (this.yjs.share.has(path)) { const stringValue = this.yjs.getText(path); return stringValue.toString(); } else { const file = await this.connection.fs.readFile(this.host?.id, path); const decoder = new TextDecoder(); return decoder.decode(file.content); } } }