BlockingCollection.cs 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558
  1. //
  2. // BlockingCollection.cs
  3. //
  4. // Copyright (c) 2008 Jérémie "Garuma" Laval
  5. //
  6. // Permission is hereby granted, free of charge, to any person obtaining a copy
  7. // of this software and associated documentation files (the "Software"), to deal
  8. // in the Software without restriction, including without limitation the rights
  9. // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
  10. // copies of the Software, and to permit persons to whom the Software is
  11. // furnished to do so, subject to the following conditions:
  12. //
  13. // The above copyright notice and this permission notice shall be included in
  14. // all copies or substantial portions of the Software.
  15. //
  16. // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
  17. // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
  18. // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
  19. // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
  20. // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
  21. // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
  22. // THE SOFTWARE.
  23. //
  24. //
  25. #if NET_4_0
  26. using System;
  27. using System.Threading;
  28. using System.Collections;
  29. using System.Collections.Generic;
  30. using System.Diagnostics;
  31. using System.Runtime.InteropServices;
  32. namespace System.Collections.Concurrent
  33. {
  34. [ComVisible (false)]
  35. [DebuggerDisplay ("Count={Count}")]
  36. [DebuggerTypeProxy (typeof (CollectionDebuggerView<>))]
  37. public class BlockingCollection<T> : IEnumerable<T>, ICollection, IEnumerable, IDisposable
  38. {
  39. readonly IProducerConsumerCollection<T> underlyingColl;
  40. readonly int upperBound;
  41. readonly SpinWait sw = new SpinWait ();
  42. AtomicBoolean isComplete;
  43. long completeId;
  44. long addId = long.MinValue;
  45. long removeId = long.MinValue;
  46. #region ctors
  47. public BlockingCollection ()
  48. : this (new ConcurrentQueue<T> (), -1)
  49. {
  50. }
  51. public BlockingCollection (int upperBound)
  52. : this (new ConcurrentQueue<T> (), upperBound)
  53. {
  54. }
  55. public BlockingCollection (IProducerConsumerCollection<T> underlyingColl)
  56. : this (underlyingColl, -1)
  57. {
  58. }
  59. public BlockingCollection (IProducerConsumerCollection<T> underlyingColl, int upperBound)
  60. {
  61. this.underlyingColl = underlyingColl;
  62. this.upperBound = upperBound;
  63. this.isComplete = new AtomicBoolean ();
  64. }
  65. #endregion
  66. #region Add & Remove (+ Try)
  67. public void Add (T item)
  68. {
  69. Add (item, null);
  70. }
  71. public void Add (T item, CancellationToken token)
  72. {
  73. Add (item, () => token.IsCancellationRequested);
  74. }
  75. void Add (T item, Func<bool> cancellationFunc)
  76. {
  77. while (true) {
  78. long cachedAddId = addId;
  79. long cachedRemoveId = removeId;
  80. if (upperBound != -1) {
  81. if (cachedAddId - cachedRemoveId > upperBound) {
  82. Block ();
  83. continue;
  84. }
  85. }
  86. // Check our transaction id against completed stored one
  87. if (isComplete.Value && cachedAddId >= completeId)
  88. throw new InvalidOperationException ("The BlockingCollection<T> has"
  89. + " been marked as complete with regards to additions.");
  90. if (Interlocked.CompareExchange (ref addId, cachedAddId + 1, cachedAddId) == cachedAddId)
  91. break;
  92. if (cancellationFunc != null && cancellationFunc ())
  93. throw new OperationCanceledException ("CancellationToken triggered");
  94. }
  95. if (!underlyingColl.TryAdd (item))
  96. throw new InvalidOperationException ("The underlying collection didn't accept the item.");
  97. }
  98. public T Take ()
  99. {
  100. return Take (null);
  101. }
  102. public T Take (CancellationToken token)
  103. {
  104. return Take (() => token.IsCancellationRequested);
  105. }
  106. T Take (Func<bool> cancellationFunc)
  107. {
  108. while (true) {
  109. long cachedRemoveId = removeId;
  110. long cachedAddId = addId;
  111. // Empty case
  112. if (cachedRemoveId == cachedAddId) {
  113. if (isComplete.Value && cachedRemoveId >= completeId)
  114. throw new OperationCanceledException ("The BlockingCollection<T> has"
  115. + " been marked as complete with regards to additions.");
  116. Block ();
  117. continue;
  118. }
  119. if (Interlocked.CompareExchange (ref removeId, cachedRemoveId + 1, cachedRemoveId) == cachedRemoveId)
  120. break;
  121. if (cancellationFunc != null && cancellationFunc ())
  122. throw new OperationCanceledException ("The CancellationToken has had cancellation requested.");
  123. }
  124. T item;
  125. while (!underlyingColl.TryTake (out item));
  126. return item;
  127. }
  128. public bool TryAdd (T item)
  129. {
  130. return TryAdd (item, null, null);
  131. }
  132. bool TryAdd (T item, Func<bool> contFunc, CancellationToken? token)
  133. {
  134. do {
  135. if (token.HasValue && token.Value.IsCancellationRequested)
  136. throw new OperationCanceledException ("The CancellationToken has had cancellation requested.");
  137. long cachedAddId = addId;
  138. long cachedRemoveId = removeId;
  139. if (upperBound != -1) {
  140. if (cachedAddId - cachedRemoveId > upperBound) {
  141. continue;
  142. }
  143. }
  144. // Check our transaction id against completed stored one
  145. if (isComplete.Value && cachedAddId >= completeId)
  146. throw new InvalidOperationException ("The BlockingCollection<T> has"
  147. + " been marked as complete with regards to additions.");
  148. if (Interlocked.CompareExchange (ref addId, cachedAddId + 1, cachedAddId) != cachedAddId)
  149. continue;
  150. if (!underlyingColl.TryAdd (item))
  151. throw new InvalidOperationException ("The underlying collection didn't accept the item.");
  152. return true;
  153. } while (contFunc != null && contFunc ());
  154. return false;
  155. }
  156. public bool TryAdd (T item, TimeSpan ts)
  157. {
  158. return TryAdd (item, (int)ts.TotalMilliseconds);
  159. }
  160. public bool TryAdd (T item, int millisecondsTimeout)
  161. {
  162. Stopwatch sw = Stopwatch.StartNew ();
  163. return TryAdd (item, () => sw.ElapsedMilliseconds < millisecondsTimeout, null);
  164. }
  165. public bool TryAdd (T item, int millisecondsTimeout, CancellationToken token)
  166. {
  167. Stopwatch sw = Stopwatch.StartNew ();
  168. return TryAdd (item, () => sw.ElapsedMilliseconds < millisecondsTimeout, token);
  169. }
  170. public bool TryTake (out T item)
  171. {
  172. return TryTake (out item, null, null);
  173. }
  174. bool TryTake (out T item, Func<bool> contFunc, CancellationToken? token)
  175. {
  176. item = default (T);
  177. do {
  178. if (token.HasValue && token.Value.IsCancellationRequested)
  179. throw new OperationCanceledException ("The CancellationToken has had cancellation requested.");
  180. long cachedRemoveId = removeId;
  181. long cachedAddId = addId;
  182. // Empty case
  183. if (cachedRemoveId == cachedAddId) {
  184. if (isComplete.Value && cachedRemoveId >= completeId)
  185. continue;
  186. continue;
  187. }
  188. if (Interlocked.CompareExchange (ref removeId, cachedRemoveId + 1, cachedRemoveId) != cachedRemoveId)
  189. continue;
  190. return underlyingColl.TryTake (out item);
  191. } while (contFunc != null && contFunc ());
  192. return false;
  193. }
  194. public bool TryTake (out T item, TimeSpan ts)
  195. {
  196. return TryTake (out item, (int)ts.TotalMilliseconds);
  197. }
  198. public bool TryTake (out T item, int millisecondsTimeout)
  199. {
  200. item = default (T);
  201. Stopwatch sw = Stopwatch.StartNew ();
  202. return TryTake (out item, () => sw.ElapsedMilliseconds < millisecondsTimeout, null);
  203. }
  204. public bool TryTake (out T item, int millisecondsTimeout, CancellationToken token)
  205. {
  206. item = default (T);
  207. Stopwatch sw = Stopwatch.StartNew ();
  208. return TryTake (out item, () => sw.ElapsedMilliseconds < millisecondsTimeout, token);
  209. }
  210. #endregion
  211. #region static methods
  212. static void CheckArray (BlockingCollection<T>[] collections)
  213. {
  214. if (collections == null)
  215. throw new ArgumentNullException ("collections");
  216. if (collections.Length == 0 || IsThereANullElement (collections))
  217. throw new ArgumentException ("The collections argument is a 0-length array or contains a null element.", "collections");
  218. }
  219. static bool IsThereANullElement (BlockingCollection<T>[] collections)
  220. {
  221. foreach (BlockingCollection<T> e in collections)
  222. if (e == null)
  223. return true;
  224. return false;
  225. }
  226. public static int AddToAny (BlockingCollection<T>[] collections, T item)
  227. {
  228. CheckArray (collections);
  229. int index = 0;
  230. foreach (var coll in collections) {
  231. try {
  232. coll.Add (item);
  233. return index;
  234. } catch {}
  235. index++;
  236. }
  237. return -1;
  238. }
  239. public static int AddToAny (BlockingCollection<T>[] collections, T item, CancellationToken token)
  240. {
  241. CheckArray (collections);
  242. int index = 0;
  243. foreach (var coll in collections) {
  244. try {
  245. coll.Add (item, token);
  246. return index;
  247. } catch {}
  248. index++;
  249. }
  250. return -1;
  251. }
  252. public static int TryAddToAny (BlockingCollection<T>[] collections, T item)
  253. {
  254. CheckArray (collections);
  255. int index = 0;
  256. foreach (var coll in collections) {
  257. if (coll.TryAdd (item))
  258. return index;
  259. index++;
  260. }
  261. return -1;
  262. }
  263. public static int TryAddToAny (BlockingCollection<T>[] collections, T item, TimeSpan ts)
  264. {
  265. CheckArray (collections);
  266. int index = 0;
  267. foreach (var coll in collections) {
  268. if (coll.TryAdd (item, ts))
  269. return index;
  270. index++;
  271. }
  272. return -1;
  273. }
  274. public static int TryAddToAny (BlockingCollection<T>[] collections, T item, int millisecondsTimeout)
  275. {
  276. CheckArray (collections);
  277. int index = 0;
  278. foreach (var coll in collections) {
  279. if (coll.TryAdd (item, millisecondsTimeout))
  280. return index;
  281. index++;
  282. }
  283. return -1;
  284. }
  285. public static int TryAddToAny (BlockingCollection<T>[] collections, T item, int millisecondsTimeout,
  286. CancellationToken token)
  287. {
  288. CheckArray (collections);
  289. int index = 0;
  290. foreach (var coll in collections) {
  291. if (coll.TryAdd (item, millisecondsTimeout, token))
  292. return index;
  293. index++;
  294. }
  295. return -1;
  296. }
  297. public static int TakeFromAny (BlockingCollection<T>[] collections, out T item)
  298. {
  299. item = default (T);
  300. CheckArray (collections);
  301. int index = 0;
  302. foreach (var coll in collections) {
  303. try {
  304. item = coll.Take ();
  305. return index;
  306. } catch {}
  307. index++;
  308. }
  309. return -1;
  310. }
  311. public static int TakeFromAny (BlockingCollection<T>[] collections, out T item, CancellationToken token)
  312. {
  313. item = default (T);
  314. CheckArray (collections);
  315. int index = 0;
  316. foreach (var coll in collections) {
  317. try {
  318. item = coll.Take (token);
  319. return index;
  320. } catch {}
  321. index++;
  322. }
  323. return -1;
  324. }
  325. public static int TryTakeFromAny (BlockingCollection<T>[] collections, out T item)
  326. {
  327. item = default (T);
  328. CheckArray (collections);
  329. int index = 0;
  330. foreach (var coll in collections) {
  331. if (coll.TryTake (out item))
  332. return index;
  333. index++;
  334. }
  335. return -1;
  336. }
  337. public static int TryTakeFromAny (BlockingCollection<T>[] collections, out T item, TimeSpan ts)
  338. {
  339. item = default (T);
  340. CheckArray (collections);
  341. int index = 0;
  342. foreach (var coll in collections) {
  343. if (coll.TryTake (out item, ts))
  344. return index;
  345. index++;
  346. }
  347. return -1;
  348. }
  349. public static int TryTakeFromAny (BlockingCollection<T>[] collections, out T item, int millisecondsTimeout)
  350. {
  351. item = default (T);
  352. CheckArray (collections);
  353. int index = 0;
  354. foreach (var coll in collections) {
  355. if (coll.TryTake (out item, millisecondsTimeout))
  356. return index;
  357. index++;
  358. }
  359. return -1;
  360. }
  361. public static int TryTakeFromAny (BlockingCollection<T>[] collections, out T item, int millisecondsTimeout,
  362. CancellationToken token)
  363. {
  364. item = default (T);
  365. CheckArray (collections);
  366. int index = 0;
  367. foreach (var coll in collections) {
  368. if (coll.TryTake (out item, millisecondsTimeout, token))
  369. return index;
  370. index++;
  371. }
  372. return -1;
  373. }
  374. #endregion
  375. public void CompleteAdding ()
  376. {
  377. // No further add beside that point
  378. completeId = addId;
  379. isComplete.Value = true;
  380. }
  381. void ICollection.CopyTo (Array array, int index)
  382. {
  383. underlyingColl.CopyTo (array, index);
  384. }
  385. public void CopyTo (T[] array, int index)
  386. {
  387. underlyingColl.CopyTo (array, index);
  388. }
  389. public IEnumerable<T> GetConsumingEnumerable ()
  390. {
  391. return GetConsumingEnumerable (Take);
  392. }
  393. public IEnumerable<T> GetConsumingEnumerable (CancellationToken token)
  394. {
  395. return GetConsumingEnumerable (() => Take (token));
  396. }
  397. IEnumerable<T> GetConsumingEnumerable (Func<T> getFunc)
  398. {
  399. while (true) {
  400. T item = default (T);
  401. try {
  402. item = getFunc ();
  403. } catch {
  404. break;
  405. }
  406. yield return item;
  407. }
  408. }
  409. IEnumerator IEnumerable.GetEnumerator ()
  410. {
  411. return ((IEnumerable)underlyingColl).GetEnumerator ();
  412. }
  413. IEnumerator<T> IEnumerable<T>.GetEnumerator ()
  414. {
  415. return ((IEnumerable<T>)underlyingColl).GetEnumerator ();
  416. }
  417. public void Dispose ()
  418. {
  419. }
  420. protected virtual void Dispose (bool managedRes)
  421. {
  422. }
  423. public T[] ToArray ()
  424. {
  425. return underlyingColl.ToArray ();
  426. }
  427. // Method used to stall the thread for a limited period of time before retrying an operation
  428. void Block ()
  429. {
  430. sw.SpinOnce ();
  431. }
  432. public int BoundedCapacity {
  433. get {
  434. return upperBound;
  435. }
  436. }
  437. public int Count {
  438. get {
  439. return underlyingColl.Count;
  440. }
  441. }
  442. public bool IsAddingCompleted {
  443. get {
  444. return isComplete.Value;
  445. }
  446. }
  447. public bool IsCompleted {
  448. get {
  449. return isComplete.Value && addId == removeId;
  450. }
  451. }
  452. object ICollection.SyncRoot {
  453. get {
  454. return underlyingColl.SyncRoot;
  455. }
  456. }
  457. bool ICollection.IsSynchronized {
  458. get {
  459. return underlyingColl.IsSynchronized;
  460. }
  461. }
  462. }
  463. }
  464. #endif