Browse Source

Jobs with missing source dependencies are now run later, in case the dependencies resolve (#14862)

* WIP test

Signed-off-by: AMZN-stankowi <[email protected]>

* First pass at automated test that demonstrates the not desired FBX processing failure.
This is to make it easier to build the actual fix, after this test goes in then I can work on fixing this issue.

Signed-off-by: AMZN-stankowi <[email protected]>

* Cleaned up test

Signed-off-by: AMZN-stankowi <[email protected]>

* Cleaned up stale code

Signed-off-by: AMZN-stankowi <[email protected]>

* Cleaned up builder creation

Signed-off-by: AMZN-stankowi <[email protected]>

* PR feedback - added comments on processEvents

Signed-off-by: AMZN-stankowi <[email protected]>

* Jobs with missing dependencies are lower priority now, and when the dependency resolves (or goes away) the old job will be cleared from the queue and the new job replaces it.

Signed-off-by: AMZN-stankowi <[email protected]>

* Added comments, cleaned out stale code

Signed-off-by: AMZN-stankowi <[email protected]>

* work in progress alternate solution to dependency processing order. I'm going to clean this up and change my approach, but I didn't want to lose this commit.

Signed-off-by: AMZN-stankowi <[email protected]>

* Common platform jobs sort earlier now

Signed-off-by: AMZN-stankowi <[email protected]>

* Common jobs come earlier in the queue now. This fixes FBX, but there is still the potential for an edge case race condition, it's just much lower now.

Signed-off-by: AMZN-stankowi <[email protected]>

* Removed debugging output

Signed-off-by: AMZN-stankowi <[email protected]>

* Cleaned up more debug code

Signed-off-by: AMZN-stankowi <[email protected]>

* Added comments for TestingRCController class.

Signed-off-by: AMZN-stankowi <[email protected]>

* More code cleanup

Signed-off-by: AMZN-stankowi <[email protected]>

* Switched to AZ_Trace

Signed-off-by: AMZN-stankowi <[email protected]>

* Removed print and for loop that was causing unused variable issues in certain build configurations. This was just an edge case helper print out, it's cleaner to remove it than handle unused parameters.

Signed-off-by: AMZN-stankowi <[email protected]>

* Tightened up timing of events in the automated test, to make this test less frail to other tests running in parallel changing the timing.

Signed-off-by: AMZN-stankowi <[email protected]>

---------

Signed-off-by: AMZN-stankowi <[email protected]>
AMZN-stankowi 2 years ago
parent
commit
16e855c373

+ 1 - 1
Code/Framework/AzFramework/AzFramework/Asset/AssetCatalog.h

@@ -108,7 +108,7 @@ namespace AzFramework
         /// by the catalog.
         bool IsTrackedAssetType(const char* assetFilename) const;
 
-        /// Helper function that adds all of searchAssetId's dependencies to the depedencySet/List (leaving out ones that are already in the list)
+        /// Helper function that adds all of searchAssetId's dependencies to the dependencySet/List (leaving out ones that are already in the list)
         void AddAssetDependencies(const AZ::Data::AssetId& searchAssetId, AZStd::unordered_set<AZ::Data::AssetId>& assetSet, AZStd::vector<AZ::Data::ProductDependency>& dependencyList, const
                                   AZStd::unordered_set<AZ::Data::AssetId>& exclusionList,
                                   const AZStd::vector<AZStd::string>& wildcardPatternExclusionList,

+ 1 - 8
Code/Tools/AssetProcessor/native/AssetManager/assetProcessorManager.cpp

@@ -3962,14 +3962,7 @@ namespace AssetProcessor
                         jobDependencyInternal->m_jobDependency.m_jobKey.c_str(),
                         jobDependencyInternal->m_jobDependency.m_platformIdentifier.c_str());
 
-                    job.m_warnings.push_back(AZStd::string::format(
-                        "No job was found to match the job dependency criteria declared by file %s. (File: %s, JobKey: %s, Platform: %s)\n"
-                        "This may be due to a mismatched job key.\n"
-                        "Job ordering will not be guaranteed and could result in errors or unexpected output.",
-                        job.m_jobEntry.GetAbsoluteSourcePath().toUtf8().constData(),
-                        jobDependencyInternal->m_jobDependency.m_sourceFile.m_sourceFileDependencyPath.c_str(),
-                        jobDependencyInternal->m_jobDependency.m_jobKey.c_str(),
-                        jobDependencyInternal->m_jobDependency.m_platformIdentifier.c_str()));
+                    job.m_hasMissingSourceDependency = true;
                 }
             }
 

+ 9 - 0
Code/Tools/AssetProcessor/native/assetprocessor.h

@@ -235,6 +235,15 @@ namespace AssetProcessor
         // if you set a job to "auto fail" it will check the m_jobParam map for a AZ_CRC(AutoFailReasonKey) and use that, if present, for fail information
         bool m_autoFail = false;
 
+        // If true, this job declared a source dependency that could not be resolved.
+        // There's a chance that the dependency might be fulfilled as part of processing other assets, if
+        // an intermediate asset matches the missing dependency. If this is true, this job is treated as
+        // lower priority than other jobs, so that there's a chance the dependency is resolved before this job runs.
+        // If that dependency is resolved, then this job will be removed from the queue and re-added.
+        // If the dependency is not resolved, then the job will run at the end of the queue still, in case the
+        // builder is able to process the asset with the dependency gap, if the dependency was optional.
+        bool m_hasMissingSourceDependency = false;
+
         AZStd::string ToString() const
         {
             return QString("%1 %2 %3").arg(m_jobEntry.GetAbsoluteSourcePath(), m_jobEntry.m_platformInfo.m_identifier.c_str(), m_jobEntry.m_jobKey).toUtf8().data();

+ 80 - 14
Code/Tools/AssetProcessor/native/resourcecompiler/RCQueueSortModel.cpp

@@ -6,6 +6,7 @@
  *
  */
 #include <native/resourcecompiler/RCQueueSortModel.h>
+#include <native/AssetDatabase/AssetDatabase.h>
 #include "rcjoblistmodel.h"
 
 namespace AssetProcessor
@@ -55,12 +56,46 @@ namespace AssetProcessor
             RCJob* actualJob = m_sourceModel->getItem(parentIndex.row());
             if ((actualJob) && (actualJob->GetState() == RCJob::pending))
             {
+                // If this job has a missing dependency, and there are any jobs in flight,
+                // don't queue it until those jobs finish, in case they resolve the dependency.
+                // This does mean that if there are multiple queued jobs with missing dependencies,
+                // they'll run one at a time instead of in parallel, while waiting for the missing dependency
+                // to be potentially resolved.
+                if (actualJob->HasMissingSourceDependency() &&
+                    (m_sourceModel->jobsInFlight() > 0 || m_sourceModel->jobsInQueueWithoutMissingDependencies() > 0 ||
+                     m_sourceModel->jobsPendingCatalog() > 0))
+                {
+                    // There is a race condition where this can fail:
+                    // Asset A generates an intermediate asset.
+                    // Asset B has a source dependency on that intermediate asset. Asset B's "HasMissingSourceDependency" flag is true.
+                    // Asset A is the last job in the queue without a missing job/source dependency, so it runs.
+                    // Asset A finishes processing job, and outputs the product.
+                    // Intermediate A has not yet been scanned and discovered by Asset Processor, so it's not in flight or in the queue yet.
+                    // Asset Processor goes to pull the next job in the queue. Asset B still technically has a missing job dependency on the intermediate asset output.
+                    // Asset B gets pulled from the queue to process here, even though Intermediate A hasn't run yet, because Intermediate A hasn't gone into the queue yet.
+                    // This happened with FBX files and a dependency on an intermediate asset materialtype, before Common platform jobs were made higher priority than host platform jobs.
+                    // Why not just check if the target file exists at this point? Because the job key has to match up.
+                    // When a check was added here to see if all the dependency files existed, other material jobs started to end up in the queue indefinitely
+                    // because they had a dependency on a file that existed but a job key that did not exist for that file.
+                    continue;
+                }
+
+                if (actualJob->HasMissingSourceDependency())
+                {
+                    AZ_Warning(
+                        AssetProcessor::ConsoleChannel,
+                        false,
+                        "No job was found to match the job dependency criteria declared by file %s.\n"
+                        "This may be due to a mismatched job key.\n"
+                        "Job ordering will not be guaranteed and could result in errors or unexpected output.",
+                        actualJob->GetJobEntry().GetAbsoluteSourcePath().toUtf8().constData());
+                }
                 bool canProcessJob = true;
-                for (const JobDependencyInternal& jobDepedencyInternal : actualJob->GetJobDependencies())
+                for (const JobDependencyInternal& jobDependencyInternal : actualJob->GetJobDependencies())
                 {
-                    if (jobDepedencyInternal.m_jobDependency.m_type == AssetBuilderSDK::JobDependencyType::Order || jobDepedencyInternal.m_jobDependency.m_type == AssetBuilderSDK::JobDependencyType::OrderOnce)
+                    if (jobDependencyInternal.m_jobDependency.m_type == AssetBuilderSDK::JobDependencyType::Order || jobDependencyInternal.m_jobDependency.m_type == AssetBuilderSDK::JobDependencyType::OrderOnce)
                     {
-                        const AssetBuilderSDK::JobDependency& jobDependency = jobDepedencyInternal.m_jobDependency;
+                        const AssetBuilderSDK::JobDependency& jobDependency = jobDependencyInternal.m_jobDependency;
                         AZ_Assert(
                             AZ::IO::PathView(jobDependency.m_sourceFile.m_sourceFileDependencyPath).IsAbsolute(),
                             "Dependency path %s is not an absolute path",
@@ -73,7 +108,7 @@ namespace AssetProcessor
                         if (m_sourceModel->isInFlight(elementId) || m_sourceModel->isInQueue(elementId))
                         {
                             canProcessJob = false;
-                            if (!anyPendingJob)
+                            if (!anyPendingJob || (anyPendingJob->HasMissingSourceDependency() && !actualJob->HasMissingSourceDependency()))
                             {
                                 anyPendingJob = actualJob;
                             }
@@ -144,20 +179,51 @@ namespace AssetProcessor
             return false; // right before left.
         }
 
-        // first thing to check is in platform.
-        bool leftActive = m_currentlyConnectedPlatforms.contains(leftJob->GetPlatformInfo().m_identifier.c_str());
-        bool rightActive = m_currentlyConnectedPlatforms.contains(rightJob->GetPlatformInfo().m_identifier.c_str());
-
-        if (leftActive)
+        // Check if either job was marked as having a missing source dependency.
+        // This means that the job is looking for another source asset and job to exist before it runs, but
+        // that source doesn't exist yet. Those jobs are deferred to run later, in case the dependency eventually shows up.
+        // The dependency may be on an intermediate asset that will be generated later in asset processing.
+        if (leftJob->HasMissingSourceDependency() != rightJob->HasMissingSourceDependency())
         {
-            if (!rightActive)
+            if (rightJob->HasMissingSourceDependency())
             {
-                return true; // left before right
+                return true; // left does not have a missing source dependency, but right does, so left wins.
             }
+            return false; // Right does not have a missing source dependency, but left does, so right wins.
         }
-        else if (rightActive)
+
+        // Common platform jobs generate intermediate assets. These generate additional jobs, and the intermediate assets
+        // can be source and/or job dependencies for other, queued assets.
+        // Run intermediate assets before active platform and host platform jobs.
+        // Critical jobs should run first, so skip the comparison here if either job is critical, to allow criticality to come in first.
+        bool platformsMatch = leftJob->GetPlatformInfo().m_identifier == rightJob->GetPlatformInfo().m_identifier;
+
+        // first thing to check is in platform.
+        if (!platformsMatch)
         {
-            return false; // right before left.
+            if (leftJob->GetPlatformInfo().m_identifier == AssetBuilderSDK::CommonPlatformName)
+            {
+                return true;
+            }
+            if (rightJob->GetPlatformInfo().m_identifier == AssetBuilderSDK::CommonPlatformName)
+            {
+                return false;
+            }
+
+            bool leftActive = m_currentlyConnectedPlatforms.contains(leftJob->GetPlatformInfo().m_identifier.c_str());
+            bool rightActive = m_currentlyConnectedPlatforms.contains(rightJob->GetPlatformInfo().m_identifier.c_str());
+
+            if (leftActive)
+            {
+                if (!rightActive)
+                {
+                    return true; // left before right
+                }
+            }
+            else if (rightActive)
+            {
+                return false; // right before left.
+            }
         }
 
         // critical jobs take priority
@@ -185,7 +251,7 @@ namespace AssetProcessor
         }
 
         // arbitrarily, lets have PC get done first since pc-format assets are what the editor uses.
-        if (leftJob->GetPlatformInfo().m_identifier != rightJob->GetPlatformInfo().m_identifier)
+        if (!platformsMatch)
         {
             if (leftJob->GetPlatformInfo().m_identifier == AzToolsFramework::AssetSystem::GetHostAssetPlatform())
             {

+ 57 - 25
Code/Tools/AssetProcessor/native/resourcecompiler/rccontroller.cpp

@@ -168,33 +168,58 @@ namespace AssetProcessor
 
     void RCController::JobSubmitted(JobDetails details)
     {
-        AssetProcessor::QueueElementID checkFile(details.m_jobEntry.m_sourceAssetReference, details.m_jobEntry.m_platformInfo.m_identifier.c_str(), details.m_jobEntry.m_jobKey);
+        AssetProcessor::QueueElementID checkFile(details.m_jobEntry.m_sourceAssetReference,
+            details.m_jobEntry.m_platformInfo.m_identifier.c_str(),
+            details.m_jobEntry.m_jobKey);
+        bool cancelJob = false;
+        RCJob* existingJob = nullptr;
+        int existingJobIndex = -1;
 
         if (m_RCJobListModel.isInQueue(checkFile))
         {
-            AZ_TracePrintf(
-                AssetProcessor::DebugChannel,
-                "Job is already in queue and has not started yet - ignored [%s, %s, %s]\n",
-                checkFile.GetSourceAssetReference().AbsolutePath().c_str(),
-                checkFile.GetPlatform().toUtf8().data(),
-                checkFile.GetJobDescriptor().toUtf8().data());
-
-            // Don't just discard the job, we need to let APM know so it can keep track of the number of jobs that are pending/finished
-            AssetBuilderSDK::JobCommandBus::Event(details.m_jobEntry.m_jobRunKey, &AssetBuilderSDK::JobCommandBus::Events::Cancel);
-            Q_EMIT FileCancelled(details.m_jobEntry);
-            return;
+            existingJobIndex = m_RCJobListModel.GetIndexOfJobByState(checkFile, RCJob::pending);
+            if (existingJobIndex != -1)
+            {
+                existingJob = m_RCJobListModel.getItem(existingJobIndex);
+
+                // The job status has changed
+                if (existingJob->HasMissingSourceDependency() != details.m_hasMissingSourceDependency)
+                {
+                    AZ_TracePrintf(
+                        AssetProcessor::DebugChannel,
+                        "Cancelling Job [%s, %s, %s] missing source dependency status has changed.\n",
+                        checkFile.GetSourceAssetReference().AbsolutePath().c_str(),
+                        checkFile.GetPlatform().toUtf8().data(),
+                        checkFile.GetJobDescriptor().toUtf8().data());
+                    cancelJob = true;
+                }
+            }
+
+            if (!cancelJob)
+            {
+                AZ_TracePrintf(
+                    AssetProcessor::DebugChannel,
+                    "Job is already in queue and has not started yet - ignored [%s, %s, %s]\n",
+                    checkFile.GetSourceAssetReference().AbsolutePath().c_str(),
+                    checkFile.GetPlatform().toUtf8().data(),
+                    checkFile.GetJobDescriptor().toUtf8().data());
+
+                // Don't just discard the job, we need to let APM know so it can keep track of the number of jobs that are pending/finished
+                AssetBuilderSDK::JobCommandBus::Event(details.m_jobEntry.m_jobRunKey, &AssetBuilderSDK::JobCommandBus::Events::Cancel);
+                Q_EMIT FileCancelled(details.m_jobEntry);
+                return;
+            }
         }
 
         if (m_RCJobListModel.isInFlight(checkFile))
         {
             // if the computed fingerprint is the same as the fingerprint of the in-flight job, this is okay.
-            int existingJobIndex = m_RCJobListModel.GetIndexOfProcessingJob(checkFile);
+            existingJobIndex = m_RCJobListModel.GetIndexOfJobByState(checkFile, RCJob::processing);
             if (existingJobIndex != -1)
             {
-                RCJob* job = m_RCJobListModel.getItem(existingJobIndex);
-                bool cancelJob = false;
+                existingJob = m_RCJobListModel.getItem(existingJobIndex);
 
-                if (job->GetJobEntry().m_computedFingerprint != details.m_jobEntry.m_computedFingerprint)
+                if (existingJob->GetJobEntry().m_computedFingerprint != details.m_jobEntry.m_computedFingerprint)
                 {
                     AZ_TracePrintf(
                         AssetProcessor::DebugChannel,
@@ -202,11 +227,11 @@ namespace AssetProcessor
                         checkFile.GetSourceAssetReference().AbsolutePath().c_str(),
                         checkFile.GetPlatform().toUtf8().data(),
                         checkFile.GetJobDescriptor().toUtf8().data(),
-                        job->GetJobEntry().m_computedFingerprint,
+                        existingJob->GetJobEntry().m_computedFingerprint,
                         details.m_jobEntry.m_computedFingerprint);
                     cancelJob = true;
                 }
-                else if(!job->GetJobDependencies().empty())
+                else if (!existingJob->GetJobDependencies().empty())
                 {
                     // If a job has dependencies, it's very likely it was re-queued as a result of a dependency being changed
                     // The in-flight job is probably going to fail at best, or use old data at worst, so cancel the in-flight job
@@ -232,15 +257,23 @@ namespace AssetProcessor
                     return;
                 }
 
-                if(cancelJob)
-                {
-                    job->SetState(RCJob::JobState::cancelled);
-                    AssetBuilderSDK::JobCommandBus::Event(job->GetJobEntry().m_jobRunKey, &AssetBuilderSDK::JobCommandBus::Events::Cancel);
-                    m_RCJobListModel.UpdateRow(existingJobIndex);
-                }
             }
         }
 
+        if (cancelJob && existingJob && existingJobIndex != -1)
+        {
+            bool markCanceledJobAsFinished = existingJob->GetState() == RCJob::JobState::pending;
+            existingJob->SetState(RCJob::JobState::cancelled);
+
+            // If the job was pending, mark it as finished, so asset processor can clean up the interface for this job and update tracking info.
+            if (markCanceledJobAsFinished)
+            {
+                FinishJob(existingJob);
+            }
+            AssetBuilderSDK::JobCommandBus::Event(existingJob->GetJobEntry().m_jobRunKey, &AssetBuilderSDK::JobCommandBus::Events::Cancel);
+            m_RCJobListModel.UpdateRow(existingJobIndex);
+        }
+
         RCJob* rcJob = new RCJob(&m_RCJobListModel);
         rcJob->Init(details); // note - move operation.  From this point on you must use the job details to refer to it.
         m_RCQueueSortModel.AddJobIdEntry(rcJob);
@@ -310,7 +343,6 @@ namespace AssetProcessor
                         break;
                     }
                 }
-
                 StartJob(rcJob);
                 rcJob = m_RCQueueSortModel.GetNextPendingJob();
             }

+ 3 - 1
Code/Tools/AssetProcessor/native/resourcecompiler/rccontroller.h

@@ -105,6 +105,9 @@ namespace AssetProcessor
         void OnJobComplete(JobEntry completeEntry, AzToolsFramework::AssetSystem::JobStatus status);
         void OnAddedToCatalog(JobEntry jobEntry);
 
+    protected:
+        AssetProcessor::RCQueueSortModel m_RCQueueSortModel;
+
     private:
         void FinishJob(AssetProcessor::RCJob* rcJob);
 
@@ -118,7 +121,6 @@ namespace AssetProcessor
         QMap<QString, int> m_jobsCountPerPlatform;// This stores the count of jobs per platform in the RC Queue
         QMap<QString, int> m_pendingCriticalJobsPerPlatform;// This stores the count of pending critical jobs per platform in the RC Queue
         AssetProcessor::RCJobListModel m_RCJobListModel;
-        AssetProcessor::RCQueueSortModel m_RCQueueSortModel;
 
         //! An Asset Compile Group is a set of assets that we're tracking the compilation of
         //! It consists of a whole bunch of assets and is considered to be "complete" when either one of the assets in the group fails

+ 5 - 0
Code/Tools/AssetProcessor/native/resourcecompiler/rcjob.cpp

@@ -105,6 +105,11 @@ namespace AssetProcessor
         return m_jobDetails.m_jobEntry;
     }
 
+    bool RCJob::HasMissingSourceDependency() const
+    {
+        return m_jobDetails.m_hasMissingSourceDependency;
+    }
+
     QDateTime RCJob::GetTimeCreated() const
     {
         return m_timeCreated;

+ 1 - 0
Code/Tools/AssetProcessor/native/resourcecompiler/rcjob.h

@@ -203,6 +203,7 @@ namespace AssetProcessor
         AssetBuilderSDK::ProcessJobResponse& GetProcessJobResponse();
 
         const JobEntry& GetJobEntry() const;
+        bool HasMissingSourceDependency() const;
 
         void Start();
 

+ 19 - 3
Code/Tools/AssetProcessor/native/resourcecompiler/rcjoblistmodel.cpp

@@ -77,6 +77,23 @@ namespace AssetProcessor
         return m_jobsInFlight.size();
     }
 
+    unsigned int RCJobListModel::jobsInQueueWithoutMissingDependencies() const
+    {
+        unsigned int jobsWithNoMissingDependencies = 0;
+        for (const auto& job : m_jobsInQueueLookup)
+        {
+            if (!job->HasMissingSourceDependency())
+            {
+                ++jobsWithNoMissingDependencies;
+            }
+        }
+        return jobsWithNoMissingDependencies;
+    }
+
+    unsigned int RCJobListModel::jobsPendingCatalog() const
+    {
+        return m_finishedJobsNotInCatalog.count();
+    }
 
     void RCJobListModel::UpdateJobEscalation(AssetProcessor::RCJob* rcJob, int jobEscalation)
     {
@@ -229,7 +246,6 @@ namespace AssetProcessor
 #if defined(DEBUG_RCJOB_MODEL)
         AZ_TracePrintf(AssetProcessor::DebugChannel, "JobTrace markAsCompleted(%i %s,%s,%s)\n", rcJob, rcJob->GetInputFileAbsolutePath().toUtf8().constData(), rcJob->GetPlatformInfo().m_identifier.c_str(), rcJob->GetJobKey().toUtf8().constData());
 #endif
-
         rcJob->SetTimeCompleted(QDateTime::currentDateTime());
 
         auto foundInQueue = m_jobsInQueueLookup.find(rcJob->GetElementID());
@@ -316,12 +332,12 @@ namespace AssetProcessor
         return false;
     }
 
-    int RCJobListModel::GetIndexOfProcessingJob(const QueueElementID& elementId)
+    int RCJobListModel::GetIndexOfJobByState(const QueueElementID& elementId, RCJob::JobState jobState)
     {
         for (int idx = 0; idx < rowCount(); ++idx)
         {
             RCJob* job = getItem(idx);
-            if (job->GetState() == RCJob::processing && job->GetElementID() == elementId)
+            if (job->GetState() == jobState && job->GetElementID() == elementId)
             {
                 return idx;
                 break;

+ 5 - 1
Code/Tools/AssetProcessor/native/resourcecompiler/rcjoblistmodel.h

@@ -74,6 +74,10 @@ namespace AssetProcessor
         void markAsCompleted(RCJob* rcJob);
         void markAsCataloged(const AssetProcessor::QueueElementID& check);
         unsigned int jobsInFlight() const;
+        // Returns how many jobs in the queue have the missing dependency flag set.
+        unsigned int jobsInQueueWithoutMissingDependencies() const;
+        // Returns how many finished jobs that haven't been updated in the catalog.
+        unsigned int jobsPendingCatalog() const;
 
         void UpdateJobEscalation(AssetProcessor::RCJob* rcJob, int jobPrioririty);
         void UpdateRow(int jobIndex);
@@ -90,7 +94,7 @@ namespace AssetProcessor
 
         int itemCount() const;
         RCJob* getItem(int index) const;
-        int GetIndexOfProcessingJob(const QueueElementID& elementId);
+        int GetIndexOfJobByState(const QueueElementID& elementId, RCJob::JobState jobState);
 
         ///! EraseJobs expects the database name of the source file.
         void EraseJobs(const SourceAssetReference& sourceAssetReference, AZStd::vector<RCJob*>& pendingJobs);

+ 21 - 5
Code/Tools/AssetProcessor/native/tests/assetmanager/AssetManagerTestingBase.cpp

@@ -145,7 +145,7 @@ namespace UnitTests
 
         AZ::Utils::WriteFile("unit test file", m_testFilePath);
 
-        m_rc = AZStd::make_unique<AssetProcessor::RCController>(1, 1);
+        m_rc = AZStd::make_unique<TestingRCController>(1, 1);
         m_rc->SetDispatchPaused(false);
 
         QObject::connect(
@@ -227,12 +227,16 @@ namespace UnitTests
     void AssetManagerTestingBase::ProcessJob(AssetProcessor::RCController& rcController, const AssetProcessor::JobDetails& jobDetails)
     {
         rcController.JobSubmitted(jobDetails);
+        UnitTests::JobSignalReceiver receiver;
+        WaitForNextJobToProcess(receiver);
+    }
 
-        JobSignalReceiver receiver;
-        QCoreApplication::processEvents(); // Once to get the job started
+    void AssetManagerTestingBase::WaitForNextJobToProcess(UnitTests::JobSignalReceiver &receiver)
+    {
+        QCoreApplication::processEvents(); // RCController::DispatchJobsImpl : Once to get the job started
         receiver.WaitForFinish(); // Wait for the RCJob to signal it has completed working
-        QCoreApplication::processEvents(); // Once more to trigger the JobFinished event
-        QCoreApplication::processEvents(); // Again to trigger the Finished event
+        QCoreApplication::processEvents(); // RCJob::Finished : Once more to trigger the JobFinished event
+        QCoreApplication::processEvents(); // RCController::FinishJob : Again to trigger the Finished event
     }
 
     AssetBuilderSDK::CreateJobFunction AssetManagerTestingBase::CreateJobStage(
@@ -349,6 +353,18 @@ namespace UnitTests
             "fingerprint");
     }
 
+    void AssetManagerTestingBase::SetCatalogToUpdateOnJobCompletion()
+    {
+        using namespace AssetBuilderSDK;
+        QObject::connect(
+            m_rc.get(),
+            &AssetProcessor::RCController::FileCompiled,
+            [this](AssetProcessor::JobEntry entry, AssetBuilderSDK::ProcessJobResponse response)
+            {
+                QMetaObject::invokeMethod(m_rc.get(), "OnAddedToCatalog", Qt::QueuedConnection, Q_ARG(AssetProcessor::JobEntry, entry));
+            });
+    }
+
     AZStd::string AssetManagerTestingBase::MakePath(const char* filename, bool intermediate)
     {
         auto cacheDir = GetCacheDir();

+ 25 - 1
Code/Tools/AssetProcessor/native/tests/assetmanager/AssetManagerTestingBase.h

@@ -73,6 +73,27 @@ namespace UnitTests
         void CheckJobEntries(int count);
     };
 
+    // Exposes protected data to automated tests.
+    class TestingRCController
+        : public AssetProcessor::RCController
+    {
+    public:
+        TestingRCController() = default;
+        explicit TestingRCController(int minJobs, int maxJobs, QObject* parent = 0)
+            : AssetProcessor::RCController(minJobs, maxJobs, parent)
+        {
+
+        }
+
+        // There are many queues in the asset processing process.
+        // This allows automated tests to examine the RCQueueSortModel, and compare
+        // to the other queues, to make sure the state of these systems matches what's expected.
+        AssetProcessor::RCQueueSortModel& GetRCQueueSortModel()
+        {
+            return m_RCQueueSortModel;
+        }
+    };
+
     class AssetManagerTestingBase : public ::UnitTest::LeakDetectionFixture
     {
     public:
@@ -85,6 +106,7 @@ namespace UnitTests
     protected:
         void RunFile(int expectedJobCount, int expectedFileCount = 1, int dependencyFileCount = 0);
         void ProcessJob(AssetProcessor::RCController& rcController, const AssetProcessor::JobDetails& jobDetails);
+        void WaitForNextJobToProcess(UnitTests::JobSignalReceiver& receiver);
 
         AZStd::string MakePath(const char* filename, bool intermediate);
 
@@ -114,6 +136,8 @@ namespace UnitTests
             AssetBuilderSDK::ProductOutputFlags outputFlags,
             bool outputExtraFile = false);
 
+        void SetCatalogToUpdateOnJobCompletion();
+
         const char* GetJobProcessFailText();
 
         AZ::IO::Path GetCacheDir();
@@ -146,7 +170,7 @@ namespace UnitTests
         AZ::Entity* m_jobManagerEntity{};
         AZ::ComponentDescriptor* m_descriptor{};
 
-        AZStd::unique_ptr<AssetProcessor::RCController> m_rc;
+        AZStd::unique_ptr<TestingRCController> m_rc;
 
         AZStd::vector<AssetProcessor::JobDetails> m_jobDetailsList;
 

+ 320 - 0
Code/Tools/AssetProcessor/native/tests/assetmanager/AssetProcessorManagerTest.cpp

@@ -12,6 +12,7 @@
 
 #include <AzCore/Settings/SettingsRegistryMergeUtils.h>
 #include <AzToolsFramework/Asset/AssetProcessorMessages.h>
+#include <AzToolsFramework/AssetDatabase/AssetDatabaseConnection.h>
 #include <AzToolsFramework/ToolsFileUtils/ToolsFileUtils.h>
 
 #include <AzTest/AzTest.h>
@@ -846,6 +847,325 @@ TEST_F(AssetProcessorIntermediateAssetTests, IntermediateAsset_SourceNoLongerEmi
 
 }
 
+// Used for tests that work with source dependencies and intermediate assets.
+class AssetProcessorIntermediateAssetSourceDependencyTests
+    : public AssetProcessorIntermediateAssetTests
+{
+public:
+
+    // Helper function - sets up the paths to files used by this test.
+    void GenerateAssetPaths()
+    {
+        // First, prep the paths and file names in use for the test.
+        m_firstFileName = AZ::IO::Path(m_firstFileNameNoExtension.c_str()).ReplaceExtension(m_firstFileExtension.c_str());
+        m_firstFilePath = AZ::IO::Path(m_scanfolder.m_scanFolder).Append(m_firstFileName);
+
+        m_secondFileName = AZ::IO::Path(m_secondFileNameNoExtension.c_str()).ReplaceExtension(m_secondFileExtension.c_str());
+        m_secondFilePath = AZ::IO::Path(m_scanfolder.m_scanFolder).Append(m_secondFileName);
+
+        m_intermediateFileName = AZ::IO::Path(m_firstFileNameNoExtension.c_str()).ReplaceExtension(m_intermediateExtension.c_str());
+        m_intermediateAssetPath = MakePath(m_intermediateFileName.c_str(), true);
+        m_firstProductPath = MakePath(AZ::IO::Path(m_firstFileNameNoExtension.c_str()).ReplaceExtension(m_firstProductExtension.c_str()).c_str(), false);
+
+        // Store the path to the product asset, so that the test can examine the contents of the product asset.
+        m_secondProductPath =
+            MakePath(AZ::IO::Path(m_secondFileNameNoExtension.c_str()).ReplaceExtension(m_SecondProductExtension.c_str()).c_str(), false);
+    }
+
+    // Helper function - generates the initial files ued by this test.
+    void CreateTestAssets()
+    {
+        AZ::Utils::WriteFile("unit test file", m_firstFilePath.c_str());
+        AZ::Utils::WriteFile("unit test file", m_secondFilePath.c_str());
+    }
+
+    // Creates three builders:
+    //  Source A - Outputs Intermediate A
+    //  Intermediate A - Outputs Product A
+    //  Source B - Outputs Product B, has a source dependency on Intermediate A.
+    void CreateBuilders()
+    {
+        using namespace AssetBuilderSDK;
+
+        // Source A's builder, this outputs the Intermediate asset.
+        CreateBuilder(
+            m_firstFileExtension.c_str(),
+            MakeWildcardForExtension(m_firstFileExtension).c_str(),
+            m_intermediateExtension.c_str(),
+            true,
+            ProductOutputFlags::IntermediateAsset);
+        // Intermediate A's builder, this outputs a product asset.
+        CreateBuilder(
+            m_intermediateExtension.c_str(),
+            MakeWildcardForExtension(m_intermediateExtension).c_str(),
+            m_firstProductExtension.c_str(),
+            false,
+            ProductOutputFlags::ProductAsset);
+
+        // Source B's builder. This builder emits the intermediate A as a dependency.
+        m_builderInfoHandler.CreateBuilderDesc(
+            QString(m_secondFileExtension.c_str()),
+            AZ::Uuid::CreateRandom().ToFixedString().c_str(),
+            { AssetBuilderPattern{ MakeWildcardForExtension(m_secondFileExtension), AssetBuilderPattern::Wildcard } },
+            [this]([[maybe_unused]] const CreateJobsRequest& request, CreateJobsResponse& response)
+            {
+                for (const auto& platform : request.m_enabledPlatforms)
+                {
+                    response.m_createJobOutputs.push_back(
+                        JobDescriptor{ "fingerprint", "Source B - Product", platform.m_identifier.c_str() });
+
+                    // Create the dependency on the path to the intermediate asset.
+                    response.m_createJobOutputs.back().m_jobDependencyList.push_back(JobDependency(
+                        m_intermediateExtension.c_str(),
+                        platform.m_identifier,
+                        JobDependencyType::Order,
+                        { m_intermediateAssetPath.c_str(),
+                          AZ::Uuid::CreateNull(),
+                          AssetBuilderSDK::SourceFileDependency::SourceFileDependencyType::Absolute }));
+                }
+                response.m_result = CreateJobsResultCode::Success;
+            },
+            [this](const ProcessJobRequest& request, ProcessJobResponse& response)
+            {
+                AZ::IO::Path outputFile = request.m_sourceFile;
+
+                outputFile.ReplaceExtension(m_SecondProductExtension.c_str());
+                outputFile = AZ::IO::Path(request.m_tempDirPath).Append(outputFile);
+
+                // Write if the intermediate exists or not to the product asset.
+                bool intermediateProductExists = (AZ::IO::FileIOBase::GetInstance()->Exists(m_firstProductPath.c_str()));
+
+                AZStd::string toWrite = m_intermediateProductExistsString;
+                if (!intermediateProductExists)
+                {
+                    toWrite = m_intermediateProductDoesNotExistString;
+                }
+                AZ::Utils::WriteFile(toWrite.c_str(), outputFile.c_str());
+
+                auto product =
+                    JobProduct{ outputFile.c_str(), AZ::Data::AssetType::CreateName(m_SecondProductExtension.c_str()), AssetSubId };
+
+                product.m_outputFlags = ProductOutputFlags::ProductAsset;
+
+                product.m_dependenciesHandled = true;
+                response.m_outputProducts.push_back(product);
+                response.m_resultCode = ProcessJobResult_Success;
+            },
+            "fingerprint");
+    }
+
+    void SetUp() override
+    {
+        AssetProcessorIntermediateAssetTests::SetUp();
+
+        GenerateAssetPaths();
+        CreateTestAssets();
+
+        // Jobs with dependencies need those dependencies to have updated the asset catalog before the job with the dependency runs.
+        SetCatalogToUpdateOnJobCompletion();
+
+        CreateBuilders();
+    }
+
+    void TearDown() override
+    {
+        AssetProcessorIntermediateAssetTests::TearDown();
+    }
+
+protected:
+
+    AZStd::string MakeWildcardForExtension(const AZStd::string& extension)
+    {
+        return AZStd::string::format("*.%.*s", AZ_STRING_ARG(extension));
+    }
+
+    AZStd::string m_firstFileExtension = "a_source";
+    AZStd::string m_firstFileNameNoExtension = "firstfile";
+    AZ::IO::Path m_firstFileName;
+    AZ::IO::Path m_firstFilePath;
+
+    AZStd::string m_intermediateExtension = "a_intermediate";
+    AZ::IO::Path m_intermediateFileName;
+    AZ::IO::Path m_intermediateAssetPath;
+
+    AZStd::string m_firstProductExtension = "a_product";
+    AZStd::string m_firstProductPath;
+
+    AZStd::string m_secondFileExtension = "b_source";
+    AZStd::string m_secondFileNameNoExtension = "secondfile";
+    AZ::IO::Path m_secondFileName;
+    AZ::IO::Path m_secondFilePath;
+
+    AZStd::string m_SecondProductExtension = "b_product";
+    AZStd::string m_secondProductPath;
+
+    AZStd::string m_intermediateProductExistsString = "Intermediate product exists.";
+    AZStd::string m_intermediateProductDoesNotExistString = "Intermediate product does not exist.";
+
+};
+
+TEST_F(AssetProcessorIntermediateAssetSourceDependencyTests, SourceDependencyIsIntermediateAsset_NotInitiallyAvailable_JobsWaitsForIntermediateJobToExistAndRun)
+{
+    // This is a regression test for a situation where Job B depends on an intermediate asset job (Job A -> Intermediate Job A).
+    // Before this was fixed, Job B would be queued before Job A and Job B was unaware that Job A created Intermediate Job A.
+    // After the fix, jobs with missing dependencies are queued to run later than other jobs, and Common platform jobs (Job A)
+    // are prioritized in the queue.
+    // Setup:
+    //  Builder for Source A is created. Processes "a_source" assets into "a_intermediate" assets.
+    //  Builder for Intermediate A is created. Processes "a_intermediate" into "a_product" assets.
+    //  Builder for Source B is created. Emits a job dependency specifically on the Intermediate A job in this test setup.
+    //  A source file for A and B are created.
+    // Test:
+    //  Jobs are both queued for Source A and Source B.
+    //  Source A job runs first, because it does not have a missing dependency.
+    //  Call asset processing updating in a specific order, out of processEvents order, because
+    //      of a race condition where Intermediate A hasn't been found and had a job created by the Asset Processor
+    //      before Job B comes up as next in the queue, causing Job B to report that it's running before its dependency is resolved.
+    //      Instead, ScheduleNextUpdate -> ProcessFilesToExamineQueue -> CheckForIdle will make sure that the Intermediate A job is emitted.
+    //  Verify that there are two jobs ready to submitted to the resource compiler: A new Job B with the dependency resolved, and Intermediate A.
+    //  Process assets twice.
+    //  Verify that the jobs run in order, and that the product of Intermediate A is available on disk during the processing of Job B.
+
+
+    QMetaObject::invokeMethod(
+        m_assetProcessorManager.get(), "AssessAddedFile", Qt::QueuedConnection, Q_ARG(QString, m_firstFilePath.c_str()));
+    QMetaObject::invokeMethod(
+        m_assetProcessorManager.get(), "AssessAddedFile", Qt::QueuedConnection, Q_ARG(QString, m_secondFilePath.c_str()));
+
+    QCoreApplication::processEvents();
+    m_fileCompiled = false;
+    m_fileFailed = false;
+    RunFile(2, 2);
+
+    // Queue both jobs at the same time, so that RCQueueSortModel.cpp will order these jobs and run them in order.
+    // TODO: Note that the PC job (asset B) will always run before the Common job (asset A) because the system
+    // purposely prioritizes host platform jobs first.
+    ASSERT_EQ(m_jobDetailsList.size(), 2);
+    // One of the two jobs should have the missing source dependency flagged for follow up.
+    EXPECT_NE(m_jobDetailsList[0].m_hasMissingSourceDependency, m_jobDetailsList[1].m_hasMissingSourceDependency);
+    m_rc->JobSubmitted(m_jobDetailsList[0]);
+    m_rc->JobSubmitted(m_jobDetailsList[1]);
+    m_jobDetailsList.clear();
+    EXPECT_EQ(m_rc->NumberOfPendingJobsPerPlatform("pc"), 1);
+    EXPECT_EQ(m_rc->NumberOfPendingJobsPerPlatform(AssetBuilderSDK::CommonPlatformName), 1);
+
+    UnitTests::JobSignalReceiver receiver;
+
+    // Process the first asset in the queue, which will be the job for A, because B had a missing dependency and was made lower priority.
+    m_rc->DispatchJobsImpl();
+
+    // Pause dispatching. In this test scenario, with only two assets queued,
+    // it's likely that Job B will finish before Job B is re-issued due to the newly
+    // updated source asset.
+    // This happens frequently when step-through debugging this function with breakpoints.
+    // This can also happen when running many tests in parallel: There are a lot of async calls
+    // in this test, and depending on the state of the machine running this test, those may resolve
+    // in a different order or timing. Pausing dispatching until the moment that dispatchJobsImpl is called
+    // mitigates this: Jobs only execute when this test needs them to.
+    m_rc->SetDispatchPaused(true);
+
+    receiver.WaitForFinish();
+
+    QCoreApplication::processEvents(); // RCJob::Finished : Once more to trigger the JobFinished event
+    QCoreApplication::processEvents(); // RCController::FinishJob : Again to trigger the Finished event 
+
+    // Product B shouldn't exist yet because A was processed first.
+    EXPECT_FALSE(AZ::IO::FileIOBase::GetInstance()->Exists(m_secondProductPath.c_str()));
+
+    // Emit that A was finished processing.
+    m_assetProcessorManager->AssetProcessed(m_processedJobEntry, m_processJobResponse);
+
+    EXPECT_EQ(m_rc->NumberOfPendingJobsPerPlatform("pc"), 1);
+    EXPECT_EQ(m_rc->NumberOfPendingJobsPerPlatform(AssetBuilderSDK::CommonPlatformName), 0);
+
+    // Manually call each step to make sure the newly created intermediate asset gets discovered and queued
+    // before the pending job with a missing dependency is run, because the queue has one entry right now.
+    // Otherwise, it's likely the resource compiler will pick up the next job before the intermediate job is processed.
+    // Which means the missing dependency warning will fire off and cause this test to fail.
+    // This is a race condition in asset processing: If the last asset to process fills the missing dependency of the
+    // next job in the queue, then there's a brief period where intermediate asset hasn't been discovered yet and isn't queued,
+    // so the next job, with the missing dependency will run. This is mitigated by having the Common platform jobs run
+    // before non-Common platform jobs. Additional mitigation isn't currently necessary.
+    m_assetProcessorManager->ScheduleNextUpdate();
+    m_assetProcessorManager->ProcessFilesToExamineQueue();
+    // Call CheckForIdle twice, once for each pending asset.
+    m_assetProcessorManager->CheckForIdle();
+    m_assetProcessorManager->CheckForIdle();
+
+    // Make sure events are dispatched and m_jobDetailsList is updated with both jobs.
+
+    // The queue should now be the job to process the intermediate asset, and the re-created Job B, which no longer has a missing dependency.
+    ASSERT_EQ(m_jobDetailsList.size(), 2);
+    // Neither job in the queue should have the missing source dependency flag.
+    EXPECT_FALSE(m_jobDetailsList[0].m_hasMissingSourceDependency);
+    EXPECT_FALSE(m_jobDetailsList[1].m_hasMissingSourceDependency);
+    m_rc->JobSubmitted(m_jobDetailsList[0]);
+    m_rc->JobSubmitted(m_jobDetailsList[1]);
+    m_jobDetailsList.clear();
+
+    // The platform is for the target output, not for the platform the source was on, so the intermediate asset job will
+    // have the PC platform.
+    EXPECT_EQ(m_rc->NumberOfPendingJobsPerPlatform("pc"), 2);
+    EXPECT_EQ(m_rc->NumberOfPendingJobsPerPlatform(AssetBuilderSDK::CommonPlatformName), 0);
+
+    // The original Job B, with the missing dependency, was canceled.
+    // Verify the pending job list matches the model's queue.
+    // Canceling jobs doesn't guarantee the pending count will be updated, but it does guarantee the row count
+    // of the model is updated. This check verifies that the call to cancel the job was done correctly, and updated
+    // the pending list.
+    AssetProcessor::RCQueueSortModel& sortModel = m_rc->GetRCQueueSortModel();
+    EXPECT_EQ(sortModel.rowCount(), m_rc->NumberOfPendingJobsPerPlatform("pc"));
+    m_rc->SetDispatchPaused(false);
+    m_rc->DispatchJobsImpl();
+    m_rc->SetDispatchPaused(true);
+
+    receiver.WaitForFinish();
+
+    QCoreApplication::processEvents(); // RCJob::Finished : Once more to trigger the JobFinished event
+    QCoreApplication::processEvents(); // RCController::FinishJob : Again to trigger the Finished event 
+
+    // Mark this asset as processed, so AP will move on to the next steps.
+    m_assetProcessorManager->AssetProcessed(m_processedJobEntry, m_processJobResponse);
+
+    // Need to call process events multiple times, to get the job details list populated with the intermediate asset job
+    // Due to timing of this test, these events may end up running in different process events calls.
+    // updates a lot of general AssetProcessor systems:
+    // AssetProcessorManager::ScheduleNextUpdate, AssetProcessorManager::ProcessFilesToExamineQueue,
+    // AssetProcessorManager::QueueIdleCheck, AssetProcessorManager::ProcessBuilders, and more.
+    // Also updates several AssetProcessor systems that the previous step updates,
+    // but the main events this is run for is two calls to AssetProcessorManager::AssetToProcess
+    // which puts Cache/Intermediate Assets/firstfile.a_intermediate and secondfile.b_source in m_jobDetailsList
+    // Make sure the job requests are populated and ready to go, without this, RCJob::PopulateProcessJobRequest sometimes crashes accessing job info.
+    QCoreApplication::processEvents();
+    QCoreApplication::processEvents();
+    QCoreApplication::processEvents();
+
+    // Verify that the job for the second asset didn't process yet, because it has a job dependency on the intermediate asset job.
+    EXPECT_FALSE(AZ::IO::FileIOBase::GetInstance()->Exists(m_secondProductPath.c_str()));
+
+    // Emit that the intermediate job was finished processing.
+    m_assetProcessorManager->AssetProcessed(m_processedJobEntry, m_processJobResponse);
+
+    EXPECT_EQ(m_rc->NumberOfPendingJobsPerPlatform("pc"), 1);
+    EXPECT_EQ(m_rc->NumberOfPendingJobsPerPlatform(AssetBuilderSDK::CommonPlatformName), 0);
+
+    // Make sure all remaning non-canceled jobs are processed.
+    m_rc->SetDispatchPaused(false);
+    WaitForNextJobToProcess(receiver);
+
+    // Verify the final product is marked as existing.
+    auto readResult = AZ::Utils::ReadFile<AZStd::string>(m_secondProductPath.c_str(), AZStd::numeric_limits<size_t>::max());
+    EXPECT_TRUE(readResult.IsSuccess());
+    EXPECT_EQ(readResult.GetValue().compare(m_intermediateProductExistsString), 0);
+
+    // Examine the queue directly : There should be nothing left in the pending job queue, or the sort model's row count.
+    EXPECT_EQ(m_rc->NumberOfPendingJobsPerPlatform("pc"), 0);
+    EXPECT_EQ(sortModel.GetNextPendingJob(), nullptr);
+
+    // The canceled job was removed from the actual sort model, it just wasn't removed from the pending per platform list.
+    EXPECT_EQ(sortModel.rowCount(), 0);
+}
+
 TEST_F(AssetProcessorManagerTest, WarningsAndErrorsReported_SuccessfullySavedToDatabase)
 {
     // This tests the JobDiagnosticTracker:  Warnings/errors reported to it should be recorded in the database when AssetProcessed is fired and able to be retrieved when querying job status