/*
* AwaitingQueue.cs
*
* This class is copied directly into the react-native-sqlite-storage repo
* for now. When this gets checked into react-native-windows, we can remove
* this file and start depending on that RNW's copy.
*
* Serializes the work of all Tasks that are added to its queue. Awaits the
* Task returned by the current work item before moving on to the next work
* item.
*
* This class is not thread-safe. All methods should be called from the same thread
* or LimitedConcurrencyActionQueue. `await` must cause the continuation to run on
* the same thread or LimitedConcurrencyActionQueue.
*
* Motivation:
* When you `await` a Task, you have to consider all of the things that could have
* changed by the time your continuation runs. For example:
*
* class Recorder
* {
* private MediaCapture _captureMedia;
*
* public async Task StartRecording()
* {
* _captureMedia = new MediaCapture();
* await _captureMedia.InitializeAsync();
* // Lots of things could have changed by the time we get here.
* // For example, maybe `_captureMedia` is null!
* await _captureMedia.StartRecordToStreamAsync(...);
* }
*
* public async Task StopRecording()
* {
* // This code can run while `StartRecording` is in the middle
* // of running.
*
* if (_captureMedia != null)
* {
* // Code to clean up _captureMedia...
* _captureMedia = null;
* }
* }
* }
*
* Alternatively, you can use `AwaitingQueue` to serialize async work that
* interacts with each other to prevent any interleavings. Example:
*
* class Recorder
* {
* private AwaitingQueue _awaitingQueue = new AwaitingQueue();
* private MediaCapture _captureMedia;
*
* public async Task StartRecording()
* {
* _awaitingQueue.RunOrDispatch(async () =>
* {
* // This code won't run until all of the other Tasks
* // that were added to the `_awaitingQueue` before us
* // have already completed.
*
* _captureMedia = new MediaCapture();
* await _captureMedia.InitializeAsync(captureInitSettings);
* // We can think of `StartRecording` as being atomic which
* // means we don't have to worry about anything we care about
* // changing by the time we get here. For example, `_captureMedia`
* // is guaranteed to be non-null by design.
* await _captureMedia.StartRecordToStreamAsync(...);
* });
* }
*
* public async Task StopRecording()
* {
* _awaitingQueue.RunOrDispatch(() =>
* {
* // This code won't run until all of the other Tasks
* // that were added to the `_awaitingQueue` before us
* // have already completed. This means this code can't
* // run while `StartRecording` is in the middle of running.
*
* if (_captureMedia != null)
* {
* // Code to clean up _captureMedia...
* _captureMedia = null;
* }
* });
* }
* }
*/
using System;
using System.Collections.Generic;
using System.Reactive;
using System.Threading;
using System.Threading.Tasks;
namespace Org.PGSQLite.SQLitePlugin
{
///
/// Serializes the work of all Tasks that are added to its queue. Awaits the
/// Task returned by the current work item before moving on to the next work
/// item.
///
///
/// This class is not thread-safe. All methods should be called from the same thread
/// or LimitedConcurrencyActionQueue. `await` must cause the continuation to run on
/// the same thread or LimitedConcurrencyActionQueue.
///
/// The type of value yielded by each Task in the queue.
public class AwaitingQueue
{
private const string _tag = nameof(AwaitingQueue);
private class WorkItemInfo
{
public readonly Func> WorkItem;
public readonly TaskCompletionSource CompletionSource;
public readonly CancellationToken CancellationToken;
public WorkItemInfo(Func> workItem, TaskCompletionSource completionSource, CancellationToken cancellationToken)
{
WorkItem = workItem;
CompletionSource = completionSource;
CancellationToken = cancellationToken;
}
}
private bool _running = false;
private readonly Queue _workQueue = new Queue();
private async void StartWorkLoopIfNeeded()
{
if (_running)
{
return;
}
try
{
_running = true;
while (_workQueue.Count > 0)
{
var workItemInfo = _workQueue.Dequeue();
if (workItemInfo.CancellationToken.IsCancellationRequested)
{
workItemInfo.CompletionSource.SetCanceled();
}
else
{
//RnLog.Info($"UI AwaitingQueue: Start {currentName}");
try
{
var result = await workItemInfo.WorkItem();
workItemInfo.CompletionSource.SetResult(result);
}
catch (Exception ex)
{
workItemInfo.CompletionSource.SetException(ex);
}
//RnLog.Info($"UI AwaitingQueue: End {currentName}");
}
}
_running = false; // Ensure _running is updated before firing the event
QueueEmptied?.Invoke(this, null);
}
finally
{
// Before exiting this method, ensure _running is updated
_running = false;
}
}
///
/// Adds `workItem` to the queue. If the queue is currently empty and not
/// executing any work items, executes `workItem` immediately and synchronously.
///
/// The work item to add to the queue.
///
/// A Task which completes when `workItem` finishes executing. The returned
/// Task resolves to the result or exception from `workItem`.
///
public Task RunOrQueue(Func> workItem)
{
return RunOrQueue(workItem, CancellationToken.None);
}
///
/// Adds `workItem` to the queue. If the queue is currently empty and not
/// executing any work items, executes `workItem` immediately and synchronously.
///
/// The work item to add to the queue.
///
/// The cancellation token associated with the work item. The work item will
/// be skipped if the cancellation token is canceled before the work item begins.
///
///
/// A Task which completes when `workItem` finishes executing. The returned
/// Task resolves to the result or exception from `workItem`. If the
/// cancellation token is canceled before `workItem` begins, Task is canceled.
///
public Task RunOrQueue(Func> workItem, CancellationToken cancellationToken)
{
//RnLog.Info($"UI AwaitingQueue: Add {name}");
TaskCompletionSource completionSource = new TaskCompletionSource();
_workQueue.Enqueue(new WorkItemInfo(workItem, completionSource, cancellationToken));
StartWorkLoopIfNeeded();
return completionSource.Task;
}
///
/// Indicates that the AwaitingQueue has finished executing all of its
/// currently scheduled work items.
///
///
/// Fires on the thread or LimitedConcurrencyActionQueue of the code that
/// has been conusming the AwaitingQueue.
///
public event EventHandler QueueEmptied;
}
///
/// Serializes the work of all Tasks that are added to its queue. Awaits the
/// Task returned by the current work item before moving on to the next work
/// item.
///
///
/// This class is not thread-safe. All methods should be called from the same thread
/// or LimitedConcurrencyActionQueue. `await` must cause the continuation to run on
/// the same thread or LimitedConcurrencyActionQueue.
///
public class AwaitingQueue
{
private AwaitingQueue _awaitingQueue = new AwaitingQueue();
///
/// Adds `workItem` to the queue. If the queue is currently empty and not
/// executing any work items, executes `workItem` immediately and synchronously.
///
/// The work item to add to the queue.
///
/// A Task which completes when `workItem` finishes executing. The returned
/// Task throws any exceptions that `workItem` may have thrown.
///
public Task RunOrQueue(Func workItem)
{
return RunOrQueue(workItem, CancellationToken.None);
}
///
/// Adds `workItem` to the queue. If the queue is currently empty and not
/// executing any work items, executes `workItem` immediately and synchronously.
///
/// The work item to add to the queue.
///
/// The cancellation token associated with the work item. The work item will
/// be skipped if the cancellation token is canceled before the work item begins.
///
///
/// A Task which completes when `workItem` finishes executing. The returned
/// Task throws any exceptions that `workItem` may have thrown. If the
/// cancellation token is canceled before `workItem` begins, Task is canceled.
///
public Task RunOrQueue(Func workItem, CancellationToken cancellationToken)
{
return _awaitingQueue.RunOrQueue(async () =>
{
await workItem();
return Unit.Default;
}, cancellationToken);
}
///
/// Indicates that the AwaitingQueue has finished executing all of its
/// currently scheduled work items.
///
///
/// Fires on the thread or LimitedConcurrencyActionQueue of the code that
/// has been conusming the AwaitingQueue.
///
public event EventHandler QueueEmptied
{
add
{
_awaitingQueue.QueueEmptied += value;
}
remove
{
_awaitingQueue.QueueEmptied -= value;
}
}
}
}