SchedulerTests.cpp 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431
  1. /*
  2. * Copyright (c) Contributors to the Open 3D Engine Project.
  3. * For complete copyright and license terms please see the LICENSE at the root of this distribution.
  4. *
  5. * SPDX-License-Identifier: Apache-2.0 OR MIT
  6. *
  7. */
  8. #include <AzCore/Casting/lossy_cast.h>
  9. #include <AzCore/IO/Streamer/Streamer.h>
  10. #include <AzCore/IO/Streamer/Scheduler.h>
  11. #include <AzCore/IO/Streamer/FileRequest.h>
  12. #include <AzCore/std/parallel/atomic.h>
  13. #include <AzCore/std/parallel/binary_semaphore.h>
  14. #include <AzCore/std/smart_ptr/make_shared.h>
  15. #include <AzCore/std/smart_ptr/unique_ptr.h>
  16. #include <AzCore/UnitTest/TestTypes.h>
  17. #include <Tests/Streamer/IStreamerTypesMock.h>
  18. #include <Tests/Streamer/StreamStackEntryMock.h>
  19. namespace AZ::IO
  20. {
  21. class Streamer_SchedulerTest
  22. : public UnitTest::LeakDetectionFixture
  23. {
  24. protected:
  25. StreamerContext* m_streamerContext{ nullptr };
  26. public:
  27. void SetUp() override
  28. {
  29. using ::testing::_;
  30. using ::testing::AnyNumber;
  31. UnitTest::LeakDetectionFixture::SetUp();
  32. m_mock = AZStd::make_shared<StreamStackEntryMock>();
  33. ON_CALL(*m_mock, PrepareRequest(_)).WillByDefault([this](FileRequest* request) { m_mock->ForwardPrepareRequest(request); });
  34. ON_CALL(*m_mock, QueueRequest(_)).WillByDefault([this](FileRequest* request) { m_mock->ForwardQueueRequest(request); });
  35. ON_CALL(*m_mock, UpdateStatus(_)).WillByDefault([this](StreamStackEntry::Status& status)
  36. {
  37. status.m_numAvailableSlots = 1;
  38. status.m_isIdle = m_isStackIdle;
  39. });
  40. // Expectation needs to be set before the Scheduler thread is started otherwise it may or may not hit before it's set in
  41. // a test.
  42. EXPECT_CALL(*m_mock, SetContext(_))
  43. .Times(1)
  44. .WillOnce([this](StreamerContext& context)
  45. {
  46. m_streamerContext = &context;
  47. m_mock->ForwardSetContext(context);
  48. });
  49. EXPECT_CALL(*m_mock, UpdateStatus(_)).Times(AnyNumber());
  50. auto isIdle = m_isStackIdle.load();
  51. m_isStackIdle = true;
  52. m_streamer = aznew IO::Streamer(AZStd::thread_desc{}, AZStd::make_unique<Scheduler>(m_mock));
  53. m_isStackIdle = isIdle;
  54. Interface<IO::IStreamer>::Register(m_streamer);
  55. }
  56. void TearDown() override
  57. {
  58. m_isStackIdle = true;
  59. if (m_streamer)
  60. {
  61. Interface<IO::IStreamer>::Unregister(m_streamer);
  62. delete m_streamer;
  63. m_streamer = nullptr;
  64. }
  65. m_mock.reset();
  66. UnitTest::LeakDetectionFixture::TearDown();
  67. }
  68. void MockForRead()
  69. {
  70. using ::testing::_;
  71. using ::testing::AtLeast;
  72. EXPECT_CALL(*m_mock, UpdateStatus(_)).Times(AtLeast(1));
  73. EXPECT_CALL(*m_mock, UpdateCompletionEstimates(_, _, _, _)).Times(AtLeast(1));
  74. EXPECT_CALL(*m_mock, PrepareRequest(_))
  75. .Times(1)
  76. .WillOnce([this](FileRequest* request)
  77. {
  78. AZ_Assert(m_streamerContext, "AZ::IO::Streamer is not ready to process requests.");
  79. auto readData = AZStd::get_if<Requests::ReadRequestData>(&request->GetCommand());
  80. AZ_Assert(readData, "Test didn't pass in the correct request.");
  81. FileRequest* read = m_streamerContext->GetNewInternalRequest();
  82. read->CreateRead(request, readData->m_output, readData->m_outputSize, readData->m_path,
  83. readData->m_offset, readData->m_size);
  84. m_streamerContext->PushPreparedRequest(read);
  85. });
  86. EXPECT_CALL(*m_mock, ExecuteRequests()).Times(AtLeast(1));
  87. EXPECT_CALL(*m_mock, QueueRequest(_))
  88. .Times(1)
  89. .WillOnce([this](FileRequest* request)
  90. {
  91. AZ_Assert(m_streamerContext, "AZ::IO::Streamer is not ready to process requests.");
  92. auto readData = AZStd::get_if<Requests::ReadData>(&request->GetCommand());
  93. AZ_Assert(readData, "Test didn't pass in the correct request.");
  94. auto output = reinterpret_cast<uint8_t*>(readData->m_output);
  95. AZ_Assert(output != nullptr, "Output buffer has not been set.");
  96. for (size_t i = 0; i < readData->m_size; ++i)
  97. {
  98. output[i] = azlossy_cast<uint8_t>(readData->m_offset + i);
  99. }
  100. request->SetStatus(IStreamerTypes::RequestStatus::Completed);
  101. m_streamerContext->MarkRequestAsCompleted(request);
  102. });
  103. }
  104. void MockAllocatorForUnclaimedMemory(IStreamerTypes::RequestMemoryAllocatorMock& mock, AZStd::binary_semaphore& sync)
  105. {
  106. using ::testing::_;
  107. EXPECT_CALL(mock, LockAllocator()).Times(1);
  108. EXPECT_CALL(mock, UnlockAllocator())
  109. .Times(1)
  110. .WillOnce([&sync]()
  111. {
  112. sync.release();
  113. });
  114. EXPECT_CALL(mock, Allocate(_, _, _))
  115. .Times(1)
  116. .WillOnce([&mock](size_t minimalSize, size_t recommendedSize, size_t alignment)
  117. {
  118. return mock.ForwardAllocate(minimalSize, recommendedSize, alignment);
  119. });
  120. EXPECT_CALL(mock, Release(_))
  121. .Times(1)
  122. .WillOnce([&mock](void* address)
  123. {
  124. mock.ForwardRelease(address);
  125. });
  126. }
  127. void MockAllocatorForClaimedMemory(IStreamerTypes::RequestMemoryAllocatorMock& mock)
  128. {
  129. using ::testing::_;
  130. EXPECT_CALL(mock, LockAllocator()).Times(1);
  131. EXPECT_CALL(mock, UnlockAllocator()).Times(1);
  132. EXPECT_CALL(mock, Allocate(_, _, _))
  133. .Times(1)
  134. .WillOnce([&mock](size_t minimalSize, size_t recommendedSize, size_t alignment)
  135. {
  136. return mock.ForwardAllocate(minimalSize, recommendedSize, alignment);
  137. });
  138. EXPECT_CALL(mock, Release(_)).Times(0);
  139. }
  140. protected:
  141. // Using Streamer to interact with the Scheduler as not all functionality
  142. // is publicly exposed. Since Streamer is mostly the threaded front end for
  143. // the Scheduler, this is fine.
  144. Streamer* m_streamer{ nullptr };
  145. AZStd::shared_ptr<StreamStackEntryMock> m_mock;
  146. AZStd::atomic_bool m_isStackIdle = false;
  147. };
  148. TEST_F(Streamer_SchedulerTest, QueueNextRequest_QueueUnclaimedFireAndForgetReadWithAllocator_AllocatorCalledAndMemoryFreedAgain)
  149. {
  150. using ::testing::_;
  151. using ::testing::AtLeast;
  152. using ::testing::Return;
  153. MockForRead();
  154. AZStd::binary_semaphore allocatorSync;
  155. IStreamerTypes::RequestMemoryAllocatorMock allocatorMock;
  156. MockAllocatorForUnclaimedMemory(allocatorMock, allocatorSync);
  157. AZStd::binary_semaphore readSync;
  158. auto wait = [&readSync](FileRequestHandle)
  159. {
  160. readSync.release();
  161. };
  162. // Scoped to simulate a fire-and-forget request which should cause the request to be deleted after
  163. // completion and free the allocated memory.
  164. {
  165. FileRequestPtr read = m_streamer->Read("TestPath", allocatorMock, 8);
  166. m_streamer->SetRequestCompleteCallback(read, wait);
  167. m_streamer->QueueRequest(read);
  168. }
  169. ASSERT_TRUE(readSync.try_acquire_for(AZStd::chrono::seconds(5)));
  170. ASSERT_TRUE(allocatorSync.try_acquire_for(AZStd::chrono::seconds(5)));
  171. }
  172. TEST_F(Streamer_SchedulerTest, QueueNextRequest_QueueUnclaimedReadWithAllocator_AllocatorCalledAndMemoryFreedAgain)
  173. {
  174. using ::testing::_;
  175. using ::testing::AtLeast;
  176. using ::testing::Return;
  177. MockForRead();
  178. AZStd::binary_semaphore allocatorSync;
  179. IStreamerTypes::RequestMemoryAllocatorMock allocatorMock;
  180. MockAllocatorForUnclaimedMemory(allocatorMock, allocatorSync);
  181. AZStd::binary_semaphore readSync;
  182. auto wait = [&readSync](FileRequestHandle)
  183. {
  184. readSync.release();
  185. };
  186. // Scoped so the request goes out to scope before ending the test. This should trigger the
  187. // memory release on this thread.
  188. {
  189. FileRequestPtr read = m_streamer->Read("TestPath", allocatorMock, 8);
  190. m_streamer->SetRequestCompleteCallback(read, wait);
  191. m_streamer->QueueRequest(read);
  192. ASSERT_TRUE(readSync.try_acquire_for(AZStd::chrono::seconds(5)));
  193. }
  194. ASSERT_TRUE(allocatorSync.try_acquire_for(AZStd::chrono::seconds(5)));
  195. }
  196. TEST_F(Streamer_SchedulerTest, QueueNextRequest_QueueClaimedReadWithAllocator_AllocatorCalledAndMemoryFreedAgain)
  197. {
  198. using ::testing::_;
  199. using ::testing::AtLeast;
  200. using ::testing::Return;
  201. MockForRead();
  202. IStreamerTypes::RequestMemoryAllocatorMock allocatorMock;
  203. MockAllocatorForClaimedMemory(allocatorMock);
  204. AZStd::binary_semaphore readSync;
  205. auto wait = [&readSync](FileRequestHandle)
  206. {
  207. readSync.release();
  208. };
  209. FileRequestPtr read = m_streamer->Read("TestPath", allocatorMock, 8);
  210. m_streamer->SetRequestCompleteCallback(read, wait);
  211. m_streamer->QueueRequest(read);
  212. ASSERT_TRUE(readSync.try_acquire_for(AZStd::chrono::seconds(5)));
  213. void* buffer = nullptr;
  214. u64 readSize = 0;
  215. EXPECT_TRUE(m_streamer->GetReadRequestResult(read, buffer, readSize, IStreamerTypes::ClaimMemory::Yes));
  216. ASSERT_NE(nullptr, buffer);
  217. allocatorMock.ForwardRelease(buffer);
  218. }
  219. TEST_F(Streamer_SchedulerTest, ProcessCancelRequest_CancelReadRequest_MockDoesNotReceiveReadRequest)
  220. {
  221. using ::testing::_;
  222. using ::testing::AtLeast;
  223. using ::testing::Return;
  224. EXPECT_CALL(*m_mock, UpdateStatus(_)).Times(AtLeast(1));
  225. EXPECT_CALL(*m_mock, UpdateCompletionEstimates(_, _, _, _)).Times(AtLeast(1));
  226. EXPECT_CALL(*m_mock, PrepareRequest(_)).Times(AtLeast(1));
  227. EXPECT_CALL(*m_mock, ExecuteRequests()).Times(AtLeast(1));
  228. EXPECT_CALL(*m_mock, QueueRequest(_)).Times(1);
  229. AZStd::atomic_int counter = 2;
  230. AZStd::binary_semaphore sync;
  231. auto wait = [&sync, &counter](FileRequestHandle)
  232. {
  233. if (--counter == 0)
  234. {
  235. sync.release();
  236. }
  237. };
  238. char fakeBuffer[8];
  239. FileRequestPtr read = m_streamer->Read("TestPath", fakeBuffer, sizeof(fakeBuffer), 8);
  240. FileRequestPtr cancel = m_streamer->Cancel(read);
  241. m_streamer->SetRequestCompleteCallback(read, wait);
  242. m_streamer->SetRequestCompleteCallback(cancel, wait);
  243. m_streamer->SuspendProcessing();
  244. m_streamer->QueueRequest(read);
  245. m_streamer->QueueRequest(cancel);
  246. m_streamer->ResumeProcessing();
  247. ASSERT_TRUE(sync.try_acquire_for(AZStd::chrono::seconds(5)));
  248. EXPECT_EQ(IStreamerTypes::RequestStatus::Completed, m_streamer->GetRequestStatus(cancel));
  249. EXPECT_EQ(IStreamerTypes::RequestStatus::Canceled, m_streamer->GetRequestStatus(read));
  250. }
  251. TEST_F(Streamer_SchedulerTest, Reschedule_SetNewDeadlineAndPriority_ReadRequestInMockHasUpdatedTime)
  252. {
  253. using ::testing::_;
  254. using ::testing::AtLeast;
  255. using ::testing::Invoke;
  256. using ::testing::Return;
  257. EXPECT_CALL(*m_mock, UpdateStatus(_)).Times(AtLeast(1));
  258. EXPECT_CALL(*m_mock, UpdateCompletionEstimates(_, _, _, _)).Times(AtLeast(1));
  259. EXPECT_CALL(*m_mock, PrepareRequest(_)).Times(AtLeast(1));
  260. EXPECT_CALL(*m_mock, ExecuteRequests()).Times(AtLeast(1));
  261. EXPECT_CALL(*m_mock, QueueRequest(_)).Times(1)
  262. .WillOnce(Invoke([this](FileRequest* request)
  263. {
  264. auto* read = request->GetCommandFromChain<Requests::ReadRequestData>();
  265. ASSERT_NE(nullptr, read);
  266. EXPECT_LT(read->m_deadline, FileRequest::s_noDeadlineTime);
  267. EXPECT_EQ(read->m_priority, IStreamerTypes::s_priorityHighest);
  268. m_mock->ForwardQueueRequest(request);
  269. }));
  270. AZStd::atomic_int counter = 2;
  271. AZStd::binary_semaphore sync;
  272. auto wait = [&sync, &counter](FileRequestHandle)
  273. {
  274. if (--counter == 0)
  275. {
  276. sync.release();
  277. }
  278. };
  279. char fakeBuffer[8];
  280. FileRequestPtr read = m_streamer->Read("TestPath", fakeBuffer, sizeof(fakeBuffer), 8,
  281. IStreamerTypes::s_noDeadline, IStreamerTypes::s_priorityMedium);
  282. FileRequestPtr reschedule = m_streamer->RescheduleRequest(read, IStreamerTypes::s_deadlineNow, IStreamerTypes::s_priorityHighest);
  283. m_streamer->SetRequestCompleteCallback(read, wait);
  284. m_streamer->SetRequestCompleteCallback(reschedule, wait);
  285. m_streamer->SuspendProcessing();
  286. m_streamer->QueueRequest(read);
  287. m_streamer->QueueRequest(reschedule);
  288. m_streamer->ResumeProcessing();
  289. ASSERT_TRUE(sync.try_acquire_for(AZStd::chrono::seconds(5)));
  290. EXPECT_EQ(IStreamerTypes::RequestStatus::Completed, m_streamer->GetRequestStatus(reschedule));
  291. }
  292. TEST_F(Streamer_SchedulerTest, ProcessTillIdle_ShutDownIsDelayedUntilIdle_SchedulerThreadDoesNotImmediatelyShutDown)
  293. {
  294. using::testing::_;
  295. using ::testing::AnyNumber;
  296. using ::testing::Invoke;
  297. constexpr static size_t Iterations = 16;
  298. AZStd::atomic_int counter{ 0 };
  299. EXPECT_CALL(*m_mock, UpdateStatus(_)).Times(AnyNumber());
  300. EXPECT_CALL(*m_mock, UpdateCompletionEstimates(_, _, _, _)).Times(AnyNumber());
  301. // Pretend to be busy [Iterations] times, then set the status to idle so the Scheduler thread can exit.
  302. EXPECT_CALL(*m_mock, ExecuteRequests())
  303. .Times(Iterations + 1)
  304. .WillRepeatedly(Invoke([this, &counter]()
  305. {
  306. if (counter++ >= Iterations)
  307. {
  308. m_isStackIdle = true;
  309. return false;
  310. }
  311. else
  312. {
  313. AZStd::this_thread::sleep_for(AZStd::chrono::milliseconds(1));
  314. return true;
  315. }
  316. }));
  317. if (m_streamer)
  318. {
  319. Interface<IO::IStreamer>::Unregister(m_streamer);
  320. delete m_streamer;
  321. m_streamer = nullptr;
  322. }
  323. EXPECT_EQ(Iterations + 1, counter);
  324. }
  325. TEST_F(Streamer_SchedulerTest, RequestSorting)
  326. {
  327. //////////////////////////////////////////////////////////////
  328. // Test equal priority requests that are past their deadlines (aka panic)
  329. //////////////////////////////////////////////////////////////
  330. IStreamerTypes::Deadline panicDeadline(IStreamerTypes::Deadline::min());
  331. auto estimatedCompleteTime = AZStd::chrono::steady_clock::now();
  332. char fakeBuffer[8];
  333. FileRequestPtr panicRequest = m_streamer->Read("PanicRequest", fakeBuffer, sizeof(fakeBuffer), 8, panicDeadline);
  334. panicRequest->m_request.SetEstimatedCompletion(estimatedCompleteTime);
  335. // Passed deadline, same object (same pointer)
  336. EXPECT_EQ(
  337. m_streamer->m_streamStack->Thread_PrioritizeRequests(&panicRequest->m_request, &panicRequest->m_request),
  338. Scheduler::Order::Equal);
  339. // Passed deadline, different object
  340. FileRequestPtr panicRequest2 = m_streamer->Read("PanicRequest2", fakeBuffer, sizeof(fakeBuffer), 8, panicDeadline);
  341. panicRequest2->m_request.SetEstimatedCompletion(estimatedCompleteTime);
  342. EXPECT_EQ(
  343. m_streamer->m_streamStack->Thread_PrioritizeRequests(&panicRequest->m_request, &panicRequest2->m_request),
  344. Scheduler::Order::Equal);
  345. //////////////////////////////////////////////////////////////
  346. // Test equal priority requests that are both reading the same file
  347. //////////////////////////////////////////////////////////////
  348. FileRequestPtr readRequest = m_streamer->Read("SameFile", fakeBuffer, sizeof(fakeBuffer), 8, panicDeadline);
  349. FileRequestPtr sameFileRequest = m_streamer->CreateRequest();
  350. sameFileRequest->m_request.CreateRead(&sameFileRequest->m_request, fakeBuffer, 8, RequestPath(), 0, 8);
  351. sameFileRequest->m_request.m_parent = &readRequest->m_request;
  352. sameFileRequest->m_request.m_dependencies = 0;
  353. // Same file read, same object (same pointer)
  354. EXPECT_EQ(
  355. m_streamer->m_streamStack->Thread_PrioritizeRequests(&sameFileRequest->m_request, &sameFileRequest->m_request),
  356. Scheduler::Order::Equal);
  357. FileRequestPtr readRequest2 = m_streamer->Read("SameFile2", fakeBuffer, sizeof(fakeBuffer), 8, panicDeadline);
  358. FileRequestPtr sameFileRequest2 = m_streamer->CreateRequest();
  359. sameFileRequest2->m_request.CreateRead(&sameFileRequest2->m_request, fakeBuffer, 8, RequestPath(), 0, 8);
  360. sameFileRequest2->m_request.m_parent = &readRequest2->m_request;
  361. sameFileRequest2->m_request.m_dependencies = 0;
  362. // Same file read, different objects
  363. EXPECT_EQ(
  364. m_streamer->m_streamStack->Thread_PrioritizeRequests(&sameFileRequest->m_request, &sameFileRequest2->m_request),
  365. Scheduler::Order::Equal);
  366. }
  367. } // namespace AZ::IO