BatchBlock.cs 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376
  1. // BatchBlock.cs
  2. //
  3. // Copyright (c) 2011 Jérémie "garuma" Laval
  4. // Copyright (c) 2012 Petr Onderka
  5. //
  6. // Permission is hereby granted, free of charge, to any person obtaining a copy
  7. // of this software and associated documentation files (the "Software"), to deal
  8. // in the Software without restriction, including without limitation the rights
  9. // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
  10. // copies of the Software, and to permit persons to whom the Software is
  11. // furnished to do so, subject to the following conditions:
  12. //
  13. // The above copyright notice and this permission notice shall be included in
  14. // all copies or substantial portions of the Software.
  15. //
  16. // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
  17. // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
  18. // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
  19. // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
  20. // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
  21. // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
  22. // THE SOFTWARE.
  23. using System.Collections.Generic;
  24. using System.Collections.Concurrent;
  25. namespace System.Threading.Tasks.Dataflow {
  26. public sealed class BatchBlock<T> : IPropagatorBlock<T, T[]>, IReceivableSourceBlock<T[]> {
  27. readonly CompletionHelper compHelper;
  28. readonly BlockingCollection<T> messageQueue = new BlockingCollection<T> ();
  29. readonly MessageBox<T> messageBox;
  30. readonly GroupingDataflowBlockOptions dataflowBlockOptions;
  31. readonly int batchSize;
  32. int batchCount;
  33. long numberOfGroups;
  34. SpinLock batchCountLock;
  35. readonly OutgoingQueue<T[]> outgoing;
  36. SpinLock batchLock;
  37. readonly AtomicBoolean nonGreedyProcessing = new AtomicBoolean ();
  38. public BatchBlock (int batchSize) : this (batchSize, GroupingDataflowBlockOptions.Default)
  39. {
  40. }
  41. public BatchBlock (int batchSize, GroupingDataflowBlockOptions dataflowBlockOptions)
  42. {
  43. if (batchSize <= 0)
  44. throw new ArgumentOutOfRangeException ("batchSize", batchSize,
  45. "The batchSize must be positive.");
  46. if (dataflowBlockOptions == null)
  47. throw new ArgumentNullException ("dataflowBlockOptions");
  48. if (dataflowBlockOptions.BoundedCapacity != -1
  49. && batchSize > dataflowBlockOptions.BoundedCapacity)
  50. throw new ArgumentOutOfRangeException ("batchSize",
  51. "The batchSize must be smaller than the value of BoundedCapacity.");
  52. this.batchSize = batchSize;
  53. this.dataflowBlockOptions = dataflowBlockOptions;
  54. this.compHelper = CompletionHelper.GetNew (dataflowBlockOptions);
  55. Action<bool> processQueue;
  56. Func<bool> canAccept;
  57. if (dataflowBlockOptions.MaxNumberOfGroups == -1) {
  58. processQueue = newItem => BatchProcess (newItem ? 1 : 0);
  59. canAccept = null;
  60. } else {
  61. processQueue = _ => BatchProcess ();
  62. canAccept = TryAdd;
  63. }
  64. this.messageBox = new PassingMessageBox<T> (this, messageQueue, compHelper,
  65. () => outgoing.IsCompleted, processQueue, dataflowBlockOptions,
  66. dataflowBlockOptions.Greedy, canAccept);
  67. this.outgoing = new OutgoingQueue<T[]> (this, compHelper,
  68. () => messageQueue.IsCompleted, messageBox.DecreaseCount,
  69. dataflowBlockOptions, batch => batch.Length);
  70. }
  71. DataflowMessageStatus ITargetBlock<T>.OfferMessage (
  72. DataflowMessageHeader messageHeader, T messageValue, ISourceBlock<T> source,
  73. bool consumeToAccept)
  74. {
  75. return messageBox.OfferMessage (
  76. messageHeader, messageValue, source, consumeToAccept);
  77. }
  78. public IDisposable LinkTo (ITargetBlock<T[]> target, DataflowLinkOptions linkOptions)
  79. {
  80. return outgoing.AddTarget (target, linkOptions);
  81. }
  82. T[] ISourceBlock<T[]>.ConsumeMessage (
  83. DataflowMessageHeader messageHeader, ITargetBlock<T[]> target,
  84. out bool messageConsumed)
  85. {
  86. return outgoing.ConsumeMessage (messageHeader, target, out messageConsumed);
  87. }
  88. void ISourceBlock<T[]>.ReleaseReservation (
  89. DataflowMessageHeader messageHeader, ITargetBlock<T[]> target)
  90. {
  91. outgoing.ReleaseReservation (messageHeader, target);
  92. }
  93. bool ISourceBlock<T[]>.ReserveMessage (
  94. DataflowMessageHeader messageHeader, ITargetBlock<T[]> target)
  95. {
  96. return outgoing.ReserveMessage (messageHeader, target);
  97. }
  98. public bool TryReceive (Predicate<T[]> filter, out T[] item)
  99. {
  100. return outgoing.TryReceive (filter, out item);
  101. }
  102. public bool TryReceiveAll (out IList<T[]> items)
  103. {
  104. return outgoing.TryReceiveAll (out items);
  105. }
  106. /// <summary>
  107. /// Verifies whether <see cref="GroupingDataflowBlockOptions.MaxNumberOfGroups"/>
  108. /// has been reached. If it did, <see cref="Complete"/>s the block.
  109. /// </summary>
  110. void VerifyMaxNumberOfGroups ()
  111. {
  112. if (dataflowBlockOptions.MaxNumberOfGroups == -1)
  113. return;
  114. bool shouldComplete;
  115. bool lockTaken = false;
  116. try {
  117. batchCountLock.Enter (ref lockTaken);
  118. shouldComplete = numberOfGroups >= dataflowBlockOptions.MaxNumberOfGroups;
  119. } finally {
  120. if (lockTaken)
  121. batchCountLock.Exit ();
  122. }
  123. if (shouldComplete)
  124. Complete ();
  125. }
  126. /// <summary>
  127. /// Returns whether a new item can be accepted, and increments a counter if it can.
  128. /// Only makes sense when <see cref="GroupingDataflowBlockOptions.MaxNumberOfGroups"/>
  129. /// is not unbounded.
  130. /// </summary>
  131. bool TryAdd ()
  132. {
  133. bool lockTaken = false;
  134. try {
  135. batchCountLock.Enter (ref lockTaken);
  136. if (numberOfGroups + batchCount / batchSize
  137. >= dataflowBlockOptions.MaxNumberOfGroups)
  138. return false;
  139. batchCount++;
  140. return true;
  141. } finally {
  142. if (lockTaken)
  143. batchCountLock.Exit ();
  144. }
  145. }
  146. public void TriggerBatch ()
  147. {
  148. if (dataflowBlockOptions.Greedy) {
  149. int earlyBatchSize;
  150. bool lockTaken = false;
  151. try {
  152. batchCountLock.Enter (ref lockTaken);
  153. if (batchCount == 0)
  154. return;
  155. earlyBatchSize = batchCount;
  156. batchCount = 0;
  157. numberOfGroups++;
  158. } finally {
  159. if (lockTaken)
  160. batchCountLock.Exit ();
  161. }
  162. MakeBatch (earlyBatchSize);
  163. } else {
  164. if (dataflowBlockOptions.BoundedCapacity == -1
  165. || outgoing.Count <= dataflowBlockOptions.BoundedCapacity)
  166. EnsureNonGreedyProcessing (true);
  167. }
  168. }
  169. /// <summary>
  170. /// Decides whether to create a new batch or not.
  171. /// </summary>
  172. /// <param name="addedItems">
  173. /// Number of newly added items. Used only with greedy processing.
  174. /// </param>
  175. void BatchProcess (int addedItems = 0)
  176. {
  177. if (dataflowBlockOptions.Greedy) {
  178. bool makeBatch = false;
  179. bool lockTaken = false;
  180. try {
  181. batchCountLock.Enter (ref lockTaken);
  182. batchCount += addedItems;
  183. if (batchCount >= batchSize) {
  184. batchCount -= batchSize;
  185. numberOfGroups++;
  186. makeBatch = true;
  187. }
  188. } finally {
  189. if (lockTaken)
  190. batchCountLock.Exit ();
  191. }
  192. if (makeBatch)
  193. MakeBatch (batchSize);
  194. } else {
  195. if (ShouldProcessNonGreedy ())
  196. EnsureNonGreedyProcessing (false);
  197. }
  198. }
  199. /// <summary>
  200. /// Returns whether non-greedy creation of a batch should be started.
  201. /// </summary>
  202. bool ShouldProcessNonGreedy ()
  203. {
  204. // do we have enough items waiting and would the new batch fit?
  205. return messageBox.PostponedMessagesCount >= batchSize
  206. && (dataflowBlockOptions.BoundedCapacity == -1
  207. || outgoing.Count + batchSize <= dataflowBlockOptions.BoundedCapacity);
  208. }
  209. /// <summary>
  210. /// Creates a batch of the given size and adds the result to the output queue.
  211. /// </summary>
  212. void MakeBatch (int size)
  213. {
  214. T[] batch = new T[size];
  215. // lock is necessary here to make sure items are in the correct order
  216. bool taken = false;
  217. try {
  218. batchLock.Enter (ref taken);
  219. for (int i = 0; i < size; ++i)
  220. messageQueue.TryTake (out batch [i]);
  221. } finally {
  222. if (taken)
  223. batchLock.Exit ();
  224. }
  225. outgoing.AddData (batch);
  226. VerifyMaxNumberOfGroups ();
  227. }
  228. /// <summary>
  229. /// Starts non-greedy creation of batches, if one doesn't already run.
  230. /// </summary>
  231. /// <param name="manuallyTriggered">Whether the batch was triggered by <see cref="TriggerBatch"/>.</param>
  232. void EnsureNonGreedyProcessing (bool manuallyTriggered)
  233. {
  234. if (nonGreedyProcessing.TrySet ())
  235. Task.Factory.StartNew (() => NonGreedyProcess (manuallyTriggered),
  236. dataflowBlockOptions.CancellationToken,
  237. TaskCreationOptions.PreferFairness,
  238. dataflowBlockOptions.TaskScheduler);
  239. }
  240. /// <summary>
  241. /// Creates batches in non-greedy mode,
  242. /// making sure the whole batch is available by using reservations.
  243. /// </summary>
  244. /// <param name="manuallyTriggered">Whether the batch was triggered by <see cref="TriggerBatch"/>.</param>
  245. void NonGreedyProcess (bool manuallyTriggered)
  246. {
  247. bool first = true;
  248. do {
  249. var reservations =
  250. new List<Tuple<ISourceBlock<T>, DataflowMessageHeader>> ();
  251. int expectedReservationsCount = messageBox.PostponedMessagesCount;
  252. if (expectedReservationsCount == 0)
  253. break;
  254. bool gotReservation;
  255. do {
  256. var reservation = messageBox.ReserveMessage ();
  257. gotReservation = reservation != null;
  258. if (gotReservation)
  259. reservations.Add (reservation);
  260. } while (gotReservation && reservations.Count < batchSize);
  261. int expectedSize = manuallyTriggered && first
  262. ? Math.Min (expectedReservationsCount, batchSize)
  263. : batchSize;
  264. if (reservations.Count < expectedSize) {
  265. foreach (var reservation in reservations)
  266. messageBox.RelaseReservation (reservation);
  267. // some reservations failed, which most likely means the message
  268. // was consumed by someone else and a new one will be offered soon;
  269. // so postpone the batch, so that the other block has time to do that
  270. // (MS .Net does something like this too)
  271. if (manuallyTriggered && first) {
  272. Task.Factory.StartNew (() => NonGreedyProcess (true),
  273. dataflowBlockOptions.CancellationToken,
  274. TaskCreationOptions.PreferFairness,
  275. dataflowBlockOptions.TaskScheduler);
  276. return;
  277. }
  278. } else {
  279. T[] batch = new T[reservations.Count];
  280. for (int i = 0; i < reservations.Count; i++)
  281. batch [i] = messageBox.ConsumeReserved (reservations [i]);
  282. outgoing.AddData (batch);
  283. // non-greedy doesn't need lock
  284. numberOfGroups++;
  285. VerifyMaxNumberOfGroups ();
  286. }
  287. first = false;
  288. } while (ShouldProcessNonGreedy ());
  289. nonGreedyProcessing.Value = false;
  290. if (ShouldProcessNonGreedy ())
  291. EnsureNonGreedyProcessing (false);
  292. }
  293. public void Complete ()
  294. {
  295. messageBox.Complete ();
  296. TriggerBatch ();
  297. outgoing.Complete ();
  298. }
  299. void IDataflowBlock.Fault (Exception exception)
  300. {
  301. compHelper.RequestFault (exception);
  302. }
  303. public Task Completion {
  304. get { return compHelper.Completion; }
  305. }
  306. public int OutputCount {
  307. get { return outgoing.Count; }
  308. }
  309. public int BatchSize {
  310. get { return batchSize; }
  311. }
  312. public override string ToString ()
  313. {
  314. return NameHelper.GetName (this, dataflowBlockOptions);
  315. }
  316. }
  317. }