| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107 |
- //------------------------------------------------------------
- // Copyright (c) Microsoft Corporation. All rights reserved.
- //------------------------------------------------------------
- namespace System.Runtime
- {
- using System;
- using System.Collections.Generic;
- using System.Threading;
- [Fx.Tag.SynchronizationPrimitive(Fx.Tag.BlocksUsing.PrivatePrimitive, SupportsAsync = true, ReleaseMethod = "Dispatch")]
- sealed class InputQueue<T> : IDisposable where T : class
- {
- static Action<object> completeOutstandingReadersCallback;
- static Action<object> completeWaitersFalseCallback;
- static Action<object> completeWaitersTrueCallback;
- static Action<object> onDispatchCallback;
- static Action<object> onInvokeDequeuedCallback;
- QueueState queueState;
- [Fx.Tag.SynchronizationObject(Blocking = false, Kind = Fx.Tag.SynchronizationKind.LockStatement)]
- ItemQueue itemQueue;
- [Fx.Tag.SynchronizationObject]
- Queue<IQueueReader> readerQueue;
- [Fx.Tag.SynchronizationObject]
- List<IQueueWaiter> waiterList;
- public InputQueue()
- {
- this.itemQueue = new ItemQueue();
- this.readerQueue = new Queue<IQueueReader>();
- this.waiterList = new List<IQueueWaiter>();
- this.queueState = QueueState.Open;
- }
- public InputQueue(Func<Action<AsyncCallback, IAsyncResult>> asyncCallbackGenerator)
- : this()
- {
- Fx.Assert(asyncCallbackGenerator != null, "use default ctor if you don't have a generator");
- AsyncCallbackGenerator = asyncCallbackGenerator;
- }
- public int PendingCount
- {
- get
- {
- lock (ThisLock)
- {
- return this.itemQueue.ItemCount;
- }
- }
- }
- // Users like ServiceModel can hook this abort ICommunicationObject or handle other non-IDisposable objects
- public Action<T> DisposeItemCallback
- {
- get;
- set;
- }
- // Users like ServiceModel can hook this to wrap the AsyncQueueReader callback functionality for tracing, etc
- Func<Action<AsyncCallback, IAsyncResult>> AsyncCallbackGenerator
- {
- get;
- set;
- }
- object ThisLock
- {
- get { return this.itemQueue; }
- }
- public IAsyncResult BeginDequeue(TimeSpan timeout, AsyncCallback callback, object state)
- {
- Item item = default(Item);
- lock (ThisLock)
- {
- if (queueState == QueueState.Open)
- {
- if (itemQueue.HasAvailableItem)
- {
- item = itemQueue.DequeueAvailableItem();
- }
- else
- {
- AsyncQueueReader reader = new AsyncQueueReader(this, timeout, callback, state);
- readerQueue.Enqueue(reader);
- return reader;
- }
- }
- else if (queueState == QueueState.Shutdown)
- {
- if (itemQueue.HasAvailableItem)
- {
- item = itemQueue.DequeueAvailableItem();
- }
- else if (itemQueue.HasAnyItem)
- {
- AsyncQueueReader reader = new AsyncQueueReader(this, timeout, callback, state);
- readerQueue.Enqueue(reader);
- return reader;
- }
- }
- }
- InvokeDequeuedCallback(item.DequeuedCallback);
- return new CompletedAsyncResult<T>(item.GetValue(), callback, state);
- }
- public IAsyncResult BeginWaitForItem(TimeSpan timeout, AsyncCallback callback, object state)
- {
- lock (ThisLock)
- {
- if (queueState == QueueState.Open)
- {
- if (!itemQueue.HasAvailableItem)
- {
- AsyncQueueWaiter waiter = new AsyncQueueWaiter(timeout, callback, state);
- waiterList.Add(waiter);
- return waiter;
- }
- }
- else if (queueState == QueueState.Shutdown)
- {
- if (!itemQueue.HasAvailableItem && itemQueue.HasAnyItem)
- {
- AsyncQueueWaiter waiter = new AsyncQueueWaiter(timeout, callback, state);
- waiterList.Add(waiter);
- return waiter;
- }
- }
- }
- return new CompletedAsyncResult<bool>(true, callback, state);
- }
- public void Close()
- {
- Dispose();
- }
- [Fx.Tag.Blocking(CancelMethod = "Close")]
- public T Dequeue(TimeSpan timeout)
- {
- T value;
- if (!this.Dequeue(timeout, out value))
- {
- throw Fx.Exception.AsError(new TimeoutException(InternalSR.TimeoutInputQueueDequeue(timeout)));
- }
- return value;
- }
- [Fx.Tag.Blocking(CancelMethod = "Close")]
- public bool Dequeue(TimeSpan timeout, out T value)
- {
- WaitQueueReader reader = null;
- Item item = new Item();
- lock (ThisLock)
- {
- if (queueState == QueueState.Open)
- {
- if (itemQueue.HasAvailableItem)
- {
- item = itemQueue.DequeueAvailableItem();
- }
- else
- {
- reader = new WaitQueueReader(this);
- readerQueue.Enqueue(reader);
- }
- }
- else if (queueState == QueueState.Shutdown)
- {
- if (itemQueue.HasAvailableItem)
- {
- item = itemQueue.DequeueAvailableItem();
- }
- else if (itemQueue.HasAnyItem)
- {
- reader = new WaitQueueReader(this);
- readerQueue.Enqueue(reader);
- }
- else
- {
- value = default(T);
- return true;
- }
- }
- else // queueState == QueueState.Closed
- {
- value = default(T);
- return true;
- }
- }
- if (reader != null)
- {
- return reader.Wait(timeout, out value);
- }
- else
- {
- InvokeDequeuedCallback(item.DequeuedCallback);
- value = item.GetValue();
- return true;
- }
- }
- public void Dispatch()
- {
- IQueueReader reader = null;
- Item item = new Item();
- IQueueReader[] outstandingReaders = null;
- IQueueWaiter[] waiters = null;
- bool itemAvailable = true;
- lock (ThisLock)
- {
- itemAvailable = !((queueState == QueueState.Closed) || (queueState == QueueState.Shutdown));
- this.GetWaiters(out waiters);
- if (queueState != QueueState.Closed)
- {
- itemQueue.MakePendingItemAvailable();
- if (readerQueue.Count > 0)
- {
- item = itemQueue.DequeueAvailableItem();
- reader = readerQueue.Dequeue();
- if (queueState == QueueState.Shutdown && readerQueue.Count > 0 && itemQueue.ItemCount == 0)
- {
- outstandingReaders = new IQueueReader[readerQueue.Count];
- readerQueue.CopyTo(outstandingReaders, 0);
- readerQueue.Clear();
- itemAvailable = false;
- }
- }
- }
- }
- if (outstandingReaders != null)
- {
- if (completeOutstandingReadersCallback == null)
- {
- completeOutstandingReadersCallback = new Action<object>(CompleteOutstandingReadersCallback);
- }
- ActionItem.Schedule(completeOutstandingReadersCallback, outstandingReaders);
- }
- if (waiters != null)
- {
- CompleteWaitersLater(itemAvailable, waiters);
- }
- if (reader != null)
- {
- InvokeDequeuedCallback(item.DequeuedCallback);
- reader.Set(item);
- }
- }
- [Fx.Tag.Blocking(CancelMethod = "Close", Conditional = "!result.IsCompleted")]
- public bool EndDequeue(IAsyncResult result, out T value)
- {
- CompletedAsyncResult<T> typedResult = result as CompletedAsyncResult<T>;
- if (typedResult != null)
- {
- value = CompletedAsyncResult<T>.End(result);
- return true;
- }
- return AsyncQueueReader.End(result, out value);
- }
- [Fx.Tag.Blocking(CancelMethod = "Close", Conditional = "!result.IsCompleted")]
- public T EndDequeue(IAsyncResult result)
- {
- T value;
- if (!this.EndDequeue(result, out value))
- {
- throw Fx.Exception.AsError(new TimeoutException());
- }
- return value;
- }
- [Fx.Tag.Blocking(CancelMethod = "Dispatch", Conditional = "!result.IsCompleted")]
- public bool EndWaitForItem(IAsyncResult result)
- {
- CompletedAsyncResult<bool> typedResult = result as CompletedAsyncResult<bool>;
- if (typedResult != null)
- {
- return CompletedAsyncResult<bool>.End(result);
- }
- return AsyncQueueWaiter.End(result);
- }
- public void EnqueueAndDispatch(T item)
- {
- EnqueueAndDispatch(item, null);
- }
- // dequeuedCallback is called as an item is dequeued from the InputQueue. The
- // InputQueue lock is not held during the callback. However, the user code will
- // not be notified of the item being available until the callback returns. If you
- // are not sure if the callback will block for a long time, then first call
- // IOThreadScheduler.ScheduleCallback to get to a "safe" thread.
- public void EnqueueAndDispatch(T item, Action dequeuedCallback)
- {
- EnqueueAndDispatch(item, dequeuedCallback, true);
- }
- public void EnqueueAndDispatch(Exception exception, Action dequeuedCallback, bool canDispatchOnThisThread)
- {
- Fx.Assert(exception != null, "EnqueueAndDispatch: exception parameter should not be null");
- EnqueueAndDispatch(new Item(exception, dequeuedCallback), canDispatchOnThisThread);
- }
- public void EnqueueAndDispatch(T item, Action dequeuedCallback, bool canDispatchOnThisThread)
- {
- Fx.Assert(item != null, "EnqueueAndDispatch: item parameter should not be null");
- EnqueueAndDispatch(new Item(item, dequeuedCallback), canDispatchOnThisThread);
- }
- public bool EnqueueWithoutDispatch(T item, Action dequeuedCallback)
- {
- Fx.Assert(item != null, "EnqueueWithoutDispatch: item parameter should not be null");
- return EnqueueWithoutDispatch(new Item(item, dequeuedCallback));
- }
- public bool EnqueueWithoutDispatch(Exception exception, Action dequeuedCallback)
- {
- Fx.Assert(exception != null, "EnqueueWithoutDispatch: exception parameter should not be null");
- return EnqueueWithoutDispatch(new Item(exception, dequeuedCallback));
- }
- public void Shutdown()
- {
- this.Shutdown(null);
- }
- // Don't let any more items in. Differs from Close in that we keep around
- // existing items in our itemQueue for possible future calls to Dequeue
- public void Shutdown(Func<Exception> pendingExceptionGenerator)
- {
- IQueueReader[] outstandingReaders = null;
- lock (ThisLock)
- {
- if (queueState == QueueState.Shutdown)
- {
- return;
- }
- if (queueState == QueueState.Closed)
- {
- return;
- }
- this.queueState = QueueState.Shutdown;
- if (readerQueue.Count > 0 && this.itemQueue.ItemCount == 0)
- {
- outstandingReaders = new IQueueReader[readerQueue.Count];
- readerQueue.CopyTo(outstandingReaders, 0);
- readerQueue.Clear();
- }
- }
- if (outstandingReaders != null)
- {
- for (int i = 0; i < outstandingReaders.Length; i++)
- {
- Exception exception = (pendingExceptionGenerator != null) ? pendingExceptionGenerator() : null;
- outstandingReaders[i].Set(new Item(exception, null));
- }
- }
- }
- [Fx.Tag.Blocking(CancelMethod = "Dispatch")]
- public bool WaitForItem(TimeSpan timeout)
- {
- WaitQueueWaiter waiter = null;
- bool itemAvailable = false;
- lock (ThisLock)
- {
- if (queueState == QueueState.Open)
- {
- if (itemQueue.HasAvailableItem)
- {
- itemAvailable = true;
- }
- else
- {
- waiter = new WaitQueueWaiter();
- waiterList.Add(waiter);
- }
- }
- else if (queueState == QueueState.Shutdown)
- {
- if (itemQueue.HasAvailableItem)
- {
- itemAvailable = true;
- }
- else if (itemQueue.HasAnyItem)
- {
- waiter = new WaitQueueWaiter();
- waiterList.Add(waiter);
- }
- else
- {
- return true;
- }
- }
- else // queueState == QueueState.Closed
- {
- return true;
- }
- }
- if (waiter != null)
- {
- return waiter.Wait(timeout);
- }
- else
- {
- return itemAvailable;
- }
- }
- public void Dispose()
- {
- bool dispose = false;
- lock (ThisLock)
- {
- if (queueState != QueueState.Closed)
- {
- queueState = QueueState.Closed;
- dispose = true;
- }
- }
- if (dispose)
- {
- while (readerQueue.Count > 0)
- {
- IQueueReader reader = readerQueue.Dequeue();
- reader.Set(default(Item));
- }
- while (itemQueue.HasAnyItem)
- {
- Item item = itemQueue.DequeueAnyItem();
- DisposeItem(item);
- InvokeDequeuedCallback(item.DequeuedCallback);
- }
- }
- }
- void DisposeItem(Item item)
- {
- T value = item.Value;
- if (value != null)
- {
- if (value is IDisposable)
- {
- ((IDisposable)value).Dispose();
- }
- else
- {
- Action<T> disposeItemCallback = this.DisposeItemCallback;
- if (disposeItemCallback != null)
- {
- disposeItemCallback(value);
- }
- }
- }
- }
- static void CompleteOutstandingReadersCallback(object state)
- {
- IQueueReader[] outstandingReaders = (IQueueReader[])state;
- for (int i = 0; i < outstandingReaders.Length; i++)
- {
- outstandingReaders[i].Set(default(Item));
- }
- }
- static void CompleteWaiters(bool itemAvailable, IQueueWaiter[] waiters)
- {
- for (int i = 0; i < waiters.Length; i++)
- {
- waiters[i].Set(itemAvailable);
- }
- }
- static void CompleteWaitersFalseCallback(object state)
- {
- CompleteWaiters(false, (IQueueWaiter[])state);
- }
- static void CompleteWaitersLater(bool itemAvailable, IQueueWaiter[] waiters)
- {
- if (itemAvailable)
- {
- if (completeWaitersTrueCallback == null)
- {
- completeWaitersTrueCallback = new Action<object>(CompleteWaitersTrueCallback);
- }
- ActionItem.Schedule(completeWaitersTrueCallback, waiters);
- }
- else
- {
- if (completeWaitersFalseCallback == null)
- {
- completeWaitersFalseCallback = new Action<object>(CompleteWaitersFalseCallback);
- }
- ActionItem.Schedule(completeWaitersFalseCallback, waiters);
- }
- }
- static void CompleteWaitersTrueCallback(object state)
- {
- CompleteWaiters(true, (IQueueWaiter[])state);
- }
- static void InvokeDequeuedCallback(Action dequeuedCallback)
- {
- if (dequeuedCallback != null)
- {
- dequeuedCallback();
- }
- }
- static void InvokeDequeuedCallbackLater(Action dequeuedCallback)
- {
- if (dequeuedCallback != null)
- {
- if (onInvokeDequeuedCallback == null)
- {
- onInvokeDequeuedCallback = new Action<object>(OnInvokeDequeuedCallback);
- }
- ActionItem.Schedule(onInvokeDequeuedCallback, dequeuedCallback);
- }
- }
- static void OnDispatchCallback(object state)
- {
- ((InputQueue<T>)state).Dispatch();
- }
- static void OnInvokeDequeuedCallback(object state)
- {
- Fx.Assert(state != null, "InputQueue.OnInvokeDequeuedCallback: (state != null)");
- Action dequeuedCallback = (Action)state;
- dequeuedCallback();
- }
- void EnqueueAndDispatch(Item item, bool canDispatchOnThisThread)
- {
- bool disposeItem = false;
- IQueueReader reader = null;
- bool dispatchLater = false;
- IQueueWaiter[] waiters = null;
- bool itemAvailable = true;
- lock (ThisLock)
- {
- itemAvailable = !((queueState == QueueState.Closed) || (queueState == QueueState.Shutdown));
- this.GetWaiters(out waiters);
- if (queueState == QueueState.Open)
- {
- if (canDispatchOnThisThread)
- {
- if (readerQueue.Count == 0)
- {
- itemQueue.EnqueueAvailableItem(item);
- }
- else
- {
- reader = readerQueue.Dequeue();
- }
- }
- else
- {
- if (readerQueue.Count == 0)
- {
- itemQueue.EnqueueAvailableItem(item);
- }
- else
- {
- itemQueue.EnqueuePendingItem(item);
- dispatchLater = true;
- }
- }
- }
- else // queueState == QueueState.Closed || queueState == QueueState.Shutdown
- {
- disposeItem = true;
- }
- }
- if (waiters != null)
- {
- if (canDispatchOnThisThread)
- {
- CompleteWaiters(itemAvailable, waiters);
- }
- else
- {
- CompleteWaitersLater(itemAvailable, waiters);
- }
- }
- if (reader != null)
- {
- InvokeDequeuedCallback(item.DequeuedCallback);
- reader.Set(item);
- }
- if (dispatchLater)
- {
- if (onDispatchCallback == null)
- {
- onDispatchCallback = new Action<object>(OnDispatchCallback);
- }
- ActionItem.Schedule(onDispatchCallback, this);
- }
- else if (disposeItem)
- {
- InvokeDequeuedCallback(item.DequeuedCallback);
- DisposeItem(item);
- }
- }
- // This will not block, however, Dispatch() must be called later if this function
- // returns true.
- bool EnqueueWithoutDispatch(Item item)
- {
- lock (ThisLock)
- {
- // Open
- if (queueState != QueueState.Closed && queueState != QueueState.Shutdown)
- {
- if (readerQueue.Count == 0 && waiterList.Count == 0)
- {
- itemQueue.EnqueueAvailableItem(item);
- return false;
- }
- else
- {
- itemQueue.EnqueuePendingItem(item);
- return true;
- }
- }
- }
- DisposeItem(item);
- InvokeDequeuedCallbackLater(item.DequeuedCallback);
- return false;
- }
- void GetWaiters(out IQueueWaiter[] waiters)
- {
- if (waiterList.Count > 0)
- {
- waiters = waiterList.ToArray();
- waiterList.Clear();
- }
- else
- {
- waiters = null;
- }
- }
- // Used for timeouts. The InputQueue must remove readers from its reader queue to prevent
- // dispatching items to timed out readers.
- bool RemoveReader(IQueueReader reader)
- {
- Fx.Assert(reader != null, "InputQueue.RemoveReader: (reader != null)");
- lock (ThisLock)
- {
- if (queueState == QueueState.Open || queueState == QueueState.Shutdown)
- {
- bool removed = false;
- for (int i = readerQueue.Count; i > 0; i--)
- {
- IQueueReader temp = readerQueue.Dequeue();
- if (object.ReferenceEquals(temp, reader))
- {
- removed = true;
- }
- else
- {
- readerQueue.Enqueue(temp);
- }
- }
- return removed;
- }
- }
- return false;
- }
- enum QueueState
- {
- Open,
- Shutdown,
- Closed
- }
- interface IQueueReader
- {
- void Set(Item item);
- }
- interface IQueueWaiter
- {
- void Set(bool itemAvailable);
- }
- struct Item
- {
- Action dequeuedCallback;
- Exception exception;
- T value;
- public Item(T value, Action dequeuedCallback)
- : this(value, null, dequeuedCallback)
- {
- }
- public Item(Exception exception, Action dequeuedCallback)
- : this(null, exception, dequeuedCallback)
- {
- }
- Item(T value, Exception exception, Action dequeuedCallback)
- {
- this.value = value;
- this.exception = exception;
- this.dequeuedCallback = dequeuedCallback;
- }
- public Action DequeuedCallback
- {
- get { return this.dequeuedCallback; }
- }
- public Exception Exception
- {
- get { return this.exception; }
- }
- public T Value
- {
- get { return this.value; }
- }
- public T GetValue()
- {
- if (this.exception != null)
- {
- throw Fx.Exception.AsError(this.exception);
- }
- return this.value;
- }
- }
- [Fx.Tag.SynchronizationPrimitive(Fx.Tag.BlocksUsing.AsyncResult, SupportsAsync = true, ReleaseMethod = "Set")]
- class AsyncQueueReader : AsyncResult, IQueueReader
- {
- static Action<object> timerCallback = new Action<object>(AsyncQueueReader.TimerCallback);
- bool expired;
- InputQueue<T> inputQueue;
- T item;
- IOThreadTimer timer;
- public AsyncQueueReader(InputQueue<T> inputQueue, TimeSpan timeout, AsyncCallback callback, object state)
- : base(callback, state)
- {
- if (inputQueue.AsyncCallbackGenerator != null)
- {
- base.VirtualCallback = inputQueue.AsyncCallbackGenerator();
- }
- this.inputQueue = inputQueue;
- if (timeout != TimeSpan.MaxValue)
- {
- this.timer = new IOThreadTimer(timerCallback, this, false);
- this.timer.Set(timeout);
- }
- }
- [Fx.Tag.Blocking(Conditional = "!result.IsCompleted", CancelMethod = "Set")]
- public static bool End(IAsyncResult result, out T value)
- {
- AsyncQueueReader readerResult = AsyncResult.End<AsyncQueueReader>(result);
- if (readerResult.expired)
- {
- value = default(T);
- return false;
- }
- else
- {
- value = readerResult.item;
- return true;
- }
- }
- public void Set(Item item)
- {
- this.item = item.Value;
- if (this.timer != null)
- {
- this.timer.Cancel();
- }
- Complete(false, item.Exception);
- }
- static void TimerCallback(object state)
- {
- AsyncQueueReader thisPtr = (AsyncQueueReader)state;
- if (thisPtr.inputQueue.RemoveReader(thisPtr))
- {
- thisPtr.expired = true;
- thisPtr.Complete(false);
- }
- }
- }
- [Fx.Tag.SynchronizationPrimitive(Fx.Tag.BlocksUsing.AsyncResult, SupportsAsync = true, ReleaseMethod = "Set")]
- class AsyncQueueWaiter : AsyncResult, IQueueWaiter
- {
- static Action<object> timerCallback = new Action<object>(AsyncQueueWaiter.TimerCallback);
- bool itemAvailable;
- [Fx.Tag.SynchronizationObject(Blocking = false)]
- object thisLock = new object();
- IOThreadTimer timer;
- public AsyncQueueWaiter(TimeSpan timeout, AsyncCallback callback, object state) : base(callback, state)
- {
- if (timeout != TimeSpan.MaxValue)
- {
- this.timer = new IOThreadTimer(timerCallback, this, false);
- this.timer.Set(timeout);
- }
- }
- object ThisLock
- {
- get
- {
- return this.thisLock;
- }
- }
- [Fx.Tag.Blocking(Conditional = "!result.IsCompleted", CancelMethod = "Set")]
- public static bool End(IAsyncResult result)
- {
- AsyncQueueWaiter waiterResult = AsyncResult.End<AsyncQueueWaiter>(result);
- return waiterResult.itemAvailable;
- }
- public void Set(bool itemAvailable)
- {
- bool timely;
- lock (ThisLock)
- {
- timely = (this.timer == null) || this.timer.Cancel();
- this.itemAvailable = itemAvailable;
- }
- if (timely)
- {
- Complete(false);
- }
- }
- static void TimerCallback(object state)
- {
- AsyncQueueWaiter thisPtr = (AsyncQueueWaiter)state;
- thisPtr.Complete(false);
- }
- }
- class ItemQueue
- {
- int head;
- Item[] items;
- int pendingCount;
- int totalCount;
- public ItemQueue()
- {
- this.items = new Item[1];
- }
- public bool HasAnyItem
- {
- get { return this.totalCount > 0; }
- }
- public bool HasAvailableItem
- {
- get { return this.totalCount > this.pendingCount; }
- }
- public int ItemCount
- {
- get { return this.totalCount; }
- }
- public Item DequeueAnyItem()
- {
- if (this.pendingCount == this.totalCount)
- {
- this.pendingCount--;
- }
- return DequeueItemCore();
- }
- public Item DequeueAvailableItem()
- {
- Fx.AssertAndThrow(this.totalCount != this.pendingCount, "ItemQueue does not contain any available items");
- return DequeueItemCore();
- }
- public void EnqueueAvailableItem(Item item)
- {
- EnqueueItemCore(item);
- }
- public void EnqueuePendingItem(Item item)
- {
- EnqueueItemCore(item);
- this.pendingCount++;
- }
- public void MakePendingItemAvailable()
- {
- Fx.AssertAndThrow(this.pendingCount != 0, "ItemQueue does not contain any pending items");
- this.pendingCount--;
- }
- Item DequeueItemCore()
- {
- Fx.AssertAndThrow(totalCount != 0, "ItemQueue does not contain any items");
- Item item = this.items[this.head];
- this.items[this.head] = new Item();
- this.totalCount--;
- this.head = (this.head + 1) % this.items.Length;
- return item;
- }
- void EnqueueItemCore(Item item)
- {
- if (this.totalCount == this.items.Length)
- {
- Item[] newItems = new Item[this.items.Length * 2];
- for (int i = 0; i < this.totalCount; i++)
- {
- newItems[i] = this.items[(head + i) % this.items.Length];
- }
- this.head = 0;
- this.items = newItems;
- }
- int tail = (this.head + this.totalCount) % this.items.Length;
- this.items[tail] = item;
- this.totalCount++;
- }
- }
- [Fx.Tag.SynchronizationObject(Blocking = false)]
- [Fx.Tag.SynchronizationPrimitive(Fx.Tag.BlocksUsing.ManualResetEvent, ReleaseMethod = "Set")]
- class WaitQueueReader : IQueueReader
- {
- Exception exception;
- InputQueue<T> inputQueue;
- T item;
- [Fx.Tag.SynchronizationObject]
- ManualResetEvent waitEvent;
- public WaitQueueReader(InputQueue<T> inputQueue)
- {
- this.inputQueue = inputQueue;
- waitEvent = new ManualResetEvent(false);
- }
- public void Set(Item item)
- {
- lock (this)
- {
- Fx.Assert(this.item == null, "InputQueue.WaitQueueReader.Set: (this.item == null)");
- Fx.Assert(this.exception == null, "InputQueue.WaitQueueReader.Set: (this.exception == null)");
- this.exception = item.Exception;
- this.item = item.Value;
- waitEvent.Set();
- }
- }
- [Fx.Tag.Blocking(CancelMethod = "Set")]
- public bool Wait(TimeSpan timeout, out T value)
- {
- bool isSafeToClose = false;
- try
- {
- if (!TimeoutHelper.WaitOne(waitEvent, timeout))
- {
- if (this.inputQueue.RemoveReader(this))
- {
- value = default(T);
- isSafeToClose = true;
- return false;
- }
- else
- {
- waitEvent.WaitOne();
- }
- }
- isSafeToClose = true;
- }
- finally
- {
- if (isSafeToClose)
- {
- waitEvent.Close();
- }
- }
- if (this.exception != null)
- {
- throw Fx.Exception.AsError(this.exception);
- }
- value = item;
- return true;
- }
- }
- [Fx.Tag.SynchronizationPrimitive(Fx.Tag.BlocksUsing.ManualResetEvent, ReleaseMethod = "Set")]
- class WaitQueueWaiter : IQueueWaiter
- {
- bool itemAvailable;
- [Fx.Tag.SynchronizationObject]
- ManualResetEvent waitEvent;
- public WaitQueueWaiter()
- {
- waitEvent = new ManualResetEvent(false);
- }
- public void Set(bool itemAvailable)
- {
- lock (this)
- {
- this.itemAvailable = itemAvailable;
- waitEvent.Set();
- }
- }
- [Fx.Tag.Blocking(CancelMethod = "Set")]
- public bool Wait(TimeSpan timeout)
- {
- if (!TimeoutHelper.WaitOne(waitEvent, timeout))
- {
- return false;
- }
- return this.itemAvailable;
- }
- }
- }
- }
|