MessageBox.cs 7.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256
  1. // MessageBox.cs
  2. //
  3. // Copyright (c) 2011 Jérémie "garuma" Laval
  4. // Copyright (c) 2012 Petr Onderka
  5. //
  6. // Permission is hereby granted, free of charge, to any person obtaining a copy
  7. // of this software and associated documentation files (the "Software"), to deal
  8. // in the Software without restriction, including without limitation the rights
  9. // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
  10. // copies of the Software, and to permit persons to whom the Software is
  11. // furnished to do so, subject to the following conditions:
  12. //
  13. // The above copyright notice and this permission notice shall be included in
  14. // all copies or substantial portions of the Software.
  15. //
  16. // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
  17. // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
  18. // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
  19. // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
  20. // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
  21. // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
  22. // THE SOFTWARE.
  23. using System.Collections.Concurrent;
  24. using System.Linq;
  25. namespace System.Threading.Tasks.Dataflow {
  26. /// <summary>
  27. /// In MessageBox we store message that have been offered to us so that they can be
  28. /// later processed
  29. /// </summary>
  30. internal class MessageBox<TInput> {
  31. protected ITargetBlock<TInput> Target { get; set; }
  32. protected CompletionHelper CompHelper { get; private set; }
  33. readonly Func<bool> externalCompleteTester;
  34. readonly DataflowBlockOptions options;
  35. readonly bool greedy;
  36. readonly Func<bool> canAccept;
  37. readonly ConcurrentDictionary<ISourceBlock<TInput>, DataflowMessageHeader>
  38. postponedMessages =
  39. new ConcurrentDictionary<ISourceBlock<TInput>, DataflowMessageHeader> ();
  40. int itemCount;
  41. readonly AtomicBoolean postponedProcessing = new AtomicBoolean ();
  42. protected BlockingCollection<TInput> MessageQueue { get; private set; }
  43. public MessageBox (
  44. ITargetBlock<TInput> target, BlockingCollection<TInput> messageQueue,
  45. CompletionHelper compHelper, Func<bool> externalCompleteTester,
  46. DataflowBlockOptions options, bool greedy = true, Func<bool> canAccept = null)
  47. {
  48. this.Target = target;
  49. this.CompHelper = compHelper;
  50. this.MessageQueue = messageQueue;
  51. this.externalCompleteTester = externalCompleteTester;
  52. this.options = options;
  53. this.greedy = greedy;
  54. this.canAccept = canAccept;
  55. }
  56. public DataflowMessageStatus OfferMessage (
  57. DataflowMessageHeader messageHeader, TInput messageValue,
  58. ISourceBlock<TInput> source, bool consumeToAccept)
  59. {
  60. if (!messageHeader.IsValid)
  61. throw new ArgumentException ("The messageHeader is not valid.",
  62. "messageHeader");
  63. if (consumeToAccept && source == null)
  64. throw new ArgumentException (
  65. "consumeToAccept may only be true if provided with a non-null source.",
  66. "consumeToAccept");
  67. if (MessageQueue.IsAddingCompleted || !CompHelper.CanRun)
  68. return DataflowMessageStatus.DecliningPermanently;
  69. var full = options.BoundedCapacity != -1
  70. && Thread.VolatileRead (ref itemCount) >= options.BoundedCapacity;
  71. if (!greedy || full) {
  72. if (source == null)
  73. return DataflowMessageStatus.Declined;
  74. postponedMessages [source] = messageHeader;
  75. // necessary to avoid race condition
  76. DecreaseCount (0);
  77. if (!greedy && !full)
  78. EnsureProcessing (true);
  79. return DataflowMessageStatus.Postponed;
  80. }
  81. if (consumeToAccept) {
  82. if (!source.ReserveMessage (messageHeader, Target))
  83. return DataflowMessageStatus.NotAvailable;
  84. bool consummed;
  85. messageValue = source.ConsumeMessage (messageHeader, Target, out consummed);
  86. if (!consummed)
  87. return DataflowMessageStatus.NotAvailable;
  88. }
  89. if (canAccept != null && !canAccept ())
  90. return DataflowMessageStatus.DecliningPermanently;
  91. try {
  92. MessageQueue.Add (messageValue);
  93. } catch (InvalidOperationException) {
  94. // This is triggered either if the underlying collection didn't accept the item
  95. // or if the messageQueue has been marked complete, either way it corresponds to a false
  96. return DataflowMessageStatus.DecliningPermanently;
  97. }
  98. IncreaseCount ();
  99. EnsureProcessing (true);
  100. VerifyCompleteness ();
  101. return DataflowMessageStatus.Accepted;
  102. }
  103. public void IncreaseCount ()
  104. {
  105. Interlocked.Increment (ref itemCount);
  106. }
  107. public void DecreaseCount (int count = 1)
  108. {
  109. int decreased = Interlocked.Add (ref itemCount, -count);
  110. // if BoundedCapacity is -1, there is no need to do this
  111. if (decreased < options.BoundedCapacity && !postponedMessages.IsEmpty) {
  112. if (greedy)
  113. EnsurePostponedProcessing ();
  114. else
  115. EnsureProcessing (false);
  116. }
  117. }
  118. public int PostponedMessagesCount {
  119. get { return postponedMessages.Count; }
  120. }
  121. public Tuple<ISourceBlock<TInput>, DataflowMessageHeader> ReserveMessage()
  122. {
  123. while (!postponedMessages.IsEmpty) {
  124. var block = postponedMessages.FirstOrDefault () .Key;
  125. // collection is empty
  126. if (block == null)
  127. break;
  128. DataflowMessageHeader header;
  129. bool removed = postponedMessages.TryRemove (block, out header);
  130. // another thread was faster, try again
  131. if (!removed)
  132. continue;
  133. bool reserved = block.ReserveMessage (header, Target);
  134. if (reserved)
  135. return Tuple.Create (block, header);
  136. }
  137. return null;
  138. }
  139. public void RelaseReservation(Tuple<ISourceBlock<TInput>, DataflowMessageHeader> reservation)
  140. {
  141. reservation.Item1.ReleaseReservation (reservation.Item2, Target);
  142. }
  143. public TInput ConsumeReserved(Tuple<ISourceBlock<TInput>, DataflowMessageHeader> reservation)
  144. {
  145. bool consumed;
  146. return reservation.Item1.ConsumeMessage (
  147. reservation.Item2, Target, out consumed);
  148. }
  149. void EnsurePostponedProcessing ()
  150. {
  151. if (postponedProcessing.TrySet())
  152. Task.Factory.StartNew (RetrievePostponed, options.CancellationToken,
  153. TaskCreationOptions.PreferFairness, options.TaskScheduler);
  154. }
  155. void RetrievePostponed ()
  156. {
  157. // BoundedCapacity can't be -1 here, because in that case there would be no postponing
  158. while (Thread.VolatileRead (ref itemCount) < options.BoundedCapacity
  159. && !postponedMessages.IsEmpty && !MessageQueue.IsAddingCompleted) {
  160. var block = postponedMessages.First ().Key;
  161. DataflowMessageHeader header;
  162. postponedMessages.TryRemove (block, out header);
  163. bool consumed;
  164. var item = block.ConsumeMessage (header, Target, out consumed);
  165. if (consumed) {
  166. try {
  167. MessageQueue.Add (item);
  168. IncreaseCount ();
  169. EnsureProcessing (false);
  170. } catch (InvalidOperationException) {
  171. break;
  172. }
  173. }
  174. }
  175. // release all postponed messages
  176. if (MessageQueue.IsAddingCompleted) {
  177. while (!postponedMessages.IsEmpty) {
  178. var block = postponedMessages.First ().Key;
  179. DataflowMessageHeader header;
  180. postponedMessages.TryRemove (block, out header);
  181. if (block.ReserveMessage (header, Target))
  182. block.ReleaseReservation (header, Target);
  183. }
  184. }
  185. postponedProcessing.Value = false;
  186. // because of race
  187. if ((Thread.VolatileRead (ref itemCount) < options.BoundedCapacity
  188. || MessageQueue.IsAddingCompleted)
  189. && !postponedMessages.IsEmpty)
  190. EnsurePostponedProcessing ();
  191. }
  192. protected virtual void EnsureProcessing (bool newItem)
  193. {
  194. }
  195. public void Complete ()
  196. {
  197. // Make message queue complete
  198. MessageQueue.CompleteAdding ();
  199. OutgoingQueueComplete ();
  200. VerifyCompleteness ();
  201. if (!postponedMessages.IsEmpty)
  202. EnsurePostponedProcessing ();
  203. }
  204. protected virtual void OutgoingQueueComplete ()
  205. {
  206. }
  207. protected virtual void VerifyCompleteness ()
  208. {
  209. if (MessageQueue.IsCompleted && externalCompleteTester ())
  210. CompHelper.Complete ();
  211. }
  212. }
  213. }