OptionsTest.cs 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419
  1. // OptionsTest.cs
  2. //
  3. // Copyright (c) 2012 Petr Onderka
  4. //
  5. // Permission is hereby granted, free of charge, to any person obtaining a copy
  6. // of this software and associated documentation files (the "Software"), to deal
  7. // in the Software without restriction, including without limitation the rights
  8. // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
  9. // copies of the Software, and to permit persons to whom the Software is
  10. // furnished to do so, subject to the following conditions:
  11. //
  12. // The above copyright notice and this permission notice shall be included in
  13. // all copies or substantial portions of the Software.
  14. //
  15. // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
  16. // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
  17. // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
  18. // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
  19. // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
  20. // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
  21. // THE SOFTWARE.
  22. using System;
  23. using System.Collections.Concurrent;
  24. using System.Collections.Generic;
  25. using System.Linq;
  26. using System.Threading;
  27. using System.Threading.Tasks;
  28. using System.Threading.Tasks.Dataflow;
  29. using NUnit.Framework;
  30. namespace MonoTests.System.Threading.Tasks.Dataflow {
  31. [TestFixture]
  32. public class OptionsTest {
  33. [Test]
  34. public void NameFormatTest ()
  35. {
  36. var constant = "constant";
  37. foreach (var block in Blocks.CreateBlocksWithNameFormat (constant))
  38. Assert.AreEqual (constant, block.ToString ());
  39. foreach (var block in Blocks.CreateBlocksWithNameFormat ("{0}"))
  40. Assert.AreEqual (block.GetType ().Name, block.ToString ());
  41. foreach (var block in Blocks.CreateBlocksWithNameFormat ("{1}"))
  42. Assert.AreEqual (block.Completion.Id.ToString (), block.ToString ());
  43. }
  44. [Test]
  45. public void CancellationTest ()
  46. {
  47. var source = new CancellationTokenSource ();
  48. var blocks = Blocks.CreateBlocksWithCancellationToken (source.Token).ToArray ();
  49. foreach (var block in blocks)
  50. Assert.IsFalse (block.Completion.Wait (100));
  51. source.Cancel ();
  52. foreach (var block in blocks) {
  53. var ae =
  54. AssertEx.Throws<AggregateException> (() => block.Completion.Wait (1000));
  55. Assert.AreEqual (1, ae.InnerExceptions.Count);
  56. Assert.IsInstanceOfType (typeof(TaskCanceledException), ae.InnerExceptions [0]);
  57. Assert.IsTrue (block.Completion.IsCanceled);
  58. }
  59. }
  60. static IEnumerable<int[]> GetTaskIdsForExecutionsOptions (
  61. ExecutionDataflowBlockOptions options)
  62. {
  63. var blockFactories =
  64. new Func<ConcurrentQueue<Tuple<int, int>>, ITargetBlock<int>>[]
  65. {
  66. q => new ActionBlock<int> (
  67. i => q.Enqueue (Tuple.Create (i, Task.CurrentId.Value)), options),
  68. q => new TransformBlock<int, int> (i =>
  69. {
  70. q.Enqueue (Tuple.Create (i, Task.CurrentId.Value));
  71. return i;
  72. }, options),
  73. q => new TransformManyBlock<int, int> (i =>
  74. {
  75. q.Enqueue (Tuple.Create (i, Task.CurrentId.Value));
  76. return new[] { i };
  77. }, options)
  78. };
  79. foreach (var factory in blockFactories) {
  80. var queue = new ConcurrentQueue<Tuple<int, int>> ();
  81. var block = factory (queue);
  82. Assert.IsEmpty (queue);
  83. for (int i = 0; i < 100; i++)
  84. block.Post (i);
  85. block.Complete ();
  86. var source = block as ISourceBlock<int>;
  87. if (source != null) {
  88. Assert.IsFalse (block.Completion.Wait (100));
  89. source.LinkTo (new BufferBlock<int> ());
  90. }
  91. Assert.IsTrue (block.Completion.Wait (1000));
  92. CollectionAssert.AreEquivalent (
  93. Enumerable.Range (0, 100), queue.Select (t => t.Item1));
  94. yield return queue.Select (t => t.Item2).ToArray ();
  95. }
  96. }
  97. static int CalculateDegreeOfParallelism(IEnumerable<int> taskIds)
  98. {
  99. var firsts = new Dictionary<int, int> ();
  100. var lasts = new Dictionary<int, int> ();
  101. int i = 0;
  102. foreach (var taskId in taskIds) {
  103. if (!firsts.ContainsKey (taskId))
  104. firsts.Add (taskId, i);
  105. lasts [taskId] = i;
  106. i++;
  107. }
  108. int maxTime = i;
  109. var times =
  110. Enumerable.Repeat (Tuple.Create<int?, int?> (null, null), maxTime).ToArray ();
  111. foreach (var first in firsts)
  112. times [first.Value] = Tuple.Create<int?, int?> (
  113. first.Key, times [first.Value].Item2);
  114. foreach (var last in lasts)
  115. times [last.Value] = Tuple.Create<int?, int?> (
  116. times [last.Value].Item1, last.Key);
  117. int maxDop = 0;
  118. int dop = 0;
  119. foreach (var time in times) {
  120. if (time.Item1 != null)
  121. dop++;
  122. if (dop > maxDop)
  123. maxDop = dop;
  124. if (time.Item2 != null)
  125. dop--;
  126. }
  127. return maxDop;
  128. }
  129. [Test]
  130. public void MaxDegreeOfParallelismTest()
  131. {
  132. // loop to better test for race conditions
  133. // some that showed in this test were quite rare
  134. for (int i = 0; i < 10; i++)
  135. {
  136. var options = new ExecutionDataflowBlockOptions ();
  137. foreach (var taskIds in GetTaskIdsForExecutionsOptions(options))
  138. Assert.AreEqual (1, CalculateDegreeOfParallelism (taskIds));
  139. options = new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 2 };
  140. foreach (var taskIds in GetTaskIdsForExecutionsOptions (options))
  141. Assert.LessOrEqual (CalculateDegreeOfParallelism (taskIds), 2);
  142. options = new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 4 };
  143. foreach (var taskIds in GetTaskIdsForExecutionsOptions (options))
  144. Assert.LessOrEqual (CalculateDegreeOfParallelism (taskIds), 4);
  145. options = new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = -1 };
  146. foreach (var taskIds in GetTaskIdsForExecutionsOptions (options))
  147. Assert.LessOrEqual (CalculateDegreeOfParallelism (taskIds), taskIds.Length);
  148. }
  149. }
  150. [Test]
  151. public void MaxMessagesPerTaskTest()
  152. {
  153. var options = new ExecutionDataflowBlockOptions ();
  154. foreach (var taskIds in GetTaskIdsForExecutionsOptions (options))
  155. Assert.GreaterOrEqual (taskIds.Distinct ().Count (), 1);
  156. options = new ExecutionDataflowBlockOptions { MaxMessagesPerTask = 1 };
  157. foreach (var taskIds in GetTaskIdsForExecutionsOptions (options))
  158. Assert.AreEqual (100, taskIds.Distinct ().Count ());
  159. options = new ExecutionDataflowBlockOptions { MaxMessagesPerTask = 2 };
  160. foreach (var taskIds in GetTaskIdsForExecutionsOptions (options))
  161. Assert.GreaterOrEqual (taskIds.Distinct ().Count (), taskIds.Length / 2);
  162. options = new ExecutionDataflowBlockOptions { MaxMessagesPerTask = 4 };
  163. foreach (var taskIds in GetTaskIdsForExecutionsOptions (options))
  164. Assert.GreaterOrEqual (taskIds.Distinct ().Count (), taskIds.Length / 4);
  165. }
  166. [Test]
  167. public void TaskSchedulerTest ()
  168. {
  169. var scheduler = new TestScheduler ();
  170. int n = 0;
  171. var action = new ActionBlock<int> (
  172. i => Interlocked.Increment (ref n),
  173. new ExecutionDataflowBlockOptions { TaskScheduler = scheduler });
  174. Assert.IsTrue (action.Post (1));
  175. Assert.AreEqual (0, Volatile.Read (ref n));
  176. Assert.AreEqual (1, scheduler.ExecuteAll ());
  177. Assert.AreEqual (1, Volatile.Read (ref n));
  178. }
  179. [Test]
  180. public void DefaultSchedulerIsDefaultTest ()
  181. {
  182. var scheduler = new TestScheduler ();
  183. var factory = new TaskFactory (scheduler);
  184. ActionBlock<int> action = null;
  185. var task = factory.StartNew (() =>
  186. {
  187. Assert.AreEqual (scheduler, TaskScheduler.Current);
  188. action = new ActionBlock<int> (
  189. i => Assert.AreNotEqual (scheduler, TaskScheduler.Current));
  190. Assert.IsTrue (action.Post (1));
  191. action.Complete ();
  192. });
  193. Assert.AreEqual (1, scheduler.ExecuteAll ());
  194. Assert.IsNotNull (action);
  195. Assert.IsTrue (action.Completion.Wait (1000));
  196. Assert.IsTrue (task.Wait (0));
  197. }
  198. [Test]
  199. public void MaxMessagesDirectTest ()
  200. {
  201. var scheduler = new TestScheduler ();
  202. var source =
  203. new BufferBlock<int> (new DataflowBlockOptions { TaskScheduler = scheduler });
  204. var target =
  205. new BufferBlock<int> (new DataflowBlockOptions { TaskScheduler = scheduler });
  206. Assert.IsNotNull (
  207. source.LinkTo (target, new DataflowLinkOptions { MaxMessages = 1 }));
  208. Assert.IsTrue (source.Post (42));
  209. scheduler.ExecuteAll ();
  210. int item;
  211. Assert.IsTrue (target.TryReceive (null, out item));
  212. Assert.AreEqual (42, item);
  213. Assert.IsTrue (source.Post (43));
  214. scheduler.ExecuteAll ();
  215. Assert.IsFalse (target.TryReceive (null, out item));
  216. Assert.IsTrue (source.TryReceive (null, out item));
  217. Assert.AreEqual (43, item);
  218. }
  219. [Test]
  220. public void MaxMessagesPostponedTest ()
  221. {
  222. var scheduler = new TestScheduler ();
  223. var source =
  224. new BufferBlock<int> (new DataflowBlockOptions { TaskScheduler = scheduler });
  225. var target = new BufferBlock<int> (
  226. new DataflowBlockOptions { TaskScheduler = scheduler, BoundedCapacity = 1 });
  227. Assert.IsNotNull (
  228. source.LinkTo (target, new DataflowLinkOptions { MaxMessages = 2 }));
  229. Assert.IsTrue (source.Post (42));
  230. Assert.IsTrue (source.Post (43));
  231. Assert.IsTrue (source.Post (44));
  232. scheduler.ExecuteAll ();
  233. int item;
  234. Assert.IsTrue (target.TryReceive (null, out item));
  235. Assert.AreEqual (42, item);
  236. Assert.IsFalse (target.TryReceive (null, out item));
  237. scheduler.ExecuteAll ();
  238. Assert.IsTrue (target.TryReceive (null, out item));
  239. Assert.AreEqual (43, item);
  240. scheduler.ExecuteAll ();
  241. Assert.IsFalse (target.TryReceive (null, out item));
  242. Assert.IsTrue (source.TryReceive (null, out item));
  243. Assert.AreEqual (44, item);
  244. }
  245. [Test]
  246. public void MaxMessagesPostponedUnconsumedTest ()
  247. {
  248. var scheduler = new TestScheduler ();
  249. var source =
  250. new BufferBlock<int> (new DataflowBlockOptions { TaskScheduler = scheduler });
  251. var target =
  252. new BufferBlock<int> (
  253. new DataflowBlockOptions { TaskScheduler = scheduler, BoundedCapacity = 1 });
  254. Assert.IsNotNull (
  255. source.LinkTo (target, new DataflowLinkOptions { MaxMessages = 2 }));
  256. Assert.IsTrue (source.Post (42));
  257. Assert.IsTrue (source.Post (43));
  258. Assert.IsTrue (source.Post (44));
  259. Assert.IsTrue (source.Post (45));
  260. scheduler.ExecuteAll ();
  261. int item;
  262. Assert.IsTrue (source.TryReceive (null, out item));
  263. Assert.AreEqual (43, item);
  264. Assert.IsTrue (target.TryReceive (null, out item));
  265. Assert.AreEqual (42, item);
  266. Assert.IsFalse (target.TryReceive (null, out item));
  267. scheduler.ExecuteAll ();
  268. Assert.IsTrue (target.TryReceive (null, out item));
  269. Assert.AreEqual (44, item);
  270. scheduler.ExecuteAll ();
  271. Assert.IsFalse (target.TryReceive (null, out item));
  272. Assert.IsTrue (source.TryReceive (null, out item));
  273. Assert.AreEqual (45, item);
  274. }
  275. [Test]
  276. public void MaxMessagesBroadcastTest ()
  277. {
  278. var scheduler = new TestScheduler ();
  279. var source = new BroadcastBlock<int> (
  280. null, new DataflowBlockOptions { TaskScheduler = scheduler });
  281. var target = new BufferBlock<int>(
  282. new DataflowBlockOptions { TaskScheduler = scheduler, BoundedCapacity = 1 });
  283. Assert.IsNotNull (
  284. source.LinkTo (target, new DataflowLinkOptions { MaxMessages = 2 }));
  285. // should be accepted
  286. Assert.IsTrue (source.Post (42));
  287. scheduler.ExecuteAll ();
  288. // should be postponed, but counted into the limit
  289. Assert.IsTrue (source.Post (43));
  290. scheduler.ExecuteAll ();
  291. // shouldn't be even offered for now
  292. Assert.IsTrue (source.Post (44));
  293. scheduler.ExecuteAll ();
  294. int item;
  295. Assert.IsTrue (target.TryReceive (out item));
  296. Assert.AreEqual (42, item);
  297. scheduler.ExecuteAll ();
  298. Assert.IsTrue (target.TryReceive (out item));
  299. Assert.AreEqual (44, item);
  300. }
  301. [Test]
  302. public void MaxNumberOfGroupsWithConsumeToAcceptTest ()
  303. {
  304. ITargetBlock<int> block = new BatchBlock<int> (1,
  305. new GroupingDataflowBlockOptions { MaxNumberOfGroups = 1 });
  306. var evt = new ManualResetEventSlim ();
  307. Func<Task<Tuple<DataflowMessageStatus, bool>>> startTask =
  308. () => Task.Factory.StartNew (
  309. () =>
  310. {
  311. var sourceBlock = new TestSourceBlock<int> { ConsumeWaiter = evt.Wait };
  312. var header = new DataflowMessageHeader (1);
  313. sourceBlock.AddMessage (header, 1);
  314. var status = block.OfferMessage (header, 1, sourceBlock, true);
  315. return Tuple.Create (status, sourceBlock.WasConsumed (header));
  316. });
  317. var task1 = startTask ();
  318. var task2 = startTask ();
  319. Thread.Sleep (100);
  320. Assert.IsFalse (task1.IsCompleted);
  321. Assert.IsFalse (task2.IsCompleted);
  322. evt.Set ();
  323. Assert.IsTrue (Task.WaitAll (new Task[] { task1, task2 }, 1000));
  324. CollectionAssert.AreEquivalent (
  325. new[]
  326. {
  327. Tuple.Create (DataflowMessageStatus.Accepted, true),
  328. Tuple.Create (DataflowMessageStatus.DecliningPermanently, false)
  329. },
  330. new[] { task1.Result, task2.Result });
  331. }
  332. }
  333. }