StreamerTests.cpp 25 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672
  1. /*
  2. * Copyright (c) Contributors to the Open 3D Engine Project.
  3. * For complete copyright and license terms please see the LICENSE at the root of this distribution.
  4. *
  5. * SPDX-License-Identifier: Apache-2.0 OR MIT
  6. *
  7. */
  8. #include <AzCore/UnitTest/TestTypes.h>
  9. #include <FileIOBaseTestTypes.h>
  10. #include <AzCore/IO/CompressionBus.h>
  11. #include <AzCore/IO/FileIO.h>
  12. #include <AzCore/IO/Streamer/FileRequest.h>
  13. #include <AzCore/IO/Streamer/Streamer.h>
  14. #include <AzCore/IO/Streamer/StreamerComponent.h>
  15. #include <AzCore/IO/Streamer/StreamerConfiguration.h>
  16. #include <AzCore/std/containers/vector.h>
  17. #include <AzCore/std/parallel/atomic.h>
  18. #include <AzCore/std/parallel/binary_semaphore.h>
  19. #include <AzCore/std/parallel/thread.h>
  20. #include <AzCore/std/string/string.h>
  21. #include <AzCore/Task/TaskExecutor.h>
  22. #include <AzCore/Task/TaskGraphSystemComponent.h>
  23. #include <AzTest/GemTestEnvironment.h>
  24. namespace AZ::IO
  25. {
  26. namespace Utils
  27. {
  28. //! Create a test file that stores 4 byte integers starting at 0 and incrementing.
  29. //! @filename The name of the file to write to.
  30. //! @filesize The size the new file needs to be in bytes. The stored values will continue till fileSize / 4.
  31. //! @paddingSize The amount of data to insert before and after the file. In total paddingSize / 4 integers
  32. //! will be added. The prefix will be marked with "0xdeadbeef" and the postfix with "0xd15ea5ed".
  33. static void CreateTestFile(const AZStd::string& name, size_t fileSize, size_t paddingSize)
  34. {
  35. constexpr size_t bufferByteSize = 1_mib;
  36. constexpr size_t bufferSize = bufferByteSize / sizeof(u32);
  37. u32* buffer = new u32[bufferSize];
  38. AZ_Assert(paddingSize < bufferByteSize, "Padding can't currently be larger than %i bytes.", bufferByteSize);
  39. size_t paddingCount = paddingSize / sizeof(u32);
  40. FileIOStream stream(name.c_str(), OpenMode::ModeWrite | OpenMode::ModeBinary);
  41. // Write pre-padding
  42. for (size_t i = 0; i < paddingCount; ++i)
  43. {
  44. buffer[i] = 0xdeadbeef;
  45. }
  46. stream.Write(paddingSize, buffer);
  47. // Write content
  48. u32 startIndex = 0;
  49. while (fileSize > bufferByteSize)
  50. {
  51. for (u32 i = 0; i < bufferSize; ++i)
  52. {
  53. buffer[i] = startIndex + i;
  54. }
  55. startIndex += bufferSize;
  56. stream.Write(bufferByteSize, buffer);
  57. fileSize -= bufferByteSize;
  58. }
  59. for (u32 i = 0; i < bufferSize; ++i)
  60. {
  61. buffer[i] = startIndex + i;
  62. }
  63. stream.Write(fileSize, buffer);
  64. // Write post-padding
  65. for (size_t i = 0; i < paddingCount; ++i)
  66. {
  67. buffer[i] = 0xd15ea5ed;
  68. }
  69. stream.Write(paddingSize, buffer);
  70. delete[] buffer;
  71. }
  72. }
  73. struct DedicatedCache_Uncompressed {};
  74. struct GlobalCache_Uncompressed {};
  75. struct DedicatedCache_Compressed {};
  76. struct GlobalCache_Compressed {};
  77. enum class PadArchive : bool
  78. {
  79. Yes,
  80. No
  81. };
  82. class MockFileBase
  83. {
  84. public:
  85. virtual ~MockFileBase() = default;
  86. virtual void CreateTestFile(AZ::IO::PathView filePath, size_t fileSize, PadArchive padding) = 0;
  87. virtual AZ::IO::PathView GetFileName() const = 0;
  88. };
  89. class MockUncompressedFile
  90. : public MockFileBase
  91. {
  92. public:
  93. ~MockUncompressedFile() override
  94. {
  95. if (m_hasFile)
  96. {
  97. FileIOBase::GetInstance()->DestroyPath(m_filePath.c_str());
  98. }
  99. }
  100. void CreateTestFile(AZ::IO::PathView filePath, size_t fileSize, PadArchive) override
  101. {
  102. m_fileSize = fileSize;
  103. m_filePath = filePath;
  104. Utils::CreateTestFile(m_filePath.Native(), m_fileSize, 0);
  105. m_hasFile = true;
  106. }
  107. AZ::IO::PathView GetFileName() const override
  108. {
  109. return m_filePath;
  110. }
  111. private:
  112. AZ::IO::Path m_filePath;
  113. size_t m_fileSize = 0;
  114. bool m_hasFile = false;
  115. };
  116. class MockCompressedFile
  117. : public MockFileBase
  118. , public CompressionBus::Handler
  119. {
  120. public:
  121. static constexpr uint32_t s_tag = static_cast<uint32_t>('T') << 24 | static_cast<uint32_t>('E') << 16 | static_cast<uint32_t>('S') << 8 | static_cast<uint32_t>('T');
  122. static constexpr uint32_t s_paddingSize = 512; // Use this amount of bytes before and after a generated file as padding.
  123. ~MockCompressedFile() override
  124. {
  125. if (m_hasFile)
  126. {
  127. BusDisconnect();
  128. FileIOBase::GetInstance()->DestroyPath(m_filePath.c_str());
  129. }
  130. }
  131. void CreateTestFile(AZ::IO::PathView filePath, size_t fileSize, PadArchive padding) override
  132. {
  133. m_fileSize = fileSize;
  134. m_filePath = filePath;
  135. m_hasPadding = (padding == PadArchive::Yes);
  136. uint32_t paddingSize = s_paddingSize;
  137. Utils::CreateTestFile(m_filePath.Native(), m_fileSize / 2, m_hasPadding ? paddingSize : 0);
  138. m_hasFile = true;
  139. BusConnect();
  140. }
  141. AZ::IO::PathView GetFileName() const override
  142. {
  143. return m_filePath;
  144. }
  145. void Decompress(const AZ::IO::CompressionInfo& info, const void* compressed, size_t compressedSize,
  146. void* uncompressed, size_t uncompressedSize)
  147. {
  148. constexpr uint32_t tag = s_tag;
  149. ASSERT_EQ(info.m_compressionTag.m_code, tag);
  150. ASSERT_EQ(info.m_compressedSize, m_fileSize / 2);
  151. ASSERT_TRUE(info.m_isCompressed);
  152. uint32_t paddingSize = s_paddingSize;
  153. ASSERT_EQ(info.m_offset, m_hasPadding ? paddingSize : 0);
  154. ASSERT_EQ(info.m_uncompressedSize, m_fileSize);
  155. // Check the input
  156. ASSERT_EQ(compressedSize, m_fileSize / 2);
  157. const u32* values = reinterpret_cast<const u32*>(compressed);
  158. const size_t numValues = compressedSize / sizeof(u32);
  159. for (size_t i = 0; i < numValues; ++i)
  160. {
  161. EXPECT_EQ(values[i], i);
  162. }
  163. // Create the fake uncompressed data.
  164. ASSERT_EQ(uncompressedSize, m_fileSize);
  165. u32* output = reinterpret_cast<u32*>(uncompressed);
  166. size_t outputSize = uncompressedSize / sizeof(u32);
  167. for (size_t i = 0; i < outputSize; ++i)
  168. {
  169. output[i] = static_cast<u32>(i);
  170. }
  171. }
  172. //@{ CompressionBus Handler implementation.
  173. void FindCompressionInfo(bool& found, AZ::IO::CompressionInfo& info, const AZ::IO::PathView filePath) override
  174. {
  175. if (m_hasFile && m_filePath == filePath)
  176. {
  177. found = true;
  178. info.m_archiveFilename = RequestPath(m_filePath);
  179. ASSERT_TRUE(info.m_archiveFilename.IsValid());
  180. info.m_compressedSize = m_fileSize / 2;
  181. const uint32_t tag = s_tag;
  182. info.m_compressionTag.m_code = tag;
  183. info.m_isCompressed = true;
  184. uint32_t paddingSize = s_paddingSize;
  185. info.m_offset = m_hasPadding ? paddingSize : 0;
  186. info.m_uncompressedSize = m_fileSize;
  187. info.m_decompressor =
  188. [this](const AZ::IO::CompressionInfo& info, const void* compressed,
  189. size_t compressedSize, void* uncompressed, size_t uncompressedSize) -> bool
  190. {
  191. Decompress(info, compressed, compressedSize, uncompressed, uncompressedSize);
  192. return true;
  193. };
  194. }
  195. }
  196. //@}
  197. private:
  198. AZ::IO::Path m_filePath;
  199. size_t m_fileSize = 0;
  200. bool m_hasFile = false;
  201. bool m_hasPadding = false;
  202. };
  203. class GemTestApplication
  204. : public AZ::ComponentApplication
  205. {
  206. public:
  207. // ComponentApplication
  208. void SetSettingsRegistrySpecializations(SettingsRegistryInterface::Specializations& specializations) override
  209. {
  210. ComponentApplication::SetSettingsRegistrySpecializations(specializations);
  211. specializations.Append("test");
  212. specializations.Append("gemtest");
  213. }
  214. };
  215. class StreamerTestBase
  216. : public UnitTest::LeakDetectionFixture
  217. {
  218. public:
  219. void SetUp() override
  220. {
  221. LeakDetectionFixture::SetUp();
  222. m_prevFileIO = FileIOBase::GetInstance();
  223. FileIOBase::SetInstance(&m_fileIO);
  224. m_application = aznew GemTestApplication();
  225. AZ::ComponentApplication::Descriptor appDesc;
  226. appDesc.m_useExistingAllocator = true;
  227. auto m_systemEntity = m_application->Create(appDesc);
  228. m_systemEntity->AddComponent(aznew AZ::TaskGraphSystemComponent());
  229. m_systemEntity->AddComponent(aznew AZ::StreamerComponent());
  230. m_systemEntity->Init();
  231. m_systemEntity->Activate();
  232. m_streamer = Interface<IO::IStreamer>::Get();
  233. }
  234. void TearDown() override
  235. {
  236. m_streamer = nullptr;
  237. m_application->Destroy();
  238. delete m_application;
  239. m_application = nullptr;
  240. FileIOBase::SetInstance(m_prevFileIO);
  241. LeakDetectionFixture::TearDown();
  242. }
  243. //! Requests are typically completed by Streamer before it updates it's internal bookkeeping.
  244. //! If a test depends on getting status information such as if cache files have been cleared
  245. //! then call WaitForScheduler to give Steamers scheduler some time to update it's internal status.
  246. void WaitForScheduler()
  247. {
  248. AZStd::this_thread::sleep_for(AZStd::chrono::microseconds(1));
  249. }
  250. protected:
  251. virtual AZStd::unique_ptr<MockFileBase> CreateMockFile() = 0;
  252. virtual bool IsUsingArchive() const = 0;
  253. virtual bool CreateDedicatedCache() const = 0;
  254. //! Create a test file that stores 4 byte integers starting at 0 and incrementing.
  255. //! @filesize The size the new file needs to be in bytes. The stored values will continue till fileSize / 4.
  256. //! @return The name of the test file.
  257. AZStd::unique_ptr<MockFileBase> CreateTestFile(size_t fileSize, PadArchive padding)
  258. {
  259. AZStd::string name = AZStd::string::format("TestFile_%zu.test", m_testFileCount++);
  260. AZ::IO::Path testFullPath = m_tempDirectory.GetDirectory();
  261. testFullPath /= name;
  262. AZStd::unique_ptr<MockFileBase> result = CreateMockFile();
  263. result->CreateTestFile(testFullPath.c_str(), fileSize, padding);
  264. if (CreateDedicatedCache())
  265. {
  266. AZ::Interface<AZ::IO::IStreamer>::Get()->CreateDedicatedCache(name.c_str());
  267. }
  268. return result;
  269. }
  270. void VerifyTestFile(const void* buffer, size_t fileSize, size_t offset = 0)
  271. {
  272. size_t count = fileSize / sizeof(u32);
  273. size_t numOffset = offset / sizeof(u32);
  274. const u32* data = reinterpret_cast<const u32*>(buffer);
  275. for (size_t i = 0; i < count; ++i)
  276. {
  277. EXPECT_EQ(data[i], i + numOffset);
  278. }
  279. }
  280. void AssertTestFile(const void* buffer, size_t fileSize, size_t offset = 0)
  281. {
  282. size_t count = fileSize / sizeof(u32);
  283. size_t numOffset = offset / sizeof(u32);
  284. const u32* data = reinterpret_cast<const u32*>(buffer);
  285. for (size_t i = 0; i < count; ++i)
  286. {
  287. ASSERT_EQ(data[i], i + numOffset);
  288. }
  289. }
  290. void PeriodicallyCheckedRead(AZ::IO::PathView filePath, void* buffer, u64 fileSize, u64 offset, AZStd::chrono::seconds timeOut, bool& result)
  291. {
  292. AZStd::binary_semaphore sync;
  293. AZStd::atomic_bool readSuccessful = false;
  294. auto callback = [&readSuccessful, &sync](FileRequestHandle request)
  295. {
  296. auto streamer = AZ::Interface<AZ::IO::IStreamer>::Get();
  297. readSuccessful = streamer->GetRequestStatus(request) == IStreamerTypes::RequestStatus::Completed;
  298. sync.release();
  299. };
  300. FileRequestPtr request = this->m_streamer->Read(filePath.Native(), buffer, fileSize, fileSize,
  301. IStreamerTypes::s_deadlineNow, IStreamerTypes::s_priorityMedium, offset);
  302. this->m_streamer->SetRequestCompleteCallback(request, AZStd::move(callback));
  303. this->m_streamer->QueueRequest(AZStd::move(request));
  304. bool hasTimedOut = !sync.try_acquire_for(timeOut);
  305. result = readSuccessful && !hasTimedOut;
  306. ASSERT_FALSE(hasTimedOut);
  307. ASSERT_TRUE(readSuccessful);
  308. }
  309. AZ::Test::ScopedAutoTempDirectory m_tempDirectory;
  310. UnitTest::TestFileIOBase m_fileIO;
  311. FileIOBase* m_prevFileIO{ nullptr };
  312. IStreamer* m_streamer{ nullptr };
  313. AZ::ComponentApplication* m_application{ nullptr };
  314. size_t m_testFileCount{ 0 };
  315. };
  316. template<typename TestTag>
  317. class StreamerTest : public StreamerTestBase
  318. {
  319. protected:
  320. bool IsUsingArchive() const override
  321. {
  322. AZ_Assert(false, "Not correctly specialized.");
  323. return false;
  324. }
  325. bool CreateDedicatedCache() const override
  326. {
  327. AZ_Assert(false, "Not correctly specialized.");
  328. return false;
  329. }
  330. AZStd::unique_ptr<MockFileBase> CreateMockFile() override
  331. {
  332. AZ_Assert(false, "Not correctly specialized.");
  333. return nullptr;
  334. }
  335. };
  336. template<>
  337. class StreamerTest<DedicatedCache_Uncompressed> : public StreamerTestBase
  338. {
  339. protected:
  340. bool IsUsingArchive() const override { return false; }
  341. bool CreateDedicatedCache() const override { return true; }
  342. AZStd::unique_ptr<MockFileBase> CreateMockFile() override
  343. {
  344. return AZStd::make_unique<MockUncompressedFile>();
  345. }
  346. };
  347. template<>
  348. class StreamerTest<GlobalCache_Uncompressed> : public StreamerTestBase
  349. {
  350. protected:
  351. bool IsUsingArchive() const override { return false; }
  352. bool CreateDedicatedCache() const override { return false; }
  353. AZStd::unique_ptr<MockFileBase> CreateMockFile() override
  354. {
  355. return AZStd::make_unique<MockUncompressedFile>();
  356. }
  357. };
  358. template<>
  359. class StreamerTest<DedicatedCache_Compressed> : public StreamerTestBase
  360. {
  361. protected:
  362. bool IsUsingArchive() const override { return true; }
  363. bool CreateDedicatedCache() const override { return true; }
  364. AZStd::unique_ptr<MockFileBase> CreateMockFile() override
  365. {
  366. return AZStd::make_unique<MockCompressedFile>();
  367. }
  368. };
  369. template<>
  370. class StreamerTest<GlobalCache_Compressed> : public StreamerTestBase
  371. {
  372. protected:
  373. bool IsUsingArchive() const override { return true; }
  374. bool CreateDedicatedCache() const override { return false; }
  375. AZStd::unique_ptr<MockFileBase> CreateMockFile() override
  376. {
  377. return AZStd::make_unique<MockCompressedFile>();
  378. }
  379. };
  380. #if !AZ_TRAIT_DISABLE_FAILED_STREAMER_TESTS
  381. TYPED_TEST_SUITE_P(StreamerTest);
  382. // Read a file that's smaller than the cache.
  383. TYPED_TEST_P(StreamerTest, Read_ReadSmallFileEntirely_FileFullyRead)
  384. {
  385. constexpr size_t fileSize = 50_kib;
  386. auto testFile = this->CreateTestFile(fileSize, PadArchive::No);
  387. char buffer[fileSize];
  388. bool readResult{ false };
  389. this->PeriodicallyCheckedRead(testFile->GetFileName(), buffer, fileSize, 0, AZStd::chrono::seconds(5), readResult);
  390. EXPECT_TRUE(readResult);
  391. if(readResult)
  392. {
  393. this->VerifyTestFile(buffer, fileSize);
  394. }
  395. }
  396. // Read a large file that will need to be broken into chunks.
  397. TYPED_TEST_P(StreamerTest, Read_ReadLargeFileEntirely_FileFullyRead)
  398. {
  399. constexpr size_t fileSize = 10_mib;
  400. auto testFile = this->CreateTestFile(fileSize, PadArchive::No);
  401. char* buffer = new char[fileSize];
  402. bool readResult{ false };
  403. this->PeriodicallyCheckedRead(testFile->GetFileName(), buffer, fileSize, 0, AZStd::chrono::seconds(500), readResult);
  404. EXPECT_TRUE(readResult);
  405. if(readResult)
  406. {
  407. this->VerifyTestFile(buffer, fileSize);
  408. }
  409. delete[] buffer;
  410. }
  411. // Reads multiple small pieces to make sure that the cache is hit, seeded and copied properly.
  412. TYPED_TEST_P(StreamerTest, Read_ReadMultiplePieces_AllReadRequestWereSuccessful)
  413. {
  414. constexpr size_t fileSize = 2_mib;
  415. // Deliberately not taking a multiple of the file size so at least one read will have a partial cache hit.
  416. #if defined(AZ_DEBUG_BUILD)
  417. constexpr size_t bufferSize = 4800;
  418. #else
  419. constexpr size_t bufferSize = 480;
  420. #endif
  421. constexpr size_t readBlock = bufferSize * sizeof(u32);
  422. auto testFile = this->CreateTestFile(fileSize, PadArchive::No);
  423. u32 buffer[bufferSize];
  424. size_t block = 0;
  425. size_t fileRemainder = fileSize;
  426. for (block = 0; block < fileSize; block += readBlock)
  427. {
  428. size_t blockSize = AZStd::min(readBlock, fileRemainder);
  429. bool readResult{ false };
  430. this->PeriodicallyCheckedRead(testFile->GetFileName(), buffer, blockSize, block, AZStd::chrono::seconds(5), readResult);
  431. EXPECT_TRUE(readResult);
  432. if (readResult)
  433. {
  434. this->AssertTestFile(buffer, blockSize, block);
  435. }
  436. fileRemainder -= blockSize;
  437. }
  438. }
  439. // Same as the previous test, but all requests are submitted in a single batch.
  440. TYPED_TEST_P(StreamerTest, Read_ReadMultiplePiecesWithBatch_AllReadRequestWereSuccessful)
  441. {
  442. constexpr size_t fileSize = 2_mib;
  443. // Deliberately not taking a multiple of the file size so at least one read will have a partial cache hit.
  444. #if defined(AZ_DEBUG_BUILD)
  445. constexpr size_t bufferSize = 4800 * sizeof(u32);
  446. #else
  447. constexpr size_t bufferSize = 480 * sizeof(u32);
  448. #endif
  449. constexpr size_t numRequests = (fileSize / bufferSize) + 1;
  450. auto testFile = this->CreateTestFile(fileSize, PadArchive::No);
  451. AZStd::vector<FileRequestPtr> requests;
  452. this->m_streamer->CreateRequestBatch(requests, numRequests);
  453. AZStd::binary_semaphore sync;
  454. AZStd::atomic_int remainingReads = numRequests;
  455. AZStd::atomic_bool readSuccessful = true;
  456. auto callback = [&readSuccessful, &sync, &remainingReads](FileRequestHandle request)
  457. {
  458. if (AZ::Interface<IStreamer>::Get()->GetRequestStatus(request) != IStreamerTypes::RequestStatus::Completed)
  459. {
  460. readSuccessful = false;
  461. }
  462. if (--remainingReads == 0)
  463. {
  464. sync.release();
  465. }
  466. };
  467. u8* buffer = new u8[fileSize];
  468. size_t block = 0;
  469. size_t fileRemainder = fileSize;
  470. size_t requestIndex = 0;
  471. for (block = 0; block < fileSize; block += bufferSize)
  472. {
  473. size_t blockSize = AZStd::min(bufferSize, fileRemainder);
  474. this->m_streamer->Read(requests[requestIndex], testFile->GetFileName().Native(), buffer + block, blockSize, blockSize,
  475. IStreamerTypes::s_deadlineNow, IStreamerTypes::s_priorityMedium, block);
  476. this->m_streamer->SetRequestCompleteCallback(requests[requestIndex], callback);
  477. fileRemainder -= blockSize;
  478. requestIndex++;
  479. }
  480. this->m_streamer->QueueRequestBatch(requests);
  481. bool hasTimedOut = !sync.try_acquire_for(AZStd::chrono::minutes(10)); // Especially in debug this can take a long time.
  482. EXPECT_FALSE(hasTimedOut);
  483. EXPECT_TRUE(readSuccessful);
  484. fileRemainder = fileSize;
  485. for (block = 0; block < fileSize; block += bufferSize)
  486. {
  487. size_t blockSize = AZStd::min(bufferSize, fileRemainder);
  488. this->AssertTestFile(buffer + block, blockSize, block);
  489. fileRemainder -= blockSize;
  490. }
  491. delete[] buffer;
  492. }
  493. // Queue a request on a suspended device, then resume to see if gets picked up again.
  494. TYPED_TEST_P(StreamerTest, SuspendProcessing_SuspendWhileFileIsQueued_FileIsNotReadUntilProcessingIsRestarted)
  495. {
  496. constexpr size_t fileSize = 50_kib;
  497. auto testFile = this->CreateTestFile(fileSize, PadArchive::No);
  498. AZStd::binary_semaphore sync;
  499. AZStd::atomic_bool readSuccessful = false;
  500. auto callback = [&readSuccessful, &sync](FileRequestHandle request)
  501. {
  502. readSuccessful = AZ::Interface<IStreamer>::Get()->GetRequestStatus(request) == IStreamerTypes::RequestStatus::Completed;
  503. sync.release();
  504. };
  505. char buffer[fileSize];
  506. FileRequestPtr request = this->m_streamer->Read(testFile->GetFileName().Native(), buffer, fileSize, fileSize);
  507. this->m_streamer->SetRequestCompleteCallback(request, AZStd::move(callback));
  508. this->m_streamer->SuspendProcessing();
  509. this->m_streamer->QueueRequest(AZStd::move(request));
  510. // Sleep for a short while to make sure the test doesn't outrun the Streamer.
  511. AZStd::this_thread::sleep_for(AZStd::chrono::seconds(1));
  512. EXPECT_EQ(IStreamerTypes::RequestStatus::Pending, this->m_streamer->GetRequestStatus(request));
  513. // Wait for a maximum of a few seconds for the request to complete. If it doesn't, the suspend is most likely stuck and the test should fail.
  514. this->m_streamer->ResumeProcessing();
  515. bool hasTimedOut = !sync.try_acquire_for(AZStd::chrono::seconds(5));
  516. EXPECT_FALSE(hasTimedOut);
  517. EXPECT_TRUE(readSuccessful);
  518. }
  519. TYPED_TEST_P(StreamerTest, FlushCaches_FlushAfterEveryRead_FilesAreReadCorrectly)
  520. {
  521. constexpr size_t fileSize = 4_mib;
  522. constexpr size_t fileCount = 128;
  523. AZStd::vector<AZStd::unique_ptr<MockFileBase>> testFiles;
  524. AZStd::vector<AZStd::unique_ptr<char[]>> testData;
  525. AZStd::vector<FileRequestPtr> requests;
  526. testFiles.reserve(fileCount);
  527. testData.reserve(fileCount);
  528. requests.reserve(fileCount * 2);
  529. AZStd::binary_semaphore sync;
  530. AZStd::atomic_bool readSuccessful = true;
  531. AZStd::atomic_int counter = fileCount * 2;
  532. auto callback = [&sync, &counter, &readSuccessful](FileRequestHandle request)
  533. {
  534. readSuccessful = readSuccessful && AZ::Interface<IStreamer>::Get()->GetRequestStatus(request) == IStreamerTypes::RequestStatus::Completed;
  535. counter--;
  536. if (counter == 0)
  537. {
  538. sync.release();
  539. }
  540. };
  541. for (size_t i = 0; i < fileCount; ++i)
  542. {
  543. auto testFile = this->CreateTestFile(fileSize, PadArchive::No);
  544. AZStd::unique_ptr<char[]> buffer(new char[fileSize]);
  545. auto readRequest = this->m_streamer->Read(testFile->GetFileName().Native(), buffer.get(), fileSize, fileSize);
  546. this->m_streamer->SetRequestCompleteCallback(readRequest, callback);
  547. auto flushRequest = this->m_streamer->FlushCaches();
  548. this->m_streamer->SetRequestCompleteCallback(flushRequest, callback);
  549. requests.push_back(AZStd::move(readRequest));
  550. requests.push_back(AZStd::move(flushRequest));
  551. testFiles.push_back(AZStd::move(testFile));
  552. testData.push_back(AZStd::move(buffer));
  553. }
  554. for (size_t i = 0; i < fileCount * 2; i += 2)
  555. {
  556. this->m_streamer->QueueRequest(requests[i]);
  557. this->m_streamer->QueueRequest(requests[i + 1]);
  558. AZStd::this_thread::yield();
  559. }
  560. bool hasTimedOut = !sync.try_acquire_for(AZStd::chrono::seconds(30));
  561. EXPECT_FALSE(hasTimedOut);
  562. EXPECT_TRUE(readSuccessful);
  563. }
  564. REGISTER_TYPED_TEST_SUITE_P(StreamerTest,
  565. Read_ReadSmallFileEntirely_FileFullyRead,
  566. Read_ReadLargeFileEntirely_FileFullyRead,
  567. Read_ReadMultiplePieces_AllReadRequestWereSuccessful,
  568. Read_ReadMultiplePiecesWithBatch_AllReadRequestWereSuccessful,
  569. SuspendProcessing_SuspendWhileFileIsQueued_FileIsNotReadUntilProcessingIsRestarted,
  570. FlushCaches_FlushAfterEveryRead_FilesAreReadCorrectly);
  571. using StreamerTestCases = ::testing::Types<GlobalCache_Uncompressed, DedicatedCache_Uncompressed, GlobalCache_Compressed, DedicatedCache_Compressed>;
  572. INSTANTIATE_TYPED_TEST_SUITE_P(StreamerTests, StreamerTest, StreamerTestCases);
  573. #endif // AZ_TRAIT_DISABLE_FAILED_STREAMER_TESTS
  574. } // namespace AZ::IO