|
|
@@ -82,11 +82,32 @@ namespace System.Linq.Parallel.QueryNodes
|
|
|
(e, c) => resultSelector (e, c)))
|
|
|
.ToList ();
|
|
|
}
|
|
|
-
|
|
|
- // This one is gonna be tricky
|
|
|
+
|
|
|
internal override IList<IEnumerable<KeyValuePair<long, TResult>>> GetOrderedEnumerables (QueryOptions options)
|
|
|
{
|
|
|
- throw new NotImplementedException ();
|
|
|
+ var source = Parent.GetOrderedEnumerables (options);
|
|
|
+ var sizeRequests = new Tuple<long, int, IEnumerable<TCollection>> [options.PartitionCount];
|
|
|
+ if (collectionSelectorIndexed == null)
|
|
|
+ collectionSelectorIndexed = (e, i) => collectionSelector (e);
|
|
|
+ long deviation = 0;
|
|
|
+
|
|
|
+ Barrier barrier = new Barrier (options.PartitionCount, delegate {
|
|
|
+ Array.Sort (sizeRequests, (e1, e2) => e1.Item1.CompareTo (e2.Item1));
|
|
|
+ long newDeviation = deviation;
|
|
|
+ for (int i = sizeRequests.Length - 1; i >= 0; i--) {
|
|
|
+ var reqi = sizeRequests[i];
|
|
|
+ long newIndex = reqi.Item1 + deviation;
|
|
|
+ newDeviation += reqi.Item2 - 1;
|
|
|
+ for (int j = i - 1; j >= 0; j--)
|
|
|
+ newIndex += sizeRequests[j].Item2 - 1;
|
|
|
+ sizeRequests[i] = Tuple.Create (newIndex, reqi.Item2, reqi.Item3);
|
|
|
+ }
|
|
|
+ deviation = newDeviation;
|
|
|
+ });
|
|
|
+
|
|
|
+ return source
|
|
|
+ .Select ((i, ind) => GetOrderedEnumerableInternal (i, sizeRequests, ind, barrier))
|
|
|
+ .ToList ();
|
|
|
}
|
|
|
|
|
|
IEnumerable<TResult> GetEnumerableInternal<T> (IEnumerable<T> source,
|
|
|
@@ -99,15 +120,43 @@ namespace System.Linq.Parallel.QueryNodes
|
|
|
}
|
|
|
|
|
|
IEnumerable<KeyValuePair<long, TResult>> GetOrderedEnumerableInternal (IEnumerable<KeyValuePair<long, TSource>> source,
|
|
|
+ Tuple<long, int, IEnumerable<TCollection>>[] sizeRequests,
|
|
|
+ int index,
|
|
|
Barrier barrier)
|
|
|
{
|
|
|
- foreach (KeyValuePair<long, TSource> element in source) {
|
|
|
- IEnumerable<TCollection> collection = collectionSelectorIndexed (element.Value, (int)element.Key);
|
|
|
-
|
|
|
- foreach (TCollection item in collection)
|
|
|
- yield return new KeyValuePair<long, TResult> (-1, resultSelector (element.Value, item));
|
|
|
+ try {
|
|
|
+ foreach (KeyValuePair<long, TSource> element in source) {
|
|
|
+ IEnumerable<TCollection> collection = collectionSelectorIndexed (element.Value, (int)element.Key);
|
|
|
+ sizeRequests[index] = Tuple.Create (element.Key, GetCount (ref collection), collection);
|
|
|
+
|
|
|
+ barrier.SignalAndWait ();
|
|
|
+
|
|
|
+ long i = sizeRequests[index].Item1;
|
|
|
+ collection = sizeRequests[index].Item3;
|
|
|
+
|
|
|
+ foreach (TCollection item in collection)
|
|
|
+ yield return new KeyValuePair<long, TResult> (i++, resultSelector (element.Value, item));
|
|
|
+ }
|
|
|
+ } finally {
|
|
|
+ barrier.RemoveParticipant ();
|
|
|
}
|
|
|
}
|
|
|
+
|
|
|
+ /* If getting Count is a O(1) operation (i.e. actual is a ICollection) then return it immediatly
|
|
|
+ * if not process the IEnumerable into a List and return the Count from that (i.e. enumerable
|
|
|
+ * processing will only happen once in case of e.g. a Linq query)
|
|
|
+ */
|
|
|
+ static int GetCount<T> (ref IEnumerable<T> actual)
|
|
|
+ {
|
|
|
+ ICollection coll = actual as ICollection;
|
|
|
+ if (coll != null)
|
|
|
+ return coll.Count;
|
|
|
+
|
|
|
+ var foo = actual.ToList ();
|
|
|
+ actual = foo;
|
|
|
+
|
|
|
+ return foo.Count;
|
|
|
+ }
|
|
|
}
|
|
|
}
|
|
|
|