SchedulerTests.cpp 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444
  1. /*
  2. * Copyright (c) Contributors to the Open 3D Engine Project.
  3. * For complete copyright and license terms please see the LICENSE at the root of this distribution.
  4. *
  5. * SPDX-License-Identifier: Apache-2.0 OR MIT
  6. *
  7. */
  8. #include <AzCore/Casting/lossy_cast.h>
  9. #include <AzCore/IO/Streamer/Streamer.h>
  10. #include <AzCore/IO/Streamer/Scheduler.h>
  11. #include <AzCore/IO/Streamer/FileRequest.h>
  12. #include <AzCore/Task/TaskExecutor.h>
  13. #include <AzCore/std/parallel/atomic.h>
  14. #include <AzCore/std/parallel/binary_semaphore.h>
  15. #include <AzCore/std/smart_ptr/make_shared.h>
  16. #include <AzCore/std/smart_ptr/unique_ptr.h>
  17. #include <AzCore/UnitTest/TestTypes.h>
  18. #include <Tests/FileIOBaseTestTypes.h>
  19. #include <Tests/Streamer/IStreamerTypesMock.h>
  20. #include <Tests/Streamer/StreamStackEntryMock.h>
  21. namespace AZ::IO
  22. {
  23. class Streamer_SchedulerTest
  24. : public UnitTest::LeakDetectionFixture
  25. , public UnitTest::SetRestoreFileIOBaseRAII
  26. {
  27. protected:
  28. StreamerContext* m_streamerContext{ nullptr };
  29. public:
  30. Streamer_SchedulerTest()
  31. : UnitTest::SetRestoreFileIOBaseRAII(m_fileIoBase)
  32. {
  33. }
  34. void SetUp() override
  35. {
  36. using ::testing::_;
  37. using ::testing::AnyNumber;
  38. UnitTest::LeakDetectionFixture::SetUp();
  39. TaskExecutor::SetInstance(&m_taskExecutor);
  40. // a regular mock warns every time functions are called without being expected
  41. // a NiceMock only pays attention to calls you tell it to pay attention to.
  42. m_mock = AZStd::make_shared<testing::NiceMock<StreamStackEntryMock>>();
  43. ON_CALL(*m_mock, PrepareRequest(_)).WillByDefault([this](FileRequest* request) { m_mock->ForwardPrepareRequest(request); });
  44. ON_CALL(*m_mock, QueueRequest(_)).WillByDefault([this](FileRequest* request) { m_mock->ForwardQueueRequest(request); });
  45. ON_CALL(*m_mock, UpdateStatus(_)).WillByDefault([this](StreamStackEntry::Status& status)
  46. {
  47. status.m_numAvailableSlots = 1;
  48. status.m_isIdle = m_isStackIdle;
  49. });
  50. // Expectation needs to be set before the Scheduler thread is started otherwise it may or may not hit before it's set in
  51. // a test.
  52. EXPECT_CALL(*m_mock, SetContext(_))
  53. .Times(1)
  54. .WillOnce([this](StreamerContext& context)
  55. {
  56. m_streamerContext = &context;
  57. m_mock->ForwardSetContext(context);
  58. });
  59. EXPECT_CALL(*m_mock, UpdateStatus(_)).Times(AnyNumber());
  60. auto isIdle = m_isStackIdle.load();
  61. m_isStackIdle = true;
  62. m_streamer = aznew IO::Streamer(AZStd::thread_desc{}, AZStd::make_unique<Scheduler>(m_mock));
  63. m_isStackIdle = isIdle;
  64. Interface<IO::IStreamer>::Register(m_streamer);
  65. }
  66. void TearDown() override
  67. {
  68. m_isStackIdle = true;
  69. if (m_streamer)
  70. {
  71. Interface<IO::IStreamer>::Unregister(m_streamer);
  72. delete m_streamer;
  73. m_streamer = nullptr;
  74. }
  75. m_mock.reset();
  76. TaskExecutor::SetInstance(nullptr);
  77. UnitTest::LeakDetectionFixture::TearDown();
  78. }
  79. void MockForRead()
  80. {
  81. using ::testing::_;
  82. using ::testing::AtLeast;
  83. EXPECT_CALL(*m_mock, UpdateStatus(_)).Times(AtLeast(1));
  84. EXPECT_CALL(*m_mock, UpdateCompletionEstimates(_, _, _, _)).Times(AtLeast(1));
  85. EXPECT_CALL(*m_mock, PrepareRequest(_))
  86. .Times(1)
  87. .WillOnce([this](FileRequest* request)
  88. {
  89. AZ_Assert(m_streamerContext, "AZ::IO::Streamer is not ready to process requests.");
  90. auto readData = AZStd::get_if<Requests::ReadRequestData>(&request->GetCommand());
  91. AZ_Assert(readData, "Test didn't pass in the correct request.");
  92. FileRequest* read = m_streamerContext->GetNewInternalRequest();
  93. read->CreateRead(request, readData->m_output, readData->m_outputSize, readData->m_path,
  94. readData->m_offset, readData->m_size);
  95. m_streamerContext->PushPreparedRequest(read);
  96. });
  97. EXPECT_CALL(*m_mock, ExecuteRequests()).Times(AtLeast(1));
  98. EXPECT_CALL(*m_mock, QueueRequest(_))
  99. .Times(1)
  100. .WillOnce([this](FileRequest* request)
  101. {
  102. AZ_Assert(m_streamerContext, "AZ::IO::Streamer is not ready to process requests.");
  103. auto readData = AZStd::get_if<Requests::ReadData>(&request->GetCommand());
  104. AZ_Assert(readData, "Test didn't pass in the correct request.");
  105. auto output = reinterpret_cast<uint8_t*>(readData->m_output);
  106. AZ_Assert(output != nullptr, "Output buffer has not been set.");
  107. for (size_t i = 0; i < readData->m_size; ++i)
  108. {
  109. output[i] = azlossy_cast<uint8_t>(readData->m_offset + i);
  110. }
  111. request->SetStatus(IStreamerTypes::RequestStatus::Completed);
  112. m_streamerContext->MarkRequestAsCompleted(request);
  113. });
  114. }
  115. void MockAllocatorForUnclaimedMemory(IStreamerTypes::RequestMemoryAllocatorMock& mock, AZStd::binary_semaphore& sync)
  116. {
  117. using ::testing::_;
  118. EXPECT_CALL(mock, LockAllocator()).Times(1);
  119. EXPECT_CALL(mock, UnlockAllocator())
  120. .Times(1)
  121. .WillOnce([&sync]()
  122. {
  123. sync.release();
  124. });
  125. EXPECT_CALL(mock, Allocate(_, _, _))
  126. .Times(1)
  127. .WillOnce([&mock](size_t minimalSize, size_t recommendedSize, size_t alignment)
  128. {
  129. return mock.ForwardAllocate(minimalSize, recommendedSize, alignment);
  130. });
  131. EXPECT_CALL(mock, Release(_))
  132. .Times(1)
  133. .WillOnce([&mock](void* address)
  134. {
  135. mock.ForwardRelease(address);
  136. });
  137. }
  138. void MockAllocatorForClaimedMemory(IStreamerTypes::RequestMemoryAllocatorMock& mock)
  139. {
  140. using ::testing::_;
  141. EXPECT_CALL(mock, LockAllocator()).Times(1);
  142. EXPECT_CALL(mock, UnlockAllocator()).Times(1);
  143. EXPECT_CALL(mock, Allocate(_, _, _))
  144. .Times(1)
  145. .WillOnce([&mock](size_t minimalSize, size_t recommendedSize, size_t alignment)
  146. {
  147. return mock.ForwardAllocate(minimalSize, recommendedSize, alignment);
  148. });
  149. EXPECT_CALL(mock, Release(_)).Times(0);
  150. }
  151. protected:
  152. // Using Streamer to interact with the Scheduler as not all functionality
  153. // is publicly exposed. Since Streamer is mostly the threaded front end for
  154. // the Scheduler, this is fine.
  155. UnitTest::TestFileIOBase m_fileIoBase;
  156. Streamer* m_streamer{ nullptr };
  157. AZStd::shared_ptr<testing::NiceMock<StreamStackEntryMock>> m_mock;
  158. AZStd::atomic_bool m_isStackIdle = false;
  159. TaskExecutor m_taskExecutor;
  160. };
  161. TEST_F(Streamer_SchedulerTest, QueueNextRequest_QueueUnclaimedFireAndForgetReadWithAllocator_AllocatorCalledAndMemoryFreedAgain)
  162. {
  163. using ::testing::_;
  164. using ::testing::AtLeast;
  165. using ::testing::Return;
  166. MockForRead();
  167. AZStd::binary_semaphore allocatorSync;
  168. IStreamerTypes::RequestMemoryAllocatorMock allocatorMock;
  169. MockAllocatorForUnclaimedMemory(allocatorMock, allocatorSync);
  170. AZStd::binary_semaphore readSync;
  171. auto wait = [&readSync](FileRequestHandle)
  172. {
  173. readSync.release();
  174. };
  175. // Scoped to simulate a fire-and-forget request which should cause the request to be deleted after
  176. // completion and free the allocated memory.
  177. {
  178. FileRequestPtr read = m_streamer->Read("TestPath", allocatorMock, 8);
  179. m_streamer->SetRequestCompleteCallback(read, wait);
  180. m_streamer->QueueRequest(read);
  181. }
  182. ASSERT_TRUE(readSync.try_acquire_for(AZStd::chrono::seconds(5)));
  183. ASSERT_TRUE(allocatorSync.try_acquire_for(AZStd::chrono::seconds(5)));
  184. }
  185. TEST_F(Streamer_SchedulerTest, QueueNextRequest_QueueUnclaimedReadWithAllocator_AllocatorCalledAndMemoryFreedAgain)
  186. {
  187. using ::testing::_;
  188. using ::testing::AtLeast;
  189. using ::testing::Return;
  190. MockForRead();
  191. AZStd::binary_semaphore allocatorSync;
  192. IStreamerTypes::RequestMemoryAllocatorMock allocatorMock;
  193. MockAllocatorForUnclaimedMemory(allocatorMock, allocatorSync);
  194. AZStd::binary_semaphore readSync;
  195. auto wait = [&readSync](FileRequestHandle)
  196. {
  197. readSync.release();
  198. };
  199. // Scoped so the request goes out to scope before ending the test. This should trigger the
  200. // memory release on this thread.
  201. {
  202. FileRequestPtr read = m_streamer->Read("TestPath", allocatorMock, 8);
  203. m_streamer->SetRequestCompleteCallback(read, wait);
  204. m_streamer->QueueRequest(read);
  205. ASSERT_TRUE(readSync.try_acquire_for(AZStd::chrono::seconds(5)));
  206. }
  207. ASSERT_TRUE(allocatorSync.try_acquire_for(AZStd::chrono::seconds(5)));
  208. }
  209. TEST_F(Streamer_SchedulerTest, QueueNextRequest_QueueClaimedReadWithAllocator_AllocatorCalledAndMemoryFreedAgain)
  210. {
  211. using ::testing::_;
  212. using ::testing::AtLeast;
  213. using ::testing::Return;
  214. MockForRead();
  215. IStreamerTypes::RequestMemoryAllocatorMock allocatorMock;
  216. MockAllocatorForClaimedMemory(allocatorMock);
  217. AZStd::binary_semaphore readSync;
  218. auto wait = [&readSync](FileRequestHandle)
  219. {
  220. readSync.release();
  221. };
  222. FileRequestPtr read = m_streamer->Read("TestPath", allocatorMock, 8);
  223. m_streamer->SetRequestCompleteCallback(read, wait);
  224. m_streamer->QueueRequest(read);
  225. ASSERT_TRUE(readSync.try_acquire_for(AZStd::chrono::seconds(5)));
  226. void* buffer = nullptr;
  227. u64 readSize = 0;
  228. EXPECT_TRUE(m_streamer->GetReadRequestResult(read, buffer, readSize, IStreamerTypes::ClaimMemory::Yes));
  229. ASSERT_NE(nullptr, buffer);
  230. allocatorMock.ForwardRelease(buffer);
  231. }
  232. TEST_F(Streamer_SchedulerTest, ProcessCancelRequest_CancelReadRequest_MockDoesNotReceiveReadRequest)
  233. {
  234. using ::testing::_;
  235. using ::testing::AtLeast;
  236. using ::testing::Return;
  237. EXPECT_CALL(*m_mock, UpdateStatus(_)).Times(AtLeast(1));
  238. EXPECT_CALL(*m_mock, UpdateCompletionEstimates(_, _, _, _)).Times(AtLeast(1));
  239. EXPECT_CALL(*m_mock, PrepareRequest(_)).Times(AtLeast(1));
  240. EXPECT_CALL(*m_mock, ExecuteRequests()).Times(AtLeast(1));
  241. EXPECT_CALL(*m_mock, QueueRequest(_)).Times(1);
  242. AZStd::atomic_int counter = 2;
  243. AZStd::binary_semaphore sync;
  244. auto wait = [&sync, &counter](FileRequestHandle)
  245. {
  246. if (--counter == 0)
  247. {
  248. sync.release();
  249. }
  250. };
  251. char fakeBuffer[8];
  252. FileRequestPtr read = m_streamer->Read("TestPath", fakeBuffer, sizeof(fakeBuffer), 8);
  253. FileRequestPtr cancel = m_streamer->Cancel(read);
  254. m_streamer->SetRequestCompleteCallback(read, wait);
  255. m_streamer->SetRequestCompleteCallback(cancel, wait);
  256. m_streamer->SuspendProcessing();
  257. m_streamer->QueueRequest(read);
  258. m_streamer->QueueRequest(cancel);
  259. m_streamer->ResumeProcessing();
  260. ASSERT_TRUE(sync.try_acquire_for(AZStd::chrono::seconds(5)));
  261. EXPECT_EQ(IStreamerTypes::RequestStatus::Completed, m_streamer->GetRequestStatus(cancel));
  262. EXPECT_EQ(IStreamerTypes::RequestStatus::Canceled, m_streamer->GetRequestStatus(read));
  263. }
  264. TEST_F(Streamer_SchedulerTest, Reschedule_SetNewDeadlineAndPriority_ReadRequestInMockHasUpdatedTime)
  265. {
  266. using ::testing::_;
  267. using ::testing::AtLeast;
  268. using ::testing::Invoke;
  269. using ::testing::Return;
  270. EXPECT_CALL(*m_mock, UpdateStatus(_)).Times(AtLeast(1));
  271. EXPECT_CALL(*m_mock, UpdateCompletionEstimates(_, _, _, _)).Times(AtLeast(1));
  272. EXPECT_CALL(*m_mock, PrepareRequest(_)).Times(AtLeast(1));
  273. EXPECT_CALL(*m_mock, ExecuteRequests()).Times(AtLeast(1));
  274. EXPECT_CALL(*m_mock, QueueRequest(_)).Times(1)
  275. .WillOnce(Invoke([this](FileRequest* request)
  276. {
  277. auto* read = request->GetCommandFromChain<Requests::ReadRequestData>();
  278. ASSERT_NE(nullptr, read);
  279. EXPECT_LT(read->m_deadline, FileRequest::s_noDeadlineTime);
  280. EXPECT_EQ(read->m_priority, IStreamerTypes::s_priorityHighest);
  281. m_mock->ForwardQueueRequest(request);
  282. }));
  283. AZStd::atomic_int counter = 2;
  284. AZStd::binary_semaphore sync;
  285. auto wait = [&sync, &counter](FileRequestHandle)
  286. {
  287. if (--counter == 0)
  288. {
  289. sync.release();
  290. }
  291. };
  292. char fakeBuffer[8];
  293. FileRequestPtr read = m_streamer->Read("TestPath", fakeBuffer, sizeof(fakeBuffer), 8,
  294. IStreamerTypes::s_noDeadline, IStreamerTypes::s_priorityMedium);
  295. FileRequestPtr reschedule = m_streamer->RescheduleRequest(read, IStreamerTypes::s_deadlineNow, IStreamerTypes::s_priorityHighest);
  296. m_streamer->SetRequestCompleteCallback(read, wait);
  297. m_streamer->SetRequestCompleteCallback(reschedule, wait);
  298. m_streamer->SuspendProcessing();
  299. m_streamer->QueueRequest(read);
  300. m_streamer->QueueRequest(reschedule);
  301. m_streamer->ResumeProcessing();
  302. ASSERT_TRUE(sync.try_acquire_for(AZStd::chrono::seconds(5)));
  303. EXPECT_EQ(IStreamerTypes::RequestStatus::Completed, m_streamer->GetRequestStatus(reschedule));
  304. }
  305. TEST_F(Streamer_SchedulerTest, ProcessTillIdle_ShutDownIsDelayedUntilIdle_SchedulerThreadDoesNotImmediatelyShutDown)
  306. {
  307. using::testing::_;
  308. using ::testing::AnyNumber;
  309. using ::testing::Invoke;
  310. constexpr static size_t Iterations = 16;
  311. AZStd::atomic_int counter{ 0 };
  312. EXPECT_CALL(*m_mock, UpdateStatus(_)).Times(AnyNumber());
  313. EXPECT_CALL(*m_mock, UpdateCompletionEstimates(_, _, _, _)).Times(AnyNumber());
  314. // Pretend to be busy [Iterations] times, then set the status to idle so the Scheduler thread can exit.
  315. EXPECT_CALL(*m_mock, ExecuteRequests())
  316. .Times(Iterations + 1)
  317. .WillRepeatedly(Invoke([this, &counter]()
  318. {
  319. if (counter++ >= Iterations)
  320. {
  321. m_isStackIdle = true;
  322. return false;
  323. }
  324. else
  325. {
  326. AZStd::this_thread::sleep_for(AZStd::chrono::milliseconds(1));
  327. return true;
  328. }
  329. }));
  330. if (m_streamer)
  331. {
  332. Interface<IO::IStreamer>::Unregister(m_streamer);
  333. delete m_streamer;
  334. m_streamer = nullptr;
  335. }
  336. EXPECT_EQ(Iterations + 1, counter);
  337. }
  338. TEST_F(Streamer_SchedulerTest, RequestSorting)
  339. {
  340. //////////////////////////////////////////////////////////////
  341. // Test equal priority requests that are past their deadlines (aka panic)
  342. //////////////////////////////////////////////////////////////
  343. IStreamerTypes::Deadline panicDeadline(IStreamerTypes::Deadline::min());
  344. auto estimatedCompleteTime = AZStd::chrono::steady_clock::now();
  345. char fakeBuffer[8];
  346. FileRequestPtr panicRequest = m_streamer->Read("PanicRequest", fakeBuffer, sizeof(fakeBuffer), 8, panicDeadline);
  347. panicRequest->m_request.SetEstimatedCompletion(estimatedCompleteTime);
  348. // Passed deadline, same object (same pointer)
  349. EXPECT_EQ(
  350. m_streamer->m_streamStack->Thread_PrioritizeRequests(&panicRequest->m_request, &panicRequest->m_request),
  351. Scheduler::Order::Equal);
  352. // Passed deadline, different object
  353. FileRequestPtr panicRequest2 = m_streamer->Read("PanicRequest2", fakeBuffer, sizeof(fakeBuffer), 8, panicDeadline);
  354. panicRequest2->m_request.SetEstimatedCompletion(estimatedCompleteTime);
  355. EXPECT_EQ(
  356. m_streamer->m_streamStack->Thread_PrioritizeRequests(&panicRequest->m_request, &panicRequest2->m_request),
  357. Scheduler::Order::Equal);
  358. //////////////////////////////////////////////////////////////
  359. // Test equal priority requests that are both reading the same file
  360. //////////////////////////////////////////////////////////////
  361. RequestPath emptyPath;
  362. FileRequestPtr readRequest = m_streamer->Read("SameFile", fakeBuffer, sizeof(fakeBuffer), 8, panicDeadline);
  363. FileRequestPtr sameFileRequest = m_streamer->CreateRequest();
  364. sameFileRequest->m_request.CreateRead(&sameFileRequest->m_request, fakeBuffer, 8, emptyPath, 0, 8);
  365. sameFileRequest->m_request.m_parent = &readRequest->m_request;
  366. sameFileRequest->m_request.m_dependencies = 0;
  367. // Same file read, same object (same pointer)
  368. EXPECT_EQ(
  369. m_streamer->m_streamStack->Thread_PrioritizeRequests(&sameFileRequest->m_request, &sameFileRequest->m_request),
  370. Scheduler::Order::Equal);
  371. FileRequestPtr readRequest2 = m_streamer->Read("SameFile2", fakeBuffer, sizeof(fakeBuffer), 8, panicDeadline);
  372. FileRequestPtr sameFileRequest2 = m_streamer->CreateRequest();
  373. sameFileRequest2->m_request.CreateRead(&sameFileRequest2->m_request, fakeBuffer, 8, emptyPath, 0, 8);
  374. sameFileRequest2->m_request.m_parent = &readRequest2->m_request;
  375. sameFileRequest2->m_request.m_dependencies = 0;
  376. // Same file read, different objects
  377. EXPECT_EQ(
  378. m_streamer->m_streamStack->Thread_PrioritizeRequests(&sameFileRequest->m_request, &sameFileRequest2->m_request),
  379. Scheduler::Order::Equal);
  380. }
  381. } // namespace AZ::IO