Parallel.cs 27 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762
  1. // Parallel.cs
  2. //
  3. // Copyright (c) 2008 Jérémie "Garuma" Laval
  4. //
  5. // Permission is hereby granted, free of charge, to any person obtaining a copy
  6. // of this software and associated documentation files (the "Software"), to deal
  7. // in the Software without restriction, including without limitation the rights
  8. // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
  9. // copies of the Software, and to permit persons to whom the Software is
  10. // furnished to do so, subject to the following conditions:
  11. //
  12. // The above copyright notice and this permission notice shall be included in
  13. // all copies or substantial portions of the Software.
  14. //
  15. // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
  16. // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
  17. // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
  18. // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
  19. // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
  20. // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
  21. // THE SOFTWARE.
  22. //
  23. //
  24. #if NET_4_0 || MOBILE
  25. using System;
  26. using System.Collections.Generic;
  27. using System.Collections.Concurrent;
  28. using System.Threading;
  29. using System.Runtime.InteropServices;
  30. namespace System.Threading.Tasks
  31. {
  32. public static class Parallel
  33. {
  34. #if MOONLIGHT || MOBILE
  35. static readonly bool sixtyfour = IntPtr.Size == 8;
  36. #else
  37. static readonly bool sixtyfour = Environment.Is64BitProcess;
  38. #endif
  39. internal static int GetBestWorkerNumber ()
  40. {
  41. return GetBestWorkerNumber (TaskScheduler.Current);
  42. }
  43. internal static int GetBestWorkerNumber (TaskScheduler scheduler)
  44. {
  45. return scheduler.MaximumConcurrencyLevel;
  46. }
  47. static int GetBestWorkerNumber (int from, int to, ParallelOptions options, out int step)
  48. {
  49. int num = GetBestWorkerNumber(options.TaskScheduler);
  50. if (options != null && options.MaxDegreeOfParallelism != -1)
  51. num = options.MaxDegreeOfParallelism;
  52. // Integer range that each task process
  53. if ((step = (to - from) / num) < 5) {
  54. step = 5;
  55. num = (to - from) / 5;
  56. if (num < 1)
  57. num = 1;
  58. }
  59. return num;
  60. }
  61. static void HandleExceptions (IEnumerable<Task> tasks)
  62. {
  63. HandleExceptions (tasks, null);
  64. }
  65. static void HandleExceptions (IEnumerable<Task> tasks, ParallelLoopState.ExternalInfos infos)
  66. {
  67. List<Exception> exs = new List<Exception> ();
  68. foreach (Task t in tasks) {
  69. if (t.Exception != null)
  70. exs.Add (t.Exception);
  71. }
  72. if (exs.Count > 0) {
  73. if (infos != null)
  74. infos.IsExceptional = true;
  75. throw new AggregateException (exs);
  76. }
  77. }
  78. static void InitTasks (Task[] tasks, int count, Action action, ParallelOptions options)
  79. {
  80. TaskCreationOptions creation = TaskCreationOptions.LongRunning | TaskCreationOptions.AttachedToParent;
  81. for (int i = 0; i < count; i++) {
  82. if (options == null)
  83. tasks [i] = Task.Factory.StartNew (action, creation);
  84. else
  85. tasks [i] = Task.Factory.StartNew (action, options.CancellationToken, creation, options.TaskScheduler);
  86. }
  87. }
  88. #region For
  89. public static ParallelLoopResult For (int fromInclusive, int toExclusive, Action<int> body)
  90. {
  91. return For (fromInclusive, toExclusive, ParallelOptions.Default, body);
  92. }
  93. public static ParallelLoopResult For (int fromInclusive, int toExclusive, Action<int, ParallelLoopState> body)
  94. {
  95. return For (fromInclusive, toExclusive, ParallelOptions.Default, body);
  96. }
  97. public static ParallelLoopResult For (int fromInclusive, int toExclusive, ParallelOptions parallelOptions, Action<int> body)
  98. {
  99. return For (fromInclusive, toExclusive, parallelOptions, (index, state) => body (index));
  100. }
  101. public static ParallelLoopResult For (int fromInclusive, int toExclusive, ParallelOptions parallelOptions, Action<int, ParallelLoopState> body)
  102. {
  103. return For<object> (fromInclusive, toExclusive, parallelOptions, () => null, (i, s, l) => { body (i, s); return null; }, _ => {});
  104. }
  105. public static ParallelLoopResult For<TLocal> (int fromInclusive,
  106. int toExclusive,
  107. Func<TLocal> localInit,
  108. Func<int, ParallelLoopState, TLocal, TLocal> body,
  109. Action<TLocal> localFinally)
  110. {
  111. return For<TLocal> (fromInclusive, toExclusive, ParallelOptions.Default, localInit, body, localFinally);
  112. }
  113. public static ParallelLoopResult For<TLocal> (int fromInclusive,
  114. int toExclusive,
  115. ParallelOptions parallelOptions,
  116. Func<TLocal> localInit,
  117. Func<int, ParallelLoopState, TLocal, TLocal> body,
  118. Action<TLocal> localFinally)
  119. {
  120. if (body == null)
  121. throw new ArgumentNullException ("body");
  122. if (localInit == null)
  123. throw new ArgumentNullException ("localInit");
  124. if (localFinally == null)
  125. throw new ArgumentNullException ("localFinally");
  126. if (parallelOptions == null)
  127. throw new ArgumentNullException ("options");
  128. if (fromInclusive >= toExclusive)
  129. return new ParallelLoopResult (null, true);
  130. // Number of task toExclusive be launched (normally == Env.ProcessorCount)
  131. int step;
  132. int num = GetBestWorkerNumber (fromInclusive, toExclusive, parallelOptions, out step);
  133. Task[] tasks = new Task [num];
  134. StealRange[] ranges = new StealRange[num];
  135. for (int i = 0; i < num; i++)
  136. ranges[i] = new StealRange (fromInclusive, i, step);
  137. ParallelLoopState.ExternalInfos infos = new ParallelLoopState.ExternalInfos ();
  138. int currentIndex = -1;
  139. Action workerMethod = delegate {
  140. int localWorker = Interlocked.Increment (ref currentIndex);
  141. StealRange range = ranges[localWorker];
  142. int index = range.V64.Actual;
  143. int stopIndex = localWorker + 1 == num ? toExclusive : Math.Min (toExclusive, index + step);
  144. TLocal local = localInit ();
  145. ParallelLoopState state = new ParallelLoopState (infos);
  146. CancellationToken token = parallelOptions.CancellationToken;
  147. try {
  148. for (int i = index; i < stopIndex;) {
  149. if (infos.IsStopped)
  150. return;
  151. token.ThrowIfCancellationRequested ();
  152. if (i >= stopIndex - range.V64.Stolen)
  153. break;
  154. if (infos.LowestBreakIteration != null && infos.LowestBreakIteration > i)
  155. return;
  156. state.CurrentIteration = i;
  157. local = body (i, state, local);
  158. if (i + 1 >= stopIndex - range.V64.Stolen)
  159. break;
  160. range.V64.Actual = ++i;
  161. }
  162. // Try toExclusive steal fromInclusive our right neighbor (cyclic)
  163. int len = num + localWorker;
  164. for (int sIndex = localWorker + 1; sIndex < len; ++sIndex) {
  165. int extWorker = sIndex % num;
  166. range = ranges[extWorker];
  167. stopIndex = extWorker + 1 == num ? toExclusive : Math.Min (toExclusive, fromInclusive + (extWorker + 1) * step);
  168. int stolen = -1;
  169. do {
  170. do {
  171. long old;
  172. StealValue64 val = new StealValue64 ();
  173. old = sixtyfour ? range.V64.Value : Interlocked.CompareExchange (ref range.V64.Value, 0, 0);
  174. val.Value = old;
  175. if (val.Actual >= stopIndex - val.Stolen - 2)
  176. goto next;
  177. stolen = (val.Stolen += 1);
  178. if (Interlocked.CompareExchange (ref range.V64.Value, val.Value, old) == old)
  179. break;
  180. } while (true);
  181. stolen = stopIndex - stolen;
  182. if (stolen > range.V64.Actual)
  183. local = body (stolen, state, local);
  184. else
  185. break;
  186. } while (true);
  187. next:
  188. continue;
  189. }
  190. } finally {
  191. localFinally (local);
  192. }
  193. };
  194. InitTasks (tasks, num, workerMethod, parallelOptions);
  195. try {
  196. Task.WaitAll (tasks);
  197. } catch {
  198. HandleExceptions (tasks, infos);
  199. }
  200. return new ParallelLoopResult (infos.LowestBreakIteration, !(infos.IsStopped || infos.IsExceptional));
  201. }
  202. [StructLayout(LayoutKind.Explicit)]
  203. struct StealValue64 {
  204. [FieldOffset(0)]
  205. public long Value;
  206. [FieldOffset(0)]
  207. public int Actual;
  208. [FieldOffset(4)]
  209. public int Stolen;
  210. }
  211. class StealRange
  212. {
  213. public StealValue64 V64 = new StealValue64 ();
  214. public StealRange (int fromInclusive, int i, int step)
  215. {
  216. V64.Actual = fromInclusive + i * step;
  217. }
  218. }
  219. #endregion
  220. #region For (long)
  221. [MonoTODO]
  222. public static ParallelLoopResult For (long fromInclusive, long toExclusive, Action<long> body)
  223. {
  224. return For (fromInclusive, toExclusive, ParallelOptions.Default, body);
  225. }
  226. [MonoTODO]
  227. public static ParallelLoopResult For (long fromInclusive, long toExclusive, Action<long, ParallelLoopState> body)
  228. {
  229. return For (fromInclusive, toExclusive, ParallelOptions.Default, body);
  230. }
  231. [MonoTODO]
  232. public static ParallelLoopResult For (long fromInclusive, long toExclusive, ParallelOptions parallelOptions, Action<long> body)
  233. {
  234. return For (fromInclusive, toExclusive, parallelOptions, (index, state) => body (index));
  235. }
  236. [MonoTODO]
  237. public static ParallelLoopResult For (long fromInclusive, long toExclusive, ParallelOptions parallelOptions, Action<long, ParallelLoopState> body)
  238. {
  239. return For<object> (fromInclusive, toExclusive, parallelOptions, () => null, (i, s, l) => { body (i, s); return null; }, _ => {});
  240. }
  241. [MonoTODO]
  242. public static ParallelLoopResult For<TLocal> (long fromInclusive,
  243. long toExclusive,
  244. Func<TLocal> localInit,
  245. Func<long, ParallelLoopState, TLocal, TLocal> body,
  246. Action<TLocal> localFinally)
  247. {
  248. return For<TLocal> (fromInclusive, toExclusive, ParallelOptions.Default, localInit, body, localFinally);
  249. }
  250. [MonoTODO ("See how this can be refactored with the above For implementation")]
  251. public static ParallelLoopResult For<TLocal> (long fromInclusive,
  252. long toExclusive,
  253. ParallelOptions parallelOptions,
  254. Func<TLocal> localInit,
  255. Func<long, ParallelLoopState, TLocal, TLocal> body,
  256. Action<TLocal> localFinally)
  257. {
  258. if (body == null)
  259. throw new ArgumentNullException ("body");
  260. if (localInit == null)
  261. throw new ArgumentNullException ("localInit");
  262. if (localFinally == null)
  263. throw new ArgumentNullException ("localFinally");
  264. if (parallelOptions == null)
  265. throw new ArgumentNullException ("options");
  266. if (fromInclusive >= toExclusive)
  267. return new ParallelLoopResult (null, true);
  268. throw new NotImplementedException ();
  269. }
  270. #endregion
  271. #region Foreach
  272. static ParallelLoopResult ForEach<TSource, TLocal> (Func<int, IList<IEnumerator<TSource>>> enumerable, ParallelOptions options,
  273. Func<TLocal> init, Func<TSource, ParallelLoopState, TLocal, TLocal> action,
  274. Action<TLocal> destruct)
  275. {
  276. if (enumerable == null)
  277. throw new ArgumentNullException ("source");
  278. if (options == null)
  279. throw new ArgumentNullException ("options");
  280. if (action == null)
  281. throw new ArgumentNullException ("action");
  282. if (init == null)
  283. throw new ArgumentNullException ("init");
  284. if (destruct == null)
  285. throw new ArgumentNullException ("destruct");
  286. int num = Math.Min (GetBestWorkerNumber (),
  287. options != null && options.MaxDegreeOfParallelism != -1 ? options.MaxDegreeOfParallelism : int.MaxValue);
  288. Task[] tasks = new Task[num];
  289. ParallelLoopState.ExternalInfos infos = new ParallelLoopState.ExternalInfos ();
  290. SimpleConcurrentBag<TSource> bag = new SimpleConcurrentBag<TSource> (num);
  291. const int bagCount = 5;
  292. IList<IEnumerator<TSource>> slices = enumerable (num);
  293. int sliceIndex = -1;
  294. Action workerMethod = delegate {
  295. IEnumerator<TSource> slice = slices[Interlocked.Increment (ref sliceIndex)];
  296. TLocal local = init ();
  297. ParallelLoopState state = new ParallelLoopState (infos);
  298. int workIndex = bag.GetNextIndex ();
  299. CancellationToken token = options.CancellationToken;
  300. try {
  301. bool cont = true;
  302. TSource element;
  303. while (cont) {
  304. if (infos.IsStopped || infos.IsBroken.Value)
  305. return;
  306. token.ThrowIfCancellationRequested ();
  307. for (int i = 0; i < bagCount && (cont = slice.MoveNext ()); i++) {
  308. bag.Add (workIndex, slice.Current);
  309. }
  310. for (int i = 0; i < bagCount && bag.TryTake (workIndex, out element); i++) {
  311. if (infos.IsStopped)
  312. return;
  313. token.ThrowIfCancellationRequested ();
  314. local = action (element, state, local);
  315. }
  316. }
  317. while (bag.TrySteal (workIndex, out element)) {
  318. token.ThrowIfCancellationRequested ();
  319. local = action (element, state, local);
  320. if (infos.IsStopped || infos.IsBroken.Value)
  321. return;
  322. }
  323. } finally {
  324. destruct (local);
  325. }
  326. };
  327. InitTasks (tasks, num, workerMethod, options);
  328. try {
  329. Task.WaitAll (tasks);
  330. } catch {
  331. HandleExceptions (tasks, infos);
  332. }
  333. return new ParallelLoopResult (infos.LowestBreakIteration, !(infos.IsStopped || infos.IsExceptional));
  334. }
  335. public static ParallelLoopResult ForEach<TSource> (IEnumerable<TSource> source, Action<TSource> body)
  336. {
  337. if (source == null)
  338. throw new ArgumentNullException ("source");
  339. if (body == null)
  340. throw new ArgumentNullException ("body");
  341. return ForEach<TSource, object> (Partitioner.Create (source),
  342. ParallelOptions.Default,
  343. () => null,
  344. (e, s, l) => { body (e); return null; },
  345. _ => {});
  346. }
  347. public static ParallelLoopResult ForEach<TSource> (IEnumerable<TSource> source, Action<TSource, ParallelLoopState> body)
  348. {
  349. if (source == null)
  350. throw new ArgumentNullException ("source");
  351. if (body == null)
  352. throw new ArgumentNullException ("body");
  353. return ForEach<TSource, object> (Partitioner.Create (source),
  354. ParallelOptions.Default,
  355. () => null,
  356. (e, s, l) => { body (e, s); return null; },
  357. _ => {});
  358. }
  359. public static ParallelLoopResult ForEach<TSource> (IEnumerable<TSource> source,
  360. Action<TSource, ParallelLoopState, long> body)
  361. {
  362. if (source == null)
  363. throw new ArgumentNullException ("source");
  364. if (body == null)
  365. throw new ArgumentNullException ("body");
  366. return ForEach<TSource, object> (Partitioner.Create (source),
  367. ParallelOptions.Default,
  368. () => null,
  369. (e, s, l) => { body (e, s, -1); return null; },
  370. _ => {});
  371. }
  372. public static ParallelLoopResult ForEach<TSource> (Partitioner<TSource> source,
  373. Action<TSource, ParallelLoopState> body)
  374. {
  375. if (body == null)
  376. throw new ArgumentNullException ("body");
  377. return ForEach<TSource, object> (source,
  378. ParallelOptions.Default,
  379. () => null,
  380. (e, s, l) => { body (e, s); return null; },
  381. _ => {});
  382. }
  383. public static ParallelLoopResult ForEach<TSource> (OrderablePartitioner<TSource> source,
  384. Action<TSource, ParallelLoopState, long> body)
  385. {
  386. if (body == null)
  387. throw new ArgumentNullException ("body");
  388. return ForEach<TSource, object> (source,
  389. ParallelOptions.Default,
  390. () => null,
  391. (e, s, i, l) => { body (e, s, i); return null; },
  392. _ => {});
  393. }
  394. public static ParallelLoopResult ForEach<TSource> (Partitioner<TSource> source,
  395. Action<TSource> body)
  396. {
  397. if (body == null)
  398. throw new ArgumentNullException ("body");
  399. return ForEach<TSource, object> (source,
  400. ParallelOptions.Default,
  401. () => null,
  402. (e, s, l) => { body (e); return null; },
  403. _ => {});
  404. }
  405. public static ParallelLoopResult ForEach<TSource> (IEnumerable<TSource> source,
  406. ParallelOptions parallelOptions,
  407. Action<TSource> body)
  408. {
  409. if (source == null)
  410. throw new ArgumentNullException ("source");
  411. if (body == null)
  412. throw new ArgumentNullException ("body");
  413. return ForEach<TSource, object> (Partitioner.Create (source),
  414. parallelOptions,
  415. () => null,
  416. (e, s, l) => { body (e); return null; },
  417. _ => {});
  418. }
  419. public static ParallelLoopResult ForEach<TSource> (IEnumerable<TSource> source, ParallelOptions parallelOptions,
  420. Action<TSource, ParallelLoopState> body)
  421. {
  422. if (source == null)
  423. throw new ArgumentNullException ("source");
  424. if (body == null)
  425. throw new ArgumentNullException ("body");
  426. return ForEach<TSource, object> (Partitioner.Create (source),
  427. parallelOptions,
  428. () => null,
  429. (e, s, l) => { body (e, s); return null; },
  430. _ => {});
  431. }
  432. public static ParallelLoopResult ForEach<TSource> (IEnumerable<TSource> source, ParallelOptions parallelOptions,
  433. Action<TSource, ParallelLoopState, long> body)
  434. {
  435. if (source == null)
  436. throw new ArgumentNullException ("source");
  437. if (body == null)
  438. throw new ArgumentNullException ("body");
  439. return ForEach<TSource, object> (Partitioner.Create (source),
  440. parallelOptions,
  441. () => null,
  442. (e, s, i, l) => { body (e, s, i); return null; },
  443. _ => {});
  444. }
  445. public static ParallelLoopResult ForEach<TSource> (OrderablePartitioner<TSource> source, ParallelOptions parallelOptions,
  446. Action<TSource, ParallelLoopState, long> body)
  447. {
  448. if (body == null)
  449. throw new ArgumentNullException ("body");
  450. return ForEach<TSource, object> (source,
  451. parallelOptions,
  452. () => null,
  453. (e, s, i, l) => { body (e, s, i); return null; },
  454. _ => {});
  455. }
  456. public static ParallelLoopResult ForEach<TSource> (Partitioner<TSource> source, ParallelOptions parallelOptions,
  457. Action<TSource> body)
  458. {
  459. if (body == null)
  460. throw new ArgumentNullException ("body");
  461. return ForEach<TSource, object> (source,
  462. parallelOptions,
  463. () => null,
  464. (e, s, l) => { body (e); return null; },
  465. _ => {});
  466. }
  467. public static ParallelLoopResult ForEach<TSource> (Partitioner<TSource> source, ParallelOptions parallelOptions,
  468. Action<TSource, ParallelLoopState> body)
  469. {
  470. return ForEach<TSource, object> (source,
  471. parallelOptions,
  472. () => null,
  473. (e, s, l) => { body (e, s); return null; },
  474. _ => {});
  475. }
  476. public static ParallelLoopResult ForEach<TSource, TLocal> (IEnumerable<TSource> source, Func<TLocal> localInit,
  477. Func<TSource, ParallelLoopState, TLocal, TLocal> body,
  478. Action<TLocal> localFinally)
  479. {
  480. if (source == null)
  481. throw new ArgumentNullException ("source");
  482. return ForEach<TSource, TLocal> ((Partitioner<TSource>)Partitioner.Create (source),
  483. ParallelOptions.Default,
  484. localInit,
  485. body,
  486. localFinally);
  487. }
  488. public static ParallelLoopResult ForEach<TSource, TLocal> (IEnumerable<TSource> source, Func<TLocal> localInit,
  489. Func<TSource, ParallelLoopState, long, TLocal, TLocal> body,
  490. Action<TLocal> localFinally)
  491. {
  492. return ForEach<TSource, TLocal> (Partitioner.Create (source),
  493. ParallelOptions.Default,
  494. localInit,
  495. body,
  496. localFinally);
  497. }
  498. public static ParallelLoopResult ForEach<TSource, TLocal> (OrderablePartitioner<TSource> source, Func<TLocal> localInit,
  499. Func<TSource, ParallelLoopState, long, TLocal, TLocal> body,
  500. Action<TLocal> localFinally)
  501. {
  502. return ForEach<TSource, TLocal> (source, ParallelOptions.Default, localInit, body, localFinally);
  503. }
  504. public static ParallelLoopResult ForEach<TSource, TLocal> (Partitioner<TSource> source, Func<TLocal> localInit,
  505. Func<TSource, ParallelLoopState, TLocal, TLocal> body,
  506. Action<TLocal> localFinally)
  507. {
  508. return ForEach<TSource, TLocal> (source, ParallelOptions.Default, localInit, body, localFinally);
  509. }
  510. public static ParallelLoopResult ForEach<TSource, TLocal> (IEnumerable<TSource> source, ParallelOptions parallelOptions,
  511. Func<TLocal> localInit,
  512. Func<TSource, ParallelLoopState, TLocal, TLocal> body,
  513. Action<TLocal> localFinally)
  514. {
  515. if (source == null)
  516. throw new ArgumentNullException ("source");
  517. return ForEach<TSource, TLocal> (Partitioner.Create (source), parallelOptions, localInit, body, localFinally);
  518. }
  519. public static ParallelLoopResult ForEach<TSource, TLocal> (IEnumerable<TSource> source, ParallelOptions parallelOptions,
  520. Func<TLocal> localInit,
  521. Func<TSource, ParallelLoopState, long, TLocal, TLocal> body,
  522. Action<TLocal> localFinally)
  523. {
  524. if (source == null)
  525. throw new ArgumentNullException ("source");
  526. return ForEach<TSource, TLocal> (Partitioner.Create (source), parallelOptions, localInit, body, localFinally);
  527. }
  528. public static ParallelLoopResult ForEach<TSource, TLocal> (Partitioner<TSource> source, ParallelOptions parallelOptions,
  529. Func<TLocal> localInit,
  530. Func<TSource, ParallelLoopState, TLocal, TLocal> body,
  531. Action<TLocal> localFinally)
  532. {
  533. if (source == null)
  534. throw new ArgumentNullException ("source");
  535. if (body == null)
  536. throw new ArgumentNullException ("body");
  537. return ForEach<TSource, TLocal> (source.GetPartitions, parallelOptions, localInit, body, localFinally);
  538. }
  539. public static ParallelLoopResult ForEach<TSource, TLocal> (OrderablePartitioner<TSource> source, ParallelOptions parallelOptions,
  540. Func<TLocal> localInit,
  541. Func<TSource, ParallelLoopState, long, TLocal, TLocal> body,
  542. Action<TLocal> localFinally)
  543. {
  544. if (source == null)
  545. throw new ArgumentNullException ("source");
  546. if (body == null)
  547. throw new ArgumentNullException ("body");
  548. return ForEach<KeyValuePair<long, TSource>, TLocal> (source.GetOrderablePartitions,
  549. parallelOptions,
  550. localInit,
  551. (e, s, l) => body (e.Value, s, e.Key, l),
  552. localFinally);
  553. }
  554. #endregion
  555. #region Invoke
  556. public static void Invoke (params Action[] actions)
  557. {
  558. if (actions == null)
  559. throw new ArgumentNullException ("actions");
  560. Invoke (actions, (Action a) => Task.Factory.StartNew (a));
  561. }
  562. public static void Invoke (ParallelOptions parallelOptions, params Action[] actions)
  563. {
  564. if (parallelOptions == null)
  565. throw new ArgumentNullException ("parallelOptions");
  566. if (actions == null)
  567. throw new ArgumentNullException ("actions");
  568. Invoke (actions, (Action a) => Task.Factory.StartNew (a, parallelOptions.CancellationToken, TaskCreationOptions.None, parallelOptions.TaskScheduler));
  569. }
  570. static void Invoke (Action[] actions, Func<Action, Task> taskCreator)
  571. {
  572. if (actions.Length == 0)
  573. throw new ArgumentException ("actions is empty");
  574. // Execute it directly
  575. if (actions.Length == 1 && actions[0] != null)
  576. actions[0] ();
  577. bool shouldThrow = false;
  578. Task[] ts = Array.ConvertAll (actions, delegate (Action a) {
  579. if (a == null) {
  580. shouldThrow = true;
  581. return null;
  582. }
  583. return taskCreator (a);
  584. });
  585. if (shouldThrow)
  586. throw new ArgumentException ("One action in actions is null", "actions");
  587. try {
  588. Task.WaitAll (ts);
  589. } catch {
  590. HandleExceptions (ts);
  591. }
  592. }
  593. #endregion
  594. #region SpawnBestNumber, used by PLinq
  595. internal static Task[] SpawnBestNumber (Action action, Action callback)
  596. {
  597. return SpawnBestNumber (action, -1, callback);
  598. }
  599. internal static Task[] SpawnBestNumber (Action action, int dop, Action callback)
  600. {
  601. return SpawnBestNumber (action, dop, false, callback);
  602. }
  603. internal static Task[] SpawnBestNumber (Action action, int dop, bool wait, Action callback)
  604. {
  605. // Get the optimum amount of worker to create
  606. int num = dop == -1 ? (wait ? GetBestWorkerNumber () + 1 : GetBestWorkerNumber ()) : dop;
  607. // Initialize worker
  608. CountdownEvent evt = new CountdownEvent (num);
  609. Task[] tasks = new Task [num];
  610. for (int i = 0; i < num; i++) {
  611. tasks [i] = Task.Factory.StartNew (() => {
  612. action ();
  613. evt.Signal ();
  614. if (callback != null && evt.IsSet)
  615. callback ();
  616. });
  617. }
  618. // If explicitely told, wait for all workers to complete
  619. // and thus let main thread participate in the processing
  620. if (wait)
  621. Task.WaitAll (tasks);
  622. return tasks;
  623. }
  624. #endregion
  625. }
  626. }
  627. #endif