Parallel.cs 26 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726
  1. // Parallel.cs
  2. //
  3. // Copyright (c) 2008 Jérémie "Garuma" Laval
  4. //
  5. // Permission is hereby granted, free of charge, to any person obtaining a copy
  6. // of this software and associated documentation files (the "Software"), to deal
  7. // in the Software without restriction, including without limitation the rights
  8. // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
  9. // copies of the Software, and to permit persons to whom the Software is
  10. // furnished to do so, subject to the following conditions:
  11. //
  12. // The above copyright notice and this permission notice shall be included in
  13. // all copies or substantial portions of the Software.
  14. //
  15. // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
  16. // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
  17. // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
  18. // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
  19. // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
  20. // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
  21. // THE SOFTWARE.
  22. //
  23. //
  24. #if NET_4_0
  25. using System;
  26. using System.Collections.Generic;
  27. using System.Collections.Concurrent;
  28. using System.Threading;
  29. namespace System.Threading.Tasks
  30. {
  31. public static class Parallel
  32. {
  33. internal static int GetBestWorkerNumber ()
  34. {
  35. return GetBestWorkerNumber (TaskScheduler.Current);
  36. }
  37. internal static int GetBestWorkerNumber (TaskScheduler scheduler)
  38. {
  39. return scheduler.MaximumConcurrencyLevel;
  40. }
  41. static int GetBestWorkerNumber (int from, int to, ParallelOptions options, out int step)
  42. {
  43. int num = Math.Min (GetBestWorkerNumber (),
  44. options != null && options.MaxDegreeOfParallelism != -1 ? options.MaxDegreeOfParallelism : int.MaxValue);
  45. // Integer range that each task process
  46. if ((step = (to - from) / num) < 5) {
  47. step = 5;
  48. num = (to - from) / 5;
  49. if (num < 1)
  50. num = 1;
  51. }
  52. return num;
  53. }
  54. static void HandleExceptions (IEnumerable<Task> tasks)
  55. {
  56. HandleExceptions (tasks, null);
  57. }
  58. static void HandleExceptions (IEnumerable<Task> tasks, ParallelLoopState.ExternalInfos infos)
  59. {
  60. List<Exception> exs = new List<Exception> ();
  61. foreach (Task t in tasks) {
  62. if (t.Exception != null)
  63. exs.Add (t.Exception);
  64. }
  65. if (exs.Count > 0) {
  66. if (infos != null)
  67. infos.IsExceptional = true;
  68. throw new AggregateException (exs);
  69. }
  70. }
  71. static void InitTasks (Task[] tasks, int count, Action action, ParallelOptions options)
  72. {
  73. TaskCreationOptions creation = TaskCreationOptions.LongRunning;
  74. for (int i = 0; i < count; i++) {
  75. if (options == null)
  76. tasks [i] = Task.Factory.StartNew (action, creation);
  77. else
  78. tasks [i] = Task.Factory.StartNew (action, options.CancellationToken, creation, options.TaskScheduler);
  79. }
  80. }
  81. #region For
  82. public static ParallelLoopResult For (int fromInclusive, int toExclusive, Action<int> body)
  83. {
  84. return For (fromInclusive, toExclusive, ParallelOptions.Default, body);
  85. }
  86. public static ParallelLoopResult For (int fromInclusive, int toExclusive, Action<int, ParallelLoopState> body)
  87. {
  88. return For (fromInclusive, toExclusive, ParallelOptions.Default, body);
  89. }
  90. public static ParallelLoopResult For (int fromInclusive, int toExclusive, ParallelOptions parallelOptions, Action<int> body)
  91. {
  92. return For (fromInclusive, toExclusive, parallelOptions, (index, state) => body (index));
  93. }
  94. public static ParallelLoopResult For (int fromInclusive, int toExclusive, ParallelOptions parallelOptions, Action<int, ParallelLoopState> body)
  95. {
  96. return For<object> (fromInclusive, toExclusive, parallelOptions, () => null, (i, s, l) => { body (i, s); return null; }, _ => {});
  97. }
  98. public static ParallelLoopResult For<TLocal> (int fromInclusive,
  99. int toExclusive,
  100. Func<TLocal> localInit,
  101. Func<int, ParallelLoopState, TLocal, TLocal> body,
  102. Action<TLocal> localFinally)
  103. {
  104. return For<TLocal> (fromInclusive, toExclusive, ParallelOptions.Default, localInit, body, localFinally);
  105. }
  106. public static ParallelLoopResult For<TLocal> (int fromInclusive,
  107. int toExclusive,
  108. ParallelOptions parallelOptions,
  109. Func<TLocal> localInit,
  110. Func<int, ParallelLoopState, TLocal, TLocal> body,
  111. Action<TLocal> localFinally)
  112. {
  113. if (body == null)
  114. throw new ArgumentNullException ("body");
  115. if (localInit == null)
  116. throw new ArgumentNullException ("localInit");
  117. if (localFinally == null)
  118. throw new ArgumentNullException ("localFinally");
  119. if (parallelOptions == null)
  120. throw new ArgumentNullException ("options");
  121. if (fromInclusive >= toExclusive)
  122. return new ParallelLoopResult (null, true);
  123. // Number of task toExclusive be launched (normally == Env.ProcessorCount)
  124. int step;
  125. int num = GetBestWorkerNumber (fromInclusive, toExclusive, parallelOptions, out step);
  126. Task[] tasks = new Task [num];
  127. StealRange[] ranges = new StealRange[num];
  128. for (int i = 0; i < num; i++)
  129. ranges[i] = new StealRange (fromInclusive, i, step);
  130. ParallelLoopState.ExternalInfos infos = new ParallelLoopState.ExternalInfos ();
  131. int currentIndex = -1;
  132. Action workerMethod = delegate {
  133. int localWorker = Interlocked.Increment (ref currentIndex);
  134. StealRange range = ranges[localWorker];
  135. int index = range.Actual;
  136. int stopIndex = localWorker + 1 == num ? toExclusive : Math.Min (toExclusive, index + step);
  137. TLocal local = localInit ();
  138. ParallelLoopState state = new ParallelLoopState (infos);
  139. CancellationToken token = parallelOptions.CancellationToken;
  140. try {
  141. for (int i = index; i < stopIndex; range.Actual = ++i) {
  142. if (infos.IsStopped)
  143. return;
  144. token.ThrowIfCancellationRequested ();
  145. if (infos.LowestBreakIteration != null && infos.LowestBreakIteration > i)
  146. return;
  147. state.CurrentIteration = i;
  148. local = body (i, state, local);
  149. if (i >= stopIndex - range.Stolen)
  150. break;
  151. }
  152. // Try toExclusive steal fromInclusive our right neighbor (cyclic)
  153. int len = num + localWorker;
  154. for (int sIndex = localWorker + 1; sIndex < len; ++sIndex) {
  155. int extWorker = sIndex % num;
  156. range = ranges[extWorker];
  157. stopIndex = extWorker + 1 == num ? toExclusive : Math.Min (toExclusive, fromInclusive + (extWorker + 1) * step);
  158. int stolen;
  159. do {
  160. stolen = range.Stolen;
  161. if (stopIndex - stolen > range.Actual)
  162. goto next;
  163. } while (Interlocked.CompareExchange (ref range.Stolen, stolen + 1, stolen) != stolen);
  164. stolen = stopIndex - stolen - 1;
  165. if (stolen > range.Actual)
  166. local = body (stolen, state, local);
  167. next:
  168. continue;
  169. }
  170. } finally {
  171. localFinally (local);
  172. }
  173. };
  174. InitTasks (tasks, num, workerMethod, parallelOptions);
  175. try {
  176. Task.WaitAll (tasks);
  177. } catch {
  178. HandleExceptions (tasks, infos);
  179. }
  180. return new ParallelLoopResult (infos.LowestBreakIteration, !(infos.IsStopped || infos.IsExceptional));
  181. }
  182. class StealRange
  183. {
  184. public int Stolen;
  185. public int Actual;
  186. public StealRange (int fromInclusive, int i, int step)
  187. {
  188. Actual = fromInclusive + i * step;
  189. }
  190. }
  191. #endregion
  192. #region For (long)
  193. [MonoTODO]
  194. public static ParallelLoopResult For (long fromInclusive, long toExclusive, Action<long> body)
  195. {
  196. return For (fromInclusive, toExclusive, ParallelOptions.Default, body);
  197. }
  198. [MonoTODO]
  199. public static ParallelLoopResult For (long fromInclusive, long toExclusive, Action<long, ParallelLoopState> body)
  200. {
  201. return For (fromInclusive, toExclusive, ParallelOptions.Default, body);
  202. }
  203. [MonoTODO]
  204. public static ParallelLoopResult For (long fromInclusive, long toExclusive, ParallelOptions parallelOptions, Action<long> body)
  205. {
  206. return For (fromInclusive, toExclusive, parallelOptions, (index, state) => body (index));
  207. }
  208. [MonoTODO]
  209. public static ParallelLoopResult For (long fromInclusive, long toExclusive, ParallelOptions parallelOptions, Action<long, ParallelLoopState> body)
  210. {
  211. return For<object> (fromInclusive, toExclusive, parallelOptions, () => null, (i, s, l) => { body (i, s); return null; }, _ => {});
  212. }
  213. [MonoTODO]
  214. public static ParallelLoopResult For<TLocal> (long fromInclusive,
  215. long toExclusive,
  216. Func<TLocal> localInit,
  217. Func<long, ParallelLoopState, TLocal, TLocal> body,
  218. Action<TLocal> localFinally)
  219. {
  220. return For<TLocal> (fromInclusive, toExclusive, ParallelOptions.Default, localInit, body, localFinally);
  221. }
  222. [MonoTODO ("See how this can be refactored with the above For implementation")]
  223. public static ParallelLoopResult For<TLocal> (long fromInclusive,
  224. long toExclusive,
  225. ParallelOptions parallelOptions,
  226. Func<TLocal> localInit,
  227. Func<long, ParallelLoopState, TLocal, TLocal> body,
  228. Action<TLocal> localFinally)
  229. {
  230. if (body == null)
  231. throw new ArgumentNullException ("body");
  232. if (localInit == null)
  233. throw new ArgumentNullException ("localInit");
  234. if (localFinally == null)
  235. throw new ArgumentNullException ("localFinally");
  236. if (parallelOptions == null)
  237. throw new ArgumentNullException ("options");
  238. if (fromInclusive >= toExclusive)
  239. return new ParallelLoopResult (null, true);
  240. throw new NotImplementedException ();
  241. }
  242. #endregion
  243. #region Foreach
  244. static ParallelLoopResult ForEach<TSource, TLocal> (Func<int, IList<IEnumerator<TSource>>> enumerable, ParallelOptions options,
  245. Func<TLocal> init, Func<TSource, ParallelLoopState, TLocal, TLocal> action,
  246. Action<TLocal> destruct)
  247. {
  248. if (enumerable == null)
  249. throw new ArgumentNullException ("source");
  250. if (options == null)
  251. throw new ArgumentNullException ("options");
  252. if (action == null)
  253. throw new ArgumentNullException ("action");
  254. if (init == null)
  255. throw new ArgumentNullException ("init");
  256. if (destruct == null)
  257. throw new ArgumentNullException ("destruct");
  258. int num = Math.Min (GetBestWorkerNumber (),
  259. options != null && options.MaxDegreeOfParallelism != -1 ? options.MaxDegreeOfParallelism : int.MaxValue);
  260. Task[] tasks = new Task[num];
  261. ParallelLoopState.ExternalInfos infos = new ParallelLoopState.ExternalInfos ();
  262. SimpleConcurrentBag<TSource> bag = new SimpleConcurrentBag<TSource> (num);
  263. const int bagCount = 5;
  264. IList<IEnumerator<TSource>> slices = enumerable (num);
  265. int sliceIndex = -1;
  266. Action workerMethod = delegate {
  267. IEnumerator<TSource> slice = slices[Interlocked.Increment (ref sliceIndex)];
  268. TLocal local = init ();
  269. ParallelLoopState state = new ParallelLoopState (infos);
  270. int workIndex = bag.GetNextIndex ();
  271. CancellationToken token = options.CancellationToken;
  272. try {
  273. bool cont = true;
  274. TSource element;
  275. while (cont) {
  276. if (infos.IsStopped || infos.IsBroken.Value)
  277. return;
  278. token.ThrowIfCancellationRequested ();
  279. for (int i = 0; i < bagCount && (cont = slice.MoveNext ()); i++) {
  280. bag.Add (workIndex, slice.Current);
  281. }
  282. for (int i = 0; i < bagCount && bag.TryTake (workIndex, out element); i++) {
  283. if (infos.IsStopped)
  284. return;
  285. token.ThrowIfCancellationRequested ();
  286. local = action (element, state, local);
  287. }
  288. }
  289. while (bag.TrySteal (workIndex, out element)) {
  290. token.ThrowIfCancellationRequested ();
  291. local = action (element, state, local);
  292. if (infos.IsStopped || infos.IsBroken.Value)
  293. return;
  294. }
  295. } finally {
  296. destruct (local);
  297. }
  298. };
  299. InitTasks (tasks, num, workerMethod, options);
  300. try {
  301. Task.WaitAll (tasks);
  302. } catch {
  303. HandleExceptions (tasks, infos);
  304. }
  305. return new ParallelLoopResult (infos.LowestBreakIteration, !(infos.IsStopped || infos.IsExceptional));
  306. }
  307. public static ParallelLoopResult ForEach<TSource> (IEnumerable<TSource> source, Action<TSource> body)
  308. {
  309. if (source == null)
  310. throw new ArgumentNullException ("source");
  311. if (body == null)
  312. throw new ArgumentNullException ("body");
  313. return ForEach<TSource, object> (Partitioner.Create (source),
  314. ParallelOptions.Default,
  315. () => null,
  316. (e, s, l) => { body (e); return null; },
  317. _ => {});
  318. }
  319. public static ParallelLoopResult ForEach<TSource> (IEnumerable<TSource> source, Action<TSource, ParallelLoopState> body)
  320. {
  321. if (source == null)
  322. throw new ArgumentNullException ("source");
  323. if (body == null)
  324. throw new ArgumentNullException ("body");
  325. return ForEach<TSource, object> (Partitioner.Create (source),
  326. ParallelOptions.Default,
  327. () => null,
  328. (e, s, l) => { body (e, s); return null; },
  329. _ => {});
  330. }
  331. public static ParallelLoopResult ForEach<TSource> (IEnumerable<TSource> source,
  332. Action<TSource, ParallelLoopState, long> body)
  333. {
  334. if (source == null)
  335. throw new ArgumentNullException ("source");
  336. if (body == null)
  337. throw new ArgumentNullException ("body");
  338. return ForEach<TSource, object> (Partitioner.Create (source),
  339. ParallelOptions.Default,
  340. () => null,
  341. (e, s, l) => { body (e, s, -1); return null; },
  342. _ => {});
  343. }
  344. public static ParallelLoopResult ForEach<TSource> (Partitioner<TSource> source,
  345. Action<TSource, ParallelLoopState> body)
  346. {
  347. if (body == null)
  348. throw new ArgumentNullException ("body");
  349. return ForEach<TSource, object> (source,
  350. ParallelOptions.Default,
  351. () => null,
  352. (e, s, l) => { body (e, s); return null; },
  353. _ => {});
  354. }
  355. public static ParallelLoopResult ForEach<TSource> (OrderablePartitioner<TSource> source,
  356. Action<TSource, ParallelLoopState, long> body)
  357. {
  358. if (body == null)
  359. throw new ArgumentNullException ("body");
  360. return ForEach<TSource, object> (source,
  361. ParallelOptions.Default,
  362. () => null,
  363. (e, s, i, l) => { body (e, s, i); return null; },
  364. _ => {});
  365. }
  366. public static ParallelLoopResult ForEach<TSource> (Partitioner<TSource> source,
  367. Action<TSource> body)
  368. {
  369. if (body == null)
  370. throw new ArgumentNullException ("body");
  371. return ForEach<TSource, object> (source,
  372. ParallelOptions.Default,
  373. () => null,
  374. (e, s, l) => { body (e); return null; },
  375. _ => {});
  376. }
  377. public static ParallelLoopResult ForEach<TSource> (IEnumerable<TSource> source,
  378. ParallelOptions parallelOptions,
  379. Action<TSource> body)
  380. {
  381. if (source == null)
  382. throw new ArgumentNullException ("source");
  383. if (body == null)
  384. throw new ArgumentNullException ("body");
  385. return ForEach<TSource, object> (Partitioner.Create (source),
  386. parallelOptions,
  387. () => null,
  388. (e, s, l) => { body (e); return null; },
  389. _ => {});
  390. }
  391. public static ParallelLoopResult ForEach<TSource> (IEnumerable<TSource> source, ParallelOptions parallelOptions,
  392. Action<TSource, ParallelLoopState> body)
  393. {
  394. if (source == null)
  395. throw new ArgumentNullException ("source");
  396. if (body == null)
  397. throw new ArgumentNullException ("body");
  398. return ForEach<TSource, object> (Partitioner.Create (source),
  399. parallelOptions,
  400. () => null,
  401. (e, s, l) => { body (e, s); return null; },
  402. _ => {});
  403. }
  404. public static ParallelLoopResult ForEach<TSource> (IEnumerable<TSource> source, ParallelOptions parallelOptions,
  405. Action<TSource, ParallelLoopState, long> body)
  406. {
  407. if (source == null)
  408. throw new ArgumentNullException ("source");
  409. if (body == null)
  410. throw new ArgumentNullException ("body");
  411. return ForEach<TSource, object> (Partitioner.Create (source),
  412. parallelOptions,
  413. () => null,
  414. (e, s, i, l) => { body (e, s, i); return null; },
  415. _ => {});
  416. }
  417. public static ParallelLoopResult ForEach<TSource> (OrderablePartitioner<TSource> source, ParallelOptions parallelOptions,
  418. Action<TSource, ParallelLoopState, long> body)
  419. {
  420. if (body == null)
  421. throw new ArgumentNullException ("body");
  422. return ForEach<TSource, object> (source,
  423. parallelOptions,
  424. () => null,
  425. (e, s, i, l) => { body (e, s, i); return null; },
  426. _ => {});
  427. }
  428. public static ParallelLoopResult ForEach<TSource> (Partitioner<TSource> source, ParallelOptions parallelOptions,
  429. Action<TSource> body)
  430. {
  431. if (body == null)
  432. throw new ArgumentNullException ("body");
  433. return ForEach<TSource, object> (source,
  434. parallelOptions,
  435. () => null,
  436. (e, s, l) => { body (e); return null; },
  437. _ => {});
  438. }
  439. public static ParallelLoopResult ForEach<TSource> (Partitioner<TSource> source, ParallelOptions parallelOptions,
  440. Action<TSource, ParallelLoopState> body)
  441. {
  442. return ForEach<TSource, object> (source,
  443. parallelOptions,
  444. () => null,
  445. (e, s, l) => { body (e, s); return null; },
  446. _ => {});
  447. }
  448. public static ParallelLoopResult ForEach<TSource, TLocal> (IEnumerable<TSource> source, Func<TLocal> localInit,
  449. Func<TSource, ParallelLoopState, TLocal, TLocal> body,
  450. Action<TLocal> localFinally)
  451. {
  452. if (source == null)
  453. throw new ArgumentNullException ("source");
  454. return ForEach<TSource, TLocal> ((Partitioner<TSource>)Partitioner.Create (source),
  455. ParallelOptions.Default,
  456. localInit,
  457. body,
  458. localFinally);
  459. }
  460. public static ParallelLoopResult ForEach<TSource, TLocal> (IEnumerable<TSource> source, Func<TLocal> localInit,
  461. Func<TSource, ParallelLoopState, long, TLocal, TLocal> body,
  462. Action<TLocal> localFinally)
  463. {
  464. return ForEach<TSource, TLocal> (Partitioner.Create (source),
  465. ParallelOptions.Default,
  466. localInit,
  467. body,
  468. localFinally);
  469. }
  470. public static ParallelLoopResult ForEach<TSource, TLocal> (OrderablePartitioner<TSource> source, Func<TLocal> localInit,
  471. Func<TSource, ParallelLoopState, long, TLocal, TLocal> body,
  472. Action<TLocal> localFinally)
  473. {
  474. return ForEach<TSource, TLocal> (source, ParallelOptions.Default, localInit, body, localFinally);
  475. }
  476. public static ParallelLoopResult ForEach<TSource, TLocal> (Partitioner<TSource> source, Func<TLocal> localInit,
  477. Func<TSource, ParallelLoopState, TLocal, TLocal> body,
  478. Action<TLocal> localFinally)
  479. {
  480. return ForEach<TSource, TLocal> (source, ParallelOptions.Default, localInit, body, localFinally);
  481. }
  482. public static ParallelLoopResult ForEach<TSource, TLocal> (IEnumerable<TSource> source, ParallelOptions parallelOptions,
  483. Func<TLocal> localInit,
  484. Func<TSource, ParallelLoopState, TLocal, TLocal> body,
  485. Action<TLocal> localFinally)
  486. {
  487. if (source == null)
  488. throw new ArgumentNullException ("source");
  489. return ForEach<TSource, TLocal> (Partitioner.Create (source), parallelOptions, localInit, body, localFinally);
  490. }
  491. public static ParallelLoopResult ForEach<TSource, TLocal> (IEnumerable<TSource> source, ParallelOptions parallelOptions,
  492. Func<TLocal> localInit,
  493. Func<TSource, ParallelLoopState, long, TLocal, TLocal> body,
  494. Action<TLocal> localFinally)
  495. {
  496. if (source == null)
  497. throw new ArgumentNullException ("source");
  498. return ForEach<TSource, TLocal> (Partitioner.Create (source), parallelOptions, localInit, body, localFinally);
  499. }
  500. public static ParallelLoopResult ForEach<TSource, TLocal> (Partitioner<TSource> source, ParallelOptions parallelOptions,
  501. Func<TLocal> localInit,
  502. Func<TSource, ParallelLoopState, TLocal, TLocal> body,
  503. Action<TLocal> localFinally)
  504. {
  505. if (source == null)
  506. throw new ArgumentNullException ("source");
  507. if (body == null)
  508. throw new ArgumentNullException ("body");
  509. return ForEach<TSource, TLocal> (source.GetPartitions, parallelOptions, localInit, body, localFinally);
  510. }
  511. public static ParallelLoopResult ForEach<TSource, TLocal> (OrderablePartitioner<TSource> source, ParallelOptions parallelOptions,
  512. Func<TLocal> localInit,
  513. Func<TSource, ParallelLoopState, long, TLocal, TLocal> body,
  514. Action<TLocal> localFinally)
  515. {
  516. if (source == null)
  517. throw new ArgumentNullException ("source");
  518. if (body == null)
  519. throw new ArgumentNullException ("body");
  520. return ForEach<KeyValuePair<long, TSource>, TLocal> (source.GetOrderablePartitions,
  521. parallelOptions,
  522. localInit,
  523. (e, s, l) => body (e.Value, s, e.Key, l),
  524. localFinally);
  525. }
  526. #endregion
  527. #region Invoke
  528. public static void Invoke (params Action[] actions)
  529. {
  530. if (actions == null)
  531. throw new ArgumentNullException ("actions");
  532. Invoke (actions, (Action a) => Task.Factory.StartNew (a));
  533. }
  534. public static void Invoke (ParallelOptions parallelOptions, params Action[] actions)
  535. {
  536. if (parallelOptions == null)
  537. throw new ArgumentNullException ("parallelOptions");
  538. if (actions == null)
  539. throw new ArgumentNullException ("actions");
  540. Invoke (actions, (Action a) => Task.Factory.StartNew (a, parallelOptions.CancellationToken, TaskCreationOptions.None, parallelOptions.TaskScheduler));
  541. }
  542. static void Invoke (Action[] actions, Func<Action, Task> taskCreator)
  543. {
  544. if (actions.Length == 0)
  545. throw new ArgumentException ("actions is empty");
  546. // Execute it directly
  547. if (actions.Length == 1 && actions[0] != null)
  548. actions[0] ();
  549. bool shouldThrow = false;
  550. Task[] ts = Array.ConvertAll (actions, delegate (Action a) {
  551. if (a == null) {
  552. shouldThrow = true;
  553. return null;
  554. }
  555. return taskCreator (a);
  556. });
  557. if (shouldThrow)
  558. throw new ArgumentException ("One action in actions is null", "actions");
  559. try {
  560. Task.WaitAll (ts);
  561. } catch {
  562. HandleExceptions (ts);
  563. }
  564. }
  565. #endregion
  566. #region SpawnBestNumber, used by PLinq
  567. internal static Task[] SpawnBestNumber (Action action, Action callback)
  568. {
  569. return SpawnBestNumber (action, -1, callback);
  570. }
  571. internal static Task[] SpawnBestNumber (Action action, int dop, Action callback)
  572. {
  573. return SpawnBestNumber (action, dop, false, callback);
  574. }
  575. internal static Task[] SpawnBestNumber (Action action, int dop, bool wait, Action callback)
  576. {
  577. // Get the optimum amount of worker to create
  578. int num = dop == -1 ? (wait ? GetBestWorkerNumber () + 1 : GetBestWorkerNumber ()) : dop;
  579. // Initialize worker
  580. CountdownEvent evt = new CountdownEvent (num);
  581. Task[] tasks = new Task [num];
  582. for (int i = 0; i < num; i++) {
  583. tasks [i] = Task.Factory.StartNew (() => {
  584. action ();
  585. evt.Signal ();
  586. if (callback != null && evt.IsSet)
  587. callback ();
  588. });
  589. }
  590. // If explicitely told, wait for all workers to complete
  591. // and thus let main thread participate in the processing
  592. if (wait)
  593. Task.WaitAll (tasks);
  594. return tasks;
  595. }
  596. #endregion
  597. }
  598. }
  599. #endif