BlockingCollection.cs 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562
  1. //
  2. // BlockingCollection.cs
  3. //
  4. // Copyright (c) 2008 Jérémie "Garuma" Laval
  5. //
  6. // Permission is hereby granted, free of charge, to any person obtaining a copy
  7. // of this software and associated documentation files (the "Software"), to deal
  8. // in the Software without restriction, including without limitation the rights
  9. // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
  10. // copies of the Software, and to permit persons to whom the Software is
  11. // furnished to do so, subject to the following conditions:
  12. //
  13. // The above copyright notice and this permission notice shall be included in
  14. // all copies or substantial portions of the Software.
  15. //
  16. // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
  17. // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
  18. // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
  19. // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
  20. // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
  21. // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
  22. // THE SOFTWARE.
  23. //
  24. //
  25. #if NET_4_0 || BOOTSTRAP_NET_4_0
  26. using System;
  27. using System.Threading;
  28. using System.Collections;
  29. using System.Collections.Generic;
  30. using System.Diagnostics;
  31. using System.Runtime.InteropServices;
  32. namespace System.Collections.Concurrent
  33. {
  34. [ComVisible (false)]
  35. [DebuggerDisplay ("Count={Count}")]
  36. [DebuggerTypeProxy (typeof (CollectionDebuggerView<>))]
  37. public class BlockingCollection<T> : IEnumerable<T>, ICollection, IEnumerable, IDisposable
  38. {
  39. const int sleepTime = 50;
  40. const int spinCount = 5;
  41. readonly IProducerConsumerCollection<T> underlyingColl;
  42. readonly int upperBound;
  43. AtomicBoolean isComplete;
  44. long completeId;
  45. long addId = long.MinValue;
  46. long removeId = long.MinValue;
  47. ManualResetEventSlim mreAdd = new ManualResetEventSlim (true);
  48. ManualResetEventSlim mreRemove = new ManualResetEventSlim (true);
  49. #region ctors
  50. public BlockingCollection ()
  51. : this (new ConcurrentQueue<T> (), -1)
  52. {
  53. }
  54. public BlockingCollection (int upperBound)
  55. : this (new ConcurrentQueue<T> (), upperBound)
  56. {
  57. }
  58. public BlockingCollection (IProducerConsumerCollection<T> underlyingColl)
  59. : this (underlyingColl, -1)
  60. {
  61. }
  62. public BlockingCollection (IProducerConsumerCollection<T> underlyingColl, int upperBound)
  63. {
  64. this.underlyingColl = underlyingColl;
  65. this.upperBound = upperBound;
  66. this.isComplete = new AtomicBoolean ();
  67. }
  68. #endregion
  69. #region Add & Remove (+ Try)
  70. public void Add (T item)
  71. {
  72. Add (item, CancellationToken.None);
  73. }
  74. public void Add (T item, CancellationToken token)
  75. {
  76. SpinWait sw = new SpinWait ();
  77. while (true) {
  78. token.ThrowIfCancellationRequested ();
  79. long cachedAddId = addId;
  80. long cachedRemoveId = removeId;
  81. if (upperBound != -1) {
  82. if (cachedAddId - cachedRemoveId > upperBound) {
  83. if (sw.Count <= spinCount)
  84. sw.SpinOnce ();
  85. else if (mreRemove.Wait (sleepTime))
  86. mreRemove.Reset ();
  87. continue;
  88. }
  89. }
  90. // Check our transaction id against completed stored one
  91. if (isComplete.Value && cachedAddId >= completeId)
  92. throw new InvalidOperationException ("The BlockingCollection<T> has"
  93. + " been marked as complete with regards to additions.");
  94. if (Interlocked.CompareExchange (ref addId, cachedAddId + 1, cachedAddId) == cachedAddId)
  95. break;
  96. }
  97. if (!underlyingColl.TryAdd (item))
  98. throw new InvalidOperationException ("The underlying collection didn't accept the item.");
  99. if (!mreAdd.IsSet)
  100. mreAdd.Set ();
  101. }
  102. public T Take ()
  103. {
  104. return Take (CancellationToken.None);
  105. }
  106. public T Take (CancellationToken token)
  107. {
  108. SpinWait sw = new SpinWait ();
  109. while (true) {
  110. token.ThrowIfCancellationRequested ();
  111. long cachedRemoveId = removeId;
  112. long cachedAddId = addId;
  113. // Empty case
  114. if (cachedRemoveId == cachedAddId) {
  115. if (IsCompleted)
  116. throw new InvalidOperationException ("The BlockingCollection<T> has"
  117. + " been marked as complete with regards to additions.");
  118. if (sw.Count <= spinCount)
  119. sw.SpinOnce ();
  120. else if (mreAdd.Wait (sleepTime))
  121. mreAdd.Reset ();
  122. continue;
  123. }
  124. if (Interlocked.CompareExchange (ref removeId, cachedRemoveId + 1, cachedRemoveId) == cachedRemoveId)
  125. break;
  126. }
  127. T item;
  128. while (!underlyingColl.TryTake (out item));
  129. if (!mreRemove.IsSet)
  130. mreRemove.Set ();
  131. return item;
  132. }
  133. public bool TryAdd (T item)
  134. {
  135. return TryAdd (item, null, CancellationToken.None);
  136. }
  137. bool TryAdd (T item, Func<bool> contFunc, CancellationToken token)
  138. {
  139. do {
  140. token.ThrowIfCancellationRequested ();
  141. long cachedAddId = addId;
  142. long cachedRemoveId = removeId;
  143. if (upperBound != -1) {
  144. if (cachedAddId - cachedRemoveId > upperBound) {
  145. continue;
  146. }
  147. }
  148. // Check our transaction id against completed stored one
  149. if (isComplete.Value && cachedAddId >= completeId)
  150. throw new InvalidOperationException ("The BlockingCollection<T> has"
  151. + " been marked as complete with regards to additions.");
  152. if (Interlocked.CompareExchange (ref addId, cachedAddId + 1, cachedAddId) != cachedAddId)
  153. continue;
  154. if (!underlyingColl.TryAdd (item))
  155. continue;
  156. if (!mreAdd.IsSet)
  157. mreAdd.Set ();
  158. return true;
  159. } while (contFunc != null && contFunc ());
  160. return false;
  161. }
  162. public bool TryAdd (T item, TimeSpan ts)
  163. {
  164. return TryAdd (item, (int)ts.TotalMilliseconds);
  165. }
  166. public bool TryAdd (T item, int millisecondsTimeout)
  167. {
  168. Stopwatch sw = Stopwatch.StartNew ();
  169. return TryAdd (item, () => sw.ElapsedMilliseconds < millisecondsTimeout, CancellationToken.None);
  170. }
  171. public bool TryAdd (T item, int millisecondsTimeout, CancellationToken token)
  172. {
  173. Stopwatch sw = Stopwatch.StartNew ();
  174. return TryAdd (item, () => sw.ElapsedMilliseconds < millisecondsTimeout, token);
  175. }
  176. public bool TryTake (out T item)
  177. {
  178. return TryTake (out item, null, CancellationToken.None);
  179. }
  180. bool TryTake (out T item, Func<bool> contFunc, CancellationToken token)
  181. {
  182. item = default (T);
  183. do {
  184. token.ThrowIfCancellationRequested ();
  185. long cachedRemoveId = removeId;
  186. long cachedAddId = addId;
  187. // Empty case
  188. if (cachedRemoveId == cachedAddId) {
  189. if (IsCompleted)
  190. return false;
  191. continue;
  192. }
  193. if (Interlocked.CompareExchange (ref removeId, cachedRemoveId + 1, cachedRemoveId) != cachedRemoveId)
  194. continue;
  195. return underlyingColl.TryTake (out item);
  196. } while (contFunc != null && contFunc ());
  197. return false;
  198. }
  199. public bool TryTake (out T item, TimeSpan ts)
  200. {
  201. return TryTake (out item, (int)ts.TotalMilliseconds);
  202. }
  203. public bool TryTake (out T item, int millisecondsTimeout)
  204. {
  205. item = default (T);
  206. Stopwatch sw = Stopwatch.StartNew ();
  207. return TryTake (out item, () => sw.ElapsedMilliseconds < millisecondsTimeout, CancellationToken.None);
  208. }
  209. public bool TryTake (out T item, int millisecondsTimeout, CancellationToken token)
  210. {
  211. item = default (T);
  212. Stopwatch sw = Stopwatch.StartNew ();
  213. return TryTake (out item, () => sw.ElapsedMilliseconds < millisecondsTimeout, token);
  214. }
  215. #endregion
  216. #region static methods
  217. static void CheckArray (BlockingCollection<T>[] collections)
  218. {
  219. if (collections == null)
  220. throw new ArgumentNullException ("collections");
  221. if (collections.Length == 0 || IsThereANullElement (collections))
  222. throw new ArgumentException ("The collections argument is a 0-length array or contains a null element.", "collections");
  223. }
  224. static bool IsThereANullElement (BlockingCollection<T>[] collections)
  225. {
  226. foreach (BlockingCollection<T> e in collections)
  227. if (e == null)
  228. return true;
  229. return false;
  230. }
  231. public static int AddToAny (BlockingCollection<T>[] collections, T item)
  232. {
  233. CheckArray (collections);
  234. int index = 0;
  235. foreach (var coll in collections) {
  236. try {
  237. coll.Add (item);
  238. return index;
  239. } catch {}
  240. index++;
  241. }
  242. return -1;
  243. }
  244. public static int AddToAny (BlockingCollection<T>[] collections, T item, CancellationToken token)
  245. {
  246. CheckArray (collections);
  247. int index = 0;
  248. foreach (var coll in collections) {
  249. try {
  250. coll.Add (item, token);
  251. return index;
  252. } catch {}
  253. index++;
  254. }
  255. return -1;
  256. }
  257. public static int TryAddToAny (BlockingCollection<T>[] collections, T item)
  258. {
  259. CheckArray (collections);
  260. int index = 0;
  261. foreach (var coll in collections) {
  262. if (coll.TryAdd (item))
  263. return index;
  264. index++;
  265. }
  266. return -1;
  267. }
  268. public static int TryAddToAny (BlockingCollection<T>[] collections, T item, TimeSpan ts)
  269. {
  270. CheckArray (collections);
  271. int index = 0;
  272. foreach (var coll in collections) {
  273. if (coll.TryAdd (item, ts))
  274. return index;
  275. index++;
  276. }
  277. return -1;
  278. }
  279. public static int TryAddToAny (BlockingCollection<T>[] collections, T item, int millisecondsTimeout)
  280. {
  281. CheckArray (collections);
  282. int index = 0;
  283. foreach (var coll in collections) {
  284. if (coll.TryAdd (item, millisecondsTimeout))
  285. return index;
  286. index++;
  287. }
  288. return -1;
  289. }
  290. public static int TryAddToAny (BlockingCollection<T>[] collections, T item, int millisecondsTimeout,
  291. CancellationToken token)
  292. {
  293. CheckArray (collections);
  294. int index = 0;
  295. foreach (var coll in collections) {
  296. if (coll.TryAdd (item, millisecondsTimeout, token))
  297. return index;
  298. index++;
  299. }
  300. return -1;
  301. }
  302. public static int TakeFromAny (BlockingCollection<T>[] collections, out T item)
  303. {
  304. item = default (T);
  305. CheckArray (collections);
  306. int index = 0;
  307. foreach (var coll in collections) {
  308. try {
  309. item = coll.Take ();
  310. return index;
  311. } catch {}
  312. index++;
  313. }
  314. return -1;
  315. }
  316. public static int TakeFromAny (BlockingCollection<T>[] collections, out T item, CancellationToken token)
  317. {
  318. item = default (T);
  319. CheckArray (collections);
  320. int index = 0;
  321. foreach (var coll in collections) {
  322. try {
  323. item = coll.Take (token);
  324. return index;
  325. } catch {}
  326. index++;
  327. }
  328. return -1;
  329. }
  330. public static int TryTakeFromAny (BlockingCollection<T>[] collections, out T item)
  331. {
  332. item = default (T);
  333. CheckArray (collections);
  334. int index = 0;
  335. foreach (var coll in collections) {
  336. if (coll.TryTake (out item))
  337. return index;
  338. index++;
  339. }
  340. return -1;
  341. }
  342. public static int TryTakeFromAny (BlockingCollection<T>[] collections, out T item, TimeSpan ts)
  343. {
  344. item = default (T);
  345. CheckArray (collections);
  346. int index = 0;
  347. foreach (var coll in collections) {
  348. if (coll.TryTake (out item, ts))
  349. return index;
  350. index++;
  351. }
  352. return -1;
  353. }
  354. public static int TryTakeFromAny (BlockingCollection<T>[] collections, out T item, int millisecondsTimeout)
  355. {
  356. item = default (T);
  357. CheckArray (collections);
  358. int index = 0;
  359. foreach (var coll in collections) {
  360. if (coll.TryTake (out item, millisecondsTimeout))
  361. return index;
  362. index++;
  363. }
  364. return -1;
  365. }
  366. public static int TryTakeFromAny (BlockingCollection<T>[] collections, out T item, int millisecondsTimeout,
  367. CancellationToken token)
  368. {
  369. item = default (T);
  370. CheckArray (collections);
  371. int index = 0;
  372. foreach (var coll in collections) {
  373. if (coll.TryTake (out item, millisecondsTimeout, token))
  374. return index;
  375. index++;
  376. }
  377. return -1;
  378. }
  379. #endregion
  380. public void CompleteAdding ()
  381. {
  382. // No further add beside that point
  383. completeId = addId;
  384. isComplete.Value = true;
  385. }
  386. void ICollection.CopyTo (Array array, int index)
  387. {
  388. underlyingColl.CopyTo (array, index);
  389. }
  390. public void CopyTo (T[] array, int index)
  391. {
  392. underlyingColl.CopyTo (array, index);
  393. }
  394. public IEnumerable<T> GetConsumingEnumerable ()
  395. {
  396. return GetConsumingEnumerable (Take);
  397. }
  398. public IEnumerable<T> GetConsumingEnumerable (CancellationToken token)
  399. {
  400. return GetConsumingEnumerable (() => Take (token));
  401. }
  402. IEnumerable<T> GetConsumingEnumerable (Func<T> getFunc)
  403. {
  404. while (true) {
  405. T item = default (T);
  406. try {
  407. item = getFunc ();
  408. } catch {
  409. break;
  410. }
  411. yield return item;
  412. }
  413. }
  414. IEnumerator IEnumerable.GetEnumerator ()
  415. {
  416. return ((IEnumerable)underlyingColl).GetEnumerator ();
  417. }
  418. IEnumerator<T> IEnumerable<T>.GetEnumerator ()
  419. {
  420. return ((IEnumerable<T>)underlyingColl).GetEnumerator ();
  421. }
  422. public void Dispose ()
  423. {
  424. }
  425. protected virtual void Dispose (bool managedRes)
  426. {
  427. }
  428. public T[] ToArray ()
  429. {
  430. return underlyingColl.ToArray ();
  431. }
  432. public int BoundedCapacity {
  433. get {
  434. return upperBound;
  435. }
  436. }
  437. public int Count {
  438. get {
  439. return underlyingColl.Count;
  440. }
  441. }
  442. public bool IsAddingCompleted {
  443. get {
  444. return isComplete.Value;
  445. }
  446. }
  447. public bool IsCompleted {
  448. get {
  449. return isComplete.Value && addId == removeId;
  450. }
  451. }
  452. object ICollection.SyncRoot {
  453. get {
  454. return underlyingColl.SyncRoot;
  455. }
  456. }
  457. bool ICollection.IsSynchronized {
  458. get {
  459. return underlyingColl.IsSynchronized;
  460. }
  461. }
  462. }
  463. }
  464. #endif