OVR_ThreadCommandQueue.cpp 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410
  1. /************************************************************************************
  2. PublicHeader: None
  3. Filename : OVR_ThreadCommandQueue.cpp
  4. Content : Command queue for operations executed on a thread
  5. Created : October 29, 2012
  6. Copyright : Copyright 2014 Oculus VR, LLC All Rights reserved.
  7. Licensed under the Oculus VR Rift SDK License Version 3.2 (the "License");
  8. you may not use the Oculus VR Rift SDK except in compliance with the License,
  9. which is provided at the time of installation or download, or which
  10. otherwise accompanies this software in either electronic or hard copy form.
  11. You may obtain a copy of the License at
  12. http://www.oculusvr.com/licenses/LICENSE-3.2
  13. Unless required by applicable law or agreed to in writing, the Oculus VR SDK
  14. distributed under the License is distributed on an "AS IS" BASIS,
  15. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  16. See the License for the specific language governing permissions and
  17. limitations under the License.
  18. ************************************************************************************/
  19. #include "OVR_ThreadCommandQueue.h"
  20. namespace OVR {
  21. //------------------------------------------------------------------------
  22. // ***** CircularBuffer
  23. // CircularBuffer is a FIFO buffer implemented in a single block of memory,
  24. // which allows writing and reading variable-size data chucks. Write fails
  25. // if buffer is full.
  26. class CircularBuffer
  27. {
  28. enum {
  29. AlignSize = 16,
  30. AlignMask = AlignSize - 1
  31. };
  32. uint8_t* pBuffer;
  33. size_t Size;
  34. size_t Tail; // Byte offset of next item to be popped.
  35. size_t Head; // Byte offset of where next push will take place.
  36. size_t End; // When Head < Tail, this is used instead of Size.
  37. inline size_t roundUpSize(size_t size)
  38. { return (size + AlignMask) & ~(size_t)AlignMask; }
  39. public:
  40. CircularBuffer(size_t size)
  41. : Size(size), Tail(0), Head(0), End(0)
  42. {
  43. pBuffer = (uint8_t*)OVR_ALLOC_ALIGNED(roundUpSize(size), AlignSize);
  44. }
  45. ~CircularBuffer()
  46. {
  47. // For ThreadCommands, we must consume everything before shutdown.
  48. OVR_ASSERT(IsEmpty());
  49. OVR_FREE_ALIGNED(pBuffer);
  50. }
  51. bool IsEmpty() const { return (Head == Tail); }
  52. // Allocates a state block of specified size and advances pointers,
  53. // returning 0 if buffer is full.
  54. uint8_t* Write(size_t size);
  55. // Returns a pointer to next available data block; 0 if none available.
  56. uint8_t* ReadBegin()
  57. { return (Head != Tail) ? (pBuffer + Tail) : 0; }
  58. // Consumes data of specified size; this must match size passed to Write.
  59. void ReadEnd(size_t size);
  60. };
  61. // Allocates a state block of specified size and advances pointers,
  62. // returning 0 if buffer is full.
  63. uint8_t* CircularBuffer::Write(size_t size)
  64. {
  65. uint8_t* p = 0;
  66. size = roundUpSize(size);
  67. // Since this is circular buffer, always allow at least one item.
  68. OVR_ASSERT(size < Size/2);
  69. if (Head >= Tail)
  70. {
  71. OVR_ASSERT(End == 0);
  72. if (size <= (Size - Head))
  73. {
  74. p = pBuffer + Head;
  75. Head += size;
  76. }
  77. else if (size < Tail)
  78. {
  79. p = pBuffer;
  80. End = Head;
  81. Head = size;
  82. OVR_ASSERT(Head != Tail);
  83. }
  84. }
  85. else
  86. {
  87. OVR_ASSERT(End != 0);
  88. if ((Tail - Head) > size)
  89. {
  90. p = pBuffer + Head;
  91. Head += size;
  92. OVR_ASSERT(Head != Tail);
  93. }
  94. }
  95. return p;
  96. }
  97. void CircularBuffer::ReadEnd(size_t size)
  98. {
  99. OVR_ASSERT(Head != Tail);
  100. size = roundUpSize(size);
  101. Tail += size;
  102. if (Tail == End)
  103. {
  104. Tail = End = 0;
  105. }
  106. else if (Tail == Head)
  107. {
  108. OVR_ASSERT(End == 0);
  109. Tail = Head = 0;
  110. }
  111. }
  112. //-------------------------------------------------------------------------------------
  113. // ***** ThreadCommand
  114. ThreadCommand::PopBuffer::~PopBuffer()
  115. {
  116. if (Size) {
  117. Destruct<ThreadCommand>(toCommand());
  118. }
  119. }
  120. void ThreadCommand::PopBuffer::InitFromBuffer(void* data)
  121. {
  122. ThreadCommand* cmd = (ThreadCommand*)data;
  123. if (Size) {
  124. Destruct<ThreadCommand>(toCommand());
  125. }
  126. Size = cmd->Size;
  127. if (Size > MaxSize)
  128. Size = MaxSize;
  129. memcpy(Buffer, (void*)cmd, Size);
  130. }
  131. void ThreadCommand::PopBuffer::Execute()
  132. {
  133. ThreadCommand* command = toCommand();
  134. OVR_ASSERT(command);
  135. if (command)
  136. {
  137. command->Execute();
  138. }
  139. if (NeedsWait()) {
  140. GetEvent()->PulseEvent();
  141. }
  142. }
  143. //-------------------------------------------------------------------------------------
  144. class ThreadCommandQueueImpl : public NewOverrideBase
  145. {
  146. typedef ThreadCommand::NotifyEvent NotifyEvent;
  147. friend class ThreadCommandQueue;
  148. public:
  149. ThreadCommandQueueImpl(ThreadCommandQueue* queue) :
  150. pQueue(queue),
  151. ExitEnqueued(false),
  152. ExitProcessed(false),
  153. CommandBuffer(2048),
  154. PullThreadId(0)
  155. {
  156. }
  157. ~ThreadCommandQueueImpl();
  158. bool PushCommand(const ThreadCommand& command);
  159. bool PopCommand(ThreadCommand::PopBuffer* popBuffer);
  160. // ExitCommand is used by notify us that Thread is shutting down.
  161. struct ExitCommand : public ThreadCommand
  162. {
  163. ThreadCommandQueueImpl* pImpl;
  164. ExitCommand(ThreadCommandQueueImpl* impl, bool wait)
  165. : ThreadCommand(sizeof(ExitCommand), wait, true), pImpl(impl) { }
  166. virtual void Execute() const
  167. {
  168. Lock::Locker lock(&pImpl->QueueLock);
  169. pImpl->ExitProcessed = true;
  170. }
  171. virtual ThreadCommand* CopyConstruct(void* p) const
  172. { return Construct<ExitCommand>(p, *this); }
  173. };
  174. NotifyEvent* AllocNotifyEvent_NTS()
  175. {
  176. NotifyEvent* p = AvailableEvents.GetFirst();
  177. if (!AvailableEvents.IsNull(p))
  178. p->RemoveNode();
  179. else
  180. p = new NotifyEvent;
  181. return p;
  182. }
  183. void FreeNotifyEvent_NTS(NotifyEvent* p)
  184. {
  185. AvailableEvents.PushBack(p);
  186. }
  187. void FreeNotifyEvents_NTS()
  188. {
  189. while(!AvailableEvents.IsEmpty())
  190. {
  191. NotifyEvent* p = AvailableEvents.GetFirst();
  192. p->RemoveNode();
  193. delete p;
  194. }
  195. }
  196. ThreadCommandQueue* pQueue;
  197. Lock QueueLock;
  198. volatile bool ExitEnqueued;
  199. volatile bool ExitProcessed;
  200. List<NotifyEvent> AvailableEvents;
  201. List<NotifyEvent> BlockedProducers;
  202. CircularBuffer CommandBuffer;
  203. // The pull thread id is set to the last thread that pulled commands.
  204. // Since this thread command queue is designed for a single thread,
  205. // reentrant behavior that would cause a dead-lock for messages that
  206. // wait for completion can be avoided by simply comparing the
  207. // thread id of the last pull.
  208. OVR::ThreadId PullThreadId;
  209. };
  210. ThreadCommandQueueImpl::~ThreadCommandQueueImpl()
  211. {
  212. Lock::Locker lock(&QueueLock);
  213. OVR_ASSERT(BlockedProducers.IsEmpty());
  214. FreeNotifyEvents_NTS();
  215. }
  216. bool ThreadCommandQueueImpl::PushCommand(const ThreadCommand& command)
  217. {
  218. if (command.NeedsWait() && PullThreadId == OVR::GetCurrentThreadId())
  219. {
  220. command.Execute();
  221. return true;
  222. }
  223. ThreadCommand::NotifyEvent* completeEvent = 0;
  224. ThreadCommand::NotifyEvent* queueAvailableEvent = 0;
  225. // Repeat writing command into buffer until it is available.
  226. for (;;) {
  227. { // Lock Scope
  228. Lock::Locker lock(&QueueLock);
  229. if (queueAvailableEvent) {
  230. FreeNotifyEvent_NTS(queueAvailableEvent);
  231. queueAvailableEvent = 0;
  232. }
  233. // Don't allow any commands after PushExitCommand() is called.
  234. if (ExitEnqueued && !command.ExitFlag) {
  235. return false;
  236. }
  237. bool bufferWasEmpty = CommandBuffer.IsEmpty();
  238. uint8_t* buffer = CommandBuffer.Write(command.GetSize());
  239. if (buffer) {
  240. ThreadCommand* c = command.CopyConstruct(buffer);
  241. if (c->NeedsWait()) {
  242. completeEvent = c->pEvent = AllocNotifyEvent_NTS();
  243. }
  244. // Signal-waker consumer when we add data to buffer.
  245. if (bufferWasEmpty) {
  246. pQueue->OnPushNonEmpty_Locked();
  247. }
  248. break;
  249. }
  250. queueAvailableEvent = AllocNotifyEvent_NTS();
  251. BlockedProducers.PushBack(queueAvailableEvent);
  252. } // Lock Scope
  253. queueAvailableEvent->Wait();
  254. } // Intentional infinite loop
  255. // Command was enqueued, wait if necessary.
  256. if (completeEvent) {
  257. completeEvent->Wait();
  258. Lock::Locker lock(&QueueLock);
  259. FreeNotifyEvent_NTS(completeEvent);
  260. }
  261. return true;
  262. }
  263. // Pops the next command from the thread queue, if any is available.
  264. bool ThreadCommandQueueImpl::PopCommand(ThreadCommand::PopBuffer* popBuffer)
  265. {
  266. // We do not write to this variable unless we are changing it.
  267. // This ensures it is read-only after the first call to PopCommand().
  268. if (PullThreadId != OVR::GetCurrentThreadId())
  269. {
  270. PullThreadId = OVR::GetCurrentThreadId();
  271. }
  272. Lock::Locker lock(&QueueLock);
  273. uint8_t* buffer = CommandBuffer.ReadBegin();
  274. if (!buffer)
  275. {
  276. // Notify thread while in lock scope, enabling initialization of wait.
  277. pQueue->OnPopEmpty_Locked();
  278. return false;
  279. }
  280. popBuffer->InitFromBuffer(buffer);
  281. CommandBuffer.ReadEnd(popBuffer->GetSize());
  282. if (!BlockedProducers.IsEmpty())
  283. {
  284. ThreadCommand::NotifyEvent* queueAvailableEvent = BlockedProducers.GetFirst();
  285. queueAvailableEvent->RemoveNode();
  286. queueAvailableEvent->PulseEvent();
  287. // Event is freed later by waiter.
  288. }
  289. return true;
  290. }
  291. //-------------------------------------------------------------------------------------
  292. ThreadCommandQueue::ThreadCommandQueue()
  293. {
  294. pImpl = new ThreadCommandQueueImpl(this);
  295. }
  296. ThreadCommandQueue::~ThreadCommandQueue()
  297. {
  298. delete pImpl;
  299. }
  300. bool ThreadCommandQueue::PushCommand(const ThreadCommand& command)
  301. {
  302. return pImpl->PushCommand(command);
  303. }
  304. bool ThreadCommandQueue::PopCommand(ThreadCommand::PopBuffer* popBuffer)
  305. {
  306. return pImpl->PopCommand(popBuffer);
  307. }
  308. void ThreadCommandQueue::PushExitCommand(bool wait)
  309. {
  310. // Exit is processed in two stages:
  311. // - First, ExitEnqueued flag is set to block further commands from queuing up.
  312. // - Second, the actual exit call is processed on the consumer thread, flushing
  313. // any prior commands.
  314. // IsExiting() only returns true after exit has flushed.
  315. {
  316. Lock::Locker lock(&pImpl->QueueLock);
  317. if (pImpl->ExitEnqueued)
  318. return;
  319. pImpl->ExitEnqueued = true;
  320. }
  321. PushCommand(ThreadCommandQueueImpl::ExitCommand(pImpl, wait));
  322. }
  323. bool ThreadCommandQueue::IsExiting() const
  324. {
  325. return pImpl->ExitProcessed;
  326. }
  327. } // namespace OVR