/* * AwaitingQueue.cs * * This class is copied directly into the react-native-sqlite-storage repo * for now. When this gets checked into react-native-windows, we can remove * this file and start depending on that RNW's copy. * * Serializes the work of all Tasks that are added to its queue. Awaits the * Task returned by the current work item before moving on to the next work * item. * * This class is not thread-safe. All methods should be called from the same thread * or LimitedConcurrencyActionQueue. `await` must cause the continuation to run on * the same thread or LimitedConcurrencyActionQueue. * * Motivation: * When you `await` a Task, you have to consider all of the things that could have * changed by the time your continuation runs. For example: * * class Recorder * { * private MediaCapture _captureMedia; * * public async Task StartRecording() * { * _captureMedia = new MediaCapture(); * await _captureMedia.InitializeAsync(); * // Lots of things could have changed by the time we get here. * // For example, maybe `_captureMedia` is null! * await _captureMedia.StartRecordToStreamAsync(...); * } * * public async Task StopRecording() * { * // This code can run while `StartRecording` is in the middle * // of running. * * if (_captureMedia != null) * { * // Code to clean up _captureMedia... * _captureMedia = null; * } * } * } * * Alternatively, you can use `AwaitingQueue` to serialize async work that * interacts with each other to prevent any interleavings. Example: * * class Recorder * { * private AwaitingQueue _awaitingQueue = new AwaitingQueue(); * private MediaCapture _captureMedia; * * public async Task StartRecording() * { * _awaitingQueue.RunOrDispatch(async () => * { * // This code won't run until all of the other Tasks * // that were added to the `_awaitingQueue` before us * // have already completed. * * _captureMedia = new MediaCapture(); * await _captureMedia.InitializeAsync(captureInitSettings); * // We can think of `StartRecording` as being atomic which * // means we don't have to worry about anything we care about * // changing by the time we get here. For example, `_captureMedia` * // is guaranteed to be non-null by design. * await _captureMedia.StartRecordToStreamAsync(...); * }); * } * * public async Task StopRecording() * { * _awaitingQueue.RunOrDispatch(() => * { * // This code won't run until all of the other Tasks * // that were added to the `_awaitingQueue` before us * // have already completed. This means this code can't * // run while `StartRecording` is in the middle of running. * * if (_captureMedia != null) * { * // Code to clean up _captureMedia... * _captureMedia = null; * } * }); * } * } */ using System; using System.Collections.Generic; using System.Reactive; using System.Threading; using System.Threading.Tasks; namespace Org.PGSQLite.SQLitePlugin { /// /// Serializes the work of all Tasks that are added to its queue. Awaits the /// Task returned by the current work item before moving on to the next work /// item. /// /// /// This class is not thread-safe. All methods should be called from the same thread /// or LimitedConcurrencyActionQueue. `await` must cause the continuation to run on /// the same thread or LimitedConcurrencyActionQueue. /// /// The type of value yielded by each Task in the queue. public class AwaitingQueue { private const string _tag = nameof(AwaitingQueue); private class WorkItemInfo { public readonly Func> WorkItem; public readonly TaskCompletionSource CompletionSource; public readonly CancellationToken CancellationToken; public WorkItemInfo(Func> workItem, TaskCompletionSource completionSource, CancellationToken cancellationToken) { WorkItem = workItem; CompletionSource = completionSource; CancellationToken = cancellationToken; } } private bool _running = false; private readonly Queue _workQueue = new Queue(); private async void StartWorkLoopIfNeeded() { if (_running) { return; } try { _running = true; while (_workQueue.Count > 0) { var workItemInfo = _workQueue.Dequeue(); if (workItemInfo.CancellationToken.IsCancellationRequested) { workItemInfo.CompletionSource.SetCanceled(); } else { //RnLog.Info($"UI AwaitingQueue: Start {currentName}"); try { var result = await workItemInfo.WorkItem(); workItemInfo.CompletionSource.SetResult(result); } catch (Exception ex) { workItemInfo.CompletionSource.SetException(ex); } //RnLog.Info($"UI AwaitingQueue: End {currentName}"); } } _running = false; // Ensure _running is updated before firing the event QueueEmptied?.Invoke(this, null); } finally { // Before exiting this method, ensure _running is updated _running = false; } } /// /// Adds `workItem` to the queue. If the queue is currently empty and not /// executing any work items, executes `workItem` immediately and synchronously. /// /// The work item to add to the queue. /// /// A Task which completes when `workItem` finishes executing. The returned /// Task resolves to the result or exception from `workItem`. /// public Task RunOrQueue(Func> workItem) { return RunOrQueue(workItem, CancellationToken.None); } /// /// Adds `workItem` to the queue. If the queue is currently empty and not /// executing any work items, executes `workItem` immediately and synchronously. /// /// The work item to add to the queue. /// /// The cancellation token associated with the work item. The work item will /// be skipped if the cancellation token is canceled before the work item begins. /// /// /// A Task which completes when `workItem` finishes executing. The returned /// Task resolves to the result or exception from `workItem`. If the /// cancellation token is canceled before `workItem` begins, Task is canceled. /// public Task RunOrQueue(Func> workItem, CancellationToken cancellationToken) { //RnLog.Info($"UI AwaitingQueue: Add {name}"); TaskCompletionSource completionSource = new TaskCompletionSource(); _workQueue.Enqueue(new WorkItemInfo(workItem, completionSource, cancellationToken)); StartWorkLoopIfNeeded(); return completionSource.Task; } /// /// Indicates that the AwaitingQueue has finished executing all of its /// currently scheduled work items. /// /// /// Fires on the thread or LimitedConcurrencyActionQueue of the code that /// has been conusming the AwaitingQueue. /// public event EventHandler QueueEmptied; } /// /// Serializes the work of all Tasks that are added to its queue. Awaits the /// Task returned by the current work item before moving on to the next work /// item. /// /// /// This class is not thread-safe. All methods should be called from the same thread /// or LimitedConcurrencyActionQueue. `await` must cause the continuation to run on /// the same thread or LimitedConcurrencyActionQueue. /// public class AwaitingQueue { private AwaitingQueue _awaitingQueue = new AwaitingQueue(); /// /// Adds `workItem` to the queue. If the queue is currently empty and not /// executing any work items, executes `workItem` immediately and synchronously. /// /// The work item to add to the queue. /// /// A Task which completes when `workItem` finishes executing. The returned /// Task throws any exceptions that `workItem` may have thrown. /// public Task RunOrQueue(Func workItem) { return RunOrQueue(workItem, CancellationToken.None); } /// /// Adds `workItem` to the queue. If the queue is currently empty and not /// executing any work items, executes `workItem` immediately and synchronously. /// /// The work item to add to the queue. /// /// The cancellation token associated with the work item. The work item will /// be skipped if the cancellation token is canceled before the work item begins. /// /// /// A Task which completes when `workItem` finishes executing. The returned /// Task throws any exceptions that `workItem` may have thrown. If the /// cancellation token is canceled before `workItem` begins, Task is canceled. /// public Task RunOrQueue(Func workItem, CancellationToken cancellationToken) { return _awaitingQueue.RunOrQueue(async () => { await workItem(); return Unit.Default; }, cancellationToken); } /// /// Indicates that the AwaitingQueue has finished executing all of its /// currently scheduled work items. /// /// /// Fires on the thread or LimitedConcurrencyActionQueue of the code that /// has been conusming the AwaitingQueue. /// public event EventHandler QueueEmptied { add { _awaitingQueue.QueueEmptied += value; } remove { _awaitingQueue.QueueEmptied -= value; } } } }