BatchedJoinBlock`3.cs 6.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217
  1. // BatchedJoinBlock.cs
  2. //
  3. // Copyright (c) 2012 Petr Onderka
  4. //
  5. // Permission is hereby granted, free of charge, to any person obtaining a copy
  6. // of this software and associated documentation files (the "Software"), to deal
  7. // in the Software without restriction, including without limitation the rights
  8. // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
  9. // copies of the Software, and to permit persons to whom the Software is
  10. // furnished to do so, subject to the following conditions:
  11. //
  12. // The above copyright notice and this permission notice shall be included in
  13. // all copies or substantial portions of the Software.
  14. //
  15. // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
  16. // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
  17. // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
  18. // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
  19. // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
  20. // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
  21. // THE SOFTWARE.
  22. //
  23. //
  24. using System.Collections.Generic;
  25. namespace System.Threading.Tasks.Dataflow {
  26. public sealed class BatchedJoinBlock<T1, T2, T3> :
  27. IReceivableSourceBlock<Tuple<IList<T1>, IList<T2>, IList<T3>>> {
  28. readonly GroupingDataflowBlockOptions options;
  29. CompletionHelper completionHelper;
  30. readonly MessageOutgoingQueue<Tuple<IList<T1>, IList<T2>, IList<T3>>> outgoing;
  31. DataflowMessageHeader headers;
  32. SpinLock batchLock;
  33. readonly JoinTarget<T1> target1;
  34. readonly JoinTarget<T2> target2;
  35. readonly JoinTarget<T3> target3;
  36. int batchCount;
  37. public BatchedJoinBlock (int batchSize)
  38. : this (batchSize, GroupingDataflowBlockOptions.Default)
  39. {
  40. }
  41. public BatchedJoinBlock (int batchSize,
  42. GroupingDataflowBlockOptions dataflowBlockOptions)
  43. {
  44. if (batchSize <= 0)
  45. throw new ArgumentOutOfRangeException (
  46. "batchSize", batchSize, "The batchSize must be positive.");
  47. if (dataflowBlockOptions == null)
  48. throw new ArgumentNullException ("dataflowBlockOptions");
  49. BatchSize = batchSize;
  50. options = dataflowBlockOptions;
  51. completionHelper = CompletionHelper.GetNew (options);
  52. target1 = new JoinTarget<T1> (
  53. this, SignalTarget, completionHelper, () => outgoing.IsCompleted,
  54. dataflowBlockOptions);
  55. target2 = new JoinTarget<T2> (
  56. this, SignalTarget, completionHelper, () => outgoing.IsCompleted,
  57. dataflowBlockOptions);
  58. target3 = new JoinTarget<T3>(
  59. this, SignalTarget, completionHelper, () => outgoing.IsCompleted,
  60. dataflowBlockOptions);
  61. outgoing = new MessageOutgoingQueue<Tuple<IList<T1>, IList<T2>, IList<T3>>> (
  62. this, completionHelper,
  63. () => target1.Buffer.IsCompleted || target2.Buffer.IsCompleted
  64. || target3.Buffer.IsCompleted,
  65. () =>
  66. {
  67. target1.DecreaseCount ();
  68. target2.DecreaseCount ();
  69. target3.DecreaseCount ();
  70. }, options);
  71. }
  72. public int BatchSize { get; private set; }
  73. public ITargetBlock<T1> Target1 {
  74. get { return target1; }
  75. }
  76. public ITargetBlock<T2> Target2 {
  77. get { return target2; }
  78. }
  79. public ITargetBlock<T3> Target3 {
  80. get { return target3; }
  81. }
  82. void SignalTarget ()
  83. {
  84. int current = Interlocked.Increment (ref batchCount);
  85. if (current % BatchSize != 0)
  86. return;
  87. Interlocked.Add (ref batchCount, -current);
  88. MakeBatch (BatchSize);
  89. }
  90. void MakeBatch (int batchSize)
  91. {
  92. var list1 = new List<T1> ();
  93. var list2 = new List<T2> ();
  94. var list3 = new List<T3> ();
  95. // lock is necessary here to make sure items are in the correct order
  96. bool taken = false;
  97. try {
  98. batchLock.Enter (ref taken);
  99. int i = 0;
  100. T1 item1;
  101. while (i < batchSize && target1.Buffer.TryTake (out item1)) {
  102. list1.Add (item1);
  103. i++;
  104. }
  105. T2 item2;
  106. while (i < batchSize && target2.Buffer.TryTake (out item2)) {
  107. list2.Add (item2);
  108. i++;
  109. }
  110. T3 item3;
  111. while (i < batchSize && target3.Buffer.TryTake (out item3)) {
  112. list3.Add (item3);
  113. i++;
  114. }
  115. if (i < batchSize)
  116. throw new InvalidOperationException ("Unexpected count of items.");
  117. } finally {
  118. if (taken)
  119. batchLock.Exit ();
  120. }
  121. var batch = Tuple.Create<IList<T1>, IList<T2>, IList<T3>> (list1, list2,
  122. list3);
  123. outgoing.AddData (batch);
  124. }
  125. public Task Completion
  126. {
  127. get { return completionHelper.Completion; }
  128. }
  129. public void Complete ()
  130. {
  131. outgoing.Complete ();
  132. }
  133. void IDataflowBlock.Fault (Exception exception)
  134. {
  135. completionHelper.RequestFault (exception);
  136. }
  137. Tuple<IList<T1>, IList<T2>, IList<T3>>
  138. ISourceBlock<Tuple<IList<T1>, IList<T2>, IList<T3>>>.ConsumeMessage (
  139. DataflowMessageHeader messageHeader,
  140. ITargetBlock<Tuple<IList<T1>, IList<T2>, IList<T3>>> target,
  141. out bool messageConsumed)
  142. {
  143. return outgoing.ConsumeMessage (messageHeader, target, out messageConsumed);
  144. }
  145. public IDisposable LinkTo (
  146. ITargetBlock<Tuple<IList<T1>, IList<T2>, IList<T3>>> target,
  147. DataflowLinkOptions linkOptions)
  148. {
  149. return outgoing.AddTarget (target, linkOptions);
  150. }
  151. void ISourceBlock<Tuple<IList<T1>, IList<T2>, IList<T3>>>.ReleaseReservation (
  152. DataflowMessageHeader messageHeader,
  153. ITargetBlock<Tuple<IList<T1>, IList<T2>, IList<T3>>> target)
  154. {
  155. outgoing.ReleaseReservation (messageHeader, target);
  156. }
  157. bool ISourceBlock<Tuple<IList<T1>, IList<T2>, IList<T3>>>.ReserveMessage (
  158. DataflowMessageHeader messageHeader,
  159. ITargetBlock<Tuple<IList<T1>, IList<T2>, IList<T3>>> target)
  160. {
  161. return outgoing.ReserveMessage (messageHeader, target);
  162. }
  163. public bool TryReceive (
  164. Predicate<Tuple<IList<T1>, IList<T2>, IList<T3>>> filter,
  165. out Tuple<IList<T1>, IList<T2>, IList<T3>> item)
  166. {
  167. return outgoing.TryReceive (filter, out item);
  168. }
  169. public bool TryReceiveAll (
  170. out IList<Tuple<IList<T1>, IList<T2>, IList<T3>>> items)
  171. {
  172. return outgoing.TryReceiveAll (out items);
  173. }
  174. public override string ToString ()
  175. {
  176. return NameHelper.GetName (this, options);
  177. }
  178. }
  179. }