import { isArray, Data, Mutation, replayWithLatest, pipeAtom, useGeneralDriver } from './libs/mobius-utils' const generateGraph = ({ material, idFieldName, prevIdFieldName }) => { const graph = {} const waitList = {} material.forEach(item => { const id = item[idFieldName] const prevId = item[prevIdFieldName] if (!graph[id]) { graph[id] = { prev: null, next: null, [idFieldName]: id, node: item } } else { graph[id].node = [graph[id].node, item] } const currentNode = graph[id] if (graph[prevId]) { currentNode.prev = graph[prevId] graph[prevId].next = currentNode } else { waitList[prevId] = id } if (waitList[id]) { const waitNode = graph[waitList[id]] waitNode.prev = currentNode currentNode.next = waitNode delete waitList[id] } }) return graph } export class UpdateContainer { constructor (updates = [], options = {}) { if (!isArray(updates)) { throw (new TypeError('"updates" is expected to be type of "Array".')) } const { isEmpty = false } = options this._updates = updates this._options = { ...options, isEmpty } } get isEmpty () { return this._options.isEmpty } get options () { return this._options } get value () { return this._updates } static of (updates, options = {}) { return new UpdateContainer(updates, options) } static empty (options = {}) { return new UpdateContainer([], { ...options, isEmpty: true }) } fill (updates) { if (!updates) { throw (new TypeError('"updates" is required.')) } if (!this._options.isEmpty) { throw (new Error('Can not fill updates to a non-empty updateContainer.')) } else { this._updates = updates this._options.isEmpty = false } } print () { return JSON.stringify(this._updates) } capture () { return this.print() } map (fn) { return new UpdateContainer(fn(this._updates), this._options) } generateBatchGraph () { const updates = JSON.parse(JSON.stringify(this._updates)) const graph = generateGraph({ material: updates, idFieldName: 'batchId', prevIdFieldName: 'lastBatchId' }) return graph } getLatestBatch () { const batchGraph = this.generateBatchGraph() const graphNode = Object.values(batchGraph).find(i => !i.next) const latestBatch = graphNode ? graphNode.node : null return latestBatch } getLatestBatchId () { const batchGraph = this.generateBatchGraph() const graphNode = Object.values(batchGraph).find(i => !i.next) const latestBatchId = graphNode ? graphNode.batchId : null return latestBatchId } getLatestUpdate () { } getBatchBeforeBatch () { } getUpdatesByRecordId (recordId) { const targetUpdates = this._updates.filter(update => update.detail.recordId === recordId) return targetUpdates } getLatestUpdateByRecordId (recordId) { const updates = this.getUpdatesByRecordId(recordId) const updateGraph = generateGraph({ material: updates, idFieldName: 'updateId', prevIdFieldName: 'lastUpdateId' }) const graphNode = Object.values(updateGraph).find(i => !i.next) const latestUpdate = graphNode ? graphNode.node : null return latestUpdate } getLatestUpdateIdByRecordId (recordId) { } sortUpdate () { } addUpdate (update) { this._updates.push(update) return true } addUpdates (updates) { this._updates.push(...updates) return true } } const parseCollect = (updateContainer, collectRequest) => { return collectRequest } /** * { * batchId, * payload: { * batch: { batchId, lastBatchId }, * updates: [{ updateId, lastUpdateId, recordId }, ...] * } * } */ const executeCollect = (updateContainer, collectRequest) => { const { batchId, payload: { batch, updates } } = collectRequest const lastBatchId = updateContainer.getLatestBatchId() // batchId 和 batch 的值虽然是一样的,但含义不同 const collectResult = { batchId: batchId, payload: { batch: { batchId: batch, lastBatchId: lastBatchId }, updates: Object.values(updates).reduce((acc, cur) => { const { updateId, recordId } = cur const latestUpdate = updateContainer.getLatestUpdateByRecordId(recordId) const lastUpdateId = latestUpdate ? latestUpdate.updateId : null acc[updateId] = { updateId, lastUpdateId, recordId } return acc }, {}) } } console.log('[UpdateContainer] collectResult', collectResult) return collectResult } const acceptUpdateItems = (updateContainer, updateItems) => { const { batchId } = updateItems const acceptResult = { batchId: batchId, payload: Object.entries(updateItems.payload).reduce((acc, cur) => { const [updateId, item] = cur acc[updateId] = updateContainer.addUpdate(item) return acc }, {}) } console.log('[UpdateContainer] acceptResult', acceptResult) return acceptResult } export const updateContainerDriver = (options = {}) => { const updateContainer = UpdateContainer.empty() const updatesInD = Data.empty() const executeStatusRD = replayWithLatest(1, Data.empty()) const processInD = Data.empty() const collectRequestInD = Data.empty() const updateItemsInD = Data.empty() const updateRollbackD = Data.empty() const collectResultD = Data.empty() const acceptResultD = Data.empty() const updatesRD = replayWithLatest(1, Data.empty()) pipeAtom(updatesInD, Mutation.ofLiftLeft((updates) => { updateContainer.fill(updates) return updateContainer.value }), updatesRD) const store = {} // 接收到调度器的运行状态之后,进行相应的处理 processInD.subscribeValue(processStatus => { const { status } = processStatus console.log('[UpdateContainerDriver] receive status', status) if (status === 'start') { store.captured = updateContainer.capture() executeStatusRD.mutate(() => ({ status })) } if (status === 'rollback') { updateContainer.map(() => JSON.parse(store.captured)) delete store.captured console.log('[UpdateContainerDriver] rollback updateContainer instance') executeStatusRD.mutate(() => ({ status })) } if (status === 'end') { updatesRD.mutate(() => updateContainer.value) executeStatusRD.mutate(() => ({ status })) } }) // 接收到 CollectRequest 之后,向外广播一个 CollectResult collectRequestInD.subscribeValue(collectRequest => { const parsedCollect = parseCollect(updateContainer, collectRequest) const collectResult = executeCollect(updateContainer, collectRequest) collectResultD.mutate(() => collectResult) }) // 接收到 UpdateItems 之后,完成 accept 并向外广播一个 AcceptResult updateItemsInD.subscribeValue(updateItems => { const acceptResult = acceptUpdateItems(updateContainer, updateItems) acceptResultD.mutate(() => acceptResult) }) return { inputs: { updates: updatesInD, process: processInD, collectRequest: collectRequestInD, updateItems: updateItemsInD }, outputs: { executeStatus: executeStatusRD, updateRollback: updateRollbackD, collectResult: collectResultD, acceptResult: acceptResultD, updates: updatesRD } } } export const useUpdateContainerDriver = useGeneralDriver(updateContainerDriver)