RCQueueSortModel.cpp 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417
  1. /*
  2. * Copyright (c) Contributors to the Open 3D Engine Project.
  3. * For complete copyright and license terms please see the LICENSE at the root of this distribution.
  4. *
  5. * SPDX-License-Identifier: Apache-2.0 OR MIT
  6. *
  7. */
  8. #include <native/resourcecompiler/RCQueueSortModel.h>
  9. #include <native/AssetDatabase/AssetDatabase.h>
  10. #include "rcjoblistmodel.h"
  11. namespace RCQueueSortModel_Internal
  12. {
  13. // Used as a debugging flag. You can set this to true to only process critical jobs to make sure
  14. // that the application properly requests jobs that may not have completed yet during initial startup.
  15. static constexpr bool s_debug_OnlyProcessCriticalJobs = false;
  16. }
  17. namespace AssetProcessor
  18. {
  19. RCQueueSortModel::RCQueueSortModel(QObject* parent)
  20. : QSortFilterProxyModel(parent)
  21. {
  22. // jobs assigned to "all" platforms are always active.
  23. m_currentlyConnectedPlatforms.insert(QString("all"));
  24. }
  25. void RCQueueSortModel::AttachToModel(RCJobListModel* target)
  26. {
  27. if (target)
  28. {
  29. setDynamicSortFilter(true);
  30. BusConnect();
  31. m_sourceModel = target;
  32. setSourceModel(target);
  33. setSortRole(RCJobListModel::jobIndexRole);
  34. sort(0);
  35. }
  36. else
  37. {
  38. BusDisconnect();
  39. setSourceModel(nullptr);
  40. m_sourceModel = nullptr;
  41. }
  42. }
  43. void PrintJob(RCJob* actualJob, int idx)
  44. {
  45. if (actualJob)
  46. {
  47. AZ_Printf(
  48. AssetProcessor::ConsoleChannel,
  49. " Job %04i: (Escalation: %i) (Priority: %3i) (Status: %10s) (Crit? %s) (Plat: %s) (MissingDeps? %s) - %s\n",
  50. idx,
  51. actualJob->JobEscalation(),
  52. actualJob->GetPriority(),
  53. RCJob::GetStateDescription(actualJob->GetState()).toUtf8().constData(),
  54. actualJob->IsCritical() ? "Y" : "N",
  55. actualJob->GetPlatformInfo().m_identifier.c_str(),
  56. actualJob->HasMissingSourceDependency() ? "Y" : "N",
  57. actualJob->GetJobEntry().GetAbsoluteSourcePath().toUtf8().constData());
  58. for (const JobDependencyInternal& jobDependencyInternal : actualJob->GetJobDependencies())
  59. {
  60. AZ_Printf(AssetProcessor::ConsoleChannel, " Depends on: %s%s\n",
  61. jobDependencyInternal.ToString().c_str(),
  62. jobDependencyInternal.m_isMissingSource ? " - missing source" : "");
  63. }
  64. }
  65. }
  66. void RCQueueSortModel::DumpJobListInSortOrder()
  67. {
  68. AZ_Printf(AssetProcessor::ConsoleChannel, "------------------------------------------------------------\n");
  69. AZ_Printf(AssetProcessor::ConsoleChannel, "RCQueueSortModel: Printing Job list in sorted order:\n");
  70. for (int idx = 0; idx < rowCount(); ++idx)
  71. {
  72. QModelIndex parentIndex = mapToSource(index(idx, 0));
  73. RCJob* actualJob = m_sourceModel->getItem(parentIndex.row());
  74. PrintJob(actualJob, idx);
  75. }
  76. AZ_Printf(AssetProcessor::ConsoleChannel, "------------------------------------------------------------\n");
  77. }
  78. RCJob* RCQueueSortModel::GetNextPendingJob()
  79. {
  80. using namespace RCQueueSortModel_Internal;
  81. if (m_dirtyNeedsResort)
  82. {
  83. setDynamicSortFilter(false);
  84. QSortFilterProxyModel::sort(0);
  85. setDynamicSortFilter(true);
  86. m_dirtyNeedsResort = false;
  87. }
  88. // anyPendingJob contains the first job in the queue that either could be started right now, or is waiting for a dependency to be resolved.
  89. // if we get to the end of the queue and no job is immediately started, we start this job anyway just to try to clear any queue log jams.
  90. RCJob* anyPendingJob = nullptr;
  91. bool waitingOnCatalog = false; // If we find an asset thats waiting on the catalog, don't assume there's a cyclic dependency. We'll wait until the catalog is updated and then check again.
  92. for (int idx = 0; idx < rowCount(); ++idx)
  93. {
  94. QModelIndex parentIndex = mapToSource(index(idx, 0));
  95. RCJob* actualJob = m_sourceModel->getItem(parentIndex.row());
  96. if ((actualJob) && (actualJob->GetState() == RCJob::pending))
  97. {
  98. if (!anyPendingJob)
  99. {
  100. // anyPendingJob is always the first available job that could be processed in order to unblock the queue.
  101. // its not necesarily the best job to process.
  102. anyPendingJob = actualJob;
  103. }
  104. // If this job has a missing dependency, and there are any jobs in flight,
  105. // don't queue it until those jobs finish, in case they resolve the dependency.
  106. // This does mean that if there are multiple queued jobs with missing dependencies,
  107. // they'll run one at a time instead of in parallel, while waiting for the missing dependency
  108. // to be potentially resolved.
  109. if (actualJob->HasMissingSourceDependency() &&
  110. (m_sourceModel->jobsInFlight() > 0 || m_sourceModel->jobsInQueueWithoutMissingDependencies() > 0 ||
  111. m_sourceModel->jobsPendingCatalog() > 0))
  112. {
  113. // There is a race condition where this can fail:
  114. // Asset A generates an intermediate asset.
  115. // Asset B has a source dependency on that intermediate asset. Asset B's "HasMissingSourceDependency" flag is true.
  116. // Asset A is the last job in the queue without a missing job/source dependency, so it runs.
  117. // Asset A finishes processing job, and outputs the product.
  118. // Intermediate A has not yet been scanned and discovered by Asset Processor, so it's not in flight or in the queue yet.
  119. // Asset Processor goes to pull the next job in the queue. Asset B still technically has a missing job dependency on the intermediate asset output.
  120. // Asset B gets pulled from the queue to process here, even though Intermediate A hasn't run yet, because Intermediate A hasn't gone into the queue yet.
  121. // This happened with FBX files and a dependency on an intermediate asset materialtype, before Common platform jobs were made higher priority than host platform jobs.
  122. // Why not just check if the target file exists at this point? Because the job key has to match up.
  123. // When a check was added here to see if all the dependency files existed, other material jobs started to end up in the queue indefinitely
  124. // because they had a dependency on a file that existed but a job key that did not exist for that file.
  125. continue;
  126. }
  127. bool canProcessJob = true;
  128. if (actualJob->HasMissingSourceDependency())
  129. {
  130. // Jobs with missing source dependencies are not an actual warning or error case, since their dependencies can
  131. // appear later. We skip over them until there are none left in the queue.
  132. // Once nothing is left in the queue except for jobs with missing dependencies, we unblock the first one and run
  133. // it anyway to try to clear any log jams and that's when its appropriate to issue a warning.
  134. canProcessJob = false;
  135. }
  136. // If the job has any other jobs its waiting for, we can't process it yet.
  137. for (const JobDependencyInternal& jobDependencyInternal : actualJob->GetJobDependencies())
  138. {
  139. if (jobDependencyInternal.m_jobDependency.m_type == AssetBuilderSDK::JobDependencyType::Order ||
  140. jobDependencyInternal.m_jobDependency.m_type == AssetBuilderSDK::JobDependencyType::OrderOnce ||
  141. jobDependencyInternal.m_jobDependency.m_type == AssetBuilderSDK::JobDependencyType::OrderOnly)
  142. {
  143. const AssetBuilderSDK::JobDependency& jobDependency = jobDependencyInternal.m_jobDependency;
  144. AZ_Assert(
  145. AZ::IO::PathView(jobDependency.m_sourceFile.m_sourceFileDependencyPath).IsAbsolute(),
  146. "Dependency path %s is not an absolute path",
  147. jobDependency.m_sourceFile.m_sourceFileDependencyPath.c_str());
  148. QueueElementID elementId(
  149. SourceAssetReference(jobDependency.m_sourceFile.m_sourceFileDependencyPath.c_str()),
  150. jobDependency.m_platformIdentifier.c_str(),
  151. jobDependency.m_jobKey.c_str());
  152. bool isInFlight = m_sourceModel->isInFlight(elementId);
  153. bool isInQueue = false;
  154. if (!isInFlight) // it can't be in flight and in the queue also.
  155. {
  156. isInQueue = m_sourceModel->isInQueue(elementId);
  157. }
  158. if (isInFlight || isInQueue)
  159. {
  160. if (isInQueue)
  161. {
  162. // escalate the job we depend on, if we're critical or escalated ourselves. No point in doing this
  163. // if the job we're about to escalate is already in flight.
  164. if ((actualJob->JobEscalation() != AssetProcessor::DefaultEscalation) || (actualJob->IsCritical()))
  165. {
  166. m_sourceModel->UpdateJobEscalation(elementId, AssetProcessor::CriticalDependencyEscalation);
  167. }
  168. else
  169. {
  170. // increase its priority, but don't escalate it, so that it will always go in front of this job.
  171. m_sourceModel->UpdateJobPriority(elementId, actualJob->GetPriority() + 1);
  172. }
  173. }
  174. canProcessJob = false;
  175. if (!anyPendingJob || (anyPendingJob->HasMissingSourceDependency() && !actualJob->HasMissingSourceDependency()))
  176. {
  177. // This job is a better candidate to unlock the queue than the previous pending job we found.
  178. // or we found no prior one.
  179. bool jobIsImportant = (actualJob->IsCritical() || actualJob->JobEscalation() != AssetProcessor::DefaultEscalation);
  180. bool processingNonImportantJobs = !s_debug_OnlyProcessCriticalJobs;
  181. if (processingNonImportantJobs || jobIsImportant)
  182. {
  183. anyPendingJob = actualJob;
  184. }
  185. }
  186. }
  187. else if(m_sourceModel->isWaitingOnCatalog(elementId))
  188. {
  189. canProcessJob = false;
  190. waitingOnCatalog = true;
  191. }
  192. }
  193. }
  194. if (canProcessJob)
  195. {
  196. if (s_debug_OnlyProcessCriticalJobs) // note that s_debug_OnlyProcessCriticalJobs is constexpr so this will be optimized out
  197. {
  198. bool isCritical = actualJob->IsCritical();
  199. bool isEscalated = (actualJob->JobEscalation() != AssetProcessor::DefaultEscalation);
  200. if ((!isCritical) && (!isEscalated))
  201. {
  202. // If we're only processing critical jobs, skip this one.
  203. continue;
  204. }
  205. }
  206. return actualJob;
  207. }
  208. }
  209. }
  210. // Either there are no jobs to do or there is a cyclic order job dependency or the only jobs left are waiting for other jobs
  211. // to complete that will never appear.
  212. // Unblock the first job we can in case this clears the log jam.
  213. if (anyPendingJob && m_sourceModel->jobsInFlight() == 0 && !waitingOnCatalog)
  214. {
  215. // there's only a tiny amount of space to print things in the log, so keep the message terse but visible!
  216. AZ_Warning(AssetProcessor::ConsoleChannel, false,
  217. "Job (%s, %s, %s, %s) missing dependencies - check logs in Project/User/Log folder! \n",
  218. anyPendingJob->GetJobEntry().m_sourceAssetReference.AbsolutePath().c_str(), anyPendingJob->GetJobKey().toUtf8().data(),
  219. anyPendingJob->GetJobEntry().m_platformInfo.m_identifier.c_str(), anyPendingJob->GetBuilderGuid().ToString<AZStd::string>().c_str());
  220. // Dump the job list to that folder. It will include all remaining jobs, what dependencies are missing, etc, for debugging.
  221. DumpJobListInSortOrder();
  222. return anyPendingJob;
  223. }
  224. return nullptr;
  225. }
  226. bool RCQueueSortModel::filterAcceptsRow(int source_row, const QModelIndex& source_parent) const
  227. {
  228. (void)source_parent;
  229. RCJob* actualJob = m_sourceModel->getItem(source_row);
  230. if (!actualJob)
  231. {
  232. return false;
  233. }
  234. if (actualJob->GetState() != RCJob::pending)
  235. {
  236. return false;
  237. }
  238. return true;
  239. }
  240. bool RCQueueSortModel::lessThan(const QModelIndex& left, const QModelIndex& right) const
  241. {
  242. RCJob* leftJob = m_sourceModel->getItem(left.row());
  243. RCJob* rightJob = m_sourceModel->getItem(right.row());
  244. // auto fail jobs always take priority to give user feedback asap.
  245. bool autoFailLeft = leftJob->IsAutoFail();
  246. bool autoFailRight = rightJob->IsAutoFail();
  247. if (autoFailLeft != autoFailRight)
  248. {
  249. return autoFailLeft;
  250. }
  251. // While it may be tempting to sort jobs that are missing source dependencies to the end of the queue,
  252. // the nature of this queue is that it is a priority queue, and the jobs that are missing source dependencies
  253. // may still be priority jobs that need to be processed as quickly as possible, ie, as soon as their missing dependencies
  254. // are resolved. If a job is missing a source dependency, the job starting system will simply skip over it and leave it in the
  255. // queue until the dependency is resolved, at which point it will be processed.
  256. // Common platform jobs generate intermediate assets. These generate additional jobs, and the intermediate assets
  257. // can be source and/or job dependencies for other, queued assets.
  258. // Run intermediate assets before active platform and host platform jobs.
  259. // Critical jobs should run first, so skip the comparison here if either job is critical, to allow criticality to come in first.
  260. bool platformsMatch = leftJob->GetPlatformInfo().m_identifier == rightJob->GetPlatformInfo().m_identifier;
  261. // first thing to check is in platform. If you're currently connected to the editor or other tool on a given platform
  262. // you should prioritize those assets.
  263. if (!platformsMatch)
  264. {
  265. bool leftIsCommon = (leftJob->GetPlatformInfo().m_identifier == AssetBuilderSDK::CommonPlatformName);
  266. bool rightIsCommon = (rightJob->GetPlatformInfo().m_identifier == AssetBuilderSDK::CommonPlatformName);
  267. if (leftIsCommon != rightIsCommon)
  268. {
  269. return leftIsCommon;
  270. }
  271. bool leftActive = m_currentlyConnectedPlatforms.contains(leftJob->GetPlatformInfo().m_identifier.c_str());
  272. bool rightActive = m_currentlyConnectedPlatforms.contains(rightJob->GetPlatformInfo().m_identifier.c_str());
  273. if (leftActive)
  274. {
  275. if (!rightActive)
  276. {
  277. return true; // left before right
  278. }
  279. }
  280. else if (rightActive)
  281. {
  282. return false; // right before left.
  283. }
  284. }
  285. // critical jobs take priority
  286. if (leftJob->IsCritical() != rightJob->IsCritical())
  287. {
  288. // one of the two is critical.
  289. return leftJob->IsCritical();
  290. }
  291. int leftJobEscalation = leftJob->JobEscalation();
  292. int rightJobEscalation = rightJob->JobEscalation();
  293. if (leftJobEscalation != rightJobEscalation)
  294. {
  295. return leftJobEscalation > rightJobEscalation;
  296. }
  297. // arbitrarily, lets prioritize assets for the tools host platform, ie, if you're on a PC, process PC assets before
  298. // you process android assets, so that the editor and other tools start quicker.
  299. if (!platformsMatch)
  300. {
  301. if (leftJob->GetPlatformInfo().m_identifier == AzToolsFramework::AssetSystem::GetHostAssetPlatform())
  302. {
  303. return true; // left wins.
  304. }
  305. if (rightJob->GetPlatformInfo().m_identifier == AzToolsFramework::AssetSystem::GetHostAssetPlatform())
  306. {
  307. return false; // right wins
  308. }
  309. }
  310. int priorityLeft = leftJob->GetPriority();
  311. int priorityRight = rightJob->GetPriority();
  312. if (priorityLeft != priorityRight)
  313. {
  314. return priorityLeft > priorityRight;
  315. }
  316. if (leftJob->GetJobEntry().m_sourceAssetReference == rightJob->GetJobEntry().m_sourceAssetReference)
  317. {
  318. // If there are two jobs for the same source, then sort by job run key.
  319. return leftJob->GetJobEntry().m_jobRunKey < rightJob->GetJobEntry().m_jobRunKey;
  320. }
  321. // if we get all the way down here it means we're dealing with two assets which are not
  322. // in any compile groups, not a priority platform, not a priority type, priority platform, etc.
  323. // we can arrange these any way we want, but must pick at least a stable order.
  324. return leftJob->GetJobEntry().GetAbsoluteSourcePath() < rightJob->GetJobEntry().GetAbsoluteSourcePath();
  325. }
  326. void RCQueueSortModel::AssetProcessorPlatformConnected(const AZStd::string platform)
  327. {
  328. QMetaObject::invokeMethod(this, "ProcessPlatformChangeMessage", Qt::QueuedConnection, Q_ARG(QString, QString::fromUtf8(platform.c_str())), Q_ARG(bool, true));
  329. }
  330. void RCQueueSortModel::AssetProcessorPlatformDisconnected(const AZStd::string platform)
  331. {
  332. QMetaObject::invokeMethod(this, "ProcessPlatformChangeMessage", Qt::QueuedConnection, Q_ARG(QString, QString::fromUtf8(platform.c_str())), Q_ARG(bool, false));
  333. }
  334. void RCQueueSortModel::ProcessPlatformChangeMessage(QString platformName, bool connected)
  335. {
  336. AZ_TracePrintf(AssetProcessor::DebugChannel, "RCQueueSortModel: Platform %s has %s.", platformName.toUtf8().data(), connected ? "connected" : "disconnected");
  337. m_dirtyNeedsResort = true;
  338. if (connected)
  339. {
  340. m_currentlyConnectedPlatforms.insert(platformName);
  341. }
  342. else
  343. {
  344. m_currentlyConnectedPlatforms.remove(platformName);
  345. }
  346. }
  347. void RCQueueSortModel::AddJobIdEntry(AssetProcessor::RCJob* rcJob)
  348. {
  349. m_currentJobRunKeyToJobEntries[rcJob->GetJobEntry().m_jobRunKey] = rcJob;
  350. }
  351. void RCQueueSortModel::RemoveJobIdEntry(AssetProcessor::RCJob* rcJob)
  352. {
  353. m_currentJobRunKeyToJobEntries.erase(rcJob->GetJobEntry().m_jobRunKey);
  354. }
  355. void RCQueueSortModel::OnEscalateJobs(AssetProcessor::JobIdEscalationList jobIdEscalationList)
  356. {
  357. for (const auto& jobIdEscalationPair : jobIdEscalationList)
  358. {
  359. auto found = m_currentJobRunKeyToJobEntries.find(jobIdEscalationPair.first);
  360. if (found != m_currentJobRunKeyToJobEntries.end())
  361. {
  362. m_sourceModel->UpdateJobEscalation(found->second, jobIdEscalationPair.second);
  363. }
  364. }
  365. }
  366. } // end namespace AssetProcessor