Parallel.cs 27 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761
  1. // Parallel.cs
  2. //
  3. // Copyright (c) 2008 Jérémie "Garuma" Laval
  4. //
  5. // Permission is hereby granted, free of charge, to any person obtaining a copy
  6. // of this software and associated documentation files (the "Software"), to deal
  7. // in the Software without restriction, including without limitation the rights
  8. // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
  9. // copies of the Software, and to permit persons to whom the Software is
  10. // furnished to do so, subject to the following conditions:
  11. //
  12. // The above copyright notice and this permission notice shall be included in
  13. // all copies or substantial portions of the Software.
  14. //
  15. // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
  16. // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
  17. // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
  18. // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
  19. // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
  20. // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
  21. // THE SOFTWARE.
  22. //
  23. //
  24. #if NET_4_0 || MOBILE
  25. using System;
  26. using System.Collections.Generic;
  27. using System.Collections.Concurrent;
  28. using System.Threading;
  29. using System.Runtime.InteropServices;
  30. namespace System.Threading.Tasks
  31. {
  32. public static class Parallel
  33. {
  34. #if MOONLIGHT || MOBILE
  35. static readonly bool sixtyfour = IntPtr.Size == 8;
  36. #else
  37. static readonly bool sixtyfour = Environment.Is64BitProcess;
  38. #endif
  39. internal static int GetBestWorkerNumber ()
  40. {
  41. return GetBestWorkerNumber (TaskScheduler.Current);
  42. }
  43. internal static int GetBestWorkerNumber (TaskScheduler scheduler)
  44. {
  45. return scheduler.MaximumConcurrencyLevel;
  46. }
  47. static int GetBestWorkerNumber (int from, int to, ParallelOptions options, out int step)
  48. {
  49. int num = Math.Min (GetBestWorkerNumber (options.TaskScheduler),
  50. options != null && options.MaxDegreeOfParallelism != -1 ? options.MaxDegreeOfParallelism : int.MaxValue);
  51. // Integer range that each task process
  52. if ((step = (to - from) / num) < 5) {
  53. step = 5;
  54. num = (to - from) / 5;
  55. if (num < 1)
  56. num = 1;
  57. }
  58. return num;
  59. }
  60. static void HandleExceptions (IEnumerable<Task> tasks)
  61. {
  62. HandleExceptions (tasks, null);
  63. }
  64. static void HandleExceptions (IEnumerable<Task> tasks, ParallelLoopState.ExternalInfos infos)
  65. {
  66. List<Exception> exs = new List<Exception> ();
  67. foreach (Task t in tasks) {
  68. if (t.Exception != null)
  69. exs.Add (t.Exception);
  70. }
  71. if (exs.Count > 0) {
  72. if (infos != null)
  73. infos.IsExceptional = true;
  74. throw new AggregateException (exs);
  75. }
  76. }
  77. static void InitTasks (Task[] tasks, int count, Action action, ParallelOptions options)
  78. {
  79. TaskCreationOptions creation = TaskCreationOptions.LongRunning | TaskCreationOptions.AttachedToParent;
  80. for (int i = 0; i < count; i++) {
  81. if (options == null)
  82. tasks [i] = Task.Factory.StartNew (action, creation);
  83. else
  84. tasks [i] = Task.Factory.StartNew (action, options.CancellationToken, creation, options.TaskScheduler);
  85. }
  86. }
  87. #region For
  88. public static ParallelLoopResult For (int fromInclusive, int toExclusive, Action<int> body)
  89. {
  90. return For (fromInclusive, toExclusive, ParallelOptions.Default, body);
  91. }
  92. public static ParallelLoopResult For (int fromInclusive, int toExclusive, Action<int, ParallelLoopState> body)
  93. {
  94. return For (fromInclusive, toExclusive, ParallelOptions.Default, body);
  95. }
  96. public static ParallelLoopResult For (int fromInclusive, int toExclusive, ParallelOptions parallelOptions, Action<int> body)
  97. {
  98. return For (fromInclusive, toExclusive, parallelOptions, (index, state) => body (index));
  99. }
  100. public static ParallelLoopResult For (int fromInclusive, int toExclusive, ParallelOptions parallelOptions, Action<int, ParallelLoopState> body)
  101. {
  102. return For<object> (fromInclusive, toExclusive, parallelOptions, () => null, (i, s, l) => { body (i, s); return null; }, _ => {});
  103. }
  104. public static ParallelLoopResult For<TLocal> (int fromInclusive,
  105. int toExclusive,
  106. Func<TLocal> localInit,
  107. Func<int, ParallelLoopState, TLocal, TLocal> body,
  108. Action<TLocal> localFinally)
  109. {
  110. return For<TLocal> (fromInclusive, toExclusive, ParallelOptions.Default, localInit, body, localFinally);
  111. }
  112. public static ParallelLoopResult For<TLocal> (int fromInclusive,
  113. int toExclusive,
  114. ParallelOptions parallelOptions,
  115. Func<TLocal> localInit,
  116. Func<int, ParallelLoopState, TLocal, TLocal> body,
  117. Action<TLocal> localFinally)
  118. {
  119. if (body == null)
  120. throw new ArgumentNullException ("body");
  121. if (localInit == null)
  122. throw new ArgumentNullException ("localInit");
  123. if (localFinally == null)
  124. throw new ArgumentNullException ("localFinally");
  125. if (parallelOptions == null)
  126. throw new ArgumentNullException ("options");
  127. if (fromInclusive >= toExclusive)
  128. return new ParallelLoopResult (null, true);
  129. // Number of task toExclusive be launched (normally == Env.ProcessorCount)
  130. int step;
  131. int num = GetBestWorkerNumber (fromInclusive, toExclusive, parallelOptions, out step);
  132. Task[] tasks = new Task [num];
  133. StealRange[] ranges = new StealRange[num];
  134. for (int i = 0; i < num; i++)
  135. ranges[i] = new StealRange (fromInclusive, i, step);
  136. ParallelLoopState.ExternalInfos infos = new ParallelLoopState.ExternalInfos ();
  137. int currentIndex = -1;
  138. Action workerMethod = delegate {
  139. int localWorker = Interlocked.Increment (ref currentIndex);
  140. StealRange range = ranges[localWorker];
  141. int index = range.V64.Actual;
  142. int stopIndex = localWorker + 1 == num ? toExclusive : Math.Min (toExclusive, index + step);
  143. TLocal local = localInit ();
  144. ParallelLoopState state = new ParallelLoopState (infos);
  145. CancellationToken token = parallelOptions.CancellationToken;
  146. try {
  147. for (int i = index; i < stopIndex;) {
  148. if (infos.IsStopped)
  149. return;
  150. token.ThrowIfCancellationRequested ();
  151. if (i >= stopIndex - range.V64.Stolen)
  152. break;
  153. if (infos.LowestBreakIteration != null && infos.LowestBreakIteration > i)
  154. return;
  155. state.CurrentIteration = i;
  156. local = body (i, state, local);
  157. if (i + 1 >= stopIndex - range.V64.Stolen)
  158. break;
  159. range.V64.Actual = ++i;
  160. }
  161. // Try toExclusive steal fromInclusive our right neighbor (cyclic)
  162. int len = num + localWorker;
  163. for (int sIndex = localWorker + 1; sIndex < len; ++sIndex) {
  164. int extWorker = sIndex % num;
  165. range = ranges[extWorker];
  166. stopIndex = extWorker + 1 == num ? toExclusive : Math.Min (toExclusive, fromInclusive + (extWorker + 1) * step);
  167. int stolen = -1;
  168. do {
  169. do {
  170. long old;
  171. StealValue64 val = new StealValue64 ();
  172. old = sixtyfour ? range.V64.Value : Interlocked.CompareExchange (ref range.V64.Value, 0, 0);
  173. val.Value = old;
  174. if (val.Actual >= stopIndex - val.Stolen - 2)
  175. goto next;
  176. stolen = (val.Stolen += 1);
  177. if (Interlocked.CompareExchange (ref range.V64.Value, val.Value, old) == old)
  178. break;
  179. } while (true);
  180. stolen = stopIndex - stolen;
  181. if (stolen > range.V64.Actual)
  182. local = body (stolen, state, local);
  183. else
  184. break;
  185. } while (true);
  186. next:
  187. continue;
  188. }
  189. } finally {
  190. localFinally (local);
  191. }
  192. };
  193. InitTasks (tasks, num, workerMethod, parallelOptions);
  194. try {
  195. Task.WaitAll (tasks);
  196. } catch {
  197. HandleExceptions (tasks, infos);
  198. }
  199. return new ParallelLoopResult (infos.LowestBreakIteration, !(infos.IsStopped || infos.IsExceptional));
  200. }
  201. [StructLayout(LayoutKind.Explicit)]
  202. struct StealValue64 {
  203. [FieldOffset(0)]
  204. public long Value;
  205. [FieldOffset(0)]
  206. public int Actual;
  207. [FieldOffset(4)]
  208. public int Stolen;
  209. }
  210. class StealRange
  211. {
  212. public StealValue64 V64 = new StealValue64 ();
  213. public StealRange (int fromInclusive, int i, int step)
  214. {
  215. V64.Actual = fromInclusive + i * step;
  216. }
  217. }
  218. #endregion
  219. #region For (long)
  220. [MonoTODO]
  221. public static ParallelLoopResult For (long fromInclusive, long toExclusive, Action<long> body)
  222. {
  223. return For (fromInclusive, toExclusive, ParallelOptions.Default, body);
  224. }
  225. [MonoTODO]
  226. public static ParallelLoopResult For (long fromInclusive, long toExclusive, Action<long, ParallelLoopState> body)
  227. {
  228. return For (fromInclusive, toExclusive, ParallelOptions.Default, body);
  229. }
  230. [MonoTODO]
  231. public static ParallelLoopResult For (long fromInclusive, long toExclusive, ParallelOptions parallelOptions, Action<long> body)
  232. {
  233. return For (fromInclusive, toExclusive, parallelOptions, (index, state) => body (index));
  234. }
  235. [MonoTODO]
  236. public static ParallelLoopResult For (long fromInclusive, long toExclusive, ParallelOptions parallelOptions, Action<long, ParallelLoopState> body)
  237. {
  238. return For<object> (fromInclusive, toExclusive, parallelOptions, () => null, (i, s, l) => { body (i, s); return null; }, _ => {});
  239. }
  240. [MonoTODO]
  241. public static ParallelLoopResult For<TLocal> (long fromInclusive,
  242. long toExclusive,
  243. Func<TLocal> localInit,
  244. Func<long, ParallelLoopState, TLocal, TLocal> body,
  245. Action<TLocal> localFinally)
  246. {
  247. return For<TLocal> (fromInclusive, toExclusive, ParallelOptions.Default, localInit, body, localFinally);
  248. }
  249. [MonoTODO ("See how this can be refactored with the above For implementation")]
  250. public static ParallelLoopResult For<TLocal> (long fromInclusive,
  251. long toExclusive,
  252. ParallelOptions parallelOptions,
  253. Func<TLocal> localInit,
  254. Func<long, ParallelLoopState, TLocal, TLocal> body,
  255. Action<TLocal> localFinally)
  256. {
  257. if (body == null)
  258. throw new ArgumentNullException ("body");
  259. if (localInit == null)
  260. throw new ArgumentNullException ("localInit");
  261. if (localFinally == null)
  262. throw new ArgumentNullException ("localFinally");
  263. if (parallelOptions == null)
  264. throw new ArgumentNullException ("options");
  265. if (fromInclusive >= toExclusive)
  266. return new ParallelLoopResult (null, true);
  267. throw new NotImplementedException ();
  268. }
  269. #endregion
  270. #region Foreach
  271. static ParallelLoopResult ForEach<TSource, TLocal> (Func<int, IList<IEnumerator<TSource>>> enumerable, ParallelOptions options,
  272. Func<TLocal> init, Func<TSource, ParallelLoopState, TLocal, TLocal> action,
  273. Action<TLocal> destruct)
  274. {
  275. if (enumerable == null)
  276. throw new ArgumentNullException ("source");
  277. if (options == null)
  278. throw new ArgumentNullException ("options");
  279. if (action == null)
  280. throw new ArgumentNullException ("action");
  281. if (init == null)
  282. throw new ArgumentNullException ("init");
  283. if (destruct == null)
  284. throw new ArgumentNullException ("destruct");
  285. int num = Math.Min (GetBestWorkerNumber (),
  286. options != null && options.MaxDegreeOfParallelism != -1 ? options.MaxDegreeOfParallelism : int.MaxValue);
  287. Task[] tasks = new Task[num];
  288. ParallelLoopState.ExternalInfos infos = new ParallelLoopState.ExternalInfos ();
  289. SimpleConcurrentBag<TSource> bag = new SimpleConcurrentBag<TSource> (num);
  290. const int bagCount = 5;
  291. IList<IEnumerator<TSource>> slices = enumerable (num);
  292. int sliceIndex = -1;
  293. Action workerMethod = delegate {
  294. IEnumerator<TSource> slice = slices[Interlocked.Increment (ref sliceIndex)];
  295. TLocal local = init ();
  296. ParallelLoopState state = new ParallelLoopState (infos);
  297. int workIndex = bag.GetNextIndex ();
  298. CancellationToken token = options.CancellationToken;
  299. try {
  300. bool cont = true;
  301. TSource element;
  302. while (cont) {
  303. if (infos.IsStopped || infos.IsBroken.Value)
  304. return;
  305. token.ThrowIfCancellationRequested ();
  306. for (int i = 0; i < bagCount && (cont = slice.MoveNext ()); i++) {
  307. bag.Add (workIndex, slice.Current);
  308. }
  309. for (int i = 0; i < bagCount && bag.TryTake (workIndex, out element); i++) {
  310. if (infos.IsStopped)
  311. return;
  312. token.ThrowIfCancellationRequested ();
  313. local = action (element, state, local);
  314. }
  315. }
  316. while (bag.TrySteal (workIndex, out element)) {
  317. token.ThrowIfCancellationRequested ();
  318. local = action (element, state, local);
  319. if (infos.IsStopped || infos.IsBroken.Value)
  320. return;
  321. }
  322. } finally {
  323. destruct (local);
  324. }
  325. };
  326. InitTasks (tasks, num, workerMethod, options);
  327. try {
  328. Task.WaitAll (tasks);
  329. } catch {
  330. HandleExceptions (tasks, infos);
  331. }
  332. return new ParallelLoopResult (infos.LowestBreakIteration, !(infos.IsStopped || infos.IsExceptional));
  333. }
  334. public static ParallelLoopResult ForEach<TSource> (IEnumerable<TSource> source, Action<TSource> body)
  335. {
  336. if (source == null)
  337. throw new ArgumentNullException ("source");
  338. if (body == null)
  339. throw new ArgumentNullException ("body");
  340. return ForEach<TSource, object> (Partitioner.Create (source),
  341. ParallelOptions.Default,
  342. () => null,
  343. (e, s, l) => { body (e); return null; },
  344. _ => {});
  345. }
  346. public static ParallelLoopResult ForEach<TSource> (IEnumerable<TSource> source, Action<TSource, ParallelLoopState> body)
  347. {
  348. if (source == null)
  349. throw new ArgumentNullException ("source");
  350. if (body == null)
  351. throw new ArgumentNullException ("body");
  352. return ForEach<TSource, object> (Partitioner.Create (source),
  353. ParallelOptions.Default,
  354. () => null,
  355. (e, s, l) => { body (e, s); return null; },
  356. _ => {});
  357. }
  358. public static ParallelLoopResult ForEach<TSource> (IEnumerable<TSource> source,
  359. Action<TSource, ParallelLoopState, long> body)
  360. {
  361. if (source == null)
  362. throw new ArgumentNullException ("source");
  363. if (body == null)
  364. throw new ArgumentNullException ("body");
  365. return ForEach<TSource, object> (Partitioner.Create (source),
  366. ParallelOptions.Default,
  367. () => null,
  368. (e, s, l) => { body (e, s, -1); return null; },
  369. _ => {});
  370. }
  371. public static ParallelLoopResult ForEach<TSource> (Partitioner<TSource> source,
  372. Action<TSource, ParallelLoopState> body)
  373. {
  374. if (body == null)
  375. throw new ArgumentNullException ("body");
  376. return ForEach<TSource, object> (source,
  377. ParallelOptions.Default,
  378. () => null,
  379. (e, s, l) => { body (e, s); return null; },
  380. _ => {});
  381. }
  382. public static ParallelLoopResult ForEach<TSource> (OrderablePartitioner<TSource> source,
  383. Action<TSource, ParallelLoopState, long> body)
  384. {
  385. if (body == null)
  386. throw new ArgumentNullException ("body");
  387. return ForEach<TSource, object> (source,
  388. ParallelOptions.Default,
  389. () => null,
  390. (e, s, i, l) => { body (e, s, i); return null; },
  391. _ => {});
  392. }
  393. public static ParallelLoopResult ForEach<TSource> (Partitioner<TSource> source,
  394. Action<TSource> body)
  395. {
  396. if (body == null)
  397. throw new ArgumentNullException ("body");
  398. return ForEach<TSource, object> (source,
  399. ParallelOptions.Default,
  400. () => null,
  401. (e, s, l) => { body (e); return null; },
  402. _ => {});
  403. }
  404. public static ParallelLoopResult ForEach<TSource> (IEnumerable<TSource> source,
  405. ParallelOptions parallelOptions,
  406. Action<TSource> body)
  407. {
  408. if (source == null)
  409. throw new ArgumentNullException ("source");
  410. if (body == null)
  411. throw new ArgumentNullException ("body");
  412. return ForEach<TSource, object> (Partitioner.Create (source),
  413. parallelOptions,
  414. () => null,
  415. (e, s, l) => { body (e); return null; },
  416. _ => {});
  417. }
  418. public static ParallelLoopResult ForEach<TSource> (IEnumerable<TSource> source, ParallelOptions parallelOptions,
  419. Action<TSource, ParallelLoopState> body)
  420. {
  421. if (source == null)
  422. throw new ArgumentNullException ("source");
  423. if (body == null)
  424. throw new ArgumentNullException ("body");
  425. return ForEach<TSource, object> (Partitioner.Create (source),
  426. parallelOptions,
  427. () => null,
  428. (e, s, l) => { body (e, s); return null; },
  429. _ => {});
  430. }
  431. public static ParallelLoopResult ForEach<TSource> (IEnumerable<TSource> source, ParallelOptions parallelOptions,
  432. Action<TSource, ParallelLoopState, long> body)
  433. {
  434. if (source == null)
  435. throw new ArgumentNullException ("source");
  436. if (body == null)
  437. throw new ArgumentNullException ("body");
  438. return ForEach<TSource, object> (Partitioner.Create (source),
  439. parallelOptions,
  440. () => null,
  441. (e, s, i, l) => { body (e, s, i); return null; },
  442. _ => {});
  443. }
  444. public static ParallelLoopResult ForEach<TSource> (OrderablePartitioner<TSource> source, ParallelOptions parallelOptions,
  445. Action<TSource, ParallelLoopState, long> body)
  446. {
  447. if (body == null)
  448. throw new ArgumentNullException ("body");
  449. return ForEach<TSource, object> (source,
  450. parallelOptions,
  451. () => null,
  452. (e, s, i, l) => { body (e, s, i); return null; },
  453. _ => {});
  454. }
  455. public static ParallelLoopResult ForEach<TSource> (Partitioner<TSource> source, ParallelOptions parallelOptions,
  456. Action<TSource> body)
  457. {
  458. if (body == null)
  459. throw new ArgumentNullException ("body");
  460. return ForEach<TSource, object> (source,
  461. parallelOptions,
  462. () => null,
  463. (e, s, l) => { body (e); return null; },
  464. _ => {});
  465. }
  466. public static ParallelLoopResult ForEach<TSource> (Partitioner<TSource> source, ParallelOptions parallelOptions,
  467. Action<TSource, ParallelLoopState> body)
  468. {
  469. return ForEach<TSource, object> (source,
  470. parallelOptions,
  471. () => null,
  472. (e, s, l) => { body (e, s); return null; },
  473. _ => {});
  474. }
  475. public static ParallelLoopResult ForEach<TSource, TLocal> (IEnumerable<TSource> source, Func<TLocal> localInit,
  476. Func<TSource, ParallelLoopState, TLocal, TLocal> body,
  477. Action<TLocal> localFinally)
  478. {
  479. if (source == null)
  480. throw new ArgumentNullException ("source");
  481. return ForEach<TSource, TLocal> ((Partitioner<TSource>)Partitioner.Create (source),
  482. ParallelOptions.Default,
  483. localInit,
  484. body,
  485. localFinally);
  486. }
  487. public static ParallelLoopResult ForEach<TSource, TLocal> (IEnumerable<TSource> source, Func<TLocal> localInit,
  488. Func<TSource, ParallelLoopState, long, TLocal, TLocal> body,
  489. Action<TLocal> localFinally)
  490. {
  491. return ForEach<TSource, TLocal> (Partitioner.Create (source),
  492. ParallelOptions.Default,
  493. localInit,
  494. body,
  495. localFinally);
  496. }
  497. public static ParallelLoopResult ForEach<TSource, TLocal> (OrderablePartitioner<TSource> source, Func<TLocal> localInit,
  498. Func<TSource, ParallelLoopState, long, TLocal, TLocal> body,
  499. Action<TLocal> localFinally)
  500. {
  501. return ForEach<TSource, TLocal> (source, ParallelOptions.Default, localInit, body, localFinally);
  502. }
  503. public static ParallelLoopResult ForEach<TSource, TLocal> (Partitioner<TSource> source, Func<TLocal> localInit,
  504. Func<TSource, ParallelLoopState, TLocal, TLocal> body,
  505. Action<TLocal> localFinally)
  506. {
  507. return ForEach<TSource, TLocal> (source, ParallelOptions.Default, localInit, body, localFinally);
  508. }
  509. public static ParallelLoopResult ForEach<TSource, TLocal> (IEnumerable<TSource> source, ParallelOptions parallelOptions,
  510. Func<TLocal> localInit,
  511. Func<TSource, ParallelLoopState, TLocal, TLocal> body,
  512. Action<TLocal> localFinally)
  513. {
  514. if (source == null)
  515. throw new ArgumentNullException ("source");
  516. return ForEach<TSource, TLocal> (Partitioner.Create (source), parallelOptions, localInit, body, localFinally);
  517. }
  518. public static ParallelLoopResult ForEach<TSource, TLocal> (IEnumerable<TSource> source, ParallelOptions parallelOptions,
  519. Func<TLocal> localInit,
  520. Func<TSource, ParallelLoopState, long, TLocal, TLocal> body,
  521. Action<TLocal> localFinally)
  522. {
  523. if (source == null)
  524. throw new ArgumentNullException ("source");
  525. return ForEach<TSource, TLocal> (Partitioner.Create (source), parallelOptions, localInit, body, localFinally);
  526. }
  527. public static ParallelLoopResult ForEach<TSource, TLocal> (Partitioner<TSource> source, ParallelOptions parallelOptions,
  528. Func<TLocal> localInit,
  529. Func<TSource, ParallelLoopState, TLocal, TLocal> body,
  530. Action<TLocal> localFinally)
  531. {
  532. if (source == null)
  533. throw new ArgumentNullException ("source");
  534. if (body == null)
  535. throw new ArgumentNullException ("body");
  536. return ForEach<TSource, TLocal> (source.GetPartitions, parallelOptions, localInit, body, localFinally);
  537. }
  538. public static ParallelLoopResult ForEach<TSource, TLocal> (OrderablePartitioner<TSource> source, ParallelOptions parallelOptions,
  539. Func<TLocal> localInit,
  540. Func<TSource, ParallelLoopState, long, TLocal, TLocal> body,
  541. Action<TLocal> localFinally)
  542. {
  543. if (source == null)
  544. throw new ArgumentNullException ("source");
  545. if (body == null)
  546. throw new ArgumentNullException ("body");
  547. return ForEach<KeyValuePair<long, TSource>, TLocal> (source.GetOrderablePartitions,
  548. parallelOptions,
  549. localInit,
  550. (e, s, l) => body (e.Value, s, e.Key, l),
  551. localFinally);
  552. }
  553. #endregion
  554. #region Invoke
  555. public static void Invoke (params Action[] actions)
  556. {
  557. if (actions == null)
  558. throw new ArgumentNullException ("actions");
  559. Invoke (actions, (Action a) => Task.Factory.StartNew (a));
  560. }
  561. public static void Invoke (ParallelOptions parallelOptions, params Action[] actions)
  562. {
  563. if (parallelOptions == null)
  564. throw new ArgumentNullException ("parallelOptions");
  565. if (actions == null)
  566. throw new ArgumentNullException ("actions");
  567. Invoke (actions, (Action a) => Task.Factory.StartNew (a, parallelOptions.CancellationToken, TaskCreationOptions.None, parallelOptions.TaskScheduler));
  568. }
  569. static void Invoke (Action[] actions, Func<Action, Task> taskCreator)
  570. {
  571. if (actions.Length == 0)
  572. throw new ArgumentException ("actions is empty");
  573. // Execute it directly
  574. if (actions.Length == 1 && actions[0] != null)
  575. actions[0] ();
  576. bool shouldThrow = false;
  577. Task[] ts = Array.ConvertAll (actions, delegate (Action a) {
  578. if (a == null) {
  579. shouldThrow = true;
  580. return null;
  581. }
  582. return taskCreator (a);
  583. });
  584. if (shouldThrow)
  585. throw new ArgumentException ("One action in actions is null", "actions");
  586. try {
  587. Task.WaitAll (ts);
  588. } catch {
  589. HandleExceptions (ts);
  590. }
  591. }
  592. #endregion
  593. #region SpawnBestNumber, used by PLinq
  594. internal static Task[] SpawnBestNumber (Action action, Action callback)
  595. {
  596. return SpawnBestNumber (action, -1, callback);
  597. }
  598. internal static Task[] SpawnBestNumber (Action action, int dop, Action callback)
  599. {
  600. return SpawnBestNumber (action, dop, false, callback);
  601. }
  602. internal static Task[] SpawnBestNumber (Action action, int dop, bool wait, Action callback)
  603. {
  604. // Get the optimum amount of worker to create
  605. int num = dop == -1 ? (wait ? GetBestWorkerNumber () + 1 : GetBestWorkerNumber ()) : dop;
  606. // Initialize worker
  607. CountdownEvent evt = new CountdownEvent (num);
  608. Task[] tasks = new Task [num];
  609. for (int i = 0; i < num; i++) {
  610. tasks [i] = Task.Factory.StartNew (() => {
  611. action ();
  612. evt.Signal ();
  613. if (callback != null && evt.IsSet)
  614. callback ();
  615. });
  616. }
  617. // If explicitely told, wait for all workers to complete
  618. // and thus let main thread participate in the processing
  619. if (wait)
  620. Task.WaitAll (tasks);
  621. return tasks;
  622. }
  623. #endregion
  624. }
  625. }
  626. #endif