| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419 |
- // OptionsTest.cs
- //
- // Copyright (c) 2012 Petr Onderka
- //
- // Permission is hereby granted, free of charge, to any person obtaining a copy
- // of this software and associated documentation files (the "Software"), to deal
- // in the Software without restriction, including without limitation the rights
- // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- // copies of the Software, and to permit persons to whom the Software is
- // furnished to do so, subject to the following conditions:
- //
- // The above copyright notice and this permission notice shall be included in
- // all copies or substantial portions of the Software.
- //
- // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
- // THE SOFTWARE.
- using System;
- using System.Collections.Concurrent;
- using System.Collections.Generic;
- using System.Linq;
- using System.Threading;
- using System.Threading.Tasks;
- using System.Threading.Tasks.Dataflow;
- using NUnit.Framework;
- namespace MonoTests.System.Threading.Tasks.Dataflow {
- [TestFixture]
- public class OptionsTest {
- [Test]
- public void NameFormatTest ()
- {
- var constant = "constant";
- foreach (var block in Blocks.CreateBlocksWithNameFormat (constant))
- Assert.AreEqual (constant, block.ToString ());
- foreach (var block in Blocks.CreateBlocksWithNameFormat ("{0}"))
- Assert.AreEqual (block.GetType ().Name, block.ToString ());
- foreach (var block in Blocks.CreateBlocksWithNameFormat ("{1}"))
- Assert.AreEqual (block.Completion.Id.ToString (), block.ToString ());
- }
- [Test]
- public void CancellationTest ()
- {
- var source = new CancellationTokenSource ();
- var blocks = Blocks.CreateBlocksWithCancellationToken (source.Token).ToArray ();
- foreach (var block in blocks)
- Assert.IsFalse (block.Completion.Wait (100));
- source.Cancel ();
- foreach (var block in blocks) {
- var ae =
- AssertEx.Throws<AggregateException> (() => block.Completion.Wait (1000));
- Assert.AreEqual (1, ae.InnerExceptions.Count);
- Assert.IsInstanceOfType (typeof(TaskCanceledException), ae.InnerExceptions [0]);
- Assert.IsTrue (block.Completion.IsCanceled);
- }
- }
- static IEnumerable<int[]> GetTaskIdsForExecutionsOptions (
- ExecutionDataflowBlockOptions options)
- {
- var blockFactories =
- new Func<ConcurrentQueue<Tuple<int, int>>, ITargetBlock<int>>[]
- {
- q => new ActionBlock<int> (
- i => q.Enqueue (Tuple.Create (i, Task.CurrentId.Value)), options),
- q => new TransformBlock<int, int> (i =>
- {
- q.Enqueue (Tuple.Create (i, Task.CurrentId.Value));
- return i;
- }, options),
- q => new TransformManyBlock<int, int> (i =>
- {
- q.Enqueue (Tuple.Create (i, Task.CurrentId.Value));
- return new[] { i };
- }, options)
- };
- foreach (var factory in blockFactories) {
- var queue = new ConcurrentQueue<Tuple<int, int>> ();
- var block = factory (queue);
- Assert.IsEmpty (queue);
- for (int i = 0; i < 100; i++)
- block.Post (i);
- block.Complete ();
- var source = block as ISourceBlock<int>;
- if (source != null) {
- Assert.IsFalse (block.Completion.Wait (100));
- source.LinkTo (new BufferBlock<int> ());
- }
- Assert.IsTrue (block.Completion.Wait (1000));
- CollectionAssert.AreEquivalent (
- Enumerable.Range (0, 100), queue.Select (t => t.Item1));
- yield return queue.Select (t => t.Item2).ToArray ();
- }
- }
- static int CalculateDegreeOfParallelism(IEnumerable<int> taskIds)
- {
- var firsts = new Dictionary<int, int> ();
- var lasts = new Dictionary<int, int> ();
- int i = 0;
- foreach (var taskId in taskIds) {
- if (!firsts.ContainsKey (taskId))
- firsts.Add (taskId, i);
- lasts [taskId] = i;
- i++;
- }
- int maxTime = i;
- var times =
- Enumerable.Repeat (Tuple.Create<int?, int?> (null, null), maxTime).ToArray ();
- foreach (var first in firsts)
- times [first.Value] = Tuple.Create<int?, int?> (
- first.Key, times [first.Value].Item2);
- foreach (var last in lasts)
- times [last.Value] = Tuple.Create<int?, int?> (
- times [last.Value].Item1, last.Key);
- int maxDop = 0;
- int dop = 0;
- foreach (var time in times) {
- if (time.Item1 != null)
- dop++;
- if (dop > maxDop)
- maxDop = dop;
- if (time.Item2 != null)
- dop--;
- }
- return maxDop;
- }
- [Test]
- public void MaxDegreeOfParallelismTest()
- {
- // loop to better test for race conditions
- // some that showed in this test were quite rare
- for (int i = 0; i < 10; i++)
- {
- var options = new ExecutionDataflowBlockOptions ();
- foreach (var taskIds in GetTaskIdsForExecutionsOptions(options))
- Assert.AreEqual (1, CalculateDegreeOfParallelism (taskIds));
- options = new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 2 };
- foreach (var taskIds in GetTaskIdsForExecutionsOptions (options))
- Assert.LessOrEqual (CalculateDegreeOfParallelism (taskIds), 2);
- options = new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 4 };
- foreach (var taskIds in GetTaskIdsForExecutionsOptions (options))
- Assert.LessOrEqual (CalculateDegreeOfParallelism (taskIds), 4);
- options = new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = -1 };
- foreach (var taskIds in GetTaskIdsForExecutionsOptions (options))
- Assert.LessOrEqual (CalculateDegreeOfParallelism (taskIds), taskIds.Length);
- }
- }
- [Test]
- public void MaxMessagesPerTaskTest()
- {
- var options = new ExecutionDataflowBlockOptions ();
- foreach (var taskIds in GetTaskIdsForExecutionsOptions (options))
- Assert.GreaterOrEqual (taskIds.Distinct ().Count (), 1);
- options = new ExecutionDataflowBlockOptions { MaxMessagesPerTask = 1 };
- foreach (var taskIds in GetTaskIdsForExecutionsOptions (options))
- Assert.AreEqual (100, taskIds.Distinct ().Count ());
- options = new ExecutionDataflowBlockOptions { MaxMessagesPerTask = 2 };
- foreach (var taskIds in GetTaskIdsForExecutionsOptions (options))
- Assert.GreaterOrEqual (taskIds.Distinct ().Count (), taskIds.Length / 2);
- options = new ExecutionDataflowBlockOptions { MaxMessagesPerTask = 4 };
- foreach (var taskIds in GetTaskIdsForExecutionsOptions (options))
- Assert.GreaterOrEqual (taskIds.Distinct ().Count (), taskIds.Length / 4);
- }
- [Test]
- public void TaskSchedulerTest ()
- {
- var scheduler = new TestScheduler ();
- int n = 0;
- var action = new ActionBlock<int> (
- i => Interlocked.Increment (ref n),
- new ExecutionDataflowBlockOptions { TaskScheduler = scheduler });
- Assert.IsTrue (action.Post (1));
- Assert.AreEqual (0, Volatile.Read (ref n));
- Assert.AreEqual (1, scheduler.ExecuteAll ());
- Assert.AreEqual (1, Volatile.Read (ref n));
- }
- [Test]
- public void DefaultSchedulerIsDefaultTest ()
- {
- var scheduler = new TestScheduler ();
- var factory = new TaskFactory (scheduler);
- ActionBlock<int> action = null;
- var task = factory.StartNew (() =>
- {
- Assert.AreEqual (scheduler, TaskScheduler.Current);
- action = new ActionBlock<int> (
- i => Assert.AreNotEqual (scheduler, TaskScheduler.Current));
- Assert.IsTrue (action.Post (1));
- action.Complete ();
- });
- Assert.AreEqual (1, scheduler.ExecuteAll ());
- Assert.IsNotNull (action);
- Assert.IsTrue (action.Completion.Wait (1000));
- Assert.IsTrue (task.Wait (0));
- }
- [Test]
- public void MaxMessagesDirectTest ()
- {
- var scheduler = new TestScheduler ();
- var source =
- new BufferBlock<int> (new DataflowBlockOptions { TaskScheduler = scheduler });
- var target =
- new BufferBlock<int> (new DataflowBlockOptions { TaskScheduler = scheduler });
- Assert.IsNotNull (
- source.LinkTo (target, new DataflowLinkOptions { MaxMessages = 1 }));
- Assert.IsTrue (source.Post (42));
- scheduler.ExecuteAll ();
- int item;
- Assert.IsTrue (target.TryReceive (null, out item));
- Assert.AreEqual (42, item);
- Assert.IsTrue (source.Post (43));
- scheduler.ExecuteAll ();
- Assert.IsFalse (target.TryReceive (null, out item));
- Assert.IsTrue (source.TryReceive (null, out item));
- Assert.AreEqual (43, item);
- }
- [Test]
- public void MaxMessagesPostponedTest ()
- {
- var scheduler = new TestScheduler ();
- var source =
- new BufferBlock<int> (new DataflowBlockOptions { TaskScheduler = scheduler });
- var target = new BufferBlock<int> (
- new DataflowBlockOptions { TaskScheduler = scheduler, BoundedCapacity = 1 });
- Assert.IsNotNull (
- source.LinkTo (target, new DataflowLinkOptions { MaxMessages = 2 }));
- Assert.IsTrue (source.Post (42));
- Assert.IsTrue (source.Post (43));
- Assert.IsTrue (source.Post (44));
- scheduler.ExecuteAll ();
- int item;
- Assert.IsTrue (target.TryReceive (null, out item));
- Assert.AreEqual (42, item);
- Assert.IsFalse (target.TryReceive (null, out item));
- scheduler.ExecuteAll ();
- Assert.IsTrue (target.TryReceive (null, out item));
- Assert.AreEqual (43, item);
- scheduler.ExecuteAll ();
- Assert.IsFalse (target.TryReceive (null, out item));
- Assert.IsTrue (source.TryReceive (null, out item));
- Assert.AreEqual (44, item);
- }
- [Test]
- public void MaxMessagesPostponedUnconsumedTest ()
- {
- var scheduler = new TestScheduler ();
- var source =
- new BufferBlock<int> (new DataflowBlockOptions { TaskScheduler = scheduler });
- var target =
- new BufferBlock<int> (
- new DataflowBlockOptions { TaskScheduler = scheduler, BoundedCapacity = 1 });
- Assert.IsNotNull (
- source.LinkTo (target, new DataflowLinkOptions { MaxMessages = 2 }));
- Assert.IsTrue (source.Post (42));
- Assert.IsTrue (source.Post (43));
- Assert.IsTrue (source.Post (44));
- Assert.IsTrue (source.Post (45));
- scheduler.ExecuteAll ();
- int item;
- Assert.IsTrue (source.TryReceive (null, out item));
- Assert.AreEqual (43, item);
- Assert.IsTrue (target.TryReceive (null, out item));
- Assert.AreEqual (42, item);
- Assert.IsFalse (target.TryReceive (null, out item));
- scheduler.ExecuteAll ();
- Assert.IsTrue (target.TryReceive (null, out item));
- Assert.AreEqual (44, item);
- scheduler.ExecuteAll ();
- Assert.IsFalse (target.TryReceive (null, out item));
- Assert.IsTrue (source.TryReceive (null, out item));
- Assert.AreEqual (45, item);
- }
- [Test]
- public void MaxMessagesBroadcastTest ()
- {
- var scheduler = new TestScheduler ();
- var source = new BroadcastBlock<int> (
- null, new DataflowBlockOptions { TaskScheduler = scheduler });
- var target = new BufferBlock<int>(
- new DataflowBlockOptions { TaskScheduler = scheduler, BoundedCapacity = 1 });
- Assert.IsNotNull (
- source.LinkTo (target, new DataflowLinkOptions { MaxMessages = 2 }));
- // should be accepted
- Assert.IsTrue (source.Post (42));
- scheduler.ExecuteAll ();
- // should be postponed, but counted into the limit
- Assert.IsTrue (source.Post (43));
- scheduler.ExecuteAll ();
- // shouldn't be even offered for now
- Assert.IsTrue (source.Post (44));
- scheduler.ExecuteAll ();
- int item;
- Assert.IsTrue (target.TryReceive (out item));
- Assert.AreEqual (42, item);
- scheduler.ExecuteAll ();
- Assert.IsTrue (target.TryReceive (out item));
- Assert.AreEqual (44, item);
- }
- [Test]
- public void MaxNumberOfGroupsWithConsumeToAcceptTest ()
- {
- ITargetBlock<int> block = new BatchBlock<int> (1,
- new GroupingDataflowBlockOptions { MaxNumberOfGroups = 1 });
- var evt = new ManualResetEventSlim ();
- Func<Task<Tuple<DataflowMessageStatus, bool>>> startTask =
- () => Task.Factory.StartNew (
- () =>
- {
- var sourceBlock = new TestSourceBlock<int> { ConsumeWaiter = evt.Wait };
- var header = new DataflowMessageHeader (1);
- sourceBlock.AddMessage (header, 1);
- var status = block.OfferMessage (header, 1, sourceBlock, true);
- return Tuple.Create (status, sourceBlock.WasConsumed (header));
- });
- var task1 = startTask ();
- var task2 = startTask ();
- Thread.Sleep (100);
- Assert.IsFalse (task1.IsCompleted);
- Assert.IsFalse (task2.IsCompleted);
- evt.Set ();
- Assert.IsTrue (Task.WaitAll (new Task[] { task1, task2 }, 1000));
- CollectionAssert.AreEquivalent (
- new[]
- {
- Tuple.Create (DataflowMessageStatus.Accepted, true),
- Tuple.Create (DataflowMessageStatus.DecliningPermanently, false)
- },
- new[] { task1.Result, task2.Result });
- }
- }
- }
|