//---------------------------------------------------------------------------- // Copyright (C) 2013-2015 Fabrice HARROUET (ENIB) // // Permission to use, copy, modify, distribute and sell this software // and its documentation for any purpose is hereby granted without fee, // provided that the above copyright notice appear in all copies and // that both that copyright notice and this permission notice appear // in supporting documentation. // The author makes no representations about the suitability of this // software for any purpose. // It is provided "as is" without express or implied warranty. //---------------------------------------------------------------------------- using System; using System.Threading; namespace ParallelRunner { public abstract class Runner : IDisposable { public static Runner create(int threadCount, bool spinning) { return spinning ? (Runner)new SpinningRunner(threadCount) : (Runner)new LockedRunner(threadCount); } public int ThreadCount { get { return _threadCount; } } public delegate void Task(int threadIndex); public void run(Task task) { _task=task; // make this task available to threads _startNewStep(); _task(0); // current thread participates as thread 0 _waitForStepEnd(); } protected Runner(int threadCount) { _threadCount=threadCount; _threads=new Thread[_threadCount-1]; // current thread will participate _mustQuit=false; _currentStep=0; _inCurrentStep=0; for(int i=0;i<_threads.Length;++i) { _threads[i]=new Thread(_runThread); _threads[i].IsBackground=true; _threads[i].Start(); } } protected void _runThread() { int threadIndex=Array.IndexOf(_threads,Thread.CurrentThread)+1; int localStep=0; while(_waitForNewStepOrTermination(ref localStep)) { _task(threadIndex); _signalStepEnd(); } } // MSDN Basic Dispose Pattern public void Dispose() { Dispose(true); GC.SuppressFinalize(this); } protected virtual void Dispose(bool disposing) { if(disposing&&!_mustQuit) { _signalTermination(); for(int i=0;i<_threads.Length;++i) { _threads[i].Join(); _threads[i]=null; } _task=null; } } protected abstract void _startNewStep(); protected abstract void _waitForStepEnd(); protected abstract bool // go on _waitForNewStepOrTermination(ref int localStep); protected abstract void _signalStepEnd(); protected abstract void _signalTermination(); protected int _threadCount; protected Thread[] _threads; protected volatile bool _mustQuit; protected volatile int _currentStep; protected volatile int _inCurrentStep; protected volatile Task _task; } public class LockedRunner : Runner { public LockedRunner(int threadCount) : base(threadCount) { } protected override void _startNewStep() { lock(this) { _inCurrentStep=_threadCount-1; // current thread will participate ++_currentStep; Monitor.PulseAll(this); } } protected override void _waitForStepEnd() { lock(this) { while(_inCurrentStep!=0) { Monitor.Wait(this); } } } protected override bool // go on _waitForNewStepOrTermination(ref int localStep) { lock(this) { while((_currentStep==localStep)&&!_mustQuit) { Monitor.Wait(this); } if(_mustQuit) return false; localStep=_currentStep; } return true; } protected override void _signalStepEnd() { lock(this) { --_inCurrentStep; Monitor.PulseAll(this); } } protected override void _signalTermination() { lock(this) { _mustQuit=true; Monitor.PulseAll(this); } } } public class SpinningRunner : Runner { public SpinningRunner(int threadCount) : base(threadCount) { } protected override void _startNewStep() { _inCurrentStep=_threadCount-1; // current thread will participate ++_currentStep; } protected override void _waitForStepEnd() { while(_inCurrentStep!=0); } protected override bool // go on _waitForNewStepOrTermination(ref int localStep) { while((_currentStep==localStep)&&!_mustQuit); if(_mustQuit) return false; localStep=_currentStep; return true; } protected override void _signalStepEnd() { // "A volatile field references will not be treated as volatile." #pragma warning disable 0420 Interlocked.Decrement(ref _inCurrentStep); #pragma warning restore 0420 } protected override void _signalTermination() { _mustQuit=true; } } } // namespace ParallelRunner //----------------------------------------------------------------------------