BlockingCollection.cs 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580
  1. //
  2. // BlockingCollection.cs
  3. //
  4. // Copyright (c) 2008 Jérémie "Garuma" Laval
  5. //
  6. // Permission is hereby granted, free of charge, to any person obtaining a copy
  7. // of this software and associated documentation files (the "Software"), to deal
  8. // in the Software without restriction, including without limitation the rights
  9. // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
  10. // copies of the Software, and to permit persons to whom the Software is
  11. // furnished to do so, subject to the following conditions:
  12. //
  13. // The above copyright notice and this permission notice shall be included in
  14. // all copies or substantial portions of the Software.
  15. //
  16. // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
  17. // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
  18. // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
  19. // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
  20. // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
  21. // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
  22. // THE SOFTWARE.
  23. //
  24. //
  25. #if NET_4_0
  26. using System;
  27. using System.Threading;
  28. using System.Collections;
  29. using System.Collections.Generic;
  30. using System.Diagnostics;
  31. using System.Runtime.InteropServices;
  32. namespace System.Collections.Concurrent
  33. {
  34. [ComVisible (false)]
  35. [DebuggerDisplay ("Count={Count}")]
  36. [DebuggerTypeProxy (typeof (CollectionDebuggerView<>))]
  37. public class BlockingCollection<T> : IEnumerable<T>, ICollection, IEnumerable, IDisposable
  38. {
  39. const int spinCount = 5;
  40. readonly IProducerConsumerCollection<T> underlyingColl;
  41. /* These events are used solely for the purpose of having an optimized sleep cycle when
  42. * the BlockingCollection have to wait on an external event (Add or Remove for instance)
  43. */
  44. ManualResetEventSlim mreAdd = new ManualResetEventSlim (true);
  45. ManualResetEventSlim mreRemove = new ManualResetEventSlim (true);
  46. AtomicBoolean isComplete;
  47. readonly int upperBound;
  48. int completeId;
  49. /* The whole idea of the collection is to use these two long values in a transactional
  50. * way to track and manage the actual data inside the underlying lock-free collection
  51. * instead of directly working with it or using external locking.
  52. *
  53. * They are manipulated with CAS and are guaranteed to increase over time and use
  54. * of the instance thus preventing ABA problems.
  55. */
  56. int addId = int.MinValue;
  57. int removeId = int.MinValue;
  58. /* For time based operations, we share this instance of Stopwatch and base calculation
  59. on a time offset at each of these method call */
  60. static Stopwatch watch = Stopwatch.StartNew ();
  61. #region ctors
  62. public BlockingCollection ()
  63. : this (new ConcurrentQueue<T> (), -1)
  64. {
  65. }
  66. public BlockingCollection (int boundedCapacity)
  67. : this (new ConcurrentQueue<T> (), boundedCapacity)
  68. {
  69. }
  70. public BlockingCollection (IProducerConsumerCollection<T> collection)
  71. : this (collection, -1)
  72. {
  73. }
  74. public BlockingCollection (IProducerConsumerCollection<T> collection, int boundedCapacity)
  75. {
  76. this.underlyingColl = collection;
  77. this.upperBound = boundedCapacity;
  78. this.isComplete = new AtomicBoolean ();
  79. }
  80. #endregion
  81. #region Add & Remove (+ Try)
  82. public void Add (T item)
  83. {
  84. Add (item, CancellationToken.None);
  85. }
  86. public void Add (T item, CancellationToken cancellationToken)
  87. {
  88. TryAdd (item, -1, cancellationToken);
  89. }
  90. public bool TryAdd (T item)
  91. {
  92. return TryAdd (item, 0, CancellationToken.None);
  93. }
  94. public bool TryAdd (T item, int millisecondsTimeout, CancellationToken cancellationToken)
  95. {
  96. if (millisecondsTimeout < -1)
  97. throw new ArgumentOutOfRangeException ("millisecondsTimeout");
  98. long start = millisecondsTimeout == -1 ? 0 : watch.ElapsedMilliseconds;
  99. SpinWait sw = new SpinWait ();
  100. do {
  101. cancellationToken.ThrowIfCancellationRequested ();
  102. int cachedAddId = addId;
  103. int cachedRemoveId = removeId;
  104. int itemsIn = cachedAddId - cachedRemoveId;
  105. // If needed, we check and wait that the collection isn't full
  106. if (upperBound != -1 && itemsIn > upperBound) {
  107. if (millisecondsTimeout == 0)
  108. return false;
  109. if (sw.Count <= spinCount) {
  110. sw.SpinOnce ();
  111. } else {
  112. mreRemove.Reset ();
  113. if (cachedRemoveId != removeId || cachedAddId != addId) {
  114. mreRemove.Set ();
  115. continue;
  116. }
  117. mreRemove.Wait (ComputeTimeout (millisecondsTimeout, start), cancellationToken);
  118. }
  119. continue;
  120. }
  121. // Check our transaction id against completed stored one
  122. if (isComplete.Value && cachedAddId >= completeId)
  123. ThrowCompleteException ();
  124. // Validate the steps we have been doing until now
  125. if (Interlocked.CompareExchange (ref addId, cachedAddId + 1, cachedAddId) != cachedAddId)
  126. continue;
  127. // We have a slot reserved in the underlying collection, try to take it
  128. if (!underlyingColl.TryAdd (item))
  129. throw new InvalidOperationException ("The underlying collection didn't accept the item.");
  130. // Wake up process that may have been sleeping
  131. mreAdd.Set ();
  132. return true;
  133. } while (millisecondsTimeout == -1 || (watch.ElapsedMilliseconds - start) < millisecondsTimeout);
  134. return false;
  135. }
  136. public bool TryAdd (T item, TimeSpan timeout)
  137. {
  138. return TryAdd (item, (int)timeout.TotalMilliseconds);
  139. }
  140. public bool TryAdd (T item, int millisecondsTimeout)
  141. {
  142. return TryAdd (item, millisecondsTimeout, CancellationToken.None);
  143. }
  144. public T Take ()
  145. {
  146. return Take (CancellationToken.None);
  147. }
  148. public T Take (CancellationToken cancellationToken)
  149. {
  150. T item;
  151. TryTake (out item, -1, cancellationToken, true);
  152. return item;
  153. }
  154. public bool TryTake (out T item)
  155. {
  156. return TryTake (out item, 0, CancellationToken.None);
  157. }
  158. public bool TryTake (out T item, int millisecondsTimeout, CancellationToken cancellationToken)
  159. {
  160. return TryTake (out item, millisecondsTimeout, cancellationToken, false);
  161. }
  162. bool TryTake (out T item, int milliseconds, CancellationToken cancellationToken, bool throwComplete)
  163. {
  164. if (milliseconds < -1)
  165. throw new ArgumentOutOfRangeException ("milliseconds");
  166. item = default (T);
  167. SpinWait sw = new SpinWait ();
  168. long start = milliseconds == -1 ? 0 : watch.ElapsedMilliseconds;
  169. do {
  170. cancellationToken.ThrowIfCancellationRequested ();
  171. int cachedRemoveId = removeId;
  172. int cachedAddId = addId;
  173. // Empty case
  174. if (cachedRemoveId == cachedAddId) {
  175. if (milliseconds == 0)
  176. return false;
  177. if (IsCompleted) {
  178. if (throwComplete)
  179. ThrowCompleteException ();
  180. else
  181. return false;
  182. }
  183. if (sw.Count <= spinCount) {
  184. sw.SpinOnce ();
  185. } else {
  186. mreAdd.Reset ();
  187. if (cachedRemoveId != removeId || cachedAddId != addId) {
  188. mreAdd.Set ();
  189. continue;
  190. }
  191. mreAdd.Wait (ComputeTimeout (milliseconds, start), cancellationToken);
  192. }
  193. continue;
  194. }
  195. if (Interlocked.CompareExchange (ref removeId, cachedRemoveId + 1, cachedRemoveId) != cachedRemoveId)
  196. continue;
  197. while (!underlyingColl.TryTake (out item));
  198. mreRemove.Set ();
  199. return true;
  200. } while (milliseconds == -1 || (watch.ElapsedMilliseconds - start) < milliseconds);
  201. return false;
  202. }
  203. public bool TryTake (out T item, TimeSpan timeout)
  204. {
  205. return TryTake (out item, (int)timeout.TotalMilliseconds);
  206. }
  207. public bool TryTake (out T item, int millisecondsTimeout)
  208. {
  209. item = default (T);
  210. return TryTake (out item, millisecondsTimeout, CancellationToken.None, false);
  211. }
  212. static int ComputeTimeout (int millisecondsTimeout, long start)
  213. {
  214. return millisecondsTimeout == -1 ? 500 : (int)Math.Max (watch.ElapsedMilliseconds - start - millisecondsTimeout, 1);
  215. }
  216. #endregion
  217. #region static methods
  218. static void CheckArray (BlockingCollection<T>[] collections)
  219. {
  220. if (collections == null)
  221. throw new ArgumentNullException ("collections");
  222. if (collections.Length == 0 || IsThereANullElement (collections))
  223. throw new ArgumentException ("The collections argument is a 0-length array or contains a null element.", "collections");
  224. }
  225. static bool IsThereANullElement (BlockingCollection<T>[] collections)
  226. {
  227. foreach (BlockingCollection<T> e in collections)
  228. if (e == null)
  229. return true;
  230. return false;
  231. }
  232. public static int AddToAny (BlockingCollection<T>[] collections, T item)
  233. {
  234. CheckArray (collections);
  235. int index = 0;
  236. foreach (var coll in collections) {
  237. try {
  238. coll.Add (item);
  239. return index;
  240. } catch {}
  241. index++;
  242. }
  243. return -1;
  244. }
  245. public static int AddToAny (BlockingCollection<T>[] collections, T item, CancellationToken cancellationToken)
  246. {
  247. CheckArray (collections);
  248. int index = 0;
  249. foreach (var coll in collections) {
  250. try {
  251. coll.Add (item, cancellationToken);
  252. return index;
  253. } catch {}
  254. index++;
  255. }
  256. return -1;
  257. }
  258. public static int TryAddToAny (BlockingCollection<T>[] collections, T item)
  259. {
  260. CheckArray (collections);
  261. int index = 0;
  262. foreach (var coll in collections) {
  263. if (coll.TryAdd (item))
  264. return index;
  265. index++;
  266. }
  267. return -1;
  268. }
  269. public static int TryAddToAny (BlockingCollection<T>[] collections, T item, TimeSpan timeout)
  270. {
  271. CheckArray (collections);
  272. int index = 0;
  273. foreach (var coll in collections) {
  274. if (coll.TryAdd (item, timeout))
  275. return index;
  276. index++;
  277. }
  278. return -1;
  279. }
  280. public static int TryAddToAny (BlockingCollection<T>[] collections, T item, int millisecondsTimeout)
  281. {
  282. CheckArray (collections);
  283. int index = 0;
  284. foreach (var coll in collections) {
  285. if (coll.TryAdd (item, millisecondsTimeout))
  286. return index;
  287. index++;
  288. }
  289. return -1;
  290. }
  291. public static int TryAddToAny (BlockingCollection<T>[] collections, T item, int millisecondsTimeout,
  292. CancellationToken cancellationToken)
  293. {
  294. CheckArray (collections);
  295. int index = 0;
  296. foreach (var coll in collections) {
  297. if (coll.TryAdd (item, millisecondsTimeout, cancellationToken))
  298. return index;
  299. index++;
  300. }
  301. return -1;
  302. }
  303. public static int TakeFromAny (BlockingCollection<T>[] collections, out T item)
  304. {
  305. item = default (T);
  306. CheckArray (collections);
  307. WaitHandle[] wait_table = null;
  308. while (true) {
  309. for (int i = 0; i < collections.Length; ++i) {
  310. if (collections [i].TryTake (out item))
  311. return i;
  312. }
  313. if (wait_table == null) {
  314. wait_table = new WaitHandle [collections.Length];
  315. for (int i = 0; i < collections.Length; ++i)
  316. wait_table [i] = collections [i].mreRemove.WaitHandle;
  317. }
  318. WaitHandle.WaitAny (wait_table);
  319. }
  320. }
  321. public static int TakeFromAny (BlockingCollection<T>[] collections, out T item, CancellationToken cancellationToken)
  322. {
  323. item = default (T);
  324. CheckArray (collections);
  325. WaitHandle[] wait_table = null;
  326. while (true) {
  327. for (int i = 0; i < collections.Length; ++i) {
  328. if (collections [i].TryTake (out item))
  329. return i;
  330. }
  331. cancellationToken.ThrowIfCancellationRequested ();
  332. if (wait_table == null) {
  333. wait_table = new WaitHandle [collections.Length + 1];
  334. for (int i = 0; i < collections.Length; ++i)
  335. wait_table [i] = collections [i].mreRemove.WaitHandle;
  336. wait_table [collections.Length] = cancellationToken.WaitHandle;
  337. }
  338. WaitHandle.WaitAny (wait_table);
  339. cancellationToken.ThrowIfCancellationRequested ();
  340. }
  341. }
  342. public static int TryTakeFromAny (BlockingCollection<T>[] collections, out T item)
  343. {
  344. item = default (T);
  345. CheckArray (collections);
  346. int index = 0;
  347. foreach (var coll in collections) {
  348. if (coll.TryTake (out item))
  349. return index;
  350. index++;
  351. }
  352. return -1;
  353. }
  354. public static int TryTakeFromAny (BlockingCollection<T>[] collections, out T item, TimeSpan timeout)
  355. {
  356. item = default (T);
  357. CheckArray (collections);
  358. int index = 0;
  359. foreach (var coll in collections) {
  360. if (coll.TryTake (out item, timeout))
  361. return index;
  362. index++;
  363. }
  364. return -1;
  365. }
  366. public static int TryTakeFromAny (BlockingCollection<T>[] collections, out T item, int millisecondsTimeout)
  367. {
  368. item = default (T);
  369. CheckArray (collections);
  370. int index = 0;
  371. foreach (var coll in collections) {
  372. if (coll.TryTake (out item, millisecondsTimeout))
  373. return index;
  374. index++;
  375. }
  376. return -1;
  377. }
  378. public static int TryTakeFromAny (BlockingCollection<T>[] collections, out T item, int millisecondsTimeout,
  379. CancellationToken cancellationToken)
  380. {
  381. item = default (T);
  382. CheckArray (collections);
  383. int index = 0;
  384. foreach (var coll in collections) {
  385. if (coll.TryTake (out item, millisecondsTimeout, cancellationToken))
  386. return index;
  387. index++;
  388. }
  389. return -1;
  390. }
  391. #endregion
  392. public void CompleteAdding ()
  393. {
  394. // No further add beside that point
  395. completeId = addId;
  396. isComplete.Value = true;
  397. // Wakeup some operation in case this has an impact
  398. mreAdd.Set ();
  399. mreRemove.Set ();
  400. }
  401. void ThrowCompleteException ()
  402. {
  403. throw new InvalidOperationException ("The BlockingCollection<T> has"
  404. + " been marked as complete with regards to additions.");
  405. }
  406. void ICollection.CopyTo (Array array, int index)
  407. {
  408. underlyingColl.CopyTo (array, index);
  409. }
  410. public void CopyTo (T[] array, int index)
  411. {
  412. underlyingColl.CopyTo (array, index);
  413. }
  414. public IEnumerable<T> GetConsumingEnumerable ()
  415. {
  416. return GetConsumingEnumerable (CancellationToken.None);
  417. }
  418. public IEnumerable<T> GetConsumingEnumerable (CancellationToken cancellationToken)
  419. {
  420. while (true) {
  421. T item = default (T);
  422. try {
  423. item = Take (cancellationToken);
  424. } catch {
  425. // Then the exception is perfectly normal
  426. if (IsCompleted)
  427. break;
  428. // otherwise rethrow
  429. throw;
  430. }
  431. yield return item;
  432. }
  433. }
  434. IEnumerator IEnumerable.GetEnumerator ()
  435. {
  436. return ((IEnumerable)underlyingColl).GetEnumerator ();
  437. }
  438. IEnumerator<T> IEnumerable<T>.GetEnumerator ()
  439. {
  440. return ((IEnumerable<T>)underlyingColl).GetEnumerator ();
  441. }
  442. public void Dispose ()
  443. {
  444. }
  445. protected virtual void Dispose (bool disposing)
  446. {
  447. }
  448. public T[] ToArray ()
  449. {
  450. return underlyingColl.ToArray ();
  451. }
  452. public int BoundedCapacity {
  453. get {
  454. return upperBound;
  455. }
  456. }
  457. public int Count {
  458. get {
  459. return underlyingColl.Count;
  460. }
  461. }
  462. public bool IsAddingCompleted {
  463. get {
  464. return isComplete.Value;
  465. }
  466. }
  467. public bool IsCompleted {
  468. get {
  469. return isComplete.Value && addId == removeId;
  470. }
  471. }
  472. object ICollection.SyncRoot {
  473. get {
  474. return underlyingColl.SyncRoot;
  475. }
  476. }
  477. bool ICollection.IsSynchronized {
  478. get {
  479. return underlyingColl.IsSynchronized;
  480. }
  481. }
  482. }
  483. }
  484. #endif