Browse Source

Release postponed messages after Complete()

Petr Onderka 13 years ago
parent
commit
37cdae3d3e

+ 18 - 2
mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/MessageBox.cs

@@ -186,7 +186,7 @@ namespace System.Threading.Tasks.Dataflow {
 		{
 			// BoundedCapacity can't be -1 here, because in that case there would be no postponing
 			while (Thread.VolatileRead (ref itemCount) < options.BoundedCapacity
-			       && !postponedMessages.IsEmpty) {
+			       && !postponedMessages.IsEmpty && !MessageQueue.IsAddingCompleted) {
 				var block = postponedMessages.First ().Key;
 				DataflowMessageHeader header;
 				postponedMessages.TryRemove (block, out header);
@@ -204,10 +204,23 @@ namespace System.Threading.Tasks.Dataflow {
 				}
 			}
 
+			// release all postponed messages
+			if (MessageQueue.IsAddingCompleted) {
+				while (!postponedMessages.IsEmpty) {
+					var block = postponedMessages.First ().Key;
+					DataflowMessageHeader header;
+					postponedMessages.TryRemove (block, out header);
+
+					if (block.ReserveMessage (header, Target))
+						block.ReleaseReservation (header, Target);
+				}
+			}
+
 			postponedProcessing.Value = false;
 
 			// because of race
-			if (Thread.VolatileRead (ref itemCount) < options.BoundedCapacity
+			if ((Thread.VolatileRead (ref itemCount) < options.BoundedCapacity
+			     || MessageQueue.IsAddingCompleted)
 			    && !postponedMessages.IsEmpty)
 				EnsurePostponedProcessing ();
 		}
@@ -222,6 +235,9 @@ namespace System.Threading.Tasks.Dataflow {
 			MessageQueue.CompleteAdding ();
 			OutgoingQueueComplete ();
 			VerifyCompleteness ();
+
+			if (!postponedMessages.IsEmpty)
+				EnsurePostponedProcessing ();
 		}
 
 		protected virtual void OutgoingQueueComplete ()

+ 25 - 0
mcs/class/System.Threading.Tasks.Dataflow/Test/System.Threading.Tasks.Dataflow/BoundedCapacityTest.cs

@@ -218,6 +218,31 @@ namespace MonoTests.System.Threading.Tasks.Dataflow {
 			Assert.IsTrue (source.WasConsumed (header3));
 			Assert.IsFalse (source.WasConsumed (header2));
 		}
+
+		[Test]
+		public void DontConsumePostponedAfterCompleteTest ()
+		{
+			var scheduler = new TestScheduler ();
+			var block = new BufferBlock<int> (
+				new DataflowBlockOptions { BoundedCapacity = 1, TaskScheduler = scheduler });
+			var target = (ITargetBlock<int>)block;
+			var source = new TestSourceBlock<int> ();
+
+			Assert.IsTrue (block.Post (11));
+
+			var header = new DataflowMessageHeader (1);
+			source.AddMessage (header, 12);
+			Assert.AreEqual (DataflowMessageStatus.Postponed,
+				target.OfferMessage (header, 12, source, false));
+
+			block.Complete ();
+
+			Assert.AreEqual (11, block.Receive ());
+
+			scheduler.ExecuteAll ();
+
+			Assert.IsFalse (source.WasConsumed (header));
+		}
 	}
 
 	class TestSourceBlock<T> : ISourceBlock<T> {