ParallelEnumerable.cs 80 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839184018411842184318441845184618471848184918501851185218531854185518561857185818591860186118621863186418651866186718681869187018711872187318741875187618771878187918801881188218831884188518861887188818891890189118921893189418951896189718981899190019011902190319041905190619071908190919101911191219131914191519161917191819191920192119221923192419251926192719281929193019311932193319341935193619371938193919401941194219431944194519461947194819491950195119521953195419551956195719581959196019611962196319641965196619671968196919701971197219731974197519761977197819791980198119821983198419851986198719881989199019911992199319941995199619971998199920002001200220032004200520062007200820092010201120122013201420152016201720182019202020212022202320242025202620272028202920302031203220332034203520362037203820392040204120422043204420452046204720482049205020512052205320542055205620572058205920602061206220632064206520662067206820692070207120722073207420752076207720782079208020812082208320842085208620872088208920902091209220932094209520962097209820992100210121022103210421052106210721082109211021112112211321142115211621172118211921202121212221232124
  1. //
  2. // ParallelEnumerable.cs
  3. //
  4. // Author:
  5. // Jérémie "Garuma" Laval <[email protected]>
  6. //
  7. // Copyright (c) 2010 Jérémie "Garuma" Laval
  8. //
  9. // Permission is hereby granted, free of charge, to any person obtaining a copy
  10. // of this software and associated documentation files (the "Software"), to deal
  11. // in the Software without restriction, including without limitation the rights
  12. // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
  13. // copies of the Software, and to permit persons to whom the Software is
  14. // furnished to do so, subject to the following conditions:
  15. //
  16. // The above copyright notice and this permission notice shall be included in
  17. // all copies or substantial portions of the Software.
  18. //
  19. // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
  20. // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
  21. // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
  22. // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
  23. // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
  24. // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
  25. // THE SOFTWARE.
  26. #if NET_4_0
  27. using System;
  28. using System.Threading;
  29. using System.Collections;
  30. using System.Collections.Generic;
  31. using System.Collections.Concurrent;
  32. using System.Linq.Parallel;
  33. using System.Linq.Parallel.QueryNodes;
  34. namespace System.Linq
  35. {
  36. public static class ParallelEnumerable
  37. {
  38. #region Range & Repeat
  39. public static ParallelQuery<int> Range (int start, int count)
  40. {
  41. if (int.MaxValue - start < count)
  42. throw new ArgumentOutOfRangeException ("count", "start + count - 1 is larger than Int32.MaxValue");
  43. if (count < 0)
  44. throw new ArgumentOutOfRangeException ("count", "count is less than 0");
  45. return (new RangeList (start, count)).AsParallel ();
  46. }
  47. public static ParallelQuery<TResult> Repeat<TResult> (TResult obj, int count)
  48. {
  49. if (count < 0)
  50. throw new ArgumentOutOfRangeException ("count", "count is less than 0");
  51. return (new RepeatList<TResult> (obj, count)).AsParallel ();
  52. }
  53. #endregion
  54. #region Empty
  55. public static ParallelQuery<TResult> Empty<TResult> ()
  56. {
  57. return Repeat<TResult> (default (TResult), 0);
  58. }
  59. #endregion
  60. #region AsParallel
  61. public static ParallelQuery<TSource> AsParallel<TSource> (this IEnumerable<TSource> source)
  62. {
  63. if (source == null)
  64. throw new ArgumentNullException ("source");
  65. return new ParallelQuery<TSource> (new QueryStartNode<TSource> (source));
  66. }
  67. public static ParallelQuery<TSource> AsParallel<TSource> (this Partitioner<TSource> source)
  68. {
  69. if (source == null)
  70. throw new ArgumentNullException ("source");
  71. return new ParallelQuery<TSource> (new QueryStartNode<TSource> (source));
  72. }
  73. public static ParallelQuery AsParallel (this IEnumerable source)
  74. {
  75. if (source == null)
  76. throw new ArgumentNullException ("source");
  77. return new ParallelQuery<object> (new QueryStartNode<object> (source.Cast<object> ()));
  78. }
  79. public static IEnumerable<TSource> AsEnumerable<TSource> (this ParallelQuery<TSource> source)
  80. {
  81. if (source == null)
  82. throw new ArgumentNullException ("source");
  83. return source.AsSequential ();
  84. }
  85. public static IEnumerable<TSource> AsSequential<TSource> (this ParallelQuery<TSource> source)
  86. {
  87. if (source == null)
  88. throw new ArgumentNullException ("source");
  89. return source.Node.GetSequential ();
  90. }
  91. #endregion
  92. #region AsOrdered / AsUnordered
  93. public static ParallelQuery<TSource> AsOrdered<TSource> (this ParallelQuery<TSource> source)
  94. {
  95. if (source == null)
  96. throw new ArgumentNullException ("source");
  97. return new ParallelQuery<TSource> (new QueryAsOrderedNode<TSource> (source.Node));
  98. }
  99. public static ParallelQuery<TSource> AsUnordered<TSource> (this ParallelQuery<TSource> source)
  100. {
  101. if (source == null)
  102. throw new ArgumentNullException ("source");
  103. return new ParallelQuery<TSource> (new QueryAsUnorderedNode<TSource> (source.Node));
  104. }
  105. public static ParallelQuery AsOrdered (this ParallelQuery source)
  106. {
  107. if (source == null)
  108. throw new ArgumentNullException ("source");
  109. return source.TypedQuery.AsOrdered ();
  110. }
  111. #endregion
  112. #region With*
  113. public static ParallelQuery<TSource> WithExecutionMode<TSource> (this ParallelQuery<TSource> source,
  114. ParallelExecutionMode executionMode)
  115. {
  116. if (source == null)
  117. throw new ArgumentNullException ("source");
  118. return new ParallelQuery<TSource> (new ParallelExecutionModeNode<TSource> (executionMode, source.Node));
  119. }
  120. public static ParallelQuery<TSource> WithCancellation<TSource> (this ParallelQuery<TSource> source,
  121. CancellationToken cancellationToken)
  122. {
  123. if (source == null)
  124. throw new ArgumentNullException ("source");
  125. return new ParallelQuery<TSource> (new CancellationTokenNode<TSource> (cancellationToken, source.Node));
  126. }
  127. public static ParallelQuery<TSource> WithMergeOptions<TSource> (this ParallelQuery<TSource> source,
  128. ParallelMergeOptions mergeOptions)
  129. {
  130. if (source == null)
  131. throw new ArgumentNullException ("source");
  132. return new ParallelQuery<TSource> (new ParallelMergeOptionsNode<TSource> (mergeOptions, source.Node));
  133. }
  134. public static ParallelQuery<TSource> WithDegreeOfParallelism<TSource> (this ParallelQuery<TSource> source,
  135. int degreeParallelism)
  136. {
  137. if (degreeParallelism < 1 || degreeParallelism > 63)
  138. throw new ArgumentException ("degreeOfParallelism is less than 1 or greater than 63", "degreeParallelism");
  139. if (source == null)
  140. throw new ArgumentNullException ("source");
  141. return new ParallelQuery<TSource> (new DegreeOfParallelismNode<TSource> (degreeParallelism, source.Node));
  142. }
  143. internal static ParallelQuery<TSource> WithImplementerToken<TSource> (this ParallelQuery<TSource> source,
  144. CancellationTokenSource token)
  145. {
  146. return new ParallelQuery<TSource> (new ImplementerTokenNode<TSource> (token, source.Node));
  147. }
  148. #endregion
  149. #region Select
  150. public static ParallelQuery<TResult> Select<TSource, TResult> (this ParallelQuery<TSource> source, Func<TSource, TResult> selector)
  151. {
  152. if (source == null)
  153. throw new ArgumentNullException ("source");
  154. if (selector == null)
  155. throw new ArgumentNullException ("selector");
  156. return new ParallelQuery<TResult> (new QuerySelectNode<TResult, TSource> (source.Node, selector));
  157. }
  158. public static ParallelQuery<TResult> Select<TSource, TResult> (this ParallelQuery<TSource> source, Func<TSource, int, TResult> selector)
  159. {
  160. if (source == null)
  161. throw new ArgumentNullException ("source");
  162. if (selector == null)
  163. throw new ArgumentNullException ("selector");
  164. return new ParallelQuery<TResult> (new QuerySelectNode<TResult, TSource> (source.Node, selector));
  165. }
  166. #endregion
  167. #region SelectMany
  168. public static ParallelQuery<TResult> SelectMany<TSource, TResult> (this ParallelQuery<TSource> source,
  169. Func<TSource, IEnumerable<TResult>> selector)
  170. {
  171. return source.SelectMany (selector, (s, e) => e);
  172. }
  173. public static ParallelQuery<TResult> SelectMany<TSource, TResult> (this ParallelQuery<TSource> source,
  174. Func<TSource, int, IEnumerable<TResult>> selector)
  175. {
  176. return source.SelectMany (selector, (s, e) => e);
  177. }
  178. public static ParallelQuery<TResult> SelectMany<TSource, TCollection, TResult> (this ParallelQuery<TSource> source,
  179. Func<TSource, IEnumerable<TCollection>> collectionSelector,
  180. Func<TSource, TCollection, TResult> resultSelector)
  181. {
  182. return new ParallelQuery<TResult> (new QuerySelectManyNode<TSource, TCollection, TResult> (source.Node,
  183. collectionSelector,
  184. resultSelector));
  185. }
  186. public static ParallelQuery<TResult> SelectMany<TSource, TCollection, TResult> (this ParallelQuery<TSource> source,
  187. Func<TSource, int, IEnumerable<TCollection>> collectionSelector,
  188. Func<TSource, TCollection, TResult> resultSelector)
  189. {
  190. return new ParallelQuery<TResult> (new QuerySelectManyNode<TSource, TCollection, TResult> (source.Node,
  191. collectionSelector,
  192. resultSelector));
  193. }
  194. #endregion
  195. #region Where
  196. public static ParallelQuery<TSource> Where<TSource> (this ParallelQuery<TSource> source, Func<TSource, bool> predicate)
  197. {
  198. if (source == null)
  199. throw new ArgumentNullException ("source");
  200. if (predicate == null)
  201. throw new ArgumentNullException ("predicate");
  202. return new ParallelQuery<TSource> (new QueryWhereNode<TSource> (source.Node, predicate));
  203. }
  204. public static ParallelQuery<TSource> Where<TSource> (this ParallelQuery<TSource> source, Func<TSource, int, bool> predicate)
  205. {
  206. if (source == null)
  207. throw new ArgumentNullException ("source");
  208. if (predicate == null)
  209. throw new ArgumentNullException ("predicate");
  210. return new ParallelQuery<TSource> (new QueryWhereNode<TSource> (source.Node, predicate));
  211. }
  212. #endregion
  213. #region Aggregate
  214. public static TSource Aggregate<TSource> (this ParallelQuery<TSource> source, Func<TSource, TSource, TSource> func)
  215. {
  216. if (source == null)
  217. throw new ArgumentNullException ("source");
  218. if (func == null)
  219. throw new ArgumentNullException ("func");
  220. return source.Aggregate<TSource, TSource, TSource> ((Func<TSource>)null,
  221. func,
  222. func,
  223. (e) => e);
  224. }
  225. public static TAccumulate Aggregate<TSource, TAccumulate> (this ParallelQuery<TSource> source,
  226. TAccumulate seed,
  227. Func<TAccumulate, TSource, TAccumulate> func)
  228. {
  229. if (source == null)
  230. throw new ArgumentNullException ("source");
  231. if (func == null)
  232. throw new ArgumentNullException ("func");
  233. return source.Aggregate (seed, func, (e) => e);
  234. }
  235. public static TResult Aggregate<TSource, TAccumulate, TResult> (this ParallelQuery<TSource> source,
  236. TAccumulate seed,
  237. Func<TAccumulate, TSource, TAccumulate> func,
  238. Func<TAccumulate, TResult> resultSelector)
  239. {
  240. if (source == null)
  241. throw new ArgumentNullException ("source");
  242. if (func == null)
  243. throw new ArgumentNullException ("func");
  244. if (resultSelector == null)
  245. throw new ArgumentNullException ("resultSelector");
  246. TAccumulate accumulator = seed;
  247. foreach (TSource value in source)
  248. accumulator = func (accumulator, value);
  249. return resultSelector (accumulator);
  250. }
  251. public static TResult Aggregate<TSource, TAccumulate, TResult> (this ParallelQuery<TSource> source,
  252. TAccumulate seed,
  253. Func<TAccumulate, TSource, TAccumulate> updateAccumulatorFunc,
  254. Func<TAccumulate, TAccumulate, TAccumulate> combineAccumulatorsFunc,
  255. Func<TAccumulate, TResult> resultSelector)
  256. {
  257. if (source == null)
  258. throw new ArgumentNullException ("source");
  259. if (updateAccumulatorFunc == null)
  260. throw new ArgumentNullException ("updateAccumulatorFunc");
  261. if (combineAccumulatorsFunc == null)
  262. throw new ArgumentNullException ("combineAccumulatorsFunc");
  263. if (resultSelector == null)
  264. throw new ArgumentNullException ("resultSelector");
  265. return source.Aggregate (() => seed, updateAccumulatorFunc, combineAccumulatorsFunc, resultSelector);
  266. }
  267. public static TResult Aggregate<TSource, TAccumulate, TResult> (this ParallelQuery<TSource> source,
  268. Func<TAccumulate> seedFunc,
  269. Func<TAccumulate, TSource, TAccumulate> updateAccumulatorFunc,
  270. Func<TAccumulate, TAccumulate, TAccumulate> combineAccumulatorsFunc,
  271. Func<TAccumulate, TResult> resultSelector)
  272. {
  273. if (source == null)
  274. throw new ArgumentNullException ("source");
  275. if (seedFunc == null)
  276. throw new ArgumentNullException ("seedFunc");
  277. if (updateAccumulatorFunc == null)
  278. throw new ArgumentNullException ("updateAccumulatorFunc");
  279. if (combineAccumulatorsFunc == null)
  280. throw new ArgumentNullException ("combineAccumulatorsFunc");
  281. if (resultSelector == null)
  282. throw new ArgumentNullException ("resultSelector");
  283. TAccumulate accumulator = default (TAccumulate);
  284. ParallelExecuter.ProcessAndAggregate<TSource, TAccumulate> (source.Node, seedFunc, updateAccumulatorFunc, (list) => {
  285. accumulator = list [0];
  286. for (int i = 1; i < list.Count; i++)
  287. accumulator = combineAccumulatorsFunc (accumulator, list[i]);
  288. });
  289. return resultSelector (accumulator);;
  290. }
  291. #endregion
  292. #region ForAll
  293. public static void ForAll<TSource> (this ParallelQuery<TSource> source, Action<TSource> action)
  294. {
  295. if (source == null)
  296. throw new ArgumentNullException ("source");
  297. if (action == null)
  298. throw new ArgumentNullException ("action");
  299. ParallelExecuter.ProcessAndBlock (source.Node, action);
  300. }
  301. #endregion
  302. #region OrderBy
  303. public static OrderedParallelQuery<TSource> OrderByDescending<TSource, TKey> (this ParallelQuery<TSource> source,
  304. Func<TSource, TKey> keySelector,
  305. IComparer<TKey> comparer)
  306. {
  307. if (source == null)
  308. throw new ArgumentNullException ("source");
  309. if (keySelector == null)
  310. throw new ArgumentNullException ("keySelector");
  311. if (comparer == null)
  312. throw new ArgumentNullException ("comparer");
  313. Comparison<TSource> comparison = (e1, e2) => -comparer.Compare (keySelector (e1), keySelector (e2));
  314. return new OrderedParallelQuery<TSource> (new QueryOrderByNode<TSource> (source.Node, comparison));
  315. }
  316. public static OrderedParallelQuery<TSource> OrderByDescending<TSource, TKey> (this ParallelQuery<TSource> source,
  317. Func<TSource, TKey> keySelector)
  318. {
  319. return OrderByDescending (source, keySelector, Comparer<TKey>.Default);
  320. }
  321. public static OrderedParallelQuery<TSource> OrderBy<TSource, TKey> (this ParallelQuery<TSource> source,
  322. Func<TSource, TKey> keySelector)
  323. {
  324. return OrderBy (source, keySelector, Comparer<TKey>.Default);
  325. }
  326. public static OrderedParallelQuery<TSource> OrderBy<TSource, TKey> (this ParallelQuery<TSource> source,
  327. Func<TSource, TKey> keySelector,
  328. IComparer<TKey> comparer)
  329. {
  330. if (source == null)
  331. throw new ArgumentNullException ("source");
  332. if (keySelector == null)
  333. throw new ArgumentNullException ("keySelector");
  334. if (comparer == null)
  335. throw new ArgumentNullException ("comparer");
  336. Comparison<TSource> comparison = (e1, e2) => comparer.Compare (keySelector (e1), keySelector (e2));
  337. return new OrderedParallelQuery<TSource> (new QueryOrderByNode<TSource> (source.Node, comparison));
  338. }
  339. #endregion
  340. #region ThenBy
  341. public static OrderedParallelQuery<TSource> ThenBy<TSource, TKey> (this OrderedParallelQuery<TSource> source,
  342. Func<TSource, TKey> keySelector)
  343. {
  344. return ThenBy (source, keySelector, Comparer<TKey>.Default);
  345. }
  346. public static OrderedParallelQuery<TSource> ThenBy<TSource, TKey> (this OrderedParallelQuery<TSource> source,
  347. Func<TSource, TKey> keySelector,
  348. IComparer<TKey> comparer)
  349. {
  350. if (source == null)
  351. throw new ArgumentNullException ("source");
  352. if (keySelector == null)
  353. throw new ArgumentNullException ("keySelector");
  354. if (comparer == null)
  355. throw new ArgumentNullException ("comparer");
  356. Comparison<TSource> comparison = (e1, e2) => comparer.Compare (keySelector (e1), keySelector (e2));
  357. return new OrderedParallelQuery<TSource> (new QueryOrderByNode<TSource> (source.Node, comparison));
  358. }
  359. public static OrderedParallelQuery<TSource> ThenByDescending<TSource, TKey> (this OrderedParallelQuery<TSource> source,
  360. Func<TSource, TKey> keySelector)
  361. {
  362. return ThenByDescending (source, keySelector, Comparer<TKey>.Default);
  363. }
  364. public static OrderedParallelQuery<TSource> ThenByDescending<TSource, TKey> (this OrderedParallelQuery<TSource> source,
  365. Func<TSource, TKey> keySelector,
  366. IComparer<TKey> comparer)
  367. {
  368. if (source == null)
  369. throw new ArgumentNullException ("source");
  370. if (keySelector == null)
  371. throw new ArgumentNullException ("keySelector");
  372. if (comparer == null)
  373. throw new ArgumentNullException ("comparer");
  374. Comparison<TSource> comparison = (e1, e2) => -comparer.Compare (keySelector (e1), keySelector (e2));
  375. return new OrderedParallelQuery<TSource> (new QueryOrderByNode<TSource> (source.Node, comparison));
  376. }
  377. #endregion
  378. #region All
  379. public static bool All<TSource> (this ParallelQuery<TSource> source, Func<TSource, bool> predicate)
  380. {
  381. if (source == null)
  382. throw new ArgumentNullException ("source");
  383. if (predicate == null)
  384. throw new ArgumentNullException ("predicate");
  385. CancellationTokenSource src = new CancellationTokenSource ();
  386. ParallelQuery<TSource> innerQuery = source.WithImplementerToken (src);
  387. bool result = true;
  388. innerQuery.ForAll ((e) => {
  389. if (!predicate (e)) {
  390. result = false;
  391. src.Cancel ();
  392. }
  393. });
  394. return result;
  395. }
  396. #endregion
  397. #region Any
  398. public static bool Any<TSource> (this ParallelQuery<TSource> source)
  399. {
  400. return Any<TSource> (source, (_) => true);
  401. }
  402. public static bool Any<TSource> (this ParallelQuery<TSource> source, Func<TSource, bool> predicate)
  403. {
  404. if (source == null)
  405. throw new ArgumentNullException ("source");
  406. if (predicate == null)
  407. throw new ArgumentNullException ("predicate");
  408. return !source.All ((e) => !predicate (e));
  409. }
  410. #endregion
  411. #region Contains
  412. public static bool Contains<TSource> (this ParallelQuery<TSource> source, TSource value)
  413. {
  414. return Contains<TSource> (source, value, EqualityComparer<TSource>.Default);
  415. }
  416. public static bool Contains<TSource> (this ParallelQuery<TSource> source, TSource value, IEqualityComparer<TSource> comparer)
  417. {
  418. if (source == null)
  419. throw new ArgumentNullException ("source");
  420. if (comparer == null)
  421. throw new ArgumentNullException ("comparer");
  422. return Any<TSource> (source, (e) => comparer.Equals (value));
  423. }
  424. #endregion
  425. #region SequenceEqual
  426. public static bool SequenceEqual<TSource> (this ParallelQuery<TSource> first,
  427. ParallelQuery<TSource> second)
  428. {
  429. if (first == null)
  430. throw new ArgumentNullException ("first");
  431. if (second == null)
  432. throw new ArgumentNullException ("second");
  433. return first.SequenceEqual (second, EqualityComparer<TSource>.Default);
  434. }
  435. public static bool SequenceEqual<TSource> (this ParallelQuery<TSource> first,
  436. ParallelQuery<TSource> second,
  437. IEqualityComparer<TSource> comparer)
  438. {
  439. if (first == null)
  440. throw new ArgumentNullException ("first");
  441. if (second == null)
  442. throw new ArgumentNullException ("second");
  443. if (comparer == null)
  444. throw new ArgumentNullException ("comparer");
  445. CancellationTokenSource source = new CancellationTokenSource ();
  446. ParallelQuery<bool> innerQuery
  447. = first.Zip (second, (e1, e2) => comparer.Equals (e1, e2)).Where ((e) => !e).WithImplementerToken (source);
  448. bool result = true;
  449. innerQuery.ForAll ((value) => {
  450. result = false;
  451. source.Cancel ();
  452. });
  453. return result;
  454. }
  455. [ObsoleteAttribute("The second data source of a binary operator must be of type System.Linq.ParallelQuery<T> rather "
  456. + "than System.Collections.Generic.IEnumerable<T>. To fix this problem, use the AsParallel() "
  457. + "extension method to convert the right data source to System.Linq.ParallelQuery<T>.")]
  458. public static bool SequenceEqual<TSource> (this ParallelQuery<TSource> first, IEnumerable<TSource> second)
  459. {
  460. throw new NotSupportedException ();
  461. }
  462. [ObsoleteAttribute("The second data source of a binary operator must be of type System.Linq.ParallelQuery<T> rather "
  463. + "than System.Collections.Generic.IEnumerable<T>. To fix this problem, use the AsParallel() "
  464. + "extension method to convert the right data source to System.Linq.ParallelQuery<T>.")]
  465. public static bool SequenceEqual<TSource> (this ParallelQuery<TSource> first,
  466. IEnumerable<TSource> second,
  467. IEqualityComparer<TSource> comparer)
  468. {
  469. throw new NotSupportedException ();
  470. }
  471. #endregion
  472. #region GroupBy
  473. public static ParallelQuery<IGrouping<TKey, TSource>> GroupBy<TSource, TKey> (this ParallelQuery<TSource> source,
  474. Func<TSource, TKey> keySelector)
  475. {
  476. return source.GroupBy (keySelector, EqualityComparer<TKey>.Default);
  477. }
  478. public static ParallelQuery<IGrouping<TKey, TSource>> GroupBy<TSource, TKey> (this ParallelQuery<TSource> source,
  479. Func<TSource, TKey> keySelector,
  480. IEqualityComparer<TKey> comparer)
  481. {
  482. return source.GroupBy (keySelector, (e) => e, comparer);
  483. }
  484. public static ParallelQuery<IGrouping<TKey, TElement>> GroupBy<TSource, TKey, TElement> (this ParallelQuery<TSource> source,
  485. Func<TSource, TKey> keySelector,
  486. Func<TSource, TElement> elementSelector)
  487. {
  488. return source.GroupBy (keySelector, elementSelector, EqualityComparer<TKey>.Default);
  489. }
  490. public static ParallelQuery<TResult> GroupBy<TSource, TKey, TResult> (this ParallelQuery<TSource> source,
  491. Func<TSource, TKey> keySelector,
  492. Func<TKey, IEnumerable<TSource>, TResult> resultSelector)
  493. {
  494. return source.GroupBy (keySelector)
  495. .Select ((g) => resultSelector (g.Key, (IEnumerable<TSource>)g));
  496. }
  497. public static ParallelQuery<IGrouping<TKey, TElement>> GroupBy<TSource, TKey, TElement> (this ParallelQuery<TSource> source,
  498. Func<TSource, TKey> keySelector,
  499. Func<TSource, TElement> elementSelector,
  500. IEqualityComparer<TKey> comparer)
  501. {
  502. throw new NotImplementedException ();
  503. }
  504. public static ParallelQuery<TResult> GroupBy<TSource, TKey, TElement, TResult> (this ParallelQuery<TSource> source,
  505. Func<TSource, TKey> keySelector,
  506. Func<TSource, TElement> elementSelector,
  507. Func<TKey, IEnumerable<TElement>, TResult> resultSelector)
  508. {
  509. return source.GroupBy (keySelector, elementSelector)
  510. .Select ((g) => resultSelector (g.Key, (IEnumerable<TElement>)g));
  511. }
  512. public static ParallelQuery<TResult> GroupBy<TSource, TKey, TResult> (this ParallelQuery<TSource> source,
  513. Func<TSource, TKey> keySelector,
  514. Func<TKey, IEnumerable<TSource>, TResult> resultSelector,
  515. IEqualityComparer<TKey> comparer)
  516. {
  517. return source.GroupBy (keySelector, comparer)
  518. .Select ((g) => resultSelector (g.Key, (IEnumerable<TSource>)g));
  519. }
  520. public static ParallelQuery<TResult> GroupBy<TSource, TKey, TElement, TResult> (this ParallelQuery<TSource> source,
  521. Func<TSource, TKey> keySelector,
  522. Func<TSource, TElement> elementSelector,
  523. Func<TKey, IEnumerable<TElement>, TResult> resultSelector,
  524. IEqualityComparer<TKey> comparer)
  525. {
  526. return source.GroupBy (keySelector, elementSelector, comparer)
  527. .Select ((g) => resultSelector (g.Key, (IEnumerable<TElement>)g));
  528. }
  529. #endregion
  530. #region GroupJoin
  531. public static ParallelQuery<TResult> GroupJoin<TOuter, TInner, TKey, TResult> (this ParallelQuery<TOuter> outer,
  532. ParallelQuery<TInner> inner,
  533. Func<TOuter, TKey> outerKeySelector,
  534. Func<TInner, TKey> innerKeySelector,
  535. Func<TOuter, IEnumerable<TInner>, TResult> resultSelector)
  536. {
  537. return outer.GroupJoin (inner, outerKeySelector, innerKeySelector, resultSelector, EqualityComparer<TKey>.Default);
  538. }
  539. public static ParallelQuery<TResult> GroupJoin<TOuter, TInner, TKey, TResult> (this ParallelQuery<TOuter> outer,
  540. ParallelQuery<TInner> inner,
  541. Func<TOuter, TKey> outerKeySelector,
  542. Func<TInner, TKey> innerKeySelector,
  543. Func<TOuter, IEnumerable<TInner>, TResult> resultSelector,
  544. IEqualityComparer<TKey> comparer)
  545. {
  546. throw new NotImplementedException ();
  547. }
  548. [ObsoleteAttribute("The second data source of a binary operator must be of type System.Linq.ParallelQuery<T> rather "
  549. + "than System.Collections.Generic.IEnumerable<T>. To fix this problem, use the AsParallel() "
  550. + "extension method to convert the right data source to System.Linq.ParallelQuery<T>.")]
  551. public static ParallelQuery<TResult> GroupJoin<TOuter, TInner, TKey, TResult> (this ParallelQuery<TOuter> outer,
  552. IEnumerable<TInner> inner,
  553. Func<TOuter, TKey> outerKeySelector,
  554. Func<TInner, TKey> innerKeySelector,
  555. Func<TOuter, IEnumerable<TInner>, TResult> resultSelector)
  556. {
  557. throw new NotSupportedException ();
  558. }
  559. [ObsoleteAttribute("The second data source of a binary operator must be of type System.Linq.ParallelQuery<T> rather "
  560. + "than System.Collections.Generic.IEnumerable<T>. To fix this problem, use the AsParallel() "
  561. + "extension method to convert the right data source to System.Linq.ParallelQuery<T>.")]
  562. public static ParallelQuery<TResult> GroupJoin<TOuter, TInner, TKey, TResult> (this ParallelQuery<TOuter> outer,
  563. IEnumerable<TInner> inner,
  564. Func<TOuter, TKey> outerKeySelector,
  565. Func<TInner, TKey> innerKeySelector,
  566. Func<TOuter, IEnumerable<TInner>, TResult> resultSelector,
  567. IEqualityComparer<TKey> comparer)
  568. {
  569. throw new NotImplementedException ();
  570. }
  571. #endregion
  572. #region ElementAt
  573. public static TSource ElementAt<TSource> (this ParallelQuery<TSource> source, int index)
  574. {
  575. if (source == null)
  576. throw new ArgumentNullException ("source");
  577. if (index < 0)
  578. throw new ArgumentOutOfRangeException ("index");
  579. if (index == 0) {
  580. try {
  581. return source.First ();
  582. } catch (InvalidOperationException) {
  583. throw new ArgumentOutOfRangeException ("index");
  584. }
  585. }
  586. TSource result = default (TSource);
  587. ParallelQuery<TSource> innerQuery = source.Where ((e, i) => i == index);
  588. try {
  589. result = innerQuery.First ();
  590. } catch (InvalidOperationException) {
  591. throw new ArgumentOutOfRangeException ("index");
  592. }
  593. return result;
  594. }
  595. public static TSource ElementAtOrDefault<TSource> (this ParallelQuery<TSource> source, int index)
  596. {
  597. if (source == null)
  598. throw new ArgumentNullException ("source");
  599. try {
  600. return source.ElementAt (index);
  601. } catch (ArgumentOutOfRangeException) {
  602. return default (TSource);
  603. }
  604. }
  605. #endregion
  606. #region Intersect
  607. public static ParallelQuery<TSource> Intersect<TSource> (this ParallelQuery<TSource> first,
  608. ParallelQuery<TSource> second)
  609. {
  610. return Intersect<TSource> (first, second, EqualityComparer<TSource>.Default);
  611. }
  612. public static ParallelQuery<TSource> Intersect<TSource> (this ParallelQuery<TSource> first,
  613. ParallelQuery<TSource> second,
  614. IEqualityComparer<TSource> comparer)
  615. {
  616. if (first == null)
  617. throw new ArgumentNullException ("first");
  618. if (second == null)
  619. throw new ArgumentNullException ("second");
  620. if (comparer == null)
  621. throw new ArgumentNullException ("comparer");
  622. return new ParallelQuery<TSource> (new QuerySetNode<TSource> (SetInclusionDefaults.Intersect, comparer, first.Node, second.Node));
  623. }
  624. [ObsoleteAttribute("The second data source of a binary operator must be of type System.Linq.ParallelQuery<T> rather "
  625. + "than System.Collections.Generic.IEnumerable<T>. To fix this problem, use the AsParallel() "
  626. + "extension method to convert the right data source to System.Linq.ParallelQuery<T>.")]
  627. public static ParallelQuery<TSource> Intersect<TSource> (this ParallelQuery<TSource> first, IEnumerable<TSource> second)
  628. {
  629. throw new NotSupportedException ();
  630. }
  631. [ObsoleteAttribute("The second data source of a binary operator must be of type System.Linq.ParallelQuery<T> rather "
  632. + "than System.Collections.Generic.IEnumerable<T>. To fix this problem, use the AsParallel() "
  633. + "extension method to convert the right data source to System.Linq.ParallelQuery<T>.")]
  634. public static ParallelQuery<TSource> Intersect<TSource> (this ParallelQuery<TSource> first,
  635. IEnumerable<TSource> second,
  636. IEqualityComparer<TSource> comparer)
  637. {
  638. throw new NotSupportedException ();
  639. }
  640. #endregion
  641. #region Join
  642. public static ParallelQuery<TResult> Join<TOuter, TInner, TKey, TResult> (this ParallelQuery<TOuter> outer,
  643. ParallelQuery<TInner> inner,
  644. Func<TOuter, TKey> outerKeySelector,
  645. Func<TInner, TKey> innerKeySelector,
  646. Func<TOuter, TInner, TResult> resultSelector)
  647. {
  648. return outer.Join (inner, outerKeySelector, innerKeySelector, resultSelector, EqualityComparer<TKey>.Default);
  649. }
  650. public static ParallelQuery<TResult> Join<TOuter, TInner, TKey, TResult> (this ParallelQuery<TOuter> outer,
  651. ParallelQuery<TInner> inner,
  652. Func<TOuter, TKey> outerKeySelector,
  653. Func<TInner, TKey> innerKeySelector,
  654. Func<TOuter, TInner, TResult> resultSelector,
  655. IEqualityComparer<TKey> comparer)
  656. {
  657. throw new NotImplementedException ();
  658. }
  659. [ObsoleteAttribute("The second data source of a binary operator must be of type System.Linq.ParallelQuery<T> rather "
  660. + "than System.Collections.Generic.IEnumerable<T>. To fix this problem, use the AsParallel() "
  661. + "extension method to convert the right data source to System.Linq.ParallelQuery<T>.")]
  662. public static ParallelQuery<TResult> Join<TOuter, TInner, TKey, TResult> (this ParallelQuery<TOuter> outer,
  663. IEnumerable<TInner> inner,
  664. Func<TOuter, TKey> outerKeySelector,
  665. Func<TInner, TKey> innerKeySelector,
  666. Func<TOuter, TInner, TResult> resultSelector)
  667. {
  668. throw new NotSupportedException ();
  669. }
  670. [ObsoleteAttribute("The second data source of a binary operator must be of type System.Linq.ParallelQuery<T> rather "
  671. + "than System.Collections.Generic.IEnumerable<T>. To fix this problem, use the AsParallel() "
  672. + "extension method to convert the right data source to System.Linq.ParallelQuery<T>.")]
  673. public static ParallelQuery<TResult> Join<TOuter, TInner, TKey, TResult> (this ParallelQuery<TOuter> outer,
  674. IEnumerable<TInner> inner,
  675. Func<TOuter, TKey> outerKeySelector,
  676. Func<TInner, TKey> innerKeySelector,
  677. Func<TOuter, TInner, TResult> resultSelector,
  678. IEqualityComparer<TKey> comparer)
  679. {
  680. throw new NotSupportedException ();
  681. }
  682. #endregion
  683. #region Except
  684. public static ParallelQuery<TSource> Except<TSource> (this ParallelQuery<TSource> first,
  685. ParallelQuery<TSource> second)
  686. {
  687. return Except<TSource> (first, second, EqualityComparer<TSource>.Default);
  688. }
  689. public static ParallelQuery<TSource> Except<TSource> (this ParallelQuery<TSource> first,
  690. ParallelQuery<TSource> second,
  691. IEqualityComparer<TSource> comparer)
  692. {
  693. if (first == null)
  694. throw new ArgumentNullException ("first");
  695. if (second == null)
  696. throw new ArgumentNullException ("second");
  697. if (comparer == null)
  698. throw new ArgumentNullException ("comparer");
  699. return new ParallelQuery<TSource> (new QuerySetNode<TSource> (SetInclusionDefaults.Except,
  700. comparer, first.Node, second.Node));
  701. }
  702. [ObsoleteAttribute("The second data source of a binary operator must be of type System.Linq.ParallelQuery<T> rather "
  703. + "than System.Collections.Generic.IEnumerable<T>. To fix this problem, use the AsParallel() "
  704. + "extension method to convert the right data source to System.Linq.ParallelQuery<T>.")]
  705. public static ParallelQuery<TSource> Except<TSource> (this ParallelQuery<TSource> first,
  706. IEnumerable<TSource> second)
  707. {
  708. throw new NotSupportedException ();
  709. }
  710. [ObsoleteAttribute("The second data source of a binary operator must be of type System.Linq.ParallelQuery<T> rather "
  711. + "than System.Collections.Generic.IEnumerable<T>. To fix this problem, use the AsParallel() "
  712. + "extension method to convert the right data source to System.Linq.ParallelQuery<T>.")]
  713. public static ParallelQuery<TSource> Except<TSource> (this ParallelQuery<TSource> first,
  714. IEnumerable<TSource> second,
  715. IEqualityComparer<TSource> comparer)
  716. {
  717. throw new NotSupportedException ();
  718. }
  719. #endregion
  720. #region Distinct
  721. public static ParallelQuery<TSource> Distinct<TSource> (this ParallelQuery<TSource> source)
  722. {
  723. return Distinct<TSource> (source, EqualityComparer<TSource>.Default);
  724. }
  725. public static ParallelQuery<TSource> Distinct<TSource> (this ParallelQuery<TSource> source, IEqualityComparer<TSource> comparer)
  726. {
  727. if (source == null)
  728. throw new ArgumentNullException ("source");
  729. if (comparer == null)
  730. throw new ArgumentNullException ("comparer");
  731. return new ParallelQuery<TSource> (new QuerySetNode<TSource> (SetInclusionDefaults.Distinct, comparer,
  732. source.Node, null));
  733. }
  734. #endregion
  735. #region Union
  736. [ObsoleteAttribute("The second data source of a binary operator must be of type System.Linq.ParallelQuery<T> rather "
  737. + "than System.Collections.Generic.IEnumerable<T>. To fix this problem, use the AsParallel() "
  738. + "extension method to convert the right data source to System.Linq.ParallelQuery<T>.")]
  739. public static ParallelQuery<TSource> Union<TSource> (this ParallelQuery<TSource> first,
  740. IEnumerable<TSource> second)
  741. {
  742. throw new NotSupportedException ();
  743. }
  744. [ObsoleteAttribute("The second data source of a binary operator must be of type System.Linq.ParallelQuery<T> rather "
  745. + "than System.Collections.Generic.IEnumerable<T>. To fix this problem, use the AsParallel() "
  746. + "extension method to convert the right data source to System.Linq.ParallelQuery<T>.")]
  747. public static ParallelQuery<TSource> Union<TSource>(this ParallelQuery<TSource> first,
  748. IEnumerable<TSource> second,
  749. IEqualityComparer<TSource> comparer)
  750. {
  751. throw new NotSupportedException ();
  752. }
  753. public static ParallelQuery<TSource> Union<TSource> (this ParallelQuery<TSource> first,
  754. ParallelQuery<TSource> second)
  755. {
  756. return first.Union (second, EqualityComparer<TSource>.Default);
  757. }
  758. public static ParallelQuery<TSource> Union<TSource> (this ParallelQuery<TSource> first,
  759. ParallelQuery<TSource> second,
  760. IEqualityComparer<TSource> comparer)
  761. {
  762. if (first == null)
  763. throw new ArgumentNullException ("first");
  764. if (second == null)
  765. throw new ArgumentNullException ("second");
  766. if (comparer == null)
  767. throw new ArgumentNullException ("comparer");
  768. return new ParallelQuery<TSource> (new QuerySetNode<TSource> (SetInclusionDefaults.Union, comparer, first.Node, second.Node));
  769. }
  770. #endregion
  771. #region Take
  772. // TODO : introduce some early break up here, use ImplementerToken
  773. public static ParallelQuery<TSource> Take<TSource> (this ParallelQuery<TSource> source, int count)
  774. {
  775. if (source == null)
  776. throw new ArgumentNullException ("source");
  777. return source.Where ((e, i) => i < count);
  778. }
  779. public static ParallelQuery<TSource> TakeWhile<TSource> (this ParallelQuery<TSource> source,
  780. Func<TSource, bool> predicate)
  781. {
  782. if (source == null)
  783. throw new ArgumentNullException ("source");
  784. if (predicate == null)
  785. throw new ArgumentNullException ("predicate");
  786. return source.Where ((e) => predicate (e));
  787. }
  788. public static ParallelQuery<TSource> TakeWhile<TSource> (this ParallelQuery<TSource> source,
  789. Func<TSource, int, bool> predicate)
  790. {
  791. if (source == null)
  792. throw new ArgumentNullException ("source");
  793. if (predicate == null)
  794. throw new ArgumentNullException ("predicate");
  795. return source.Where ((e, i) => predicate (e, i));
  796. }
  797. #endregion
  798. #region Skip
  799. public static ParallelQuery<TSource> Skip<TSource> (this ParallelQuery<TSource> source, int count)
  800. {
  801. if (source == null)
  802. throw new ArgumentNullException ("source");
  803. return source.Node.IsOrdered () ?
  804. source.Where ((e, i) => i >= count) :
  805. source.Where ((e) => count < 0 || Interlocked.Decrement (ref count) < 0);
  806. }
  807. public static ParallelQuery<TSource> SkipWhile<TSource> (this ParallelQuery<TSource> source,
  808. Func<TSource, bool> predicate)
  809. {
  810. if (source == null)
  811. throw new ArgumentNullException ("source");
  812. if (predicate == null)
  813. throw new ArgumentNullException ("predicate");
  814. return source.Node.IsOrdered () ?
  815. source.SkipWhile ((e, i) => predicate (e)) :
  816. source.Where ((e) => !predicate (e));
  817. }
  818. public static ParallelQuery<TSource> SkipWhile<TSource> (this ParallelQuery<TSource> source,
  819. Func<TSource, int, bool> predicate)
  820. {
  821. if (source == null)
  822. throw new ArgumentNullException ("source");
  823. if (predicate == null)
  824. throw new ArgumentNullException ("predicate");
  825. int indexCache = int.MaxValue;
  826. return source.Where ((e, i) => i >= indexCache || (!predicate (e, i) && (indexCache = i) == i));
  827. }
  828. #endregion
  829. #region Single
  830. static TSource SingleInternal<TSource> (this ParallelQuery<TSource> source, params TSource[] init)
  831. {
  832. TSource result = default(TSource);
  833. bool hasValue = false;
  834. foreach (TSource element in source) {
  835. if (hasValue)
  836. throw new InvalidOperationException ("The input sequence contains more than one element.");
  837. result = element;
  838. hasValue = true;
  839. }
  840. if (!hasValue && init.Length != 0) {
  841. result = init[0];
  842. hasValue = true;
  843. }
  844. if (!hasValue)
  845. throw new InvalidOperationException ("The input sequence is empty.");
  846. return result;
  847. }
  848. public static TSource Single<TSource> (this ParallelQuery<TSource> source)
  849. {
  850. if (source == null)
  851. throw new ArgumentNullException ("source");
  852. return SingleInternal<TSource> (source);
  853. }
  854. public static TSource Single<TSource> (this ParallelQuery<TSource> source,
  855. Func<TSource, bool> predicate)
  856. {
  857. if (source == null)
  858. throw new ArgumentNullException ("source");
  859. if (predicate == null)
  860. throw new ArgumentNullException ("predicate");
  861. return source.Where (predicate).Single ();
  862. }
  863. public static TSource SingleOrDefault<TSource> (this ParallelQuery<TSource> source)
  864. {
  865. if (source == null)
  866. throw new ArgumentNullException ("source");
  867. return SingleInternal<TSource> (source, default (TSource));
  868. }
  869. public static TSource SingleOrDefault<TSource> (this ParallelQuery<TSource> source, Func<TSource, bool> predicate)
  870. {
  871. if (source == null)
  872. throw new ArgumentNullException ("source");
  873. if (predicate == null)
  874. throw new ArgumentNullException ("predicate");
  875. return source.Where (predicate).SingleOrDefault ();
  876. }
  877. #endregion
  878. #region Count
  879. public static int Count<TSource> (this ParallelQuery<TSource> source)
  880. {
  881. if (source == null)
  882. throw new ArgumentNullException ("source");
  883. return source.Aggregate<TSource, int, int> (() => 0,
  884. (acc, e) => acc + 1,
  885. (acc1, acc2) => acc1 + acc2,
  886. (result) => result);
  887. }
  888. public static int Count<TSource> (this ParallelQuery<TSource> source, Func<TSource, bool> predicate)
  889. {
  890. if (source == null)
  891. throw new ArgumentNullException ("source");
  892. if (predicate == null)
  893. throw new ArgumentNullException ("predicate");
  894. return source.Where (predicate).Count ();
  895. }
  896. public static long LongCount<TSource> (this ParallelQuery<TSource> source)
  897. {
  898. if (source == null)
  899. throw new ArgumentNullException ("source");
  900. return source.Aggregate<TSource, long, long> (() => 0,
  901. (acc, e) => acc + 1,
  902. (acc1, acc2) => acc1 + acc2,
  903. (result) => result);
  904. }
  905. public static long LongCount<TSource> (this ParallelQuery<TSource> source, Func<TSource, bool> predicate)
  906. {
  907. if (source == null)
  908. throw new ArgumentNullException ("source");
  909. if (predicate == null)
  910. throw new ArgumentNullException ("predicate");
  911. return source.Where (predicate).LongCount ();
  912. }
  913. #endregion
  914. #region Average
  915. public static double Average (this ParallelQuery<int> source)
  916. {
  917. if (source == null)
  918. throw new ArgumentNullException ("source");
  919. return source.Aggregate (() => new int[2],
  920. (acc, e) => { acc[0] += e; acc[1]++; return acc; },
  921. (acc1, acc2) => { acc1[0] += acc2[0]; acc1[1] += acc2[1]; return acc1; },
  922. (acc) => acc[0] / ((double)acc[1]));
  923. }
  924. public static double Average (this ParallelQuery<long> source)
  925. {
  926. if (source == null)
  927. throw new ArgumentNullException ("source");
  928. return source.Aggregate (() => new long[2],
  929. (acc, e) => { acc[0] += e; acc[1]++; return acc; },
  930. (acc1, acc2) => { acc1[0] += acc2[0]; acc1[1] += acc2[1]; return acc1; },
  931. (acc) => acc[0] / ((double)acc[1]));
  932. }
  933. public static decimal Average (this ParallelQuery<decimal> source)
  934. {
  935. if (source == null)
  936. throw new ArgumentNullException ("source");
  937. return source.Aggregate (() => new decimal[2],
  938. (acc, e) => { acc[0] += e; acc[1]++; return acc; },
  939. (acc1, acc2) => { acc1[0] += acc2[0]; acc1[1] += acc2[1]; return acc1; },
  940. (acc) => acc[0] / acc[1]);
  941. }
  942. public static double Average (this ParallelQuery<double> source)
  943. {
  944. if (source == null)
  945. throw new ArgumentNullException ("source");
  946. return source.Aggregate (() => new double[2],
  947. (acc, e) => { acc[0] += e; acc[1]++; return acc; },
  948. (acc1, acc2) => { acc1[0] += acc2[0]; acc1[1] += acc2[1]; return acc1; },
  949. (acc) => acc[0] / ((double)acc[1]));
  950. }
  951. public static float Average (this ParallelQuery<float> source)
  952. {
  953. if (source == null)
  954. throw new ArgumentNullException ("source");
  955. return source.Aggregate (() => new float[2],
  956. (acc, e) => { acc[0] += e; acc[1]++; return acc; },
  957. (acc1, acc2) => { acc1[0] += acc2[0]; acc1[1] += acc2[1]; return acc1; },
  958. (acc) => acc[0] / acc[1]);
  959. }
  960. #endregion
  961. #region More Average
  962. public static double? Average (this ParallelQuery<int?> source)
  963. {
  964. if (source == null)
  965. throw new ArgumentNullException ("source");
  966. return source.Select ((e) => e.HasValue ? e.Value : 0).Average ();;
  967. }
  968. public static double? Average (this ParallelQuery<long?> source)
  969. {
  970. if (source == null)
  971. throw new ArgumentNullException ("source");
  972. return source.Select ((e) => e.HasValue ? e.Value : 0).Average ();
  973. }
  974. public static decimal? Average (this ParallelQuery<decimal?> source)
  975. {
  976. if (source == null)
  977. throw new ArgumentNullException ("source");
  978. return source.Select ((e) => e.HasValue ? e.Value : 0).Average ();
  979. }
  980. public static double? Average (this ParallelQuery<double?> source)
  981. {
  982. if (source == null)
  983. throw new ArgumentNullException ("source");
  984. return source.Select ((e) => e.HasValue ? e.Value : 0).Average ();
  985. }
  986. public static float? Average (this ParallelQuery<float?> source)
  987. {
  988. if (source == null)
  989. throw new ArgumentNullException ("source");
  990. return source.Select ((e) => e.HasValue ? e.Value : 0).Average ();
  991. }
  992. public static double Average<TSource> (this ParallelQuery<TSource> source, Func<TSource, int> func)
  993. {
  994. if (source == null)
  995. throw new ArgumentNullException ("source");
  996. if (func == null)
  997. throw new ArgumentNullException ("func");
  998. return source.Select (func).Average ();
  999. }
  1000. public static double Average<TSource> (this ParallelQuery<TSource> source, Func<TSource, long> func)
  1001. {
  1002. if (source == null)
  1003. throw new ArgumentNullException ("source");
  1004. if (func == null)
  1005. throw new ArgumentNullException ("func");
  1006. return source.Select (func).Average ();
  1007. }
  1008. public static float Average<TSource> (this ParallelQuery<TSource> source, Func<TSource, float> func)
  1009. {
  1010. if (source == null)
  1011. throw new ArgumentNullException ("source");
  1012. if (func == null)
  1013. throw new ArgumentNullException ("func");
  1014. return source.Select (func).Average ();
  1015. }
  1016. public static double Average<TSource> (this ParallelQuery<TSource> source, Func<TSource, double> func)
  1017. {
  1018. if (source == null)
  1019. throw new ArgumentNullException ("source");
  1020. if (func == null)
  1021. throw new ArgumentNullException ("func");
  1022. return source.Select (func).Average ();
  1023. }
  1024. public static decimal Average<TSource> (this ParallelQuery<TSource> source, Func<TSource, decimal> func)
  1025. {
  1026. if (source == null)
  1027. throw new ArgumentNullException ("source");
  1028. if (func == null)
  1029. throw new ArgumentNullException ("func");
  1030. return source.Select (func).Average ();
  1031. }
  1032. public static double? Average<TSource> (this ParallelQuery<TSource> source, Func<TSource, int?> func)
  1033. {
  1034. if (source == null)
  1035. throw new ArgumentNullException ("source");
  1036. if (func == null)
  1037. throw new ArgumentNullException ("func");
  1038. return source.Select (func).Average ();
  1039. }
  1040. public static double? Average<TSource> (this ParallelQuery<TSource> source, Func<TSource, long?> func)
  1041. {
  1042. if (source == null)
  1043. throw new ArgumentNullException ("source");
  1044. if (func == null)
  1045. throw new ArgumentNullException ("func");
  1046. return source.Select (func).Average ();
  1047. }
  1048. public static float? Average<TSource> (this ParallelQuery<TSource> source, Func<TSource, float?> func)
  1049. {
  1050. if (source == null)
  1051. throw new ArgumentNullException ("source");
  1052. if (func == null)
  1053. throw new ArgumentNullException ("func");
  1054. return source.Select (func).Average ();
  1055. }
  1056. public static double? Average<TSource> (this ParallelQuery<TSource> source, Func<TSource, double?> func)
  1057. {
  1058. if (source == null)
  1059. throw new ArgumentNullException ("source");
  1060. if (func == null)
  1061. throw new ArgumentNullException ("func");
  1062. return source.Select (func).Average ();
  1063. }
  1064. public static decimal? Average<TSource> (this ParallelQuery<TSource> source, Func<TSource, decimal?> func)
  1065. {
  1066. if (source == null)
  1067. throw new ArgumentNullException ("source");
  1068. if (func == null)
  1069. throw new ArgumentNullException ("func");
  1070. return source.Select (func).Average ();
  1071. }
  1072. #endregion
  1073. #region Sum
  1074. public static int Sum (this ParallelQuery<int> source)
  1075. {
  1076. if (source == null)
  1077. throw new ArgumentNullException ("source");
  1078. return source.Aggregate (0, (e1, e2) => e1 + e2, (sum1, sum2) => sum1 + sum2, (sum) => sum);
  1079. }
  1080. public static long Sum (this ParallelQuery<long> source)
  1081. {
  1082. if (source == null)
  1083. throw new ArgumentNullException ("source");
  1084. return source.Aggregate ((long)0, (e1, e2) => e1 + e2, (sum1, sum2) => sum1 + sum2, (sum) => sum);
  1085. }
  1086. public static float Sum (this ParallelQuery<float> source)
  1087. {
  1088. if (source == null)
  1089. throw new ArgumentNullException ("source");
  1090. return source.Aggregate (0.0f, (e1, e2) => e1 + e2, (sum1, sum2) => sum1 + sum2, (sum) => sum);
  1091. }
  1092. public static double Sum (this ParallelQuery<double> source)
  1093. {
  1094. if (source == null)
  1095. throw new ArgumentNullException ("source");
  1096. return source.Aggregate (0.0, (e1, e2) => e1 + e2, (sum1, sum2) => sum1 + sum2, (sum) => sum);
  1097. }
  1098. public static decimal Sum (this ParallelQuery<decimal> source)
  1099. {
  1100. if (source == null)
  1101. throw new ArgumentNullException ("source");
  1102. return source.Aggregate ((decimal)0, (e1, e2) => e1 + e2, (sum1, sum2) => sum1 + sum2, (sum) => sum);
  1103. }
  1104. public static int? Sum (this ParallelQuery<int?> source)
  1105. {
  1106. return source.Select ((e) => e.HasValue ? e.Value : 0).Sum ();
  1107. }
  1108. public static long? Sum (this ParallelQuery<long?> source)
  1109. {
  1110. if (source == null)
  1111. throw new ArgumentNullException ("source");
  1112. return source.Select ((e) => e.HasValue ? e.Value : 0).Sum ();
  1113. }
  1114. public static float? Sum (this ParallelQuery<float?> source)
  1115. {
  1116. if (source == null)
  1117. throw new ArgumentNullException ("source");
  1118. return source.Select ((e) => e.HasValue ? e.Value : 0).Sum ();
  1119. }
  1120. public static double? Sum (this ParallelQuery<double?> source)
  1121. {
  1122. if (source == null)
  1123. throw new ArgumentNullException ("source");
  1124. return source.Select ((e) => e.HasValue ? e.Value : 0).Sum ();
  1125. }
  1126. public static decimal? Sum (this ParallelQuery<decimal?> source)
  1127. {
  1128. if (source == null)
  1129. throw new ArgumentNullException ("source");
  1130. return source.Select ((e) => e.HasValue ? e.Value : 0).Sum ();
  1131. }
  1132. public static int Sum<TSource> (this ParallelQuery<TSource> source, Func<TSource, int> func)
  1133. {
  1134. if (source == null)
  1135. throw new ArgumentNullException ("source");
  1136. if (func == null)
  1137. throw new ArgumentNullException ("func");
  1138. return source.Select (func).Sum ();
  1139. }
  1140. public static long Sum<TSource> (this ParallelQuery<TSource> source, Func<TSource, long> func)
  1141. {
  1142. if (source == null)
  1143. throw new ArgumentNullException ("source");
  1144. if (func == null)
  1145. throw new ArgumentNullException ("func");
  1146. return source.Select (func).Sum ();
  1147. }
  1148. public static decimal Sum<TSource> (this ParallelQuery<TSource> source, Func<TSource, decimal> func)
  1149. {
  1150. if (source == null)
  1151. throw new ArgumentNullException ("source");
  1152. if (func == null)
  1153. throw new ArgumentNullException ("func");
  1154. return source.Select (func).Sum ();
  1155. }
  1156. public static float Sum<TSource> (this ParallelQuery<TSource> source, Func<TSource, float> func)
  1157. {
  1158. if (source == null)
  1159. throw new ArgumentNullException ("source");
  1160. if (func == null)
  1161. throw new ArgumentNullException ("func");
  1162. return source.Select (func).Sum ();
  1163. }
  1164. public static double Sum<TSource> (this ParallelQuery<TSource> source, Func<TSource, double> func)
  1165. {
  1166. if (source == null)
  1167. throw new ArgumentNullException ("source");
  1168. if (func == null)
  1169. throw new ArgumentNullException ("func");
  1170. return source.Select (func).Sum ();
  1171. }
  1172. public static int? Sum<TSource> (this ParallelQuery<TSource> source, Func<TSource, int?> func)
  1173. {
  1174. if (source == null)
  1175. throw new ArgumentNullException ("source");
  1176. if (func == null)
  1177. throw new ArgumentNullException ("func");
  1178. return source.Select (func).Sum ();
  1179. }
  1180. public static long? Sum<TSource> (this ParallelQuery<TSource> source, Func<TSource, long?> func)
  1181. {
  1182. if (source == null)
  1183. throw new ArgumentNullException ("source");
  1184. if (func == null)
  1185. throw new ArgumentNullException ("func");
  1186. return source.Select (func).Sum ();
  1187. }
  1188. public static decimal? Sum<TSource> (this ParallelQuery<TSource> source, Func<TSource, decimal?> func)
  1189. {
  1190. if (source == null)
  1191. throw new ArgumentNullException ("source");
  1192. if (func == null)
  1193. throw new ArgumentNullException ("func");
  1194. return source.Select (func).Sum ();
  1195. }
  1196. public static float? Sum<TSource> (this ParallelQuery<TSource> source, Func<TSource, float?> func)
  1197. {
  1198. if (source == null)
  1199. throw new ArgumentNullException ("source");
  1200. if (func == null)
  1201. throw new ArgumentNullException ("func");
  1202. return source.Select (func).Sum ();
  1203. }
  1204. public static double? Sum<TSource> (this ParallelQuery<TSource> source, Func<TSource, double?> func)
  1205. {
  1206. if (source == null)
  1207. throw new ArgumentNullException ("source");
  1208. if (func == null)
  1209. throw new ArgumentNullException ("func");
  1210. return source.Select (func).Sum ();
  1211. }
  1212. #endregion
  1213. #region Min-Max
  1214. static T BestOrder<T> (ParallelQuery<T> source, Func<T, T, bool> bestSelector, T seed)
  1215. {
  1216. if (source == null)
  1217. throw new ArgumentNullException ("source");
  1218. T best = seed;
  1219. best = source.Aggregate (() => seed,
  1220. (first, second) => (bestSelector(first, second)) ? first : second,
  1221. (first, second) => (bestSelector(first, second)) ? first : second,
  1222. (e) => e);
  1223. return best;
  1224. }
  1225. public static int Min (this ParallelQuery<int> source)
  1226. {
  1227. return BestOrder (source, (first, second) => first < second, int.MaxValue);
  1228. }
  1229. public static long Min (this ParallelQuery<long> source)
  1230. {
  1231. return BestOrder (source, (first, second) => first < second, long.MaxValue);
  1232. }
  1233. public static float Min (this ParallelQuery<float> source)
  1234. {
  1235. return BestOrder (source, (first, second) => first < second, float.MaxValue);
  1236. }
  1237. public static double Min (this ParallelQuery<double> source)
  1238. {
  1239. return BestOrder (source, (first, second) => first < second, double.MaxValue);
  1240. }
  1241. public static decimal Min (this ParallelQuery<decimal> source)
  1242. {
  1243. return BestOrder (source, (first, second) => first < second, decimal.MaxValue);
  1244. }
  1245. public static TSource Min<TSource> (this ParallelQuery<TSource> source)
  1246. {
  1247. IComparer<TSource> comparer = Comparer<TSource>.Default;
  1248. return BestOrder (source, (first, second) => comparer.Compare (first, second) < 0, default (TSource));
  1249. }
  1250. public static TResult Min<TSource, TResult> (this ParallelQuery<TSource> source, Func<TSource, TResult> func)
  1251. {
  1252. if (source == null)
  1253. throw new ArgumentNullException ("source");
  1254. if (func == null)
  1255. throw new ArgumentNullException ("func");
  1256. return source.Select (func).Min ();
  1257. }
  1258. public static int? Min (this ParallelQuery<int?> source)
  1259. {
  1260. if (source == null)
  1261. throw new ArgumentNullException ("source");
  1262. return source.Select ((e) => e.HasValue ? e.Value : int.MaxValue).Min ();
  1263. }
  1264. public static long? Min (this ParallelQuery<long?> source)
  1265. {
  1266. if (source == null)
  1267. throw new ArgumentNullException ("source");
  1268. return source.Select ((e) => e.HasValue ? e.Value : long.MaxValue).Min ();
  1269. }
  1270. public static float? Min (this ParallelQuery<float?> source)
  1271. {
  1272. if (source == null)
  1273. throw new ArgumentNullException ("source");
  1274. return source.Select ((e) => e.HasValue ? e.Value : float.MaxValue).Min ();
  1275. }
  1276. public static double? Min (this ParallelQuery<double?> source)
  1277. {
  1278. if (source == null)
  1279. throw new ArgumentNullException ("source");
  1280. return source.Select ((e) => e.HasValue ? e.Value : double.MaxValue).Min ();
  1281. }
  1282. public static decimal? Min (this ParallelQuery<decimal?> source)
  1283. {
  1284. if (source == null)
  1285. throw new ArgumentNullException ("source");
  1286. return source.Select ((e) => e.HasValue ? e.Value : decimal.MaxValue).Min ();
  1287. }
  1288. public static int Min<TSource> (this ParallelQuery<TSource> source, Func<TSource, int> func)
  1289. {
  1290. if (source == null)
  1291. throw new ArgumentNullException ("source");
  1292. if (func == null)
  1293. throw new ArgumentNullException ("func");
  1294. return source.Select (func).Min ();
  1295. }
  1296. public static long Min<TSource> (this ParallelQuery<TSource> source, Func<TSource, long> func)
  1297. {
  1298. if (source == null)
  1299. throw new ArgumentNullException ("source");
  1300. if (func == null)
  1301. throw new ArgumentNullException ("func");
  1302. return source.Select (func).Min ();
  1303. }
  1304. public static float Min<TSource> (this ParallelQuery<TSource> source, Func<TSource, float> func)
  1305. {
  1306. if (source == null)
  1307. throw new ArgumentNullException ("source");
  1308. if (func == null)
  1309. throw new ArgumentNullException ("func");
  1310. return source.Select (func).Min ();
  1311. }
  1312. public static double Min<TSource> (this ParallelQuery<TSource> source, Func<TSource, double> func)
  1313. {
  1314. if (source == null)
  1315. throw new ArgumentNullException ("source");
  1316. if (func == null)
  1317. throw new ArgumentNullException ("func");
  1318. return source.Select (func).Min ();
  1319. }
  1320. public static decimal Min<TSource> (this ParallelQuery<TSource> source, Func<TSource, decimal> func)
  1321. {
  1322. if (source == null)
  1323. throw new ArgumentNullException ("source");
  1324. if (func == null)
  1325. throw new ArgumentNullException ("func");
  1326. return source.Select (func).Min ();
  1327. }
  1328. public static int? Min<TSource> (this ParallelQuery<TSource> source, Func<TSource, int?> func)
  1329. {
  1330. if (source == null)
  1331. throw new ArgumentNullException ("source");
  1332. if (func == null)
  1333. throw new ArgumentNullException ("func");
  1334. return source.Select (func).Min ();
  1335. }
  1336. public static long? Min<TSource> (this ParallelQuery<TSource> source, Func<TSource, long?> func)
  1337. {
  1338. if (source == null)
  1339. throw new ArgumentNullException ("source");
  1340. if (func == null)
  1341. throw new ArgumentNullException ("func");
  1342. return source.Select (func).Min ();
  1343. }
  1344. public static float? Min<TSource> (this ParallelQuery<TSource> source, Func<TSource, float?> func)
  1345. {
  1346. if (source == null)
  1347. throw new ArgumentNullException ("source");
  1348. if (func == null)
  1349. throw new ArgumentNullException ("func");
  1350. return source.Select (func).Min ();
  1351. }
  1352. public static double? Min<TSource> (this ParallelQuery<TSource> source, Func<TSource, double?> func)
  1353. {
  1354. if (source == null)
  1355. throw new ArgumentNullException ("source");
  1356. if (func == null)
  1357. throw new ArgumentNullException ("func");
  1358. return source.Select (func).Min ();
  1359. }
  1360. public static decimal? Min<TSource> (this ParallelQuery<TSource> source, Func<TSource, decimal?> func)
  1361. {
  1362. if (source == null)
  1363. throw new ArgumentNullException ("source");
  1364. if (func == null)
  1365. throw new ArgumentNullException ("func");
  1366. return source.Select (func).Min ();
  1367. }
  1368. public static int Max (this ParallelQuery<int> source)
  1369. {
  1370. return BestOrder (source, (first, second) => first > second, int.MinValue);
  1371. }
  1372. public static long Max(this ParallelQuery<long> source)
  1373. {
  1374. return BestOrder(source, (first, second) => first > second, long.MinValue);
  1375. }
  1376. public static float Max (this ParallelQuery<float> source)
  1377. {
  1378. return BestOrder(source, (first, second) => first > second, float.MinValue);
  1379. }
  1380. public static double Max (this ParallelQuery<double> source)
  1381. {
  1382. return BestOrder(source, (first, second) => first > second, double.MinValue);
  1383. }
  1384. public static decimal Max (this ParallelQuery<decimal> source)
  1385. {
  1386. return BestOrder(source, (first, second) => first > second, decimal.MinValue);
  1387. }
  1388. public static TSource Max<TSource> (this ParallelQuery<TSource> source)
  1389. {
  1390. IComparer<TSource> comparer = Comparer<TSource>.Default;
  1391. return BestOrder (source, (first, second) => comparer.Compare (first, second) > 0, default (TSource));
  1392. }
  1393. public static TResult Max<TSource, TResult> (this ParallelQuery<TSource> source, Func<TSource, TResult> func)
  1394. {
  1395. if (source == null)
  1396. throw new ArgumentNullException ("source");
  1397. if (func == null)
  1398. throw new ArgumentNullException ("func");
  1399. return source.Select (func).Max ();
  1400. }
  1401. public static int? Max (this ParallelQuery<int?> source)
  1402. {
  1403. if (source == null)
  1404. throw new ArgumentNullException ("source");
  1405. return source.Select ((e) => e.HasValue ? e.Value : int.MinValue).Max ();
  1406. }
  1407. public static long? Max (this ParallelQuery<long?> source)
  1408. {
  1409. if (source == null)
  1410. throw new ArgumentNullException ("source");
  1411. return source.Select ((e) => e.HasValue ? e.Value : long.MinValue).Max ();
  1412. }
  1413. public static float? Max (this ParallelQuery<float?> source)
  1414. {
  1415. if (source == null)
  1416. throw new ArgumentNullException ("source");
  1417. return source.Select ((e) => e.HasValue ? e.Value : float.MinValue).Max ();
  1418. }
  1419. public static double? Max (this ParallelQuery<double?> source)
  1420. {
  1421. if (source == null)
  1422. throw new ArgumentNullException ("source");
  1423. return source.Select ((e) => e.HasValue ? e.Value : double.MinValue).Max ();
  1424. }
  1425. public static decimal? Max (this ParallelQuery<decimal?> source)
  1426. {
  1427. if (source == null)
  1428. throw new ArgumentNullException ("source");
  1429. return source.Select ((e) => e.HasValue ? e.Value : decimal.MinValue).Max ();
  1430. }
  1431. public static int Max<TSource> (this ParallelQuery<TSource> source, Func<TSource, int> func)
  1432. {
  1433. if (source == null)
  1434. throw new ArgumentNullException ("source");
  1435. if (func == null)
  1436. throw new ArgumentNullException ("func");
  1437. return source.Select (func).Max ();
  1438. }
  1439. public static long Max<TSource> (this ParallelQuery<TSource> source, Func<TSource, long> func)
  1440. {
  1441. if (source == null)
  1442. throw new ArgumentNullException ("source");
  1443. if (func == null)
  1444. throw new ArgumentNullException ("func");
  1445. return source.Select (func).Max ();
  1446. }
  1447. public static float Max<TSource> (this ParallelQuery<TSource> source, Func<TSource, float> func)
  1448. {
  1449. if (source == null)
  1450. throw new ArgumentNullException ("source");
  1451. if (func == null)
  1452. throw new ArgumentNullException ("func");
  1453. return source.Select (func).Max ();
  1454. }
  1455. public static double Max<TSource> (this ParallelQuery<TSource> source, Func<TSource, double> func)
  1456. {
  1457. if (source == null)
  1458. throw new ArgumentNullException ("source");
  1459. if (func == null)
  1460. throw new ArgumentNullException ("func");
  1461. return source.Select (func).Max ();
  1462. }
  1463. public static decimal Max<TSource> (this ParallelQuery<TSource> source, Func<TSource, decimal> func)
  1464. {
  1465. if (source == null)
  1466. throw new ArgumentNullException ("source");
  1467. if (func == null)
  1468. throw new ArgumentNullException ("func");
  1469. return source.Select (func).Max ();
  1470. }
  1471. public static int? Max<TSource> (this ParallelQuery<TSource> source, Func<TSource, int?> func)
  1472. {
  1473. if (source == null)
  1474. throw new ArgumentNullException ("source");
  1475. if (func == null)
  1476. throw new ArgumentNullException ("func");
  1477. return source.Select (func).Max ();
  1478. }
  1479. public static long? Max<TSource> (this ParallelQuery<TSource> source, Func<TSource, long?> func)
  1480. {
  1481. if (source == null)
  1482. throw new ArgumentNullException ("source");
  1483. if (func == null)
  1484. throw new ArgumentNullException ("func");
  1485. return source.Select (func).Max ();
  1486. }
  1487. public static float? Max<TSource> (this ParallelQuery<TSource> source, Func<TSource, float?> func)
  1488. {
  1489. if (source == null)
  1490. throw new ArgumentNullException ("source");
  1491. if (func == null)
  1492. throw new ArgumentNullException ("func");
  1493. return source.Select (func).Max ();
  1494. }
  1495. public static double? Max<TSource> (this ParallelQuery<TSource> source, Func<TSource, double?> func)
  1496. {
  1497. if (source == null)
  1498. throw new ArgumentNullException ("source");
  1499. if (func == null)
  1500. throw new ArgumentNullException ("func");
  1501. return source.Select (func).Max ();
  1502. }
  1503. public static decimal? Max<TSource> (this ParallelQuery<TSource> source, Func<TSource, decimal?> func)
  1504. {
  1505. if (source == null)
  1506. throw new ArgumentNullException ("source");
  1507. if (func == null)
  1508. throw new ArgumentNullException ("func");
  1509. return source.Select (func).Max ();
  1510. }
  1511. #endregion
  1512. #region Cast / OfType
  1513. public static ParallelQuery<TResult> Cast<TResult> (this ParallelQuery source)
  1514. {
  1515. if (source == null)
  1516. throw new ArgumentNullException ("source");
  1517. return source.TypedQuery.Select ((e) => (TResult)e);
  1518. }
  1519. public static ParallelQuery<TResult> OfType<TResult> (this ParallelQuery source)
  1520. {
  1521. if (source == null)
  1522. throw new ArgumentNullException ("source");
  1523. return source.TypedQuery.Where ((e) => e is TResult).Cast<TResult> ();
  1524. }
  1525. #endregion
  1526. #region Reverse
  1527. public static ParallelQuery<TSource> Reverse<TSource> (this ParallelQuery<TSource> source)
  1528. {
  1529. if (source == null)
  1530. throw new ArgumentNullException ("source");
  1531. return new ParallelQuery<TSource> (new QueryReverseNode<TSource> (source));
  1532. }
  1533. #endregion
  1534. #region ToArray - ToList - ToDictionary - ToLookup
  1535. public static List<TSource> ToList<TSource> (this ParallelQuery<TSource> source)
  1536. {
  1537. if (source == null)
  1538. throw new ArgumentNullException ("source");
  1539. if (source.Node.IsOrdered ())
  1540. return ToListOrdered (source);
  1541. List<TSource> temp = source.Aggregate (() => new List<TSource>(50),
  1542. (list, e) => { list.Add (e); return list; },
  1543. (list, list2) => { list.AddRange (list2); return list; },
  1544. (list) => list);
  1545. return temp;
  1546. }
  1547. internal static List<TSource> ToListOrdered<TSource> (this ParallelQuery<TSource> source)
  1548. {
  1549. List<TSource> result = new List<TSource> ();
  1550. foreach (TSource element in source)
  1551. result.Add (element);
  1552. return result;
  1553. }
  1554. public static TSource[] ToArray<TSource> (this ParallelQuery<TSource> source)
  1555. {
  1556. if (source == null)
  1557. throw new ArgumentNullException ("source");
  1558. if (source.Node.IsOrdered ())
  1559. return ToListOrdered (source).ToArray ();
  1560. TSource[] result = null;
  1561. Func<List<TSource>, TSource, List<TSource>> intermediate = (list, e) => {
  1562. list.Add (e); return list;
  1563. };
  1564. Action<IList<List<TSource>>> final = (list) => {
  1565. int count = 0;
  1566. for (int i = 0; i < list.Count; i++)
  1567. count += list[i].Count;
  1568. result = new TSource[count];
  1569. int insertIndex = -1;
  1570. for (int i = 0; i < list.Count; i++)
  1571. for (int j = 0; j < list[i].Count; j++)
  1572. result [++insertIndex] = list[i][j];
  1573. };
  1574. ParallelExecuter.ProcessAndAggregate<TSource, List<TSource>> (source.Node,
  1575. () => new List<TSource> (),
  1576. intermediate,
  1577. final);
  1578. return result;
  1579. }
  1580. public static Dictionary<TKey, TSource> ToDictionary<TSource, TKey> (this ParallelQuery<TSource> source,
  1581. Func<TSource, TKey> keySelector,
  1582. IEqualityComparer<TKey> comparer)
  1583. {
  1584. return ToDictionary<TSource, TKey, TSource> (source, keySelector, (e) => e, comparer);
  1585. }
  1586. public static Dictionary<TKey, TSource> ToDictionary<TSource, TKey> (this ParallelQuery<TSource> source,
  1587. Func<TSource, TKey> keySelector)
  1588. {
  1589. return ToDictionary<TSource, TKey, TSource> (source, keySelector, (e) => e, EqualityComparer<TKey>.Default);
  1590. }
  1591. public static Dictionary<TKey, TElement> ToDictionary<TSource, TKey, TElement> (this ParallelQuery<TSource> source,
  1592. Func<TSource, TKey> keySelector,
  1593. Func<TSource, TElement> elementSelector)
  1594. {
  1595. return ToDictionary<TSource, TKey, TElement> (source, keySelector, elementSelector, EqualityComparer<TKey>.Default);
  1596. }
  1597. public static Dictionary<TKey, TElement> ToDictionary<TSource, TKey, TElement> (this ParallelQuery<TSource> source,
  1598. Func<TSource, TKey> keySelector,
  1599. Func<TSource, TElement> elementSelector,
  1600. IEqualityComparer<TKey> comparer)
  1601. {
  1602. if (source == null)
  1603. throw new ArgumentNullException ("source");
  1604. if (keySelector == null)
  1605. throw new ArgumentNullException ("keySelector");
  1606. if (comparer == null)
  1607. throw new ArgumentNullException ("comparer");
  1608. if (elementSelector == null)
  1609. throw new ArgumentNullException ("elementSelector");
  1610. return source.Aggregate (() => new Dictionary<TKey, TElement> (comparer),
  1611. (d, e) => { d.Add (keySelector (e), elementSelector (e)); return d; },
  1612. (d1, d2) => { foreach (var couple in d2) d1.Add (couple.Key, couple.Value); return d1; },
  1613. (d) => d);
  1614. }
  1615. public static ILookup<TKey, TSource> ToLookup<TSource, TKey> (this ParallelQuery<TSource> source,
  1616. Func<TSource, TKey> keySelector)
  1617. {
  1618. return ToLookup<TSource, TKey, TSource> (source, keySelector, (e) => e, EqualityComparer<TKey>.Default);
  1619. }
  1620. public static ILookup<TKey, TSource> ToLookup<TSource, TKey> (this ParallelQuery<TSource> source,
  1621. Func<TSource, TKey> keySelector,
  1622. IEqualityComparer<TKey> comparer)
  1623. {
  1624. return ToLookup<TSource, TKey, TSource> (source, keySelector, (e) => e, comparer);
  1625. }
  1626. public static ILookup<TKey, TElement> ToLookup<TSource, TKey, TElement> (this ParallelQuery<TSource> source,
  1627. Func<TSource, TKey> keySelector,
  1628. Func<TSource, TElement> elementSelector)
  1629. {
  1630. return ToLookup<TSource, TKey, TElement> (source, keySelector, elementSelector, EqualityComparer<TKey>.Default);
  1631. }
  1632. public static ILookup<TKey, TElement> ToLookup<TSource, TKey, TElement> (this ParallelQuery<TSource> source,
  1633. Func<TSource, TKey> keySelector,
  1634. Func<TSource, TElement> elementSelector,
  1635. IEqualityComparer<TKey> comparer)
  1636. {
  1637. if (source == null)
  1638. throw new ArgumentNullException ("source");
  1639. if (keySelector == null)
  1640. throw new ArgumentNullException ("keySelector");
  1641. if (comparer == null)
  1642. throw new ArgumentNullException ("comparer");
  1643. if (elementSelector == null)
  1644. throw new ArgumentNullException ("elementSelector");
  1645. ConcurrentLookup<TKey, TElement> lookup = new ConcurrentLookup<TKey, TElement> (comparer);
  1646. source.ForAll ((e) => lookup.Add (keySelector (e), elementSelector (e)));
  1647. return lookup;
  1648. }
  1649. #endregion
  1650. #region Concat
  1651. [ObsoleteAttribute("The second data source of a binary operator must be of type System.Linq.ParallelQuery<T> rather than "
  1652. + "System.Collections.Generic.IEnumerable<T>. To fix this problem, use the AsParallel() extension method "
  1653. + "to convert the right data source to System.Linq.ParallelQuery<T>.")]
  1654. public static ParallelQuery<TSource> Concat<TSource>(this ParallelQuery<TSource> first,
  1655. IEnumerable<TSource> second)
  1656. {
  1657. throw new NotSupportedException ();
  1658. }
  1659. public static ParallelQuery<TSource> Concat<TSource> (this ParallelQuery<TSource> first, ParallelQuery<TSource> second)
  1660. {
  1661. return new ParallelQuery<TSource> (new QueryConcatNode<TSource> (first.Node, second.Node));
  1662. }
  1663. #endregion
  1664. #region DefaultIfEmpty
  1665. public static ParallelQuery<TSource> DefaultIfEmpty<TSource> (this ParallelQuery<TSource> source)
  1666. {
  1667. return source.DefaultIfEmpty (default (TSource));
  1668. }
  1669. public static ParallelQuery<TSource> DefaultIfEmpty<TSource> (this ParallelQuery<TSource> source, TSource defaultValue)
  1670. {
  1671. return new ParallelQuery<TSource> (new QueryDefaultEmptyNode<TSource> (source.Node, defaultValue));
  1672. }
  1673. #endregion
  1674. #region First
  1675. public static TSource First<TSource> (this ParallelQuery<TSource> source)
  1676. {
  1677. CancellationTokenSource src = new CancellationTokenSource ();
  1678. IEnumerator<TSource> enumerator = source.WithImplementerToken (src).GetEnumerator ();
  1679. if (enumerator == null || !enumerator.MoveNext ())
  1680. throw new InvalidOperationException ("source contains no element");
  1681. TSource result = enumerator.Current;
  1682. src.Cancel ();
  1683. return result;
  1684. }
  1685. public static TSource First<TSource> (this ParallelQuery<TSource> source, Func<TSource, bool> predicate)
  1686. {
  1687. return source.Where (predicate).First ();
  1688. }
  1689. public static TSource FirstOrDefault<TSource> (this ParallelQuery<TSource> source)
  1690. {
  1691. return source.DefaultIfEmpty ().First ();
  1692. }
  1693. public static TSource FirstOrDefault<TSource> (this ParallelQuery<TSource> source, Func<TSource, bool> predicate)
  1694. {
  1695. return source.Where (predicate).FirstOrDefault ();
  1696. }
  1697. #endregion
  1698. #region Last
  1699. public static TSource Last<TSource> (this ParallelQuery<TSource> source)
  1700. {
  1701. return source.Reverse ().First ();
  1702. }
  1703. public static TSource Last<TSource> (this ParallelQuery<TSource> source, Func<TSource, bool> predicate)
  1704. {
  1705. return source.Reverse ().First (predicate);
  1706. }
  1707. public static TSource LastOrDefault<TSource> (this ParallelQuery<TSource> source)
  1708. {
  1709. return source.Reverse ().FirstOrDefault ();
  1710. }
  1711. public static TSource LastOrDefault<TSource> (this ParallelQuery<TSource> source, Func<TSource, bool> predicate)
  1712. {
  1713. return source.Reverse ().FirstOrDefault (predicate);
  1714. }
  1715. #endregion
  1716. #region Zip
  1717. public static ParallelQuery<TResult> Zip<TFirst, TSecond, TResult> (this ParallelQuery<TFirst> first,
  1718. ParallelQuery<TSecond> second,
  1719. Func<TFirst, TSecond, TResult> resultSelector)
  1720. {
  1721. if (first == null)
  1722. throw new ArgumentNullException ("first");
  1723. if (second == null)
  1724. throw new ArgumentNullException ("second");
  1725. if (resultSelector == null)
  1726. throw new ArgumentNullException ("resultSelector");
  1727. return new ParallelQuery<TResult> (new QueryZipNode<TFirst, TSecond, TResult> (resultSelector, first.Node, second.Node));
  1728. }
  1729. [ObsoleteAttribute("The second data source of a binary operator must be of type System.Linq.ParallelQuery<T> rather "
  1730. + "than System.Collections.Generic.IEnumerable<T>. To fix this problem, use the AsParallel() "
  1731. + "extension method to convert the right data source to System.Linq.ParallelQuery<T>.")]
  1732. public static ParallelQuery<TResult> Zip<TFirst, TSecond, TResult> (this ParallelQuery<TFirst> first,
  1733. IEnumerable<TSecond> second,
  1734. Func<TFirst, TSecond, TResult> resultSelector)
  1735. {
  1736. throw new NotSupportedException ();
  1737. }
  1738. #endregion
  1739. }
  1740. }
  1741. #endif