||
- //------------------------------------------------------------
- // Copyright (c) Microsoft Corporation. All rights reserved.
- //------------------------------------------------------------
- namespace System.Runtime
- {
- using System;
- using System.Collections.Generic;
- using System.Threading;
- [Fx.Tag.SynchronizationPrimitive(Fx.Tag.BlocksUsing.PrivatePrimitive, SupportsAsync = true, ReleaseMethod = "Dispatch")]
- sealed class InputQueue<T> : IDisposable where T : class
- {
- static Action<object> completeOutstandingReadersCallback;
- static Action<object> completeWaitersFalseCallback;
- static Action<object> completeWaitersTrueCallback;
- static Action<object> onDispatchCallback;
- static Action<object> onInvokeDequeuedCallback;
- QueueState queueState;
- [Fx.Tag.SynchronizationObject(Blocking = false, Kind = Fx.Tag.SynchronizationKind.LockStatement)]
- ItemQueue itemQueue;
- [Fx.Tag.SynchronizationObject]
- Queue<IQueueReader> readerQueue;
- [Fx.Tag.SynchronizationObject]
- List<IQueueWaiter> waiterList;
- public InputQueue()
- {
- this.itemQueue = new ItemQueue();
- this.readerQueue = new Queue<IQueueReader>();
- this.waiterList = new List<IQueueWaiter>();
- this.queueState = QueueState.Open;
- }
- public InputQueue(Func<Action<AsyncCallback, IAsyncResult>> asyncCallbackGenerator)
- : this()
- {
- Fx.Assert(asyncCallbackGenerator != null, "use default ctor if you don't have a generator");
- AsyncCallbackGenerator = asyncCallbackGenerator;
- }
- public int PendingCount
- {
- get
- {
- lock (ThisLock)
- {
- return this.itemQueue.ItemCount;
- }
- }
- }
- // Users like ServiceModel can hook this abort ICommunicationObject or handle other non-IDisposable objects
- public Action<T> DisposeItemCallback
- {
- get;
- set;
- }
- // Users like ServiceModel can hook this to wrap the AsyncQueueReader callback functionality for tracing, etc
- Func<Action<AsyncCallback, IAsyncResult>> AsyncCallbackGenerator
- {
- get;
- set;
- }
- object ThisLock
- {
- get { return this.itemQueue; }
- }
- public IAsyncResult BeginDequeue(TimeSpan timeout, AsyncCallback callback, object state)
- {
- Item item = default(Item);
- lock (ThisLock)
- {
- if (queueState == QueueState.Open)
- {
- if (itemQueue.HasAvailableItem)
- {
- item = itemQueue.DequeueAvailableItem();
- }
- else
- {
- AsyncQueueReader reader = new AsyncQueueReader(this, timeout, callback, state);
- readerQueue.Enqueue(reader);
- return reader;
- }
- }
- else if (queueState == QueueState.Shutdown)
- {
- if (itemQueue.HasAvailableItem)
- {
- item = itemQueue.DequeueAvailableItem();
- }
- else if (itemQueue.HasAnyItem)
- {
- AsyncQueueReader reader = new AsyncQueueReader(this, timeout, callback, state);
- readerQueue.Enqueue(reader);
- return reader;
- }
- }
- }
- InvokeDequeuedCallback(item.DequeuedCallback);
- return new CompletedAsyncResult<T>(item.GetValue(), callback, state);
- }
- public IAsyncResult BeginWaitForItem(TimeSpan timeout, AsyncCallback callback, object state)
- {
- lock (ThisLock)
- {
- if (queueState == QueueState.Open)
- {
- if (!itemQueue.HasAvailableItem)
- {
- AsyncQueueWaiter waiter = new AsyncQueueWaiter(timeout, callback, state);
- waiterList.Add(waiter);
- return waiter;
- }
- }
- else if (queueState == QueueState.Shutdown)
- {
- if (!itemQueue.HasAvailableItem && itemQueue.HasAnyItem)
- {
- AsyncQueueWaiter waiter = new AsyncQueueWaiter(timeout, callback, state);
- waiterList.Add(waiter);
- return waiter;
- }
- }
- }
- return new CompletedAsyncResult<bool>(true, callback, state);
- }
- public void Close()
- {
- Dispose();
- }
- [Fx.Tag.Blocking(CancelMethod = "Close")]
- public T Dequeue(TimeSpan timeout)
- {
- T value;
- if (!this.Dequeue(timeout, out value))
- {
- throw Fx.Exception.AsError(new TimeoutException(InternalSR.TimeoutInputQueueDequeue(timeout)));
- }
- return value;
- }
- [Fx.Tag.Blocking(CancelMethod = "Close")]
- public bool Dequeue(TimeSpan timeout, out T value)
- {
- WaitQueueReader reader = null;
- Item item = new Item();
- lock (ThisLock)
- {
- if (queueState == QueueState.Open)
- {
- if (itemQueue.HasAvailableItem)
- {
- item = itemQueue.DequeueAvailableItem();
- }
- else
- {
- reader = new WaitQueueReader(this);
- readerQueue.Enqueue(reader);
- }
- }
- else if (queueState == QueueState.Shutdown)
- {
- if (itemQueue.HasAvailableItem)
- {
- item = itemQueue.DequeueAvailableItem();
- }
- else if (itemQueue.HasAnyItem)
- {
- reader = new WaitQueueReader(this);
- readerQueue.Enqueue(reader);
- }
- else
- {
- value = default(T);
- return true;
- }
- }
- else // queueState == QueueState.Closed
- {
- value = default(T);
- return true;
- }
- }
- if (reader != null)
- {
- return reader.Wait(timeout, out value);
- }
- else
- {
- InvokeDequeuedCallback(item.DequeuedCallback);
- value = item.GetValue();
- return true;
- }
- }
- public void Dispatch()
- {
- IQueueReader reader = null;
- Item item = new Item();
- IQueueReader[] outstandingReaders = null;
- IQueueWaiter[] waiters = null;
- bool itemAvailable = true;
- lock (ThisLock)
- {
- itemAvailable = !((queueState == QueueState.Closed) || (queueState == QueueState.Shutdown));
- this.GetWaiters(out waiters);
- if (queueState != QueueState.Closed)
- {
- itemQueue.MakePendingItemAvailable();
- if (readerQueue.Count > 0)
- {
- item = itemQueue.DequeueAvailableItem();
- reader = readerQueue.Dequeue();
- if (queueState == QueueState.Shutdown && readerQueue.Count > 0 && itemQueue.ItemCount == 0)
- {
- outstandingReaders = new IQueueReader[readerQueue.Count];
- readerQueue.CopyTo(outstandingReaders, 0);
- readerQueue.Clear();
- itemAvailable = false;
- }
- }
- }
- }
- if (outstandingReaders != null)
- {
- if (completeOutstandingReadersCallback == null)
- {
- completeOutstandingReadersCallback = new Action<object>(CompleteOutstandingReadersCallback);
- }
- ActionItem.Schedule(completeOutstandingReadersCallback, outstandingReaders);
- }
- if (waiters != null)
- {
- CompleteWaitersLater(itemAvailable, waiters);
- }
- if (reader != null)
- {
- InvokeDequeuedCallback(item.DequeuedCallback);
- reader.Set(item);
- }
- }
- [Fx.Tag.Blocking(CancelMethod = "Close", Conditional = "!result.IsCompleted")]
- public bool EndDequeue(IAsyncResult result, out T value)
- {
- CompletedAsyncResult<T> typedResult = result as CompletedAsyncResult<T>;
- if (typedResult != null)
- {
- value = CompletedAsyncResult<T>.End(result);
- return true;
- }
- return AsyncQueueReader.End(result, out value);
- }
- [Fx.Tag.Blocking(CancelMethod = "Close", Conditional = "!result.IsCompleted")]
- public T EndDequeue(IAsyncResult result)
- {
- T value;
- if (!this.EndDequeue(result, out value))
- {
- throw Fx.Exception.AsError(new TimeoutException());
- }
- return value;
- }
- [Fx.Tag.Blocking(CancelMethod = "Dispatch", Conditional = "!result.IsCompleted")]
- public bool EndWaitForItem(IAsyncResult result)
- {
- CompletedAsyncResult<bool> typedResult = result as CompletedAsyncResult<bool>;
- if (typedResult != null)
- {
- return CompletedAsyncResult<bool>.End(result);
- }
- return AsyncQueueWaiter.End(result);
- }
- public void EnqueueAndDispatch(T item)
- {
- EnqueueAndDispatch(item, null);
- }
- // dequeuedCallback is called as an item is dequeued from the InputQueue. The
- // InputQueue lock is not held during the callback. However, the user code will
- // not be notified of the item being available until the callback returns. If you
- // are not sure if the callback will block for a long time, then first call
- // IOThreadScheduler.ScheduleCallback to get to a "safe" thread.
- public void EnqueueAndDispatch(T item, Action dequeuedCallback)
- {
- EnqueueAndDispatch(item, dequeuedCallback, true);
- }
- public void EnqueueAndDispatch(Exception exception, Action dequeuedCallback, bool canDispatchOnThisThread)
- {
- Fx.Assert(exception != null, "EnqueueAndDispatch: exception parameter should not be null");
- EnqueueAndDispatch(new Item(exception, dequeuedCallback), canDispatchOnThisThread);
- }
- public void EnqueueAndDispatch(T item, Action dequeuedCallback, bool canDispatchOnThisThread)
- {
- Fx.Assert(item != null, "EnqueueAndDispatch: item parameter should not be null");
- EnqueueAndDispatch(new Item(item, dequeuedCallback), canDispatchOnThisThread);
- }
- public bool EnqueueWithoutDispatch(T item, Action dequeuedCallback)
- {
- Fx.Assert(item != null, "EnqueueWithoutDispatch: item parameter should not be null");
- return EnqueueWithoutDispatch(new Item(item, dequeuedCallback));
- }
- public bool EnqueueWithoutDispatch(Exception exception, Action dequeuedCallback)
- {
- Fx.Assert(exception != null, "EnqueueWithoutDispatch: exception parameter should not be null");
- return EnqueueWithoutDispatch(new Item(exception, dequeuedCallback));
- }
- public void Shutdown()
- {
- this.Shutdown(null);
- }
- // Don't let any more items in. Differs from Close in that we keep around
- // existing items in our itemQueue for possible future calls to Dequeue
- public void Shutdown(Func<Exception> pendingExceptionGenerator)
- {
- IQueueReader[] outstandingReaders = null;
- lock (ThisLock)
- {
- if (queueState == QueueState.Shutdown)
- {
- return;
- }
- if (queueState == QueueState.Closed)
- {
- return;
- }
- this.queueState = QueueState.Shutdown;
- if (readerQueue.Count > 0 && this.itemQueue.ItemCount == 0)
- {
- outstandingReaders = new IQueueReader[readerQueue.Count];
- readerQueue.CopyTo(outstandingReaders, 0);
- readerQueue.Clear();
- }
- }
- if (outstandingReaders != null)
- {
- for (int i = 0; i < outstandingReaders.Length; i++)
- {
- Exception exception = (pendingExceptionGenerator != null) ? pendingExceptionGenerator() : null;
- outstandingReaders[i].Set(new Item(exception, null));
- }
- }
- }
- [Fx.Tag.Blocking(CancelMethod = "Dispatch")]
- public bool WaitForItem(TimeSpan timeout)
- {
- WaitQueueWaiter waiter = null;
- bool itemAvailable = false;
- lock (ThisLock)
- {
- if (queueState == QueueState.Open)
- {
- if (itemQueue.HasAvailableItem)
- {
- itemAvailable = true;
- }
- else
- {
- waiter = new WaitQueueWaiter();
- waiterList.Add(waiter);
- }
- }
- else if (queueState == QueueState.Shutdown)
- {
- if (itemQueue.HasAvailableItem)
- {
- itemAvailable = true;
- }
- else if (itemQueue.HasAnyItem)
- {
- waiter = new WaitQueueWaiter();
- waiterList.Add(waiter);
- }
- else
- {
- return true;
- }
- }
- else // queueState == QueueState.Closed
- {
- return true;
- }
- }
- if (waiter != null)
- {
- return waiter.Wait(timeout);
- }
- else
- {
- return itemAvailable;
- }
- }
- public void Dispose()
- {
- bool dispose = false;
- lock (ThisLock)
- {
- if (queueState != QueueState.Closed)
- {
- queueState = QueueState.Closed;
- dispose = true;
- }
- }
- if (dispose)
- {
- while (readerQueue.Count > 0)
- {
- IQueueReader reader = readerQueue.Dequeue();
- reader.Set(default(Item));
- }
- while (itemQueue.HasAnyItem)
- {
- Item item = itemQueue.DequeueAnyItem();
- DisposeItem(item);
- InvokeDequeuedCallback(item.DequeuedCallback);
- }
- }
- }
- void DisposeItem(Item item)
- {
- T value = item.Value;
- if (value != null)
- {
- if (value is IDisposable)
- {
- ((IDisposable)value).Dispose();
- }
- else
- {
- Action<T> disposeItemCallback = this.DisposeItemCallback;
- if (disposeItemCallback != null)
- {
- disposeItemCallback(value);
- }
- }
- }
- }
- static void CompleteOutstandingReadersCallback(object state)
- {
- IQueueReader[] outstandingReaders = (IQueueReader[])state;
- for (int i = 0; i < outstandingReaders.Length; i++)
- {
- outstandingReaders[i].Set(default(Item));
- }
- }
- static void CompleteWaiters(bool itemAvailable, IQueueWaiter[] waiters)
- {
- for (int i = 0; i < waiters.Length; i++)
- {
- waiters[i].Set(itemAvailable);
- }
- }
- static void CompleteWaitersFalseCallback(object state)
- {
- CompleteWaiters(false, (IQueueWaiter[])state);
- }
- static void CompleteWaitersLater(bool itemAvailable, IQueueWaiter[] waiters)
- {
- if (itemAvailable)
- {
- if (completeWaitersTrueCallback == null)
- {
- completeWaitersTrueCallback = new Action<object>(CompleteWaitersTrueCallback);
- }
- ActionItem.Schedule(completeWaitersTrueCallback, waiters);
- }
- else
- {
- if (completeWaitersFalseCallback == null)
- {
- completeWaitersFalseCallback = new Action<object>(CompleteWaitersFalseCallback);
- }
- ActionItem.Schedule(completeWaitersFalseCallback, waiters);
- }
- }
- static void CompleteWaitersTrueCallback(object state)
- {
- CompleteWaiters(true, (IQueueWaiter[])state);
- }
- static void InvokeDequeuedCallback(Action dequeuedCallback)
- {
- if (dequeuedCallback != null)
- {
- dequeuedCallback();
- }
- }
- static void InvokeDequeuedCallbackLater(Action dequeuedCallback)
- {
- if (dequeuedCallback != null)
- {
- if (onInvokeDequeuedCallback == null)
- {
- onInvokeDequeuedCallback = new Action<object>(OnInvokeDequeuedCallback);
- }
- ActionItem.Schedule(onInvokeDequeuedCallback, dequeuedCallback);
- }
- }
- static void OnDispatchCallback(object state)
- {
- ((InputQueue<T>)state).Dispatch();
- }
- static void OnInvokeDequeuedCallback(object state)
- {
- Fx.Assert(state != null, "InputQueue.OnInvokeDequeuedCallback: (state != null)");
- Action dequeuedCallback = (Action)state;
- dequeuedCallback();
- }
- void EnqueueAndDispatch(Item item, bool canDispatchOnThisThread)
- {
- bool disposeItem = false;
- IQueueReader reader = null;
- bool dispatchLater = false;
- IQueueWaiter[] waiters = null;
- bool itemAvailable = true;
- lock (ThisLock)
- {
- itemAvailable = !((queueState == QueueState.Closed) || (queueState == QueueState.Shutdown));
- this.GetWaiters(out waiters);
- if (queueState == QueueState.Open)
- {
- if (canDispatchOnThisThread)
- {
- if (readerQueue.Count == 0)
- {
- itemQueue.EnqueueAvailableItem(item);
- }
- else
- {
- reader = readerQueue.Dequeue();
- }
- }
- else
- {
- if (readerQueue.Count == 0)
- {
- itemQueue.EnqueueAvailableItem(item);
- }
- else
- {
- itemQueue.EnqueuePendingItem(item);
- dispatchLater = true;
- }
- }
- }
- else // queueState == QueueState.Closed || queueState == QueueState.Shutdown
- {
- disposeItem = true;
- }
- }
- if (waiters != null)
- {
- if (canDispatchOnThisThread)
- {
- CompleteWaiters(itemAvailable, waiters);
- }
- else
- {
- CompleteWaitersLater(itemAvailable, waiters);
- }
- }
- if (reader != null)
- {
- InvokeDequeuedCallback(item.DequeuedCallback);
- reader.Set(item);
- }
- if (dispatchLater)
- {
- if (onDispatchCallback == null)
- {
- onDispatchCallback = new Action<object>(OnDispatchCallback);
- }
- ActionItem.Schedule(onDispatchCallback, this);
- }
- else if (disposeItem)
- {
- InvokeDequeuedCallback(item.DequeuedCallback);
- DisposeItem(item);
- }
- }
- // This will not block, however, Dispatch() must be called later if this function
- // returns true.
- bool EnqueueWithoutDispatch(Item item)
- {
- lock (ThisLock)
- {
- // Open
- if (queueState != QueueState.Closed && queueState != QueueState.Shutdown)
- {
- if (readerQueue.Count == 0 && waiterList.Count == 0)
- {
- itemQueue.EnqueueAvailableItem(item);
- return false;
- }
- else
- {
- itemQueue.EnqueuePendingItem(item);
- return true;
- }
- }
- }
- DisposeItem(item);
- InvokeDequeuedCallbackLater(item.DequeuedCallback);
- return false;
- }
- void GetWaiters(out IQueueWaiter[] waiters)
- {
- if (waiterList.Count > 0)
- {
- waiters = waiterList.ToArray();
- waiterList.Clear();
- }
- else
- {
- waiters = null;
- }
- }
- // Used for timeouts. The InputQueue must remove readers from its reader queue to prevent
- // dispatching items to timed out readers.
- bool RemoveReader(IQueueReader reader)
- {
- Fx.Assert(reader != null, "InputQueue.RemoveReader: (reader != null)");
- lock (ThisLock)
- {
- if (queueState == QueueState.Open || queueState == QueueState.Shutdown)
- {
- bool removed = false;
- for (int i = readerQueue.Count; i > 0; i--)
- {
- IQueueReader temp = readerQueue.Dequeue();
- if (object.ReferenceEquals(temp, reader))
- {
- removed = true;
- }
- else
- {
- readerQueue.Enqueue(temp);
- }
- }
- return removed;
- }
- }
- return false;
- }
- enum QueueState
- {
- Open,
- Shutdown,
- Closed
- }
- interface IQueueReader
- {
- void Set(Item item);
- }
- interface IQueueWaiter
- {
- void Set(bool itemAvailable);
- }
- struct Item
- {
- Action dequeuedCallback;
- Exception exception;
- T value;
- public Item(T value, Action dequeuedCallback)
- : this(value, null, dequeuedCallback)
- {
- }
- public Item(Exception exception, Action dequeuedCallback)
- : this(null, exception, dequeuedCallback)
- {
- }
- Item(T value, Exception exception, Action dequeuedCallback)
- {
- this.value = value;
- this.exception = exception;
- this.dequeuedCallback = dequeuedCallback;
- }
- public Action DequeuedCallback
- {
- get { return this.dequeuedCallback; }
- }
- public Exception Exception
- {
- get { return this.exception; }
- }
- public T Value
- {
- get { return this.value; }
- }
- public T GetValue()
- {
- if (this.exception != null)
- {
- throw Fx.Exception.AsError(this.exception);
- }
- return this.value;
- }
- }
- [Fx.Tag.SynchronizationPrimitive(Fx.Tag.BlocksUsing.AsyncResult, SupportsAsync = true, ReleaseMethod = "Set")]
- class AsyncQueueReader : AsyncResult, IQueueReader
- {
- static Action<object> timerCallback = new Action<object>(AsyncQueueReader.TimerCallback);
- bool expired;
- InputQueue<T> inputQueue;
- T item;
- IOThreadTimer timer;
- public AsyncQueueReader(InputQueue<T> inputQueue, TimeSpan timeout, AsyncCallback callback, object state)
- : base(callback, state)
- {
- if (inputQueue.AsyncCallbackGenerator != null)
- {
- base.VirtualCallback = inputQueue.AsyncCallbackGenerator();
- }
- this.inputQueue = inputQueue;
- if (timeout != TimeSpan.MaxValue)
- {
- this.timer = new IOThreadTimer(timerCallback, this, false);
- this.timer.Set(timeout);
- }
- }
- [Fx.Tag.Blocking(Conditional = "!result.IsCompleted", CancelMethod = "Set")]
- public static bool End(IAsyncResult result, out T value)
- {
- AsyncQueueReader readerResult = AsyncResult.End<AsyncQueueReader>(result);
- if (readerResult.expired)
- {
- value = default(T);
- return false;
- }
- else
- {
- value = readerResult.item;
- return true;
- }
- }
- public void Set(Item item)
- {
- this.item = item.Value;
- if (this.timer != null)
- {
- this.timer.Cancel();
- }
- Complete(false, item.Exception);
- }
- static void TimerCallback(object state)
- {
- AsyncQueueReader thisPtr = (AsyncQueueReader)state;
- if (thisPtr.inputQueue.RemoveReader(thisPtr))
- {
- thisPtr.expired = true;
- thisPtr.Complete(false);
- }
- }
- }
- [Fx.Tag.SynchronizationPrimitive(Fx.Tag.BlocksUsing.AsyncResult, SupportsAsync = true, ReleaseMethod = "Set")]
- class AsyncQueueWaiter : AsyncResult, IQueueWaiter
- {
- static Action<object> timerCallback = new Action<object>(AsyncQueueWaiter.TimerCallback);
- bool itemAvailable;
- [Fx.Tag.SynchronizationObject(Blocking = false)]
- object thisLock = new object();
- IOThreadTimer timer;
- public AsyncQueueWaiter(TimeSpan timeout, AsyncCallback callback, object state) : base(callback, state)
- {
- if (timeout != TimeSpan.MaxValue)
- {
- this.timer = new IOThreadTimer(timerCallback, this, false);
- this.timer.Set(timeout);
- }
- }
- object ThisLock
- {
- get
- {
- return this.thisLock;
- }
- }
- [Fx.Tag.Blocking(Conditional = "!result.IsCompleted", CancelMethod = "Set")]
- public static bool End(IAsyncResult result)
- {
- AsyncQueueWaiter waiterResult = AsyncResult.End<AsyncQueueWaiter>(result);
- return waiterResult.itemAvailable;
- }
- public void Set(bool itemAvailable)
- {
- bool timely;
- lock (ThisLock)
- {
- timely = (this.timer == null) || this.timer.Cancel();
- this.itemAvailable = itemAvailable;
- }
- if (timely)
- {
- Complete(false);
- }
- }
- static void TimerCallback(object state)
- {
- AsyncQueueWaiter thisPtr = (AsyncQueueWaiter)state;
- thisPtr.Complete(false);
- }
- }
- class ItemQueue
- {
- int head;
- Item[] items;
- int pendingCount;
- int totalCount;
- public ItemQueue()
- {
- this.items = new Item[1];
- }
- public bool HasAnyItem
- {
- get { return this.totalCount > 0; }
- }
- public bool HasAvailableItem
- {
- get { return this.totalCount > this.pendingCount; }
- }
- public int ItemCount
- {
- get { return this.totalCount; }
- }
- public Item DequeueAnyItem()
- {
- if (this.pendingCount == this.totalCount)
- {
- this.pendingCount--;
- }
- return DequeueItemCore();
- }
- public Item DequeueAvailableItem()
- {
- Fx.AssertAndThrow(this.totalCount != this.pendingCount, "ItemQueue does not contain any available items");
- return DequeueItemCore();
- }
- public void EnqueueAvailableItem(Item item)
- {
- EnqueueItemCore(item);
- }
- public void EnqueuePendingItem(Item item)
- {
- EnqueueItemCore(item);
- this.pendingCount++;
- }
- public void MakePendingItemAvailable()
- {
- Fx.AssertAndThrow(this.pendingCount != 0, "ItemQueue does not contain any pending items");
- this.pendingCount--;
- }
- Item DequeueItemCore()
- {
- Fx.AssertAndThrow(totalCount != 0, "ItemQueue does not contain any items");
- Item item = this.items[this.head];
- this.items[this.head] = new Item();
- this.totalCount--;
- this.head = (this.head + 1) % this.items.Length;
- return item;
- }
- void EnqueueItemCore(Item item)
- {
- if (this.totalCount == this.items.Length)
- {
- Item[] newItems = new Item[this.items.Length * 2];
- for (int i = 0; i < this.totalCount; i++)
- {
- newItems[i] = this.items[(head + i) % this.items.Length];
- }
- this.head = 0;
- this.items = newItems;
- }
- int tail = (this.head + this.totalCount) % this.items.Length;
- this.items[tail] = item;
- this.totalCount++;
- }
- }
- [Fx.Tag.SynchronizationObject(Blocking = false)]
- [Fx.Tag.SynchronizationPrimitive(Fx.Tag.BlocksUsing.ManualResetEvent, ReleaseMethod = "Set")]
- class WaitQueueReader : IQueueReader
- {
- Exception exception;
- InputQueue<T> inputQueue;
- T item;
- [Fx.Tag.SynchronizationObject]
- ManualResetEvent waitEvent;
- public WaitQueueReader(InputQueue<T> inputQueue)
- {
- this.inputQueue = inputQueue;
- waitEvent = new ManualResetEvent(false);
- }
- public void Set(Item item)
- {
- lock (this)
- {
- Fx.Assert(this.item == null, "InputQueue.WaitQueueReader.Set: (this.item == null)");
- Fx.Assert(this.exception == null, "InputQueue.WaitQueueReader.Set: (this.exception == null)");
- this.exception = item.Exception;
- this.item = item.Value;
- waitEvent.Set();
- }
- }
- [Fx.Tag.Blocking(CancelMethod = "Set")]
- public bool Wait(TimeSpan timeout, out T value)
- {
- bool isSafeToClose = false;
- try
- {
- if (!TimeoutHelper.WaitOne(waitEvent, timeout))
- {
- if (this.inputQueue.RemoveReader(this))
- {
- value = default(T);
- isSafeToClose = true;
- return false;
- }
- else
- {
- waitEvent.WaitOne();
- }
- }
- isSafeToClose = true;
- }
- finally
- {
- if (isSafeToClose)
- {
- waitEvent.Close();
- }
- }
- if (this.exception != null)
- {
- throw Fx.Exception.AsError(this.exception);
- }
- value = item;
- return true;
- }
- }
- [Fx.Tag.SynchronizationPrimitive(Fx.Tag.BlocksUsing.ManualResetEvent, ReleaseMethod = "Set")]
- class WaitQueueWaiter : IQueueWaiter
- {
- bool itemAvailable;
- [Fx.Tag.SynchronizationObject]
- ManualResetEvent waitEvent;
- public WaitQueueWaiter()
- {
- waitEvent = new ManualResetEvent(false);
- }
- public void Set(bool itemAvailable)
- {
- lock (this)
- {
- this.itemAvailable = itemAvailable;
- waitEvent.Set();
- }
- }
- [Fx.Tag.Blocking(CancelMethod = "Set")]
- public bool Wait(TimeSpan timeout)
- {
- if (!TimeoutHelper.WaitOne(waitEvent, timeout))
- {
- return false;
- }
- return this.itemAvailable;
- }
- }
- }
- }
|