import { and, collection, doc, Firestore, onSnapshot, or, orderBy, query, serverTimestamp, startAt, where, writeBatch, } from "firebase/firestore"; import { CloudConfig, DocUpdatesForCloud } from "../Persisters"; import { Functions, httpsCallable } from "firebase/functions"; import { AnyFunction } from "../CloudFunction"; const CHANGE_DATE_KEY = `_mfs_lastChangedDate`; export function firebasePersister(firebaseConfig: { firestore: Firestore; // firebaseStorage?: FirebaseStorage; firebaseFunctions: Functions; }): CloudConfig<{ lastChangeMs: number; lastChangedDocId: string; }> { return { startCloudSynchronizer(syncConfig) { const myCollection = collection( firebaseConfig.firestore, syncConfig.path, ); const initialCheckpoint = syncConfig.initialCheckpoint ?? { lastChangeMs: 0, lastChangedDocId: null, }; let lastCheckpoint = initialCheckpoint; const disposeWatcher = onSnapshot( query( myCollection, and( or( // TODO: If a docs CHANGE_DATE_KEY is changed then it is removed and re-added to this query. where( CHANGE_DATE_KEY, ">", new Date(Math.max(initialCheckpoint.lastChangeMs - 30000, 0)), ), where(CHANGE_DATE_KEY, "==", null), // where(CHANGE_DATE_KEY, "==", useServerTimestamp), ), // TODO: Maybe there is some way to avoid already deleted docs. // ...firestoreConfig.queryConstraints, ), orderBy(CHANGE_DATE_KEY, `asc`), orderBy(`__name__`, `asc`), ...(initialCheckpoint.lastChangedDocId !== null ? [ startAt( initialCheckpoint.lastChangeMs, initialCheckpoint.lastChangedDocId, ), ] : []), ), (snapshot) => { // isProcessingSnapshot = true; const updates: DocUpdatesForCloud = {}; let latestChange = lastCheckpoint; snapshot.docChanges().forEach((change) => { const docData = change.doc.data(); /** Change date is formatted like {seconds: ..., nanoseconds: ...} * We need to combine seconds and nano seconds to get the milliseconds since epoch. */ const docChangeDatePosix = (docData[CHANGE_DATE_KEY]?.seconds ?? -1) * 1_000 + (docData[CHANGE_DATE_KEY]?.nanoseconds ?? -1) / 1_000_000; // Skip removed documents. Documents should never be deleted only flagged. if (change.type === "removed") return; // The first one is usually a duplicate. if ( docChangeDatePosix === latestChange.lastChangeMs && change.doc.id === latestChange.lastChangedDocId ) { return; } // Update doc store. updates[change.doc.id] = docData; // We shouldn't need to to this comparison since the last document should be the most recent. // If this is not how it works, then our "startAfter" above will be in trouble. if (docChangeDatePosix > latestChange.lastChangeMs) { latestChange = { lastChangeMs: docChangeDatePosix, lastChangedDocId: change.doc.id, }; } }); syncConfig.updateLocalDocs(updates); if (latestChange.lastChangeMs > latestChange.lastChangeMs) { syncConfig.setCheckpoint({ lastChangeMs: latestChange.lastChangeMs, lastChangedDocId: latestChange.lastChangedDocId!, }); } // isProcessingSnapshot = false; }, ); const requestPush = (() => { console.log(`Setting up push requester for ${syncConfig.path}`); let haveQueuedPush = false; let isPushing = false; const doPush = async () => { isPushing = true; haveQueuedPush = false; const [finishPushPromise, markPushFinished] = (() => { let markPushFinished: (() => void) | undefined; const finishPushPromise = new Promise( (resolve) => (markPushFinished = resolve), ); return [finishPushPromise, markPushFinished!]; })(); const changesToPush = syncConfig.getChanges(finishPushPromise); if (Object.keys(changesToPush).length > 0) { const fsBatch = writeBatch(firebaseConfig.firestore); Object.keys(changesToPush).forEach((docId) => { fsBatch.set( doc(myCollection, docId), { ...changesToPush[docId], [CHANGE_DATE_KEY]: serverTimestamp(), }, { merge: true }, ); }); await fsBatch.commit(); } markPushFinished!(); // This probably shouldn't be recursive, but it will work for now. if (haveQueuedPush) { setTimeout(doPush, 2 * 1000); } else { isPushing = false; } }; return () => { console.log(`Push requested`); if (haveQueuedPush) return; haveQueuedPush = true; if (isPushing) return; console.log(`Push queued`); // Debounce the push request. /* TODO: At some point we might make a more advanced system that pushes early if no extra changes come in, and waits longer if changes keep coming in. */ setTimeout(doPush, 2 * 1000); }; })(); return { requestPush, dispose: () => { disposeWatcher(); }, }; }, createCloudFunctionCaller(functionConfig: { path: string; }) { const callCloudFunction = httpsCallable< Parameters, ReturnType | Error >(firebaseConfig.firebaseFunctions, functionConfig.path); return (...args) => callCloudFunction(...args).then((result) => result.data); }, }; }