| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410 |
- /************************************************************************************
- PublicHeader: None
- Filename : OVR_ThreadCommandQueue.cpp
- Content : Command queue for operations executed on a thread
- Created : October 29, 2012
- Copyright : Copyright 2014 Oculus VR, LLC All Rights reserved.
- Licensed under the Oculus VR Rift SDK License Version 3.2 (the "License");
- you may not use the Oculus VR Rift SDK except in compliance with the License,
- which is provided at the time of installation or download, or which
- otherwise accompanies this software in either electronic or hard copy form.
- You may obtain a copy of the License at
- http://www.oculusvr.com/licenses/LICENSE-3.2
- Unless required by applicable law or agreed to in writing, the Oculus VR SDK
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
- ************************************************************************************/
- #include "OVR_ThreadCommandQueue.h"
- namespace OVR {
- //------------------------------------------------------------------------
- // ***** CircularBuffer
- // CircularBuffer is a FIFO buffer implemented in a single block of memory,
- // which allows writing and reading variable-size data chucks. Write fails
- // if buffer is full.
- class CircularBuffer
- {
- enum {
- AlignSize = 16,
- AlignMask = AlignSize - 1
- };
- uint8_t* pBuffer;
- size_t Size;
- size_t Tail; // Byte offset of next item to be popped.
- size_t Head; // Byte offset of where next push will take place.
- size_t End; // When Head < Tail, this is used instead of Size.
- inline size_t roundUpSize(size_t size)
- { return (size + AlignMask) & ~(size_t)AlignMask; }
- public:
- CircularBuffer(size_t size)
- : Size(size), Tail(0), Head(0), End(0)
- {
- pBuffer = (uint8_t*)OVR_ALLOC_ALIGNED(roundUpSize(size), AlignSize);
- }
- ~CircularBuffer()
- {
- // For ThreadCommands, we must consume everything before shutdown.
- OVR_ASSERT(IsEmpty());
- OVR_FREE_ALIGNED(pBuffer);
- }
- bool IsEmpty() const { return (Head == Tail); }
- // Allocates a state block of specified size and advances pointers,
- // returning 0 if buffer is full.
- uint8_t* Write(size_t size);
- // Returns a pointer to next available data block; 0 if none available.
- uint8_t* ReadBegin()
- { return (Head != Tail) ? (pBuffer + Tail) : 0; }
- // Consumes data of specified size; this must match size passed to Write.
- void ReadEnd(size_t size);
- };
- // Allocates a state block of specified size and advances pointers,
- // returning 0 if buffer is full.
- uint8_t* CircularBuffer::Write(size_t size)
- {
- uint8_t* p = 0;
- size = roundUpSize(size);
- // Since this is circular buffer, always allow at least one item.
- OVR_ASSERT(size < Size/2);
- if (Head >= Tail)
- {
- OVR_ASSERT(End == 0);
-
- if (size <= (Size - Head))
- {
- p = pBuffer + Head;
- Head += size;
- }
- else if (size < Tail)
- {
- p = pBuffer;
- End = Head;
- Head = size;
- OVR_ASSERT(Head != Tail);
- }
- }
- else
- {
- OVR_ASSERT(End != 0);
- if ((Tail - Head) > size)
- {
- p = pBuffer + Head;
- Head += size;
- OVR_ASSERT(Head != Tail);
- }
- }
- return p;
- }
- void CircularBuffer::ReadEnd(size_t size)
- {
- OVR_ASSERT(Head != Tail);
- size = roundUpSize(size);
-
- Tail += size;
- if (Tail == End)
- {
- Tail = End = 0;
- }
- else if (Tail == Head)
- {
- OVR_ASSERT(End == 0);
- Tail = Head = 0;
- }
- }
- //-------------------------------------------------------------------------------------
- // ***** ThreadCommand
- ThreadCommand::PopBuffer::~PopBuffer()
- {
- if (Size) {
- Destruct<ThreadCommand>(toCommand());
- }
- }
- void ThreadCommand::PopBuffer::InitFromBuffer(void* data)
- {
- ThreadCommand* cmd = (ThreadCommand*)data;
- if (Size) {
- Destruct<ThreadCommand>(toCommand());
- }
- Size = cmd->Size;
- if (Size > MaxSize)
- Size = MaxSize;
- memcpy(Buffer, (void*)cmd, Size);
- }
- void ThreadCommand::PopBuffer::Execute()
- {
- ThreadCommand* command = toCommand();
- OVR_ASSERT(command);
- if (command)
- {
- command->Execute();
- }
- if (NeedsWait()) {
- GetEvent()->PulseEvent();
- }
- }
- //-------------------------------------------------------------------------------------
- class ThreadCommandQueueImpl : public NewOverrideBase
- {
- typedef ThreadCommand::NotifyEvent NotifyEvent;
- friend class ThreadCommandQueue;
-
- public:
- ThreadCommandQueueImpl(ThreadCommandQueue* queue) :
- pQueue(queue),
- ExitEnqueued(false),
- ExitProcessed(false),
- CommandBuffer(2048),
- PullThreadId(0)
- {
- }
- ~ThreadCommandQueueImpl();
- bool PushCommand(const ThreadCommand& command);
- bool PopCommand(ThreadCommand::PopBuffer* popBuffer);
- // ExitCommand is used by notify us that Thread is shutting down.
- struct ExitCommand : public ThreadCommand
- {
- ThreadCommandQueueImpl* pImpl;
-
- ExitCommand(ThreadCommandQueueImpl* impl, bool wait)
- : ThreadCommand(sizeof(ExitCommand), wait, true), pImpl(impl) { }
- virtual void Execute() const
- {
- Lock::Locker lock(&pImpl->QueueLock);
- pImpl->ExitProcessed = true;
- }
- virtual ThreadCommand* CopyConstruct(void* p) const
- { return Construct<ExitCommand>(p, *this); }
- };
- NotifyEvent* AllocNotifyEvent_NTS()
- {
- NotifyEvent* p = AvailableEvents.GetFirst();
- if (!AvailableEvents.IsNull(p))
- p->RemoveNode();
- else
- p = new NotifyEvent;
- return p;
- }
- void FreeNotifyEvent_NTS(NotifyEvent* p)
- {
- AvailableEvents.PushBack(p);
- }
- void FreeNotifyEvents_NTS()
- {
- while(!AvailableEvents.IsEmpty())
- {
- NotifyEvent* p = AvailableEvents.GetFirst();
- p->RemoveNode();
- delete p;
- }
- }
- ThreadCommandQueue* pQueue;
- Lock QueueLock;
- volatile bool ExitEnqueued;
- volatile bool ExitProcessed;
- List<NotifyEvent> AvailableEvents;
- List<NotifyEvent> BlockedProducers;
- CircularBuffer CommandBuffer;
- // The pull thread id is set to the last thread that pulled commands.
- // Since this thread command queue is designed for a single thread,
- // reentrant behavior that would cause a dead-lock for messages that
- // wait for completion can be avoided by simply comparing the
- // thread id of the last pull.
- OVR::ThreadId PullThreadId;
- };
- ThreadCommandQueueImpl::~ThreadCommandQueueImpl()
- {
- Lock::Locker lock(&QueueLock);
- OVR_ASSERT(BlockedProducers.IsEmpty());
- FreeNotifyEvents_NTS();
- }
- bool ThreadCommandQueueImpl::PushCommand(const ThreadCommand& command)
- {
- if (command.NeedsWait() && PullThreadId == OVR::GetCurrentThreadId())
- {
- command.Execute();
- return true;
- }
- ThreadCommand::NotifyEvent* completeEvent = 0;
- ThreadCommand::NotifyEvent* queueAvailableEvent = 0;
- // Repeat writing command into buffer until it is available.
- for (;;) {
- { // Lock Scope
- Lock::Locker lock(&QueueLock);
- if (queueAvailableEvent) {
- FreeNotifyEvent_NTS(queueAvailableEvent);
- queueAvailableEvent = 0;
- }
- // Don't allow any commands after PushExitCommand() is called.
- if (ExitEnqueued && !command.ExitFlag) {
- return false;
- }
- bool bufferWasEmpty = CommandBuffer.IsEmpty();
- uint8_t* buffer = CommandBuffer.Write(command.GetSize());
- if (buffer) {
- ThreadCommand* c = command.CopyConstruct(buffer);
- if (c->NeedsWait()) {
- completeEvent = c->pEvent = AllocNotifyEvent_NTS();
- }
- // Signal-waker consumer when we add data to buffer.
- if (bufferWasEmpty) {
- pQueue->OnPushNonEmpty_Locked();
- }
- break;
- }
- queueAvailableEvent = AllocNotifyEvent_NTS();
- BlockedProducers.PushBack(queueAvailableEvent);
- } // Lock Scope
- queueAvailableEvent->Wait();
- } // Intentional infinite loop
- // Command was enqueued, wait if necessary.
- if (completeEvent) {
- completeEvent->Wait();
- Lock::Locker lock(&QueueLock);
- FreeNotifyEvent_NTS(completeEvent);
- }
- return true;
- }
- // Pops the next command from the thread queue, if any is available.
- bool ThreadCommandQueueImpl::PopCommand(ThreadCommand::PopBuffer* popBuffer)
- {
- // We do not write to this variable unless we are changing it.
- // This ensures it is read-only after the first call to PopCommand().
- if (PullThreadId != OVR::GetCurrentThreadId())
- {
- PullThreadId = OVR::GetCurrentThreadId();
- }
- Lock::Locker lock(&QueueLock);
- uint8_t* buffer = CommandBuffer.ReadBegin();
- if (!buffer)
- {
- // Notify thread while in lock scope, enabling initialization of wait.
- pQueue->OnPopEmpty_Locked();
- return false;
- }
- popBuffer->InitFromBuffer(buffer);
- CommandBuffer.ReadEnd(popBuffer->GetSize());
- if (!BlockedProducers.IsEmpty())
- {
- ThreadCommand::NotifyEvent* queueAvailableEvent = BlockedProducers.GetFirst();
- queueAvailableEvent->RemoveNode();
- queueAvailableEvent->PulseEvent();
- // Event is freed later by waiter.
- }
- return true;
- }
- //-------------------------------------------------------------------------------------
- ThreadCommandQueue::ThreadCommandQueue()
- {
- pImpl = new ThreadCommandQueueImpl(this);
- }
- ThreadCommandQueue::~ThreadCommandQueue()
- {
- delete pImpl;
- }
- bool ThreadCommandQueue::PushCommand(const ThreadCommand& command)
- {
- return pImpl->PushCommand(command);
- }
- bool ThreadCommandQueue::PopCommand(ThreadCommand::PopBuffer* popBuffer)
- {
- return pImpl->PopCommand(popBuffer);
- }
- void ThreadCommandQueue::PushExitCommand(bool wait)
- {
- // Exit is processed in two stages:
- // - First, ExitEnqueued flag is set to block further commands from queuing up.
- // - Second, the actual exit call is processed on the consumer thread, flushing
- // any prior commands.
- // IsExiting() only returns true after exit has flushed.
- {
- Lock::Locker lock(&pImpl->QueueLock);
- if (pImpl->ExitEnqueued)
- return;
- pImpl->ExitEnqueued = true;
- }
- PushCommand(ThreadCommandQueueImpl::ExitCommand(pImpl, wait));
- }
- bool ThreadCommandQueue::IsExiting() const
- {
- return pImpl->ExitProcessed;
- }
- } // namespace OVR
|