SchedulerTests.cpp 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433
  1. /*
  2. * Copyright (c) Contributors to the Open 3D Engine Project.
  3. * For complete copyright and license terms please see the LICENSE at the root of this distribution.
  4. *
  5. * SPDX-License-Identifier: Apache-2.0 OR MIT
  6. *
  7. */
  8. #include <AzCore/Casting/lossy_cast.h>
  9. #include <AzCore/IO/Streamer/Streamer.h>
  10. #include <AzCore/IO/Streamer/Scheduler.h>
  11. #include <AzCore/IO/Streamer/FileRequest.h>
  12. #include <AzCore/std/parallel/atomic.h>
  13. #include <AzCore/std/parallel/binary_semaphore.h>
  14. #include <AzCore/std/smart_ptr/make_shared.h>
  15. #include <AzCore/std/smart_ptr/unique_ptr.h>
  16. #include <AzCore/UnitTest/TestTypes.h>
  17. #include <Tests/Streamer/IStreamerTypesMock.h>
  18. #include <Tests/Streamer/StreamStackEntryMock.h>
  19. namespace AZ::IO
  20. {
  21. class Streamer_SchedulerTest
  22. : public UnitTest::LeakDetectionFixture
  23. {
  24. protected:
  25. StreamerContext* m_streamerContext{ nullptr };
  26. public:
  27. void SetUp() override
  28. {
  29. using ::testing::_;
  30. using ::testing::AnyNumber;
  31. UnitTest::LeakDetectionFixture::SetUp();
  32. // a regular mock warns every time functions are called without being expected
  33. // a NiceMock only pays attention to calls you tell it to pay attention to.
  34. m_mock = AZStd::make_shared<testing::NiceMock<StreamStackEntryMock>>();
  35. ON_CALL(*m_mock, PrepareRequest(_)).WillByDefault([this](FileRequest* request) { m_mock->ForwardPrepareRequest(request); });
  36. ON_CALL(*m_mock, QueueRequest(_)).WillByDefault([this](FileRequest* request) { m_mock->ForwardQueueRequest(request); });
  37. ON_CALL(*m_mock, UpdateStatus(_)).WillByDefault([this](StreamStackEntry::Status& status)
  38. {
  39. status.m_numAvailableSlots = 1;
  40. status.m_isIdle = m_isStackIdle;
  41. });
  42. // Expectation needs to be set before the Scheduler thread is started otherwise it may or may not hit before it's set in
  43. // a test.
  44. EXPECT_CALL(*m_mock, SetContext(_))
  45. .Times(1)
  46. .WillOnce([this](StreamerContext& context)
  47. {
  48. m_streamerContext = &context;
  49. m_mock->ForwardSetContext(context);
  50. });
  51. EXPECT_CALL(*m_mock, UpdateStatus(_)).Times(AnyNumber());
  52. auto isIdle = m_isStackIdle.load();
  53. m_isStackIdle = true;
  54. m_streamer = aznew IO::Streamer(AZStd::thread_desc{}, AZStd::make_unique<Scheduler>(m_mock));
  55. m_isStackIdle = isIdle;
  56. Interface<IO::IStreamer>::Register(m_streamer);
  57. }
  58. void TearDown() override
  59. {
  60. m_isStackIdle = true;
  61. if (m_streamer)
  62. {
  63. Interface<IO::IStreamer>::Unregister(m_streamer);
  64. delete m_streamer;
  65. m_streamer = nullptr;
  66. }
  67. m_mock.reset();
  68. UnitTest::LeakDetectionFixture::TearDown();
  69. }
  70. void MockForRead()
  71. {
  72. using ::testing::_;
  73. using ::testing::AtLeast;
  74. EXPECT_CALL(*m_mock, UpdateStatus(_)).Times(AtLeast(1));
  75. EXPECT_CALL(*m_mock, UpdateCompletionEstimates(_, _, _, _)).Times(AtLeast(1));
  76. EXPECT_CALL(*m_mock, PrepareRequest(_))
  77. .Times(1)
  78. .WillOnce([this](FileRequest* request)
  79. {
  80. AZ_Assert(m_streamerContext, "AZ::IO::Streamer is not ready to process requests.");
  81. auto readData = AZStd::get_if<Requests::ReadRequestData>(&request->GetCommand());
  82. AZ_Assert(readData, "Test didn't pass in the correct request.");
  83. FileRequest* read = m_streamerContext->GetNewInternalRequest();
  84. read->CreateRead(request, readData->m_output, readData->m_outputSize, readData->m_path,
  85. readData->m_offset, readData->m_size);
  86. m_streamerContext->PushPreparedRequest(read);
  87. });
  88. EXPECT_CALL(*m_mock, ExecuteRequests()).Times(AtLeast(1));
  89. EXPECT_CALL(*m_mock, QueueRequest(_))
  90. .Times(1)
  91. .WillOnce([this](FileRequest* request)
  92. {
  93. AZ_Assert(m_streamerContext, "AZ::IO::Streamer is not ready to process requests.");
  94. auto readData = AZStd::get_if<Requests::ReadData>(&request->GetCommand());
  95. AZ_Assert(readData, "Test didn't pass in the correct request.");
  96. auto output = reinterpret_cast<uint8_t*>(readData->m_output);
  97. AZ_Assert(output != nullptr, "Output buffer has not been set.");
  98. for (size_t i = 0; i < readData->m_size; ++i)
  99. {
  100. output[i] = azlossy_cast<uint8_t>(readData->m_offset + i);
  101. }
  102. request->SetStatus(IStreamerTypes::RequestStatus::Completed);
  103. m_streamerContext->MarkRequestAsCompleted(request);
  104. });
  105. }
  106. void MockAllocatorForUnclaimedMemory(IStreamerTypes::RequestMemoryAllocatorMock& mock, AZStd::binary_semaphore& sync)
  107. {
  108. using ::testing::_;
  109. EXPECT_CALL(mock, LockAllocator()).Times(1);
  110. EXPECT_CALL(mock, UnlockAllocator())
  111. .Times(1)
  112. .WillOnce([&sync]()
  113. {
  114. sync.release();
  115. });
  116. EXPECT_CALL(mock, Allocate(_, _, _))
  117. .Times(1)
  118. .WillOnce([&mock](size_t minimalSize, size_t recommendedSize, size_t alignment)
  119. {
  120. return mock.ForwardAllocate(minimalSize, recommendedSize, alignment);
  121. });
  122. EXPECT_CALL(mock, Release(_))
  123. .Times(1)
  124. .WillOnce([&mock](void* address)
  125. {
  126. mock.ForwardRelease(address);
  127. });
  128. }
  129. void MockAllocatorForClaimedMemory(IStreamerTypes::RequestMemoryAllocatorMock& mock)
  130. {
  131. using ::testing::_;
  132. EXPECT_CALL(mock, LockAllocator()).Times(1);
  133. EXPECT_CALL(mock, UnlockAllocator()).Times(1);
  134. EXPECT_CALL(mock, Allocate(_, _, _))
  135. .Times(1)
  136. .WillOnce([&mock](size_t minimalSize, size_t recommendedSize, size_t alignment)
  137. {
  138. return mock.ForwardAllocate(minimalSize, recommendedSize, alignment);
  139. });
  140. EXPECT_CALL(mock, Release(_)).Times(0);
  141. }
  142. protected:
  143. // Using Streamer to interact with the Scheduler as not all functionality
  144. // is publicly exposed. Since Streamer is mostly the threaded front end for
  145. // the Scheduler, this is fine.
  146. Streamer* m_streamer{ nullptr };
  147. AZStd::shared_ptr<testing::NiceMock<StreamStackEntryMock>> m_mock;
  148. AZStd::atomic_bool m_isStackIdle = false;
  149. };
  150. TEST_F(Streamer_SchedulerTest, QueueNextRequest_QueueUnclaimedFireAndForgetReadWithAllocator_AllocatorCalledAndMemoryFreedAgain)
  151. {
  152. using ::testing::_;
  153. using ::testing::AtLeast;
  154. using ::testing::Return;
  155. MockForRead();
  156. AZStd::binary_semaphore allocatorSync;
  157. IStreamerTypes::RequestMemoryAllocatorMock allocatorMock;
  158. MockAllocatorForUnclaimedMemory(allocatorMock, allocatorSync);
  159. AZStd::binary_semaphore readSync;
  160. auto wait = [&readSync](FileRequestHandle)
  161. {
  162. readSync.release();
  163. };
  164. // Scoped to simulate a fire-and-forget request which should cause the request to be deleted after
  165. // completion and free the allocated memory.
  166. {
  167. FileRequestPtr read = m_streamer->Read("TestPath", allocatorMock, 8);
  168. m_streamer->SetRequestCompleteCallback(read, wait);
  169. m_streamer->QueueRequest(read);
  170. }
  171. ASSERT_TRUE(readSync.try_acquire_for(AZStd::chrono::seconds(5)));
  172. ASSERT_TRUE(allocatorSync.try_acquire_for(AZStd::chrono::seconds(5)));
  173. }
  174. TEST_F(Streamer_SchedulerTest, QueueNextRequest_QueueUnclaimedReadWithAllocator_AllocatorCalledAndMemoryFreedAgain)
  175. {
  176. using ::testing::_;
  177. using ::testing::AtLeast;
  178. using ::testing::Return;
  179. MockForRead();
  180. AZStd::binary_semaphore allocatorSync;
  181. IStreamerTypes::RequestMemoryAllocatorMock allocatorMock;
  182. MockAllocatorForUnclaimedMemory(allocatorMock, allocatorSync);
  183. AZStd::binary_semaphore readSync;
  184. auto wait = [&readSync](FileRequestHandle)
  185. {
  186. readSync.release();
  187. };
  188. // Scoped so the request goes out to scope before ending the test. This should trigger the
  189. // memory release on this thread.
  190. {
  191. FileRequestPtr read = m_streamer->Read("TestPath", allocatorMock, 8);
  192. m_streamer->SetRequestCompleteCallback(read, wait);
  193. m_streamer->QueueRequest(read);
  194. ASSERT_TRUE(readSync.try_acquire_for(AZStd::chrono::seconds(5)));
  195. }
  196. ASSERT_TRUE(allocatorSync.try_acquire_for(AZStd::chrono::seconds(5)));
  197. }
  198. TEST_F(Streamer_SchedulerTest, QueueNextRequest_QueueClaimedReadWithAllocator_AllocatorCalledAndMemoryFreedAgain)
  199. {
  200. using ::testing::_;
  201. using ::testing::AtLeast;
  202. using ::testing::Return;
  203. MockForRead();
  204. IStreamerTypes::RequestMemoryAllocatorMock allocatorMock;
  205. MockAllocatorForClaimedMemory(allocatorMock);
  206. AZStd::binary_semaphore readSync;
  207. auto wait = [&readSync](FileRequestHandle)
  208. {
  209. readSync.release();
  210. };
  211. FileRequestPtr read = m_streamer->Read("TestPath", allocatorMock, 8);
  212. m_streamer->SetRequestCompleteCallback(read, wait);
  213. m_streamer->QueueRequest(read);
  214. ASSERT_TRUE(readSync.try_acquire_for(AZStd::chrono::seconds(5)));
  215. void* buffer = nullptr;
  216. u64 readSize = 0;
  217. EXPECT_TRUE(m_streamer->GetReadRequestResult(read, buffer, readSize, IStreamerTypes::ClaimMemory::Yes));
  218. ASSERT_NE(nullptr, buffer);
  219. allocatorMock.ForwardRelease(buffer);
  220. }
  221. TEST_F(Streamer_SchedulerTest, ProcessCancelRequest_CancelReadRequest_MockDoesNotReceiveReadRequest)
  222. {
  223. using ::testing::_;
  224. using ::testing::AtLeast;
  225. using ::testing::Return;
  226. EXPECT_CALL(*m_mock, UpdateStatus(_)).Times(AtLeast(1));
  227. EXPECT_CALL(*m_mock, UpdateCompletionEstimates(_, _, _, _)).Times(AtLeast(1));
  228. EXPECT_CALL(*m_mock, PrepareRequest(_)).Times(AtLeast(1));
  229. EXPECT_CALL(*m_mock, ExecuteRequests()).Times(AtLeast(1));
  230. EXPECT_CALL(*m_mock, QueueRequest(_)).Times(1);
  231. AZStd::atomic_int counter = 2;
  232. AZStd::binary_semaphore sync;
  233. auto wait = [&sync, &counter](FileRequestHandle)
  234. {
  235. if (--counter == 0)
  236. {
  237. sync.release();
  238. }
  239. };
  240. char fakeBuffer[8];
  241. FileRequestPtr read = m_streamer->Read("TestPath", fakeBuffer, sizeof(fakeBuffer), 8);
  242. FileRequestPtr cancel = m_streamer->Cancel(read);
  243. m_streamer->SetRequestCompleteCallback(read, wait);
  244. m_streamer->SetRequestCompleteCallback(cancel, wait);
  245. m_streamer->SuspendProcessing();
  246. m_streamer->QueueRequest(read);
  247. m_streamer->QueueRequest(cancel);
  248. m_streamer->ResumeProcessing();
  249. ASSERT_TRUE(sync.try_acquire_for(AZStd::chrono::seconds(5)));
  250. EXPECT_EQ(IStreamerTypes::RequestStatus::Completed, m_streamer->GetRequestStatus(cancel));
  251. EXPECT_EQ(IStreamerTypes::RequestStatus::Canceled, m_streamer->GetRequestStatus(read));
  252. }
  253. TEST_F(Streamer_SchedulerTest, Reschedule_SetNewDeadlineAndPriority_ReadRequestInMockHasUpdatedTime)
  254. {
  255. using ::testing::_;
  256. using ::testing::AtLeast;
  257. using ::testing::Invoke;
  258. using ::testing::Return;
  259. EXPECT_CALL(*m_mock, UpdateStatus(_)).Times(AtLeast(1));
  260. EXPECT_CALL(*m_mock, UpdateCompletionEstimates(_, _, _, _)).Times(AtLeast(1));
  261. EXPECT_CALL(*m_mock, PrepareRequest(_)).Times(AtLeast(1));
  262. EXPECT_CALL(*m_mock, ExecuteRequests()).Times(AtLeast(1));
  263. EXPECT_CALL(*m_mock, QueueRequest(_)).Times(1)
  264. .WillOnce(Invoke([this](FileRequest* request)
  265. {
  266. auto* read = request->GetCommandFromChain<Requests::ReadRequestData>();
  267. ASSERT_NE(nullptr, read);
  268. EXPECT_LT(read->m_deadline, FileRequest::s_noDeadlineTime);
  269. EXPECT_EQ(read->m_priority, IStreamerTypes::s_priorityHighest);
  270. m_mock->ForwardQueueRequest(request);
  271. }));
  272. AZStd::atomic_int counter = 2;
  273. AZStd::binary_semaphore sync;
  274. auto wait = [&sync, &counter](FileRequestHandle)
  275. {
  276. if (--counter == 0)
  277. {
  278. sync.release();
  279. }
  280. };
  281. char fakeBuffer[8];
  282. FileRequestPtr read = m_streamer->Read("TestPath", fakeBuffer, sizeof(fakeBuffer), 8,
  283. IStreamerTypes::s_noDeadline, IStreamerTypes::s_priorityMedium);
  284. FileRequestPtr reschedule = m_streamer->RescheduleRequest(read, IStreamerTypes::s_deadlineNow, IStreamerTypes::s_priorityHighest);
  285. m_streamer->SetRequestCompleteCallback(read, wait);
  286. m_streamer->SetRequestCompleteCallback(reschedule, wait);
  287. m_streamer->SuspendProcessing();
  288. m_streamer->QueueRequest(read);
  289. m_streamer->QueueRequest(reschedule);
  290. m_streamer->ResumeProcessing();
  291. ASSERT_TRUE(sync.try_acquire_for(AZStd::chrono::seconds(5)));
  292. EXPECT_EQ(IStreamerTypes::RequestStatus::Completed, m_streamer->GetRequestStatus(reschedule));
  293. }
  294. TEST_F(Streamer_SchedulerTest, ProcessTillIdle_ShutDownIsDelayedUntilIdle_SchedulerThreadDoesNotImmediatelyShutDown)
  295. {
  296. using::testing::_;
  297. using ::testing::AnyNumber;
  298. using ::testing::Invoke;
  299. constexpr static size_t Iterations = 16;
  300. AZStd::atomic_int counter{ 0 };
  301. EXPECT_CALL(*m_mock, UpdateStatus(_)).Times(AnyNumber());
  302. EXPECT_CALL(*m_mock, UpdateCompletionEstimates(_, _, _, _)).Times(AnyNumber());
  303. // Pretend to be busy [Iterations] times, then set the status to idle so the Scheduler thread can exit.
  304. EXPECT_CALL(*m_mock, ExecuteRequests())
  305. .Times(Iterations + 1)
  306. .WillRepeatedly(Invoke([this, &counter]()
  307. {
  308. if (counter++ >= Iterations)
  309. {
  310. m_isStackIdle = true;
  311. return false;
  312. }
  313. else
  314. {
  315. AZStd::this_thread::sleep_for(AZStd::chrono::milliseconds(1));
  316. return true;
  317. }
  318. }));
  319. if (m_streamer)
  320. {
  321. Interface<IO::IStreamer>::Unregister(m_streamer);
  322. delete m_streamer;
  323. m_streamer = nullptr;
  324. }
  325. EXPECT_EQ(Iterations + 1, counter);
  326. }
  327. TEST_F(Streamer_SchedulerTest, RequestSorting)
  328. {
  329. //////////////////////////////////////////////////////////////
  330. // Test equal priority requests that are past their deadlines (aka panic)
  331. //////////////////////////////////////////////////////////////
  332. IStreamerTypes::Deadline panicDeadline(IStreamerTypes::Deadline::min());
  333. auto estimatedCompleteTime = AZStd::chrono::steady_clock::now();
  334. char fakeBuffer[8];
  335. FileRequestPtr panicRequest = m_streamer->Read("PanicRequest", fakeBuffer, sizeof(fakeBuffer), 8, panicDeadline);
  336. panicRequest->m_request.SetEstimatedCompletion(estimatedCompleteTime);
  337. // Passed deadline, same object (same pointer)
  338. EXPECT_EQ(
  339. m_streamer->m_streamStack->Thread_PrioritizeRequests(&panicRequest->m_request, &panicRequest->m_request),
  340. Scheduler::Order::Equal);
  341. // Passed deadline, different object
  342. FileRequestPtr panicRequest2 = m_streamer->Read("PanicRequest2", fakeBuffer, sizeof(fakeBuffer), 8, panicDeadline);
  343. panicRequest2->m_request.SetEstimatedCompletion(estimatedCompleteTime);
  344. EXPECT_EQ(
  345. m_streamer->m_streamStack->Thread_PrioritizeRequests(&panicRequest->m_request, &panicRequest2->m_request),
  346. Scheduler::Order::Equal);
  347. //////////////////////////////////////////////////////////////
  348. // Test equal priority requests that are both reading the same file
  349. //////////////////////////////////////////////////////////////
  350. RequestPath emptyPath;
  351. FileRequestPtr readRequest = m_streamer->Read("SameFile", fakeBuffer, sizeof(fakeBuffer), 8, panicDeadline);
  352. FileRequestPtr sameFileRequest = m_streamer->CreateRequest();
  353. sameFileRequest->m_request.CreateRead(&sameFileRequest->m_request, fakeBuffer, 8, emptyPath, 0, 8);
  354. sameFileRequest->m_request.m_parent = &readRequest->m_request;
  355. sameFileRequest->m_request.m_dependencies = 0;
  356. // Same file read, same object (same pointer)
  357. EXPECT_EQ(
  358. m_streamer->m_streamStack->Thread_PrioritizeRequests(&sameFileRequest->m_request, &sameFileRequest->m_request),
  359. Scheduler::Order::Equal);
  360. FileRequestPtr readRequest2 = m_streamer->Read("SameFile2", fakeBuffer, sizeof(fakeBuffer), 8, panicDeadline);
  361. FileRequestPtr sameFileRequest2 = m_streamer->CreateRequest();
  362. sameFileRequest2->m_request.CreateRead(&sameFileRequest2->m_request, fakeBuffer, 8, emptyPath, 0, 8);
  363. sameFileRequest2->m_request.m_parent = &readRequest2->m_request;
  364. sameFileRequest2->m_request.m_dependencies = 0;
  365. // Same file read, different objects
  366. EXPECT_EQ(
  367. m_streamer->m_streamStack->Thread_PrioritizeRequests(&sameFileRequest->m_request, &sameFileRequest2->m_request),
  368. Scheduler::Order::Equal);
  369. }
  370. } // namespace AZ::IO