import { JSXElement, isJSXPrimitive } from '../jsx' import { PartialRenderer } from '../renderer' import { JSXAsync, JSXNodeAsync, JSXElementAsync } from './jsx' import { intoObservable } from './observable' export const createAsyncRenderStream = (renderer: PartialRenderer) => { // We call this function recursively, so we need to give it a name before it // gets returned. const renderStream = (jsx: JSXAsync): Observable => { const jsx$ = intoObservable(jsx) return new Observable(observer => { const subscriptions: Subscription[] = [] let node: JSXNodeAsync = null subscriptions.push(jsx$.subscribe( // Keep track of the last node “next”ed. nextNode => (node = nextNode), error => observer.error(error), // Wait for our JSX node to complete before streaming anything. () => { if (isJSXPrimitive(node)) { observer.next(renderer.renderNode(node)) observer.complete() } else if (!node.children || node.children.length === 0) { observer.next(renderer.renderNode(node as JSXElement)) observer.complete() } else { const parentElement = node const { children } = parentElement let streamingChild = 0 // Open the parent element tag and start streaming its contents! observer.next(renderer.renderOpeningTag( parentElement.elementName, parentElement.attributes )) type ChildState = { node: JSXAsync, queuedValues: (A | B | C)[], complete: boolean, } const childrenState: ChildState[] = children.map(child => ({ node: child, queuedValues: [], complete: false, })) childrenState.forEach((childState, thisChild) => { // Side effects! We watch the child’s stream and update its state // accordingly. subscriptions.push(renderStream(childState.node).subscribe( value => { // Send the XML to the observer if this child is live, // otherwise add it to the child’s queue. if (streamingChild === thisChild) observer.next(value) else childState.queuedValues.push(value) }, error => observer.error(error), () => { // This child has completed, so set its completed value to // `true`. childState.complete = true // If this was the currently streaming child it has a // responsibility to set the next streaming child and stream // everything that’s waiting in the queue. if (streamingChild === thisChild) { while (true) { // Increment the streaming child. streamingChild++ // If streaming child is greater than the last child // index, we’ve reached the end. Complete the observable // and break out of the loop. if (streamingChild > children.length - 1) { observer.next(renderer.renderClosingTag(parentElement.elementName)) observer.complete() break } // Stream the queued values from the streaming child. const childState = childrenState[streamingChild] childState.queuedValues.forEach(value => observer.next(value)) // If the child has not yet completed, it is truly the // new streaming child and we should break out of this // while loop. Otherwise, the next child might be the // next streaming child. if (!childState.complete) break } } } )) }) } } )) // Unsubscribe from all the things! return () => subscriptions.forEach(subscription => subscription.unsubscribe()) }) } // Return the function. return renderStream }