SystemFiberScheduler.cs
Go to the documentation of this file.
1 /*
2 
3 Author: Aaron Oneal, http://aarononeal.info
4 
5 Copyright (c) 2012 Spicy Pixel, http://spicypixel.com
6 
7 Permission is hereby granted, free of charge, to any person obtaining
8 a copy of this software and associated documentation files (the
9 "Software"), to deal in the Software without restriction, including
10 without limitation the rights to use, copy, modify, merge, publish,
11 distribute, sublicense, and/or sell copies of the Software, and to
12 permit persons to whom the Software is furnished to do so, subject to
13 the following conditions:
14 
15 The above copyright notice and this permission notice shall be
16 included in all copies or substantial portions of the Software.
17 
18 THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
19 EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
20 MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
21 NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
22 LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
23 OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
24 WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
25 
26 */
27 using System;
28 using System.Collections;
31 using System.Threading;
32 
33 namespace SpicyPixel.Threading
34 {
47  {
54  public static SystemFiberScheduler StartNew ()
55  {
56  return StartNew (null, CancellationToken.None, 0f);
57  }
58 
71  public static SystemFiberScheduler StartNew (CancellationToken token, float updatesPerSecond = 0f)
72  {
73  return StartNew (null, token, updatesPerSecond);
74  }
75 
85  public static SystemFiberScheduler StartNew (Fiber fiber)
86  {
87  return StartNew (fiber, CancellationToken.None, 0f);
88  }
89 
105  public static SystemFiberScheduler StartNew (Fiber fiber, CancellationToken token, float updatesPerSecond = 0f)
106  {
107  SystemFiberScheduler backgroundScheduler = null;
108 
109  // Setup a thread to run the scheduler
110  var wait = new ManualResetEvent (false);
111  var thread = new Thread (() => {
112  backgroundScheduler = (SystemFiberScheduler)FiberScheduler.Current;
113  wait.Set ();
114  FiberScheduler.Current.Run (fiber, token, updatesPerSecond);
115  });
116  thread.Start ();
117  wait.WaitOne ();
118 
119  return backgroundScheduler;
120  }
121 
125  private const int MaxStackDepth = 10;
126 
131  [ThreadStatic]
132  private static int stackDepthQueueFiber = 0;
133 
137  private ConcurrentQueue<Fiber> executingFibers = new ConcurrentQueue<Fiber> ();
138 
143 
144  // A future queue may include waitingFibers (e.g. waiting on a signal or timeout)
145 
149  private float currentTime;
150 
154  private ManualResetEvent runWaitHandle = new ManualResetEvent (true);
155 
159  private ManualResetEvent disposeWaitHandle = new ManualResetEvent (false);
160 
165  private AutoResetEvent schedulerEventWaitHandle = new AutoResetEvent (false);
166 
176  protected ManualResetEvent RunWaitHandle {
177  get { return runWaitHandle; }
178  }
179 
189  protected WaitHandle DisposeWaitHandle {
190  get { return disposeWaitHandle; }
191  }
192 
210  protected WaitHandle SchedulerEventWaitHandle {
211  get { return schedulerEventWaitHandle; }
212  }
213 
223  protected int ExecutingFiberCount {
224  get { return executingFibers.Count; }
225  }
226 
236  protected int SleepingFiberCount {
237  get { return sleepingFibers.Count; }
238  }
239 
244  public bool IsRunning {
245  get; private set;
246  }
247 
252  {
253  IsRunning = false;
254  }
255 
266  protected sealed override void QueueFiber (Fiber fiber)
267  {
268  //if (!IsRunning) {
269  // throw new InvalidOperationException ("Cannot queue a fiber on a non-running scheduler");
270  //}
271 
272  // Queueing can happen from completion callbacks
273  // which may happen once the fiber has already
274  // executed and changed state. It would be fine
275  // if the queue did happen because non-running
276  // fibers are skipped, but it's better to
277  // shortcut here.
278  if (fiber.Status != FiberStatus.WaitingToRun && fiber.Status != FiberStatus.Running)
279  return;
280 
281  // Entering queue fiber where recursion might matter
282  Interlocked.Increment (ref stackDepthQueueFiber);
283 
284  try {
285  // Execute immediately to inline as much as possible
286  //
287  // Note: Some applications may want to always queue to control
288  // performance more strictly by the run loop.
289  if (AllowInlining && SchedulerThread == Thread.CurrentThread && stackDepthQueueFiber < MaxStackDepth) {
290  ExecuteFiberInternal (fiber);
291  return;
292  } else {
293  QueueFiberForExecution (fiber);
294  return;
295  }
296  } finally {
297  // Exiting queue fiber
298  Interlocked.Decrement (ref stackDepthQueueFiber);
299  }
300  }
301 
308  private void QueueFiberForExecution (Fiber fiber)
309  {
310  executingFibers.Enqueue (fiber);
311 
312  // Queueing a new execution fiber needs to trigger re-evaluation of the
313  // next update time
314  schedulerEventWaitHandle.Set ();
315  }
316 
326  private void QueueFiberForSleep (Fiber fiber, float timeToWake)
327  {
328  var tuple = new Tuple<Fiber, float> (fiber, timeToWake);
329  sleepingFibers.Enqueue (tuple);
330 
331  // Fibers can only be queued for sleep when they return
332  // a yield instruction. This can only happen when executing
333  // on the main thread and therefore we will never be in
334  // a wait loop with a need to signal the scheduler event handle.
335  }
336 
350  protected void Update (float time)
351  {
352  currentTime = time;
353 
354  UpdateExecutingFibers ();
355  UpdateSleepingFibers ();
356  }
357 
358  private void UpdateExecutingFibers ()
359  {
361  // Run executing fibers
362 
363  // Add null to know when to stop
364  executingFibers.Enqueue (null);
365 
366  Fiber item;
367  while (executingFibers.TryDequeue (out item)) {
368  // If we reached the marker for this update then stop
369  if (item == null)
370  break;
371 
372  // Skip completed items
373  if (item.IsCompleted)
374  continue;
375 
376  ExecuteFiberInternal (item);
377  }
378  }
379 
380  private void UpdateSleepingFibers ()
381  {
383  // Wake sleeping fibers that it's time for
384 
385  // Add null to know when to stop
386  sleepingFibers.Enqueue (null);
387 
388  Tuple<Fiber, float> item;
389  while (sleepingFibers.TryDequeue (out item)) {
390  // If we reached the marker for this update then stop
391  if (item == null)
392  break;
393 
394  Fiber fiber = item.Item1;
395 
396  // Skip completed items
397  if (fiber.IsCompleted)
398  continue;
399 
400  // Run if time or cancelled otherwise re-enqueue
401  if (item.Item2 <= currentTime || fiber.cancelToken.IsCancellationRequested)
402  ExecuteFiberInternal (item.Item1);
403  else
404  sleepingFibers.Enqueue (item);
405  }
406  }
407 
424  protected bool GetNextFiberWakeTime (out float fiberWakeTime)
425  {
426  fiberWakeTime = -1f;
427 
428  // Nothig to do if there are no sleeping fibers
429  if (sleepingFibers.Count == 0)
430  return false;
431 
432  // Find the earliest wake time
433  foreach (var fiber in sleepingFibers) {
434  if (fiber.Item1.cancelToken.IsCancellationRequested) {
435  fiberWakeTime = 0f; // wake immediately
436  break;
437  }
438 
439  if (fiberWakeTime == -1f || fiber.Item2 < fiberWakeTime)
440  fiberWakeTime = fiber.Item2;
441  }
442 
443  return true;
444  }
445 
446  private IEnumerator CancelWhenComplete (Fiber waitOnFiber, CancellationTokenSource cancelSource)
447  {
448  yield return waitOnFiber;
449  cancelSource.Cancel ();
450  }
451 
472  public override void Run (Fiber fiber, CancellationToken token, float updatesPerSecond)
473  {
474  long frequencyTicks = (long)(updatesPerSecond * (float)TimeSpan.TicksPerSecond); // min time between updates (duration)
475  long startTicks = 0; // start of update time (marker)
476  long endTicks = 0; // end of update time (marker)
477  long sleepTicks; // time to sleep (duration)
478  long wakeTicks; // ticks before wake (duration)
479  int sleepMilliseconds; // ms to sleep (duration)
480  int wakeMilliseconds; // ms before wake (duration)
481  float wakeMarkerInSeconds; // time of wake in seconds (marker)
482  var mainFiberCompleteCancelSource = new CancellationTokenSource ();
483 
484  if (isDisposed)
485  throw new ObjectDisposedException (GetType ().FullName);
486 
487  // Run is not re-entrant, make sure we're not running
488  if (!runWaitHandle.WaitOne (0))
489  throw new InvalidOperationException ("Run is already executing and is not re-entrant");
490 
491  // Verify arguments
492  if (updatesPerSecond < 0f)
493  throw new ArgumentOutOfRangeException ("updatesPerSecond", "The updatesPerSecond must be >= 0");
494 
495  // Get a base time for better precision
496  long baseTicks = DateTime.Now.Ticks;
497 
498  // Build wait list to terminate execution
499  var waitHandleList = new List<WaitHandle> (4);
500  waitHandleList.Add (schedulerEventWaitHandle);
501  waitHandleList.Add (disposeWaitHandle);
502 
503  if (token.CanBeCanceled)
504  waitHandleList.Add (token.WaitHandle);
505 
506  try {
507  IsRunning = true;
508 
509  if (fiber != null) {
510  // Add the main fiber to the wait list so when it completes
511  // the wait handle falls through.
512  waitHandleList.Add (mainFiberCompleteCancelSource.Token.WaitHandle);
513 
514  // Start the main fiber if it isn't running yet
515  if (fiber.Status == FiberStatus.Created)
516  fiber.Start (this);
517 
518  // Start a fiber that waits on the main fiber to complete.
519  // When it does, it raises a cancellation.
520  Fiber.Factory.StartNew (CancelWhenComplete (fiber, mainFiberCompleteCancelSource), this);
521  }
522 
523  WaitHandle [] waitHandles = waitHandleList.ToArray ();
524 
525  // FIXME: Unclear why below was included as the handle
526  // seems to be needed to wake sleeping fibers when abort is called.
527  //waitHandleList.Remove(schedulerEventWaitHandle);
528 
529  WaitHandle [] sleepWaitHandles = waitHandleList.ToArray ();
530 
531  runWaitHandle.Reset ();
532 
533  while (true) {
534  // Throw if faulted
535  if (fiber != null && fiber.IsFaulted) {
536  throw fiber.Exception;
537  }
538 
539  // Stop executing if cancelled
540  if ((token.CanBeCanceled && token.IsCancellationRequested) || mainFiberCompleteCancelSource.IsCancellationRequested || disposeWaitHandle.WaitOne (0))
541  return;
542 
543  // Snap current time
544  startTicks = DateTime.Now.Ticks;
545 
546  // Update using this time marker (and convert ticks to s)
547  Update ((float)((double)(startTicks - baseTicks) / (double)TimeSpan.TicksPerSecond));
548 
549  // Only sleep to next frequency cycle if one was specified
550  if (updatesPerSecond > 0f) {
551  // Snap end time
552  endTicks = DateTime.Now.Ticks;
553 
554  // Sleep at least until next update
555  sleepTicks = frequencyTicks - (endTicks - startTicks);
556  if (sleepTicks > 0) {
557  sleepMilliseconds = (int)(sleepTicks / TimeSpan.TicksPerMillisecond);
558 
559  WaitHandle.WaitAny (sleepWaitHandles, sleepMilliseconds);
560 
561  // Stop executing if cancelled
562  if ((token.CanBeCanceled && token.IsCancellationRequested) || mainFiberCompleteCancelSource.IsCancellationRequested || disposeWaitHandle.WaitOne (0))
563  return;
564  }
565  }
566 
567  // Now keep sleeping until it's time to update
568  while (ExecutingFiberCount == 0 && (fiber == null || (fiber != null && !fiber.IsFaulted))) {
569  // Assume we wait forever (e.g. until a signal)
570  wakeMilliseconds = -1;
571 
572  // If there are sleeping fibers, then set a wake time
573  if (GetNextFiberWakeTime (out wakeMarkerInSeconds)) {
574  wakeTicks = baseTicks;
575  wakeTicks += (long)((double)wakeMarkerInSeconds * (double)TimeSpan.TicksPerSecond);
576  wakeTicks -= DateTime.Now.Ticks;
577 
578  // If there was a waiting fiber and it's already past time to awake then stop waiting
579  if (wakeTicks <= 0)
580  break;
581 
582  wakeMilliseconds = (int)(wakeTicks / TimeSpan.TicksPerMillisecond);
583  }
584 
585  // FIXME: Sleeping tasks can be aborted and this should wake the scheduler.
586  // For some reason the schedulerEventWaitHandle which would do this was not
587  // in the wait list and removed above. Trying with it in the list again.
588 
589  // There was no waiting fiber and we will wait for another signal,
590  // or there was a waiting fiber and we wait until that time.
591  WaitHandle.WaitAny (waitHandles, wakeMilliseconds);
592 
593  // Stop executing if cancelled
594  if ((token.CanBeCanceled && token.IsCancellationRequested) || mainFiberCompleteCancelSource.IsCancellationRequested || disposeWaitHandle.WaitOne (0))
595  return;
596  }
597  }
598  } finally {
599  // Clear queues
600  Fiber deqeueFiber;
601  while (executingFibers.TryDequeue (out deqeueFiber)) { }
602 
603  Tuple<Fiber, float> dequeueSleepingFiber;
604  while (sleepingFibers.TryDequeue (out dequeueSleepingFiber)) { }
605 
606  // Reset time
607  currentTime = 0f;
608 
609  // Not running
610  IsRunning = false;
611 
612  // Set for dispose
613  runWaitHandle.Set ();
614  }
615  }
616 
629  private void ExecuteFiberInternal (Fiber fiber)
630  {
631  Fiber currentFiber = fiber;
632  Fiber nextFiber;
633 
634  while (currentFiber != null) {
635  // Execute the fiber
636  var fiberInstruction = ExecuteFiber (currentFiber);
637 
638  // Nothing more to do if stopped
639  if (currentFiber.IsCompleted)
640  return;
641 
642  // Handle special fiber instructions or queue for another update
643  bool fiberQueued = false;
644  OnFiberInstruction (currentFiber, fiberInstruction, out fiberQueued, out nextFiber);
645 
646  // If the fiber is still running but wasn't added to a special queue by
647  // an instruction then it needs to be added to the execution queue
648  // to run in the next Update().
649  //
650  // Check alive state again in case an instruction resulted
651  // in an inline execution and altered state.
652  if (!fiberQueued && !currentFiber.IsCompleted) {
653  // Send the fiber to the queue and don't execute inline
654  // since we're done this update
655  QueueFiberForExecution (currentFiber);
656  }
657 
658  // Switch to the next fiber if an instruction says to do so
659  currentFiber = nextFiber;
660  }
661  }
662 
663  private void OnFiberInstruction (Fiber fiber, FiberInstruction instruction, out bool fiberQueued, out Fiber nextFiber)
664  {
665  fiberQueued = false;
666  nextFiber = null;
667 
668  YieldUntilComplete yieldUntilComplete = instruction as YieldUntilComplete;
669  if (yieldUntilComplete != null) {
670  // The additional complexity below is because this was going
671  // to handle waiting for completions for fibers from other threads.
672  // Currently fibers must belong to the same thread and this is enforced
673  // by the instructions themselves for now.
674 
675  int completeOnce = 0;
676 
677  // FIXME: If we support multiple schedulers in the future
678  // this callback could occur from another thread and
679  // therefore after Dispose(). Would probably need a lock.
680 
681  yieldUntilComplete.Fiber.ContinueWith ((f) => {
682  var originalCompleteOnce = Interlocked.CompareExchange (ref completeOnce, 1, 0);
683  if (originalCompleteOnce != 0)
684  return;
685 
686  QueueFiber (fiber); // optionally execute inline when the completion occurs
687 
688  // If f.Status != RanToCompletion then this fiber needs to transition to the same state
689  // or faults won't propegate
690  //if (f.Status != FiberStatus.RanToCompletion) {
691  // if (f.IsCanceled) {
692  // if (f.CancellationToken == fiber.CancellationToken) {
693  // fiber.CancelContinuation ();
694  // } else {
695  // fiber.FaultContinuation (new System.Threading.OperationCanceledException ());
696  // }
697  // } else if (f.IsFaulted) {
698  // fiber.FaultContinuation (f.Exception);
699  // }
700  // RemoveFiberFromQueues (fiber);
701  //} else {
702  // QueueFiber (fiber); // optionally execute inline when the completion occurs
703  //}
704  });
705 
706  fiberQueued = true;
707  return;
708  }
709 
710  YieldForSeconds yieldForSeconds = instruction as YieldForSeconds;
711  if (yieldForSeconds != null) {
712  QueueFiberForSleep (fiber, currentTime + yieldForSeconds.Seconds);
713  fiberQueued = true;
714  return;
715  }
716 
717  YieldToFiber yieldToFiber = instruction as YieldToFiber;
718  if (yieldToFiber != null) {
719  RemoveFiberFromQueues (yieldToFiber.Fiber);
720  nextFiber = yieldToFiber.Fiber;
721  fiberQueued = false;
722  return;
723  }
724  }
725 
736  private void RemoveFiberFromQueues (Fiber fiber)
737  {
738  bool found = false;
739 
740  if (executingFibers.Count > 0) {
741  Fiber markerItem = new Fiber (() => { });
742  executingFibers.Enqueue (markerItem);
743 
744  Fiber item;
745  while (executingFibers.TryDequeue (out item)) {
746  if (item == markerItem)
747  break;
748 
749  if (item == fiber)
750  found = true;
751  else
752  executingFibers.Enqueue (item);
753  }
754 
755  if (found)
756  return;
757  }
758 
759  if (sleepingFibers.Count > 0) {
760  Tuple<Fiber, float> markerTuple = new Tuple<Fiber, float> (null, 0f);
761  sleepingFibers.Enqueue (markerTuple);
762 
763  Tuple<Fiber, float> itemTuple;
764  while (sleepingFibers.TryDequeue (out itemTuple)) {
765  if (itemTuple == markerTuple)
766  break;
767 
768  if (itemTuple != null && itemTuple.Item1 == fiber)
769  found = true;
770  else
771  sleepingFibers.Enqueue (itemTuple);
772  }
773  }
774  }
775 
776  #region IDisposable implementation
777 
781  private bool isDisposed = false;
782 
793  protected override void Dispose (bool disposing)
794  {
795  // Do nothing if already called
796  if (isDisposed)
797  return;
798 
799  if (disposing) {
800  // Free other state (managed objects).
801  disposeWaitHandle.Set ();
802  runWaitHandle.WaitOne ();
803  }
804 
805  // Free your own state (unmanaged objects).
806  // Set large fields to null.
807 
808  // Mark disposed
809  isDisposed = true;
810 
811  base.Dispose (disposing);
812  }
813  #endregion
814  }
815 }
816 
Schedules fibers for execution.
Fiber StartNew(IEnumerator coroutine)
Start executing a new fiber using the default scheduler on the thread.
sealed override void QueueFiber(Fiber fiber)
Queues the fiber for execution on the scheduler.
This class is the system default implementation of a FiberScheduler and is capable of scheduling and ...
void Run(Fiber fiber)
Run the blocking scheduler loop and perform the specified number of updates per second.
static FiberFactory Factory
Gets the default factory for creating fibers.
Definition: Fiber.cs:119
A Fiber is a lightweight means of scheduling work that enables multiple units of processing to execut...
static SystemFiberScheduler StartNew(Fiber fiber, CancellationToken token, float updatesPerSecond=0f)
Starts a new thread, creates a scheduler, starts it running, and returns it to the calling thread.
override void Dispose(bool disposing)
Dispose the scheduler.
void Start()
Start executing the fiber using the default scheduler on the thread.
Definition: Fiber.cs:454
Thread SchedulerThread
Gets the thread the scheduler is running on.
static FiberScheduler Current
Gets the default fiber scheduler for the thread.
int ExecutingFiberCount
Gets the executing fiber count.
static SystemFiberScheduler StartNew(Fiber fiber)
Starts a new thread, creates a scheduler, starts it running, and returns it to the calling thread.
override void Run(Fiber fiber, CancellationToken token, float updatesPerSecond)
Run the blocking scheduler loop and perform the specified number of updates per second.
WaitHandle SchedulerEventWaitHandle
Gets a wait handle which can be used to wait for a scheduler event to occur.
FiberStatus Status
Gets or sets the state of the fiber.
Definition: Fiber.cs:237
Fiber(IEnumerator coroutine)
Initializes a new instance of the SpicyPixel.Threading.Fiber class.
Definition: Fiber.cs:323
FiberInstruction ExecuteFiber(Fiber fiber)
Executes the fiber until it ends or yields.
void Update(float time)
Update the scheduler which causes all queued tasks to run for a cycle.
bool IsFaulted
Gets a value indicating whether this instance is faulted.
Definition: Fiber.cs:216
int SleepingFiberCount
Gets the sleeping fiber count.
SystemFiberScheduler()
Initializes a new instance of the SpicyPixel.Threading.FiberScheduler class.
bool AllowInlining
Gets or sets a value indicating whether this SpicyPixel.Threading.FiberScheduler allows inlining.
FiberStatus
Represents the current state of a fiber.
Definition: FiberStatus.cs:34
ManualResetEvent RunWaitHandle
Gets the run wait handle.
bool IsCompleted
Gets a value indicating whether this instance is completed.
Definition: Fiber.cs:203
bool GetNextFiberWakeTime(out float fiberWakeTime)
Gets the time of the first fiber wake up.
WaitHandle DisposeWaitHandle
Gets the dispose wait handle.
static SystemFiberScheduler StartNew(CancellationToken token, float updatesPerSecond=0f)
Starts a new thread, creates a scheduler, starts it running, and returns it to the calling thread.
Exception Exception
Gets the exception that led to the Faulted state.
Definition: Fiber.cs:226
static SystemFiberScheduler StartNew()
Starts a new thread, creates a scheduler, starts it running, and returns it to the calling thread.