3
0

DecompressorStackEntry.cpp 44 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811
  1. /*
  2. * Copyright (c) Contributors to the Open 3D Engine Project.
  3. * For complete copyright and license terms please see the LICENSE at the root of this distribution.
  4. *
  5. * SPDX-License-Identifier: Apache-2.0 OR MIT
  6. *
  7. */
  8. #include "DecompressorStackEntry.h"
  9. #include <AzCore/IO/CompressionBus.h>
  10. #include <AzCore/IO/Streamer/FileRequest.h>
  11. #include <AzCore/IO/Streamer/StreamerContext.h>
  12. #include <AzCore/Task/TaskGraph.h>
  13. #include <AzCore/Math/MathUtils.h>
  14. #include <AzCore/Serialization/SerializeContext.h>
  15. #include <AzCore/std/parallel/thread.h>
  16. #include <AzCore/std/smart_ptr/make_shared.h>
  17. #include <AzCore/std/typetraits/decay.h>
  18. namespace Compression
  19. {
  20. AZ_TYPE_INFO_WITH_NAME_IMPL(DecompressorRegistrarConfig, "DecompressorRegistrarConfig", "{763D7F80-0FE1-4084-A165-0CC6A2E57F05}");
  21. AZ_RTTI_NO_TYPE_INFO_IMPL(DecompressorRegistrarConfig, IStreamerStackConfig);
  22. AZ_CLASS_ALLOCATOR_IMPL(DecompressorRegistrarConfig, AZ::SystemAllocator);
  23. AZStd::shared_ptr<AZ::IO::StreamStackEntry> DecompressorRegistrarConfig::AddStreamStackEntry(
  24. const AZ::IO::HardwareInformation& hardware, AZStd::shared_ptr<AZ::IO::StreamStackEntry> parent)
  25. {
  26. auto stackEntry = AZStd::make_shared<DecompressorRegistrarEntry>(
  27. m_maxNumReads, m_maxNumTasks, aznumeric_caster(hardware.m_maxPhysicalSectorSize));
  28. stackEntry->SetNext(AZStd::move(parent));
  29. return stackEntry;
  30. }
  31. void DecompressorRegistrarConfig::Reflect(AZ::ReflectContext* context)
  32. {
  33. if (auto serializeContext = azrtti_cast<AZ::SerializeContext*>(context);
  34. serializeContext != nullptr)
  35. {
  36. serializeContext->Class<DecompressorRegistrarConfig, IStreamerStackConfig>()
  37. ->Field("MaxNumReads", &DecompressorRegistrarConfig::m_maxNumReads)
  38. ->Field("MaxNumTasks", &DecompressorRegistrarConfig::m_maxNumTasks)
  39. ;
  40. }
  41. }
  42. #if AZ_STREAMER_ADD_EXTRA_PROFILING_INFO
  43. static constexpr const char* DecompBoundName = "Decompression bound";
  44. static constexpr const char* ReadBoundName = "Read bound";
  45. #endif // AZ_STREAMER_ADD_EXTRA_PROFILING_INFO
  46. bool DecompressorRegistrarEntry::DecompressionInformation::IsProcessing() const
  47. {
  48. return m_compressedData != nullptr;
  49. }
  50. DecompressorRegistrarEntry::DecompressorRegistrarEntry(AZ::u32 maxNumReads, AZ::u32 maxNumTasks, AZ::u32 alignment)
  51. : AZ::IO::StreamStackEntry("Compression Gem decompressor registrar")
  52. , m_maxNumReads(maxNumReads)
  53. , m_maxNumTasks(maxNumTasks)
  54. , m_alignment(alignment)
  55. {
  56. m_processingJobs = AZStd::make_unique<DecompressionInformation[]>(m_maxNumTasks);
  57. m_readBuffers = AZStd::make_unique<Buffer[]>(maxNumReads);
  58. m_readRequests = AZStd::make_unique<AZ::IO::FileRequest*[]>(maxNumReads);
  59. m_readBufferStatus = AZStd::make_unique<ReadBufferStatus[]>(maxNumReads);
  60. for (AZ::u32 i = 0; i < maxNumReads; ++i)
  61. {
  62. m_readBufferStatus[i] = ReadBufferStatus::Unused;
  63. }
  64. // Add initial dummy values to the stats to avoid division by zero later on and avoid needing branches.
  65. m_bytesDecompressed.PushEntry(1);
  66. m_decompressionDurationMicroSec.PushEntry(1);
  67. }
  68. void DecompressorRegistrarEntry::PrepareRequest(AZ::IO::FileRequest* request)
  69. {
  70. AZ_Assert(request, "PrepareRequest was provided a null request.");
  71. auto RunCommand = [this, request](auto&& args)
  72. {
  73. using Command = AZStd::decay_t<decltype(args)>;
  74. if constexpr (AZStd::is_same_v<Command, AZ::IO::Requests::ReadRequestData>)
  75. {
  76. PrepareReadRequest(request, args);
  77. }
  78. else if constexpr (AZStd::is_same_v<Command, AZ::IO::Requests::CreateDedicatedCacheData> ||
  79. AZStd::is_same_v<Command, AZ::IO::Requests::DestroyDedicatedCacheData>)
  80. {
  81. PrepareDedicatedCache(request, args.m_path);
  82. }
  83. else
  84. {
  85. AZ::IO::StreamStackEntry::PrepareRequest(request);
  86. }
  87. };
  88. AZStd::visit(AZStd::move(RunCommand), request->GetCommand());
  89. }
  90. void DecompressorRegistrarEntry::QueueRequest(AZ::IO::FileRequest* request)
  91. {
  92. AZ_Assert(request, "QueueRequest was provided a null request.");
  93. auto QueueCommand = [this, request](auto&& args)
  94. {
  95. using Command = AZStd::decay_t<decltype(args)>;
  96. if constexpr (AZStd::is_same_v<Command, AZ::IO::Requests::CompressedReadData>)
  97. {
  98. m_pendingReads.push_back(request);
  99. }
  100. else if constexpr (AZStd::is_same_v<Command, AZ::IO::Requests::FileExistsCheckData>)
  101. {
  102. m_pendingFileExistChecks.push_back(request);
  103. }
  104. else
  105. {
  106. if constexpr (AZStd::is_same_v<Command, AZ::IO::Requests::ReportData>)
  107. {
  108. Report(args);
  109. }
  110. AZ::IO::StreamStackEntry::QueueRequest(request);
  111. }
  112. };
  113. AZStd::visit(AZStd::move(QueueCommand), request->GetCommand());
  114. }
  115. bool DecompressorRegistrarEntry::ExecuteRequests()
  116. {
  117. bool result = false;
  118. // First queue jobs as this might open up new read slots.
  119. if (m_numInFlightReads > 0 && m_numRunningTasks < m_maxNumTasks)
  120. {
  121. result = StartDecompressions();
  122. }
  123. // Queue as many new reads as possible.
  124. while (!m_pendingReads.empty() && m_numInFlightReads < m_maxNumReads)
  125. {
  126. StartArchiveRead(m_pendingReads.front());
  127. m_pendingReads.pop_front();
  128. result = true;
  129. }
  130. // If nothing else happened and there is at least one pending file exist check request, run one of those.
  131. if (!result && !m_pendingFileExistChecks.empty())
  132. {
  133. FileExistsCheck(m_pendingFileExistChecks.front());
  134. m_pendingFileExistChecks.pop_front();
  135. result = true;
  136. }
  137. #if AZ_STREAMER_ADD_EXTRA_PROFILING_INFO
  138. bool allPendingDecompression = true;
  139. bool allReading = true;
  140. for (AZ::u32 i = 0; i < m_maxNumReads; ++i)
  141. {
  142. allPendingDecompression =
  143. allPendingDecompression && (m_readBufferStatus[i] == ReadBufferStatus::PendingDecompression);
  144. allReading =
  145. allReading && (m_readBufferStatus[i] == ReadBufferStatus::ReadInFlight);
  146. }
  147. m_decompressionBoundStat.PushSample(allPendingDecompression ? 1.0 : 0.0);
  148. AZ::IO::Statistic::PlotImmediate(m_name, DecompBoundName, m_decompressionBoundStat.GetMostRecentSample());
  149. m_readBoundStat.PushSample(allReading && (m_numRunningTasks < m_maxNumTasks) ? 1.0 : 0.0);
  150. AZ::IO::Statistic::PlotImmediate(m_name, ReadBoundName, m_readBoundStat.GetMostRecentSample());
  151. #endif
  152. return AZ::IO::StreamStackEntry::ExecuteRequests() || result;
  153. }
  154. void DecompressorRegistrarEntry::UpdateStatus(Status& status) const
  155. {
  156. AZ::IO::StreamStackEntry::UpdateStatus(status);
  157. AZ::s32 numAvailableSlots = static_cast<AZ::s32>(m_maxNumReads - m_numInFlightReads);
  158. status.m_numAvailableSlots = AZStd::min(status.m_numAvailableSlots, numAvailableSlots);
  159. status.m_isIdle = status.m_isIdle && IsIdle();
  160. }
  161. void DecompressorRegistrarEntry::UpdateCompletionEstimates(AZStd::chrono::steady_clock::time_point now, AZStd::vector<AZ::IO::FileRequest*>& internalPending,
  162. AZ::IO::StreamerContext::PreparedQueue::iterator pendingBegin, AZ::IO::StreamerContext::PreparedQueue::iterator pendingEnd)
  163. {
  164. // Create predictions for all pending requests. Some will be further processed after this.
  165. AZStd::reverse_copy(m_pendingFileExistChecks.begin(), m_pendingFileExistChecks.end(), AZStd::back_inserter(internalPending));
  166. AZStd::reverse_copy(m_pendingReads.begin(), m_pendingReads.end(), AZStd::back_inserter(internalPending));
  167. AZ::IO::StreamStackEntry::UpdateCompletionEstimates(now, internalPending, pendingBegin, pendingEnd);
  168. double totalBytesDecompressed = aznumeric_caster(m_bytesDecompressed.GetTotal());
  169. double totalDecompressionDuration = aznumeric_caster(m_decompressionDurationMicroSec.GetTotal());
  170. AZStd::chrono::microseconds cumulativeDelay = AZStd::chrono::microseconds::max();
  171. // Check the number of jobs that are processing.
  172. for (AZ::u32 i = 0; i < m_maxNumTasks; ++i)
  173. {
  174. if (m_processingJobs[i].IsProcessing())
  175. {
  176. AZ::IO::FileRequest* compressedRequest = m_processingJobs[i].m_waitRequest->GetParent();
  177. AZ_Assert(compressedRequest, "A wait request attached to DecompressorRegistrarEntry was completed but didn't have a parent compressed request.");
  178. auto data = AZStd::get_if<AZ::IO::Requests::CompressedReadData>(&compressedRequest->GetCommand());
  179. AZ_Assert(data, "Compressed request in the decompression queue in DecompressorRegistrarEntry didn't contain compression read data.");
  180. size_t bytesToDecompress = data->m_compressionInfo.m_compressedSize;
  181. auto decompressionDuration = AZStd::chrono::microseconds(
  182. static_cast<AZ::u64>((bytesToDecompress * totalDecompressionDuration) / totalBytesDecompressed));
  183. auto timeInProcessing = now - m_processingJobs[i].m_jobStartTime;
  184. auto timeLeft = decompressionDuration > timeInProcessing ? decompressionDuration - timeInProcessing : AZStd::chrono::microseconds(0);
  185. // Get the shortest time as this indicates the next decompression to become available.
  186. cumulativeDelay = AZStd::min(AZStd::chrono::duration_cast<AZStd::chrono::microseconds>(timeLeft), cumulativeDelay);
  187. m_processingJobs[i].m_waitRequest->SetEstimatedCompletion(now + timeLeft);
  188. }
  189. }
  190. if (cumulativeDelay == AZStd::chrono::microseconds::max())
  191. {
  192. cumulativeDelay = AZStd::chrono::microseconds(0);
  193. }
  194. // Next update all reads that are in flight. These will have an estimation for the read to complete, but will then be queued
  195. // for decompression, so add the time needed decompression. Assume that decompression happens in parallel.
  196. AZStd::chrono::microseconds decompressionDelay =
  197. AZStd::chrono::microseconds(static_cast<AZ::u64>(m_decompressionJobDelayMicroSec.CalculateAverage()));
  198. AZStd::chrono::microseconds smallestDecompressionDuration = AZStd::chrono::microseconds::max();
  199. for (AZ::u32 i = 0; i < m_maxNumReads; ++i)
  200. {
  201. AZStd::chrono::steady_clock::time_point baseTime;
  202. switch (m_readBufferStatus[i])
  203. {
  204. case ReadBufferStatus::Unused:
  205. continue;
  206. case ReadBufferStatus::ReadInFlight:
  207. // Internal read requests can start and complete but pending finalization before they're ever scheduled in which case
  208. // the estimated time is not set.
  209. baseTime = m_readRequests[i]->GetEstimatedCompletion();
  210. if (baseTime == AZStd::chrono::steady_clock::time_point())
  211. {
  212. baseTime = now;
  213. }
  214. break;
  215. case ReadBufferStatus::PendingDecompression:
  216. baseTime = now;
  217. break;
  218. default:
  219. AZ_Assert(false, "Unsupported buffer type: %i.", m_readBufferStatus[i]);
  220. continue;
  221. }
  222. baseTime += cumulativeDelay; // Delay until the first decompression slot becomes available.
  223. baseTime += decompressionDelay; // The average time it takes for the job system to pick up the decompression job.
  224. // Calculate the amount of time it will take to decompress the data.
  225. AZ::IO::FileRequest* compressedRequest = m_readRequests[i]->GetParent();
  226. auto data = AZStd::get_if<AZ::IO::Requests::CompressedReadData>(&compressedRequest->GetCommand());
  227. size_t bytesToDecompress = data->m_compressionInfo.m_compressedSize;
  228. auto decompressionDuration = AZStd::chrono::microseconds(
  229. static_cast<AZ::u64>((bytesToDecompress * totalDecompressionDuration) / totalBytesDecompressed));
  230. smallestDecompressionDuration = AZStd::min(smallestDecompressionDuration, decompressionDuration);
  231. baseTime += decompressionDuration;
  232. m_readRequests[i]->SetEstimatedCompletion(baseTime);
  233. }
  234. if (smallestDecompressionDuration != AZStd::chrono::microseconds::max())
  235. {
  236. cumulativeDelay += smallestDecompressionDuration; // Time after which the decompression jobs and pending reads have completed.
  237. }
  238. // For all internally pending compressed reads add the decompression time. The read time will have already been added downstream.
  239. // Because this call will go from the top of the stack to the bottom, but estimation is calculated from the bottom to the top, this
  240. // list should be processed in reverse order.
  241. for (auto pendingIt = internalPending.rbegin(); pendingIt != internalPending.rend(); ++pendingIt)
  242. {
  243. EstimateCompressedReadRequest(*pendingIt, cumulativeDelay, decompressionDelay,
  244. totalDecompressionDuration, totalBytesDecompressed);
  245. }
  246. // Finally add a prediction for all the requests that are waiting to be queued.
  247. for (auto requestIt = pendingBegin; requestIt != pendingEnd; ++requestIt)
  248. {
  249. EstimateCompressedReadRequest(*requestIt, cumulativeDelay, decompressionDelay,
  250. totalDecompressionDuration, totalBytesDecompressed);
  251. }
  252. }
  253. void DecompressorRegistrarEntry::EstimateCompressedReadRequest(AZ::IO::FileRequest* request, AZStd::chrono::microseconds& cumulativeDelay,
  254. AZStd::chrono::microseconds decompressionDelay, double totalDecompressionDurationUs, double totalBytesDecompressed) const
  255. {
  256. auto data = AZStd::get_if<AZ::IO::Requests::CompressedReadData>(&request->GetCommand());
  257. if (data)
  258. {
  259. AZStd::chrono::microseconds processingTime = decompressionDelay;
  260. size_t bytesToDecompress = data->m_compressionInfo.m_compressedSize;
  261. processingTime += AZStd::chrono::microseconds(
  262. static_cast<AZ::u64>((bytesToDecompress * totalDecompressionDurationUs) / totalBytesDecompressed));
  263. cumulativeDelay += processingTime;
  264. request->SetEstimatedCompletion(request->GetEstimatedCompletion() + processingTime);
  265. }
  266. }
  267. void DecompressorRegistrarEntry::CollectStatistics(AZStd::vector<AZ::IO::Statistic>& statistics) const
  268. {
  269. constexpr double usToSec = 1.0 / (1000.0 * 1000.0);
  270. constexpr double usToMs = 1.0 / 1000.0;
  271. if (m_bytesDecompressed.GetNumRecorded() > 1) // There's always a default added.
  272. {
  273. //It only makes sense to add decompression statistics when reading from PAK files.
  274. statistics.push_back(AZ::IO::Statistic::CreateInteger(
  275. m_name, "Available decompression slots", m_maxNumTasks - m_numRunningTasks,
  276. "The number of available slots to decompress files with. Increasing the number of slots will require more hardware "
  277. "resources and may negatively impact other cpu utilization but improves performance of Streamer."));
  278. statistics.push_back(AZ::IO::Statistic::CreateInteger(
  279. m_name, "Available read slots", m_maxNumReads - m_numInFlightReads,
  280. "The number of slots available to queue read requests into. Increasing this number will allow more read requests to be "
  281. "processed but new slots will not become available until a read file can queued in a decompression slot. Increasing this "
  282. "number will only be helpful if decompressing is faster than reading, otherwise the number of slots can be kept around the "
  283. "same number as there are decompression slots."));
  284. statistics.push_back(AZ::IO::Statistic::CreateInteger(
  285. m_name, "Pending decompression", m_numPendingDecompression,
  286. "The number of requests that have completed reading and are waiting for a decompression slot to become available. If this "
  287. "value is frequently more than zero than the number of decompression slots may need to be increased, a faster decompressor "
  288. "is needed or the number of read slots can be reduced."));
  289. statistics.push_back(AZ::IO::Statistic::CreateByteSize(
  290. m_name, "Buffer memory", m_memoryUsage,
  291. "The total amount of memory in megabytes used by the decompressor. This is dependent on the compressed file sizes and may "
  292. "improve by reducing the file sizes of the largest files in the archive."));
  293. double averageJobStartDelay = m_decompressionJobDelayMicroSec.CalculateAverage() * usToMs;
  294. statistics.push_back(AZ::IO::Statistic::CreateFloat(
  295. m_name, "Decompression job delay (avg. ms)", averageJobStartDelay,
  296. "The amount of time in milliseconds between queuing a decompression job and it starting. If this is too long it may "
  297. "indicate that the job system is too saturated to pick decompression jobs."));
  298. AZ::u64 totalBytesDecompressed = m_bytesDecompressed.GetTotal();
  299. double totalDecompressionTimeSec = m_decompressionDurationMicroSec.GetTotal() * usToSec;
  300. statistics.push_back(AZ::IO::Statistic::CreateBytesPerSecond(
  301. m_name, "Decompression Speed per job", totalBytesDecompressed / totalDecompressionTimeSec,
  302. "The average speed that the decompressor can handle. If this is not higher than the average read "
  303. "speed than decompressing can't keep up with file reads. Increasing the number of jobs can help hide this issue, but only "
  304. "for parallel reads, while individual reads will still remain decompression bound."));
  305. #if AZ_STREAMER_ADD_EXTRA_PROFILING_INFO
  306. statistics.push_back(AZ::IO::Statistic::CreatePercentageRange(
  307. m_name, DecompBoundName, m_decompressionBoundStat.GetAverage(), m_decompressionBoundStat.GetMinimum(),
  308. m_decompressionBoundStat.GetMaximum(),
  309. "The percentage of time that Streamer was decompression bound. High values mean that more jobs are needed, although this "
  310. "may only help if there are a sufficient number of requests."));
  311. statistics.push_back(AZ::IO::Statistic::CreatePercentageRange(
  312. m_name, ReadBoundName, m_readBoundStat.GetAverage(), m_readBoundStat.GetMinimum(),
  313. m_readBoundStat.GetMaximum(),
  314. "The percentage of time that Streamer was read bound. High values are generally good if there is a sufficient number of "
  315. "requests."));
  316. #endif
  317. }
  318. AZ::IO::StreamStackEntry::CollectStatistics(statistics);
  319. }
  320. bool DecompressorRegistrarEntry::IsIdle() const
  321. {
  322. return m_pendingReads.empty()
  323. && m_pendingFileExistChecks.empty()
  324. && m_numInFlightReads == 0
  325. && m_numPendingDecompression == 0
  326. && m_numRunningTasks == 0;
  327. }
  328. void DecompressorRegistrarEntry::PrepareReadRequest(AZ::IO::FileRequest* request, AZ::IO::Requests::ReadRequestData& data)
  329. {
  330. if (AZ::IO::CompressionInfo info; AZ::IO::CompressionUtils::FindCompressionInfo(info, data.m_path.GetRelativePath()))
  331. {
  332. AZ::IO::FileRequest* nextRequest = m_context->GetNewInternalRequest();
  333. if (info.m_isCompressed)
  334. {
  335. AZ_Assert(info.m_decompressor,
  336. "DecompressorRegistrarEntry::PrepareRequest found a compressed file, but no decompressor to decompress with.");
  337. nextRequest->CreateCompressedRead(request, AZStd::move(info), data.m_output, data.m_offset, data.m_size);
  338. }
  339. else
  340. {
  341. AZ::IO::FileRequest* pathStorageRequest = m_context->GetNewInternalRequest();
  342. pathStorageRequest->CreateRequestPathStore(request, AZStd::move(info.m_archiveFilename));
  343. auto& pathStorage = AZStd::get<AZ::IO::Requests::RequestPathStoreData>(pathStorageRequest->GetCommand());
  344. nextRequest->CreateRead(pathStorageRequest, data.m_output, data.m_outputSize, pathStorage.m_path,
  345. info.m_offset + data.m_offset, data.m_size, info.m_isSharedPak);
  346. }
  347. if (info.m_conflictResolution == AZ::IO::ConflictResolution::PreferFile)
  348. {
  349. auto callback = [this, nextRequest](const AZ::IO::FileRequest& checkRequest)
  350. {
  351. auto check = AZStd::get_if<AZ::IO::Requests::FileExistsCheckData>(&checkRequest.GetCommand());
  352. AZ_Assert(check,
  353. "Callback in DecompressorRegistrarEntry::PrepareReadRequest expected FileExistsCheck but got another command.");
  354. if (check->m_found)
  355. {
  356. AZ::IO::FileRequest* originalRequest = m_context->RejectRequest(nextRequest);
  357. if (AZStd::holds_alternative<AZ::IO::Requests::RequestPathStoreData>(originalRequest->GetCommand()))
  358. {
  359. originalRequest = m_context->RejectRequest(originalRequest);
  360. }
  361. AZ::IO::StreamStackEntry::PrepareRequest(originalRequest);
  362. }
  363. else
  364. {
  365. m_context->PushPreparedRequest(nextRequest);
  366. }
  367. };
  368. AZ::IO::FileRequest* fileCheckRequest = m_context->GetNewInternalRequest();
  369. fileCheckRequest->CreateFileExistsCheck(data.m_path);
  370. fileCheckRequest->SetCompletionCallback(AZStd::move(callback));
  371. AZ::IO::StreamStackEntry::QueueRequest(fileCheckRequest);
  372. }
  373. else
  374. {
  375. m_context->PushPreparedRequest(nextRequest);
  376. }
  377. }
  378. else
  379. {
  380. AZ::IO::StreamStackEntry::PrepareRequest(request);
  381. }
  382. }
  383. void DecompressorRegistrarEntry::PrepareDedicatedCache(AZ::IO::FileRequest* request, const AZ::IO::RequestPath& path)
  384. {
  385. if (AZ::IO::CompressionInfo info;
  386. AZ::IO::CompressionUtils::FindCompressionInfo(info, path.GetRelativePath()))
  387. {
  388. AZ::IO::FileRequest* nextRequest = m_context->GetNewInternalRequest();
  389. auto RunCacheCommand = [request, &info, nextRequest](auto&& args)
  390. {
  391. using Command = AZStd::decay_t<decltype(args)>;
  392. if constexpr (AZStd::is_same_v<Command, AZ::IO::Requests::CreateDedicatedCacheData>)
  393. {
  394. nextRequest->CreateDedicatedCacheCreation(AZStd::move(info.m_archiveFilename),
  395. AZ::IO::FileRange::CreateRange(info.m_offset, info.m_compressedSize), request);
  396. }
  397. else if constexpr (AZStd::is_same_v<Command, AZ::IO::Requests::DestroyDedicatedCacheData>)
  398. {
  399. nextRequest->CreateDedicatedCacheDestruction(AZStd::move(info.m_archiveFilename),
  400. AZ::IO::FileRange::CreateRange(info.m_offset, info.m_compressedSize), request);
  401. }
  402. };
  403. AZStd::visit(AZStd::move(RunCacheCommand), request->GetCommand());
  404. if (info.m_conflictResolution == AZ::IO::ConflictResolution::PreferFile)
  405. {
  406. auto callback = [this, nextRequest](const AZ::IO::FileRequest& checkRequest)
  407. {
  408. auto check = AZStd::get_if<AZ::IO::Requests::FileExistsCheckData>(&checkRequest.GetCommand());
  409. AZ_Assert(check,
  410. "Callback in DecompressorRegistrarEntry::PrepareDedicatedCache expected FileExistsCheck but got another command.");
  411. if (check->m_found)
  412. {
  413. AZ::IO::FileRequest* originalRequest = nextRequest->GetParent();
  414. m_context->RejectRequest(nextRequest);
  415. AZ::IO::StreamStackEntry::PrepareRequest(originalRequest);
  416. }
  417. else
  418. {
  419. m_context->PushPreparedRequest(nextRequest);
  420. }
  421. };
  422. AZ::IO::FileRequest* fileCheckRequest = m_context->GetNewInternalRequest();
  423. fileCheckRequest->CreateFileExistsCheck(path);
  424. fileCheckRequest->SetCompletionCallback(AZStd::move(callback));
  425. AZ::IO::StreamStackEntry::QueueRequest(fileCheckRequest);
  426. }
  427. else
  428. {
  429. m_context->PushPreparedRequest(nextRequest);
  430. }
  431. }
  432. else
  433. {
  434. AZ::IO::StreamStackEntry::PrepareRequest(request);
  435. }
  436. }
  437. void DecompressorRegistrarEntry::FileExistsCheck(AZ::IO::FileRequest* checkRequest)
  438. {
  439. auto& fileCheckRequest = AZStd::get<AZ::IO::Requests::FileExistsCheckData>(checkRequest->GetCommand());
  440. AZ::IO::CompressionInfo info;
  441. if (AZ::IO::CompressionUtils::FindCompressionInfo(info, fileCheckRequest.m_path.GetRelativePath()))
  442. {
  443. fileCheckRequest.m_found = true;
  444. }
  445. else
  446. {
  447. // The file isn't in the archive but might still exist as a loose file, so let the next node have a shot.
  448. AZ::IO::StreamStackEntry::QueueRequest(checkRequest);
  449. }
  450. }
  451. void DecompressorRegistrarEntry::StartArchiveRead(AZ::IO::FileRequest* compressedReadRequest)
  452. {
  453. if (!m_next)
  454. {
  455. compressedReadRequest->SetStatus(AZ::IO::IStreamerTypes::RequestStatus::Failed);
  456. m_context->MarkRequestAsCompleted(compressedReadRequest);
  457. return;
  458. }
  459. for (AZ::u32 i = 0; i < m_maxNumReads; ++i)
  460. {
  461. if (m_readBufferStatus[i] == ReadBufferStatus::Unused)
  462. {
  463. auto data = AZStd::get_if<AZ::IO::Requests::CompressedReadData>(&compressedReadRequest->GetCommand());
  464. AZ_Assert(data, "Compressed request that's starting a read in DecompressorRegistrarEntry didn't contain compression read data.");
  465. AZ_Assert(data->m_compressionInfo.m_decompressor,
  466. "FileRequest for DecompressorRegistrarEntry is missing a decompression callback.");
  467. AZ::IO::CompressionInfo& info = data->m_compressionInfo;
  468. AZ_Assert(info.m_decompressor, "DecompressorRegistrarEntry is planning to a queue a request for reading but couldn't find a decompressor.");
  469. // The buffer is aligned down but the offset is not corrected. If the offset was adjusted it would mean the same data is read
  470. // multiple times and negates the block cache's ability to detect these cases. By still adjusting it means that the reads between
  471. // the BlockCache's prolog and epilog are read into aligned buffers.
  472. size_t offsetAdjustment = info.m_offset - AZ_SIZE_ALIGN_DOWN(info.m_offset, aznumeric_cast<size_t>(m_alignment));
  473. size_t bufferSize = AZ_SIZE_ALIGN_UP((info.m_compressedSize + offsetAdjustment), aznumeric_cast<size_t>(m_alignment));
  474. m_readBuffers[i] = reinterpret_cast<Buffer>(AZ::AllocatorInstance<AZ::SystemAllocator>::Get().Allocate(
  475. bufferSize, m_alignment));
  476. m_memoryUsage += bufferSize;
  477. AZ::IO::FileRequest* archiveReadRequest = m_context->GetNewInternalRequest();
  478. archiveReadRequest->CreateRead(compressedReadRequest, m_readBuffers[i] + offsetAdjustment, bufferSize, info.m_archiveFilename,
  479. info.m_offset, info.m_compressedSize, info.m_isSharedPak);
  480. auto ArchiveReadCommandComplete = [this, readSlot = i](AZ::IO::FileRequest& request)
  481. {
  482. FinishArchiveRead(&request, readSlot);
  483. };
  484. archiveReadRequest->SetCompletionCallback(AZStd::move(ArchiveReadCommandComplete));
  485. m_next->QueueRequest(archiveReadRequest);
  486. m_readRequests[i] = archiveReadRequest;
  487. m_readBufferStatus[i] = ReadBufferStatus::ReadInFlight;
  488. AZ_Assert(m_numInFlightReads < m_maxNumReads,
  489. "A FileRequest was queued for reading in DecompressorRegistrarEntry, but there's no slots available.");
  490. m_numInFlightReads++;
  491. return;
  492. }
  493. }
  494. AZ_Assert(false, "%u of %u read slots are used in the DecompressorRegistrarEntry, but no empty slot was found.", m_numInFlightReads, m_maxNumReads);
  495. }
  496. void DecompressorRegistrarEntry::FinishArchiveRead(AZ::IO::FileRequest* readRequest, AZ::u32 readSlot)
  497. {
  498. AZ_Assert(m_readRequests[readSlot] == readRequest,
  499. "Request in the archive read slot isn't the same as request that's being completed.");
  500. AZ::IO::FileRequest* compressedRequest = readRequest->GetParent();
  501. AZ_Assert(compressedRequest, "Read requests started by DecompressorRegistrarEntry is missing a parent request.");
  502. if (readRequest->GetStatus() == AZ::IO::IStreamerTypes::RequestStatus::Completed)
  503. {
  504. m_readBufferStatus[readSlot] = ReadBufferStatus::PendingDecompression;
  505. ++m_numPendingDecompression;
  506. // Add this wait so the compressed request isn't fully completed yet as only the read part is done. The
  507. // job thread will finish this wait, which in turn will trigger this function again on the main streaming thread.
  508. AZ::IO::FileRequest* waitRequest = m_context->GetNewInternalRequest();
  509. waitRequest->CreateWait(compressedRequest);
  510. m_readRequests[readSlot] = waitRequest;
  511. }
  512. else
  513. {
  514. auto data = AZStd::get_if<AZ::IO::Requests::CompressedReadData>(&compressedRequest->GetCommand());
  515. AZ_Assert(data, "Compressed request in DecompressorRegistrarEntry that finished unsuccessfully didn't contain compression read data.");
  516. AZ::IO::CompressionInfo& info = data->m_compressionInfo;
  517. size_t offsetAdjustment = info.m_offset - AZ_SIZE_ALIGN_DOWN(info.m_offset, aznumeric_cast<size_t>(m_alignment));
  518. size_t bufferSize = AZ_SIZE_ALIGN_UP((info.m_compressedSize + offsetAdjustment), aznumeric_cast<size_t>(m_alignment));
  519. m_memoryUsage -= bufferSize;
  520. if (m_readBuffers[readSlot] != nullptr)
  521. {
  522. AZ::AllocatorInstance<AZ::SystemAllocator>::Get().DeAllocate(m_readBuffers[readSlot], bufferSize, m_alignment);
  523. m_readBuffers[readSlot] = nullptr;
  524. }
  525. m_readRequests[readSlot] = nullptr;
  526. m_readBufferStatus[readSlot] = ReadBufferStatus::Unused;
  527. AZ_Assert(m_numInFlightReads > 0,
  528. "Trying to decrement a read request after it was canceled or failed in DecompressorRegistrarEntry, "
  529. "but no read requests are supposed to be queued.");
  530. m_numInFlightReads--;
  531. }
  532. }
  533. bool DecompressorRegistrarEntry::StartDecompressions()
  534. {
  535. bool submittedTask = false;
  536. for (AZ::u32 readSlot = 0; readSlot < m_maxNumReads; ++readSlot)
  537. {
  538. // Find completed read.
  539. if (m_readBufferStatus[readSlot] != ReadBufferStatus::PendingDecompression)
  540. {
  541. continue;
  542. }
  543. auto DecompressionCompleteTask = []()
  544. {
  545. AZ_Trace("Decompression Registrar Streamer", "All current decompression task are complete");
  546. };
  547. AZ::TaskGraph taskGraph{ "Decompression Tasks" };
  548. AZ::TaskToken finishToken = taskGraph.AddTask(
  549. AZ::TaskDescriptor{"Decompress Gather All", "Compression"}, AZStd::move(DecompressionCompleteTask));
  550. // Find decompression slot
  551. for (size_t decompressionSlotIndex = 0; decompressionSlotIndex < m_maxNumTasks; ++decompressionSlotIndex)
  552. {
  553. if (m_processingJobs[decompressionSlotIndex].IsProcessing())
  554. {
  555. continue;
  556. }
  557. AZ::IO::FileRequest* waitRequest = m_readRequests[readSlot];
  558. AZ_Assert(AZStd::holds_alternative<AZ::IO::Requests::WaitData>(waitRequest->GetCommand()),
  559. "File request waiting for decompression wasn't marked as being a wait operation.");
  560. AZ::IO::FileRequest* compressedRequest = waitRequest->GetParent();
  561. AZ_Assert(compressedRequest, "Read requests started by DecompressorRegistrarEntry is missing a parent request.");
  562. auto DecompressionRequestFinishedCB = [this, taskSlot = AZ::u32(decompressionSlotIndex)](AZ::IO::FileRequest& request)
  563. {
  564. FinishDecompression(&request, taskSlot);
  565. };
  566. waitRequest->SetCompletionCallback(AZStd::move(DecompressionRequestFinishedCB));
  567. DecompressionInformation& info = m_processingJobs[decompressionSlotIndex];
  568. info.m_waitRequest = waitRequest;
  569. info.m_queueStartTime = AZStd::chrono::steady_clock::now();
  570. info.m_jobStartTime = info.m_queueStartTime; // Set these to the same in case the scheduler requests an update before the job has started.
  571. info.m_compressedData = m_readBuffers[readSlot]; // Transfer ownership of the pointer.
  572. m_readBuffers[readSlot] = nullptr;
  573. if (m_taskGraphEvent == nullptr || m_taskGraphEvent->IsSignaled())
  574. {
  575. m_taskGraphEvent = AZStd::make_unique<AZ::TaskGraphEvent>("Decompressor Registrar Wait");
  576. auto data = AZStd::get_if<AZ::IO::Requests::CompressedReadData>(&compressedRequest->GetCommand());
  577. AZ_Assert(data, "Compressed request in DecompressorRegistrarEntry that's starting decompression didn't contain compression read data.");
  578. AZ_Assert(data->m_compressionInfo.m_decompressor, "DecompressorRegistrarEntry is queuing a decompression job but couldn't find a decompressor.");
  579. info.m_alignmentOffset = aznumeric_caster(data->m_compressionInfo.m_offset -
  580. AZ_SIZE_ALIGN_DOWN(data->m_compressionInfo.m_offset, aznumeric_cast<size_t>(m_alignment)));
  581. AZ::TaskDescriptor taskDescriptor{ "Decompress file", "Compression" };
  582. if (data->m_readOffset == 0 && data->m_readSize == data->m_compressionInfo.m_uncompressedSize)
  583. {
  584. auto decompressTask = [this, &info]()
  585. {
  586. FullDecompression(m_context, info);
  587. };
  588. AZ::TaskToken token = taskGraph.AddTask(taskDescriptor, AZStd::move(decompressTask));
  589. token.Precedes(finishToken);
  590. }
  591. else
  592. {
  593. m_memoryUsage += data->m_compressionInfo.m_uncompressedSize;
  594. auto decompressTask = [this, &info]()
  595. {
  596. PartialDecompression(m_context, info);
  597. };
  598. AZ::TaskToken token = taskGraph.AddTask(taskDescriptor, AZStd::move(decompressTask));
  599. token.Precedes(finishToken);
  600. }
  601. --m_numPendingDecompression;
  602. ++m_numRunningTasks;
  603. }
  604. AZ_Assert(m_taskGraphEvent->IsSignaled() == false, "Decompression has been started on another thread"
  605. " while executing this function");
  606. taskGraph.SubmitOnExecutor(m_taskExecutor, m_taskGraphEvent.get());
  607. m_readRequests[readSlot] = nullptr;
  608. m_readBufferStatus[readSlot] = ReadBufferStatus::Unused;
  609. AZ_Assert(m_numInFlightReads > 0, "Trying to decrement a read request after it's queued for decompression in DecompressorRegistrarEntry, but no read requests are supposed to be queued.");
  610. m_numInFlightReads--;
  611. submittedTask = true;
  612. break;
  613. }
  614. if (m_numInFlightReads == 0 || m_numRunningTasks == m_maxNumTasks)
  615. {
  616. return submittedTask;
  617. }
  618. }
  619. return submittedTask;
  620. }
  621. void DecompressorRegistrarEntry::FinishDecompression([[maybe_unused]] AZ::IO::FileRequest* waitRequest, AZ::u32 jobSlot)
  622. {
  623. DecompressionInformation& jobInfo = m_processingJobs[jobSlot];
  624. AZ_Assert(jobInfo.m_waitRequest == waitRequest, "Job slot didn't contain the expected wait request.");
  625. auto endTime = AZStd::chrono::steady_clock::now();
  626. AZ::IO::FileRequest* compressedRequest = jobInfo.m_waitRequest->GetParent();
  627. AZ_Assert(compressedRequest, "A wait request attached to DecompressorRegistrarEntry was completed but didn't have a parent compressed request.");
  628. auto data = AZStd::get_if<AZ::IO::Requests::CompressedReadData>(&compressedRequest->GetCommand());
  629. AZ_Assert(data, "Compressed request in DecompressorRegistrarEntry that completed decompression didn't contain compression read data.");
  630. AZ::IO::CompressionInfo& info = data->m_compressionInfo;
  631. size_t offsetAdjustment = info.m_offset - AZ_SIZE_ALIGN_DOWN(info.m_offset, aznumeric_cast<size_t>(m_alignment));
  632. size_t bufferSize = AZ_SIZE_ALIGN_UP((info.m_compressedSize + offsetAdjustment), aznumeric_cast<size_t>(m_alignment));
  633. m_memoryUsage -= bufferSize;
  634. if (data->m_readOffset != 0 || data->m_readSize != data->m_compressionInfo.m_uncompressedSize)
  635. {
  636. m_memoryUsage -= data->m_compressionInfo.m_uncompressedSize;
  637. }
  638. m_decompressionJobDelayMicroSec.PushEntry(AZStd::chrono::duration_cast<AZStd::chrono::microseconds>(
  639. jobInfo.m_jobStartTime - jobInfo.m_queueStartTime).count());
  640. m_decompressionDurationMicroSec.PushEntry(AZStd::chrono::duration_cast<AZStd::chrono::microseconds>(
  641. endTime - jobInfo.m_jobStartTime).count());
  642. m_bytesDecompressed.PushEntry(data->m_compressionInfo.m_compressedSize);
  643. AZ::AllocatorInstance<AZ::SystemAllocator>::Get().DeAllocate(jobInfo.m_compressedData, bufferSize, m_alignment);
  644. jobInfo.m_compressedData = nullptr;
  645. AZ_Assert(m_numRunningTasks > 0, "About to complete a decompression job, but the internal count doesn't see a running job.");
  646. --m_numRunningTasks;
  647. return;
  648. }
  649. void DecompressorRegistrarEntry::FullDecompression(AZ::IO::StreamerContext* context, DecompressionInformation& info)
  650. {
  651. info.m_jobStartTime = AZStd::chrono::steady_clock::now();
  652. AZ::IO::FileRequest* compressedRequest = info.m_waitRequest->GetParent();
  653. AZ_Assert(compressedRequest, "A wait request attached to DecompressorRegistrarEntry was completed but didn't have a parent compressed request.");
  654. auto request = AZStd::get_if<AZ::IO::Requests::CompressedReadData>(&compressedRequest->GetCommand());
  655. AZ_Assert(request, "Compressed request in DecompressorRegistrarEntry that's running full decompression didn't contain compression read data.");
  656. AZ::IO::CompressionInfo& compressionInfo = request->m_compressionInfo;
  657. AZ_Assert(compressionInfo.m_decompressor, "Full decompressor job started, but there's no decompressor callback assigned.");
  658. AZ_Assert(request->m_readOffset == 0, "DecompressorRegistrarEntry is doing a full decompression on a file request with an offset (%zu).",
  659. request->m_readOffset);
  660. AZ_Assert(compressionInfo.m_uncompressedSize == request->m_readSize,
  661. "DecompressorRegistrarEntry is doing a full decompression, but the target buffer size (%llu) doesn't match the decompressed size (%zu).",
  662. request->m_readSize, compressionInfo.m_uncompressedSize);
  663. bool success = compressionInfo.m_decompressor(compressionInfo, info.m_compressedData + info.m_alignmentOffset,
  664. compressionInfo.m_compressedSize, request->m_output, compressionInfo.m_uncompressedSize);
  665. info.m_waitRequest->SetStatus(success ? AZ::IO::IStreamerTypes::RequestStatus::Completed : AZ::IO::IStreamerTypes::RequestStatus::Failed);
  666. context->MarkRequestAsCompleted(info.m_waitRequest);
  667. context->WakeUpSchedulingThread();
  668. }
  669. void DecompressorRegistrarEntry::PartialDecompression(AZ::IO::StreamerContext* context, DecompressionInformation& info)
  670. {
  671. info.m_jobStartTime = AZStd::chrono::steady_clock::now();
  672. AZ::IO::FileRequest* compressedRequest = info.m_waitRequest->GetParent();
  673. AZ_Assert(compressedRequest, "A wait request attached to DecompressorRegistrarEntry was completed but didn't have a parent compressed request.");
  674. auto request = AZStd::get_if<AZ::IO::Requests::CompressedReadData>(&compressedRequest->GetCommand());
  675. AZ_Assert(request, "Compressed request in DecompressorRegistrarEntry that's running partial decompression didn't contain compression read data.");
  676. AZ::IO::CompressionInfo& compressionInfo = request->m_compressionInfo;
  677. AZ_Assert(compressionInfo.m_decompressor, "Partial decompressor job started, but there's no decompressor callback assigned.");
  678. auto decompressionBuffer = AZStd::make_unique<AZStd::byte[]>(compressionInfo.m_uncompressedSize);
  679. bool success = compressionInfo.m_decompressor(compressionInfo, info.m_compressedData + info.m_alignmentOffset,
  680. compressionInfo.m_compressedSize, decompressionBuffer.get(), compressionInfo.m_uncompressedSize);
  681. info.m_waitRequest->SetStatus(success ? AZ::IO::IStreamerTypes::RequestStatus::Completed : AZ::IO::IStreamerTypes::RequestStatus::Failed);
  682. memcpy(request->m_output, decompressionBuffer.get() + request->m_readOffset, request->m_readSize);
  683. context->MarkRequestAsCompleted(info.m_waitRequest);
  684. context->WakeUpSchedulingThread();
  685. }
  686. void DecompressorRegistrarEntry::Report(const AZ::IO::Requests::ReportData& data) const
  687. {
  688. switch (data.m_reportType)
  689. {
  690. case AZ::IO::IStreamerTypes::ReportType::Config:
  691. data.m_output.push_back(AZ::IO::Statistic::CreateInteger(
  692. m_name, "Max number of reads", m_maxNumReads, "The maximum number of parallel reads this decompressor node will support."));
  693. data.m_output.push_back(AZ::IO::Statistic::CreateInteger(
  694. m_name, "Max number of jobs", m_maxNumTasks,
  695. "The maximum number of decompression jobs that can run in parallel. A thread per job will be used. A dedicated job system "
  696. "is used as not to interfere with the regular job/task system, but this does add additional thread scheduling work to the "
  697. "operating system and may impact how stable the performance on the rest of the engine is. If there are functions that "
  698. "periodically take much longer, look for excessive context switches by the operating systems and if found lowering this "
  699. "value may help reduce those at the cost or streaming speeds."));
  700. data.m_output.push_back(AZ::IO::Statistic::CreateByteSize(
  701. m_name, "Alignment", m_alignment,
  702. "The alignment for read buffer. This allows enough memory to be reserved in the read buffer to allow for alignment to "
  703. "happen by later nodes without requiring additional temporary buffers. This does not adjust the offset or read size in "
  704. "order to allow cache nodes to remain effective."));
  705. data.m_output.push_back(AZ::IO::Statistic::CreateReferenceString(
  706. m_name, "Next node", m_next ? AZStd::string_view(m_next->GetName()) : AZStd::string_view("<None>"),
  707. "The name of the node that follows this node or none."));
  708. break;
  709. };
  710. }
  711. } // namespace AZ::IO