CompletionTest.cs 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382
  1. //
  2. // CompletionTest.cs
  3. //
  4. // Author:
  5. // Jérémie "garuma" Laval <[email protected]>
  6. // Petr Onderka <[email protected]>
  7. //
  8. // Copyright (c) 2011 Jérémie "garuma" Laval
  9. // Copyright (c) 2012 Petr Onderka
  10. //
  11. // Permission is hereby granted, free of charge, to any person obtaining a copy
  12. // of this software and associated documentation files (the "Software"), to deal
  13. // in the Software without restriction, including without limitation the rights
  14. // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
  15. // copies of the Software, and to permit persons to whom the Software is
  16. // furnished to do so, subject to the following conditions:
  17. //
  18. // The above copyright notice and this permission notice shall be included in
  19. // all copies or substantial portions of the Software.
  20. //
  21. // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
  22. // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
  23. // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
  24. // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
  25. // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
  26. // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
  27. // THE SOFTWARE.
  28. using System;
  29. using System.Collections.Generic;
  30. using System.Threading;
  31. using System.Threading.Tasks;
  32. using System.Threading.Tasks.Dataflow;
  33. using NUnit.Framework;
  34. using System.Linq;
  35. namespace MonoTests.System.Threading.Tasks.Dataflow {
  36. [TestFixture]
  37. public class CompletionTest {
  38. [Test]
  39. public void WithElementsStillLingering ()
  40. {
  41. var block = new BufferBlock<int> ();
  42. Assert.IsTrue (block.Post (42));
  43. block.Complete ();
  44. Assert.IsFalse (block.Completion.Wait (100));
  45. Assert.IsFalse (block.Completion.IsCompleted);
  46. Assert.AreEqual (TaskStatus.WaitingForActivation, block.Completion.Status);
  47. Assert.AreEqual (42, block.Receive ());
  48. Assert.IsTrue (block.Completion.Wait (1000));
  49. Assert.IsTrue (block.Completion.IsCompleted);
  50. Assert.AreEqual (TaskStatus.RanToCompletion, block.Completion.Status);
  51. }
  52. [Test]
  53. public void WithElementsStillLingeringButFaulted ()
  54. {
  55. var block = new BufferBlock<int> ();
  56. Assert.IsTrue (block.Post (42));
  57. ((IDataflowBlock)block).Fault (new Exception ());
  58. AssertEx.Throws<AggregateException> (() => block.Completion.Wait (1000));
  59. Assert.IsTrue (block.Completion.IsCompleted);
  60. Assert.AreEqual (TaskStatus.Faulted, block.Completion.Status);
  61. Assert.IsFalse (block.Post (43));
  62. }
  63. [Test]
  64. public void WithElementsStillLingeringButCancelled ()
  65. {
  66. var tokenSource = new CancellationTokenSource ();
  67. var block = new BufferBlock<int> (
  68. new DataflowBlockOptions { CancellationToken = tokenSource.Token });
  69. Assert.IsTrue (block.Post (42));
  70. tokenSource.Cancel ();
  71. var ae = AssertEx.Throws<AggregateException> (
  72. () => block.Completion.Wait (1000));
  73. Assert.AreEqual (1, ae.InnerExceptions.Count);
  74. Assert.AreEqual (typeof(TaskCanceledException), ae.InnerException.GetType ());
  75. Assert.IsTrue (block.Completion.IsCompleted);
  76. Assert.AreEqual (TaskStatus.Canceled, block.Completion.Status);
  77. Assert.IsFalse (block.Post (43));
  78. }
  79. static IEnumerable<Tuple<IDataflowBlock, ITargetBlock<T>>>
  80. GetJoinBlocksWithTargets<T> ()
  81. {
  82. Func<IDataflowBlock, ITargetBlock<T>, Tuple<IDataflowBlock, ITargetBlock<T>>>
  83. createTuple = Tuple.Create;
  84. var joinBlock = new JoinBlock<T, T> ();
  85. yield return createTuple (joinBlock, joinBlock.Target1);
  86. var joinBlock3 = new JoinBlock<T, T, T> ();
  87. yield return createTuple (joinBlock3, joinBlock3.Target1);
  88. var batchedJoinBlock = new BatchedJoinBlock<T, T> (2);
  89. yield return createTuple (batchedJoinBlock, batchedJoinBlock.Target1);
  90. var batchedJoinBlock3 = new BatchedJoinBlock<T, T, T> (2);
  91. yield return createTuple (batchedJoinBlock3, batchedJoinBlock3.Target1);
  92. }
  93. [Test]
  94. public void JoinTargetCompletitionTest ()
  95. {
  96. foreach (var tuple in GetJoinBlocksWithTargets<int> ()) {
  97. AssertEx.Throws<NotSupportedException> (
  98. () => { var x = tuple.Item2.Completion; });
  99. Assert.IsTrue (tuple.Item2.Post (1));
  100. tuple.Item2.Complete ();
  101. Assert.IsFalse (tuple.Item2.Post (2));
  102. }
  103. foreach (var tuple in GetJoinBlocksWithTargets<int> ()) {
  104. Assert.IsTrue (tuple.Item2.Post (1));
  105. tuple.Item1.Complete ();
  106. Assert.IsFalse (tuple.Item2.Post (2));
  107. }
  108. }
  109. [Test]
  110. public void MultipleFaultsTest ()
  111. {
  112. IDataflowBlock block = new BufferBlock<int> ();
  113. block.Fault (new Exception ("1"));
  114. // second exception should be ignored
  115. block.Fault (new Exception ("2"));
  116. Thread.Sleep (100);
  117. Assert.IsTrue (block.Completion.IsFaulted);
  118. var exception = block.Completion.Exception;
  119. Assert.IsNotNull (exception);
  120. Assert.AreEqual (1, exception.InnerExceptions.Count);
  121. Assert.AreEqual ("1", exception.InnerException.Message);
  122. }
  123. [Test]
  124. public void MultipleFaultsWhileExecutingTest ()
  125. {
  126. var evt = new ManualResetEventSlim ();
  127. var actionBlock = new ActionBlock<int> (_ => evt.Wait ());
  128. IDataflowBlock dataflowBlock = actionBlock;
  129. actionBlock.Post (1);
  130. Thread.Sleep (100);
  131. dataflowBlock.Fault (new Exception ("1"));
  132. // second exception should still be ignored
  133. dataflowBlock.Fault (new Exception ("2"));
  134. Thread.Sleep (100);
  135. Assert.IsFalse (actionBlock.Completion.IsCompleted);
  136. evt.Set ();
  137. Thread.Sleep (100);
  138. Assert.IsTrue (actionBlock.Completion.IsFaulted);
  139. var exception = actionBlock.Completion.Exception;
  140. Assert.IsNotNull (exception);
  141. Assert.AreEqual (1, exception.InnerExceptions.Count);
  142. Assert.AreEqual ("1", exception.InnerException.Message);
  143. }
  144. [Test]
  145. public void MultipleExceptionsTest ()
  146. {
  147. // use barrier to make sure both threads have time to start
  148. var barrier = new Barrier (2);
  149. var block = new ActionBlock<int> (
  150. _ =>
  151. {
  152. barrier.SignalAndWait ();
  153. throw new Exception ();
  154. },
  155. // strictly speaking, the actions are not guaranteed to run in parallel,
  156. // but there is no way to test this otherwise
  157. new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = -1 });
  158. block.Post (1);
  159. block.Post (2);
  160. var exception =
  161. AssertEx.Throws<AggregateException> (() => block.Completion.Wait (1000));
  162. Assert.AreEqual (2, exception.InnerExceptions.Count);
  163. }
  164. [Test]
  165. public void ExceptionAndFaultTest ()
  166. {
  167. var block = new ActionBlock<int> (
  168. _ => { throw new Exception ("action"); },
  169. new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = -1 });
  170. block.Post (1);
  171. Thread.Sleep (100);
  172. ((IDataflowBlock)block).Fault (new Exception ("fault"));
  173. var exception =
  174. AssertEx.Throws<AggregateException> (() => block.Completion.Wait (1000));
  175. Assert.AreEqual (1, exception.InnerExceptions.Count);
  176. Assert.AreEqual ("action", exception.InnerException.Message);
  177. }
  178. [Test]
  179. public void FaultAndExceptionTest ()
  180. {
  181. var evt = new ManualResetEventSlim ();
  182. var block = new ActionBlock<int> (
  183. _ =>
  184. {
  185. evt.Wait ();
  186. throw new Exception ("action");
  187. },
  188. new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = -1 });
  189. block.Post (1);
  190. Thread.Sleep (100);
  191. ((IDataflowBlock)block).Fault (new Exception ("fault1"));
  192. ((IDataflowBlock)block).Fault (new Exception ("fault2"));
  193. evt.Set ();
  194. var exception =
  195. AssertEx.Throws<AggregateException> (() => block.Completion.Wait (100));
  196. Assert.AreEqual (2, exception.InnerExceptions.Count);
  197. CollectionAssert.AreEqual (new[] { "fault1", "action" },
  198. exception.InnerExceptions.Select (e => e.Message).ToArray ());
  199. }
  200. [Test]
  201. public void ExceptionAndCancelTest ()
  202. {
  203. var tokenSource = new CancellationTokenSource ();
  204. var block = new ActionBlock<int> (
  205. _ => { throw new Exception ("action"); },
  206. new ExecutionDataflowBlockOptions
  207. { MaxDegreeOfParallelism = -1, CancellationToken = tokenSource.Token });
  208. block.Post (1);
  209. Thread.Sleep (100);
  210. tokenSource.Cancel ();
  211. var exception =
  212. AssertEx.Throws<AggregateException> (() => block.Completion.Wait (1000));
  213. Assert.AreEqual (1, exception.InnerExceptions.Count);
  214. Assert.AreEqual ("action", exception.InnerException.Message);
  215. }
  216. [Test]
  217. public void CancelAndExceptionTest ()
  218. {
  219. var tokenSource = new CancellationTokenSource ();
  220. var evt = new ManualResetEventSlim ();
  221. var block = new ActionBlock<int> (
  222. _ =>
  223. {
  224. evt.Wait ();
  225. throw new Exception ("action");
  226. },
  227. new ExecutionDataflowBlockOptions
  228. { MaxDegreeOfParallelism = -1, CancellationToken = tokenSource.Token });
  229. block.Post (1);
  230. Thread.Sleep (100);
  231. tokenSource.Cancel ();
  232. evt.Set ();
  233. var exception =
  234. AssertEx.Throws<AggregateException> (() => block.Completion.Wait (1000));
  235. Assert.AreEqual (1, exception.InnerExceptions.Count);
  236. Assert.AreEqual ("action", exception.InnerException.Message);
  237. }
  238. [Test]
  239. public void CancelAndFaultTest ()
  240. {
  241. var tokenSource = new CancellationTokenSource ();
  242. var block = new BufferBlock<int> (
  243. new DataflowBlockOptions { CancellationToken = tokenSource.Token });
  244. tokenSource.Cancel ();
  245. Thread.Sleep (100);
  246. ((IDataflowBlock)block).Fault (new Exception ("fault"));
  247. var exception =
  248. AssertEx.Throws<AggregateException> (() => block.Completion.Wait (1000));
  249. Assert.AreEqual (1, exception.InnerExceptions.Count);
  250. Assert.AreEqual (typeof(TaskCanceledException),
  251. exception.InnerException.GetType ());
  252. }
  253. [Test]
  254. public void CancelAndFaultWhileExecutingTest ()
  255. {
  256. var tokenSource = new CancellationTokenSource ();
  257. var evt = new ManualResetEventSlim ();
  258. var block = new ActionBlock<int> (
  259. _ => evt.Wait (),
  260. new ExecutionDataflowBlockOptions
  261. { MaxDegreeOfParallelism = -1, CancellationToken = tokenSource.Token });
  262. block.Post (1);
  263. Thread.Sleep (100);
  264. tokenSource.Cancel ();
  265. Thread.Sleep (100);
  266. ((IDataflowBlock)block).Fault (new Exception ("fault"));
  267. evt.Set ();
  268. Thread.Sleep (100);
  269. var exception =
  270. AssertEx.Throws<AggregateException> (() => block.Completion.Wait (1000));
  271. Assert.AreEqual (1, exception.InnerExceptions.Count);
  272. Assert.AreEqual (typeof(TaskCanceledException),
  273. exception.InnerException.GetType ());
  274. }
  275. [Test]
  276. public void FaultAndCancelWhileExecutingTest ()
  277. {
  278. var tokenSource = new CancellationTokenSource ();
  279. var evt = new ManualResetEventSlim ();
  280. var block = new ActionBlock<int> (
  281. _ => evt.Wait (),
  282. new ExecutionDataflowBlockOptions
  283. { MaxDegreeOfParallelism = -1, CancellationToken = tokenSource.Token });
  284. block.Post (1);
  285. Thread.Sleep (100);
  286. ((IDataflowBlock)block).Fault (new Exception ("fault"));
  287. Thread.Sleep (100);
  288. tokenSource.Cancel ();
  289. evt.Set ();
  290. Thread.Sleep (100);
  291. var exception =
  292. AssertEx.Throws<AggregateException> (() => block.Completion.Wait (1000));
  293. Assert.AreEqual (1, exception.InnerExceptions.Count);
  294. Assert.AreEqual ("fault", exception.InnerException.Message);
  295. }
  296. }
  297. }