| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466 |
- /*
- * Copyright (c) Contributors to the Open 3D Engine Project.
- * For complete copyright and license terms please see the LICENSE at the root of this distribution.
- *
- * SPDX-License-Identifier: Apache-2.0 OR MIT
- *
- */
- #include <AWSMetricsBus.h>
- #include <AWSMetricsConstant.h>
- #include <AWSMetricsServiceApi.h>
- #include <ClientConfiguration.h>
- #include <DefaultClientIdProvider.h>
- #include <MetricsEvent.h>
- #include <MetricsEventBuilder.h>
- #include <MetricsManager.h>
- #include <AzCore/Jobs/JobFunction.h>
- #include <AzCore/IO/FileIO.h>
- #include <AzCore/Math/MathUtils.h>
- #include <AzCore/std/smart_ptr/make_shared.h>
- namespace AWSMetrics
- {
- MetricsManager::MetricsManager()
- : m_clientConfiguration(AZStd::make_unique<ClientConfiguration>())
- , m_clientIdProvider(IdentityProvider::CreateIdentityProvider())
- , m_monitorTerminated(true)
- , m_sendMetricsId(0)
- {
- }
- MetricsManager::~MetricsManager()
- {
- ShutdownMetrics();
- }
- bool MetricsManager::Init()
- {
- if (!m_clientConfiguration->InitClientConfiguration())
- {
- return false;
- }
- SetupJobContext();
- return true;
- }
- void MetricsManager::StartMetrics()
- {
- if (!m_monitorTerminated)
- {
- // The background thread has been started.
- return;
- }
- m_monitorTerminated = false;
- // Start a separate thread to monitor and consume the metrics queue.
- // Avoid using the job system since the worker is long-running over multiple frames
- m_monitorThread = AZStd::thread(AZStd::bind(&MetricsManager::MonitorMetricsQueue, this));
- }
- void MetricsManager::MonitorMetricsQueue()
- {
- // Continue to loop until the monitor is terminated.
- while (!m_monitorTerminated)
- {
- // The thread will wake up either when the metrics event queue is full (try_acquire_for call returns true),
- // or the flush period limit is hit (try_acquire_for call returns false).
- m_waitEvent.try_acquire_for(AZStd::chrono::seconds(m_clientConfiguration->GetQueueFlushPeriodInSeconds()));
- FlushMetricsAsync();
- }
- }
- void MetricsManager::SetupJobContext()
- {
- // Avoid using the default job context since we will do blocking IO instead of CPU/memory intensive work
- unsigned int numWorkerThreads = AZ::GetMin(DesiredMaxWorkers, AZStd::thread::hardware_concurrency());
- AZ::JobManagerDesc jobDesc;
- AZ::JobManagerThreadDesc threadDesc;
- for (unsigned int i = 0; i < numWorkerThreads; ++i)
- {
- jobDesc.m_workerThreads.push_back(threadDesc);
- }
- m_jobManager.reset(aznew AZ::JobManager{ jobDesc });
- m_jobContext.reset(aznew AZ::JobContext{ *m_jobManager });
- }
- bool MetricsManager::SubmitMetrics(const AZStd::vector<MetricsAttribute>& metricsAttributes, int eventPriority, const AZStd::string& eventSourceOverride)
- {
- MetricsEvent metricsEvent = MetricsEventBuilder().
- AddDefaultMetricsAttributes(m_clientIdProvider->GetIdentifier(), eventSourceOverride).
- AddMetricsAttributes(metricsAttributes).
- SetMetricsPriority(eventPriority).
- Build();
- if (!metricsEvent.ValidateAgainstSchema())
- {
- m_globalStats.m_numDropped++;
- return false;
- }
- AZStd::lock_guard<AZStd::mutex> lock(m_metricsMutex);
- m_metricsQueue.AddMetrics(metricsEvent);
- if (m_metricsQueue.GetSizeInBytes() >= static_cast<size_t>(m_clientConfiguration->GetMaxQueueSizeInBytes()))
- {
- // Flush the metrics queue when the accumulated metrics size hits the limit
- m_waitEvent.release();
- }
- return true;
- }
- bool MetricsManager::SendMetricsAsync(const AZStd::vector<MetricsAttribute>& metricsAttributes, int eventPriority, const AZStd::string & eventSourceOverride)
- {
- MetricsEvent metricsEvent = MetricsEventBuilder().
- AddDefaultMetricsAttributes(m_clientIdProvider->GetIdentifier(), eventSourceOverride).
- AddMetricsAttributes(metricsAttributes).
- SetMetricsPriority(eventPriority).
- Build();
- if (!metricsEvent.ValidateAgainstSchema())
- {
- m_globalStats.m_numDropped++;
- return false;
- }
- auto metricsToFlush = AZStd::make_shared<MetricsQueue>();
- metricsToFlush->AddMetrics(metricsEvent);
- SendMetricsAsync(metricsToFlush);
- return true;
- }
- void MetricsManager::SendMetricsAsync(AZStd::shared_ptr<MetricsQueue> metricsQueue)
- {
- if (m_clientConfiguration->OfflineRecordingEnabled())
- {
- SendMetricsToLocalFileAsync(metricsQueue);
- }
- else
- {
- // Constant used to convert size limit from MB to Bytes.
- static constexpr int MbToBytes = 1000000;
- while (metricsQueue->GetNumMetrics() > 0)
- {
- // Break the metrics queue by the payload and records count limits. Make one or more service API requests to send all the buffered metrics.
- MetricsQueue metricsEventsToProcess;
- metricsQueue->PopBufferedEventsByServiceLimits(metricsEventsToProcess, AwsMetricsMaxRestApiPayloadSizeInMb * MbToBytes, AwsMetricsMaxKinesisBatchedRecordCount);
- SendMetricsToServiceApiAsync(metricsEventsToProcess);
- }
- }
- }
- void MetricsManager::SendMetricsToLocalFileAsync(AZStd::shared_ptr<MetricsQueue> metricsQueue)
- {
- int requestId = ++m_sendMetricsId;
- // Send metrics to a local file
- AZ::Job* job{nullptr};
- job = AZ::CreateJobFunction(
- [this, metricsQueue, requestId]()
- {
- AZ::Outcome<void, AZStd::string> outcome = SendMetricsToFile(metricsQueue);
- if (outcome.IsSuccess())
- {
- // Generate response records for success call to keep consistency with the Service API response
- ServiceAPI::PostMetricsEventsResponseEntries responseEntries;
- int numMetricsEventsInRequest = metricsQueue->GetNumMetrics();
- for (int index = 0; index < numMetricsEventsInRequest; ++index)
- {
- ServiceAPI::PostMetricsEventsResponseEntry responseEntry;
- responseEntry.m_result = AwsMetricsPostMetricsEventsResponseEntrySuccessResult;
- responseEntries.emplace_back(responseEntry);
- }
- OnResponseReceived(*metricsQueue, responseEntries);
- AZ::TickBus::QueueFunction([requestId]()
- {
- AWSMetricsNotificationBus::Broadcast(&AWSMetricsNotifications::OnSendMetricsSuccess, requestId);
- });
- }
- else
- {
- OnResponseReceived(*metricsQueue);
- AZStd::string errorMessage = outcome.GetError();
- AZ::TickBus::QueueFunction([requestId, errorMessage]()
- {
- AWSMetricsNotificationBus::Broadcast(&AWSMetricsNotifications::OnSendMetricsFailure, requestId, errorMessage);
- });
- }
- },
- true, m_jobContext.get());
- job->Start();
- }
- void MetricsManager::SendMetricsToServiceApiAsync(const MetricsQueue& metricsQueue)
- {
- int requestId = ++m_sendMetricsId;
- ServiceAPI::PostMetricsEventsRequestJob* requestJob = ServiceAPI::PostMetricsEventsRequestJob::Create(
- [this, requestId](ServiceAPI::PostMetricsEventsRequestJob* successJob)
- {
- OnResponseReceived(successJob->parameters.m_metricsQueue, successJob->result.m_responseEntries);
- AZ::TickBus::QueueFunction([requestId]()
- {
- AWSMetricsNotificationBus::Broadcast(&AWSMetricsNotifications::OnSendMetricsSuccess, requestId);
- });
- },
- [this, requestId](ServiceAPI::PostMetricsEventsRequestJob* failedJob)
- {
- OnResponseReceived(failedJob->parameters.m_metricsQueue);
- AZStd::string errorMessage = failedJob->error.message;
- AZ::TickBus::QueueFunction([requestId, errorMessage]()
- {
- AWSMetricsNotificationBus::Broadcast(&AWSMetricsNotifications::OnSendMetricsFailure, requestId, errorMessage);
- });
- });
- requestJob->parameters.m_metricsQueue = AZStd::move(metricsQueue);
- requestJob->Start();
- }
- void MetricsManager::OnResponseReceived(const MetricsQueue& metricsEventsInRequest, const ServiceAPI::PostMetricsEventsResponseEntries& responseEntries)
- {
- MetricsQueue metricsEventsForRetry;
- int numMetricsEventsInRequest = metricsEventsInRequest.GetNumMetrics();
- for (int index = 0; index < numMetricsEventsInRequest; ++index)
- {
- MetricsEvent metricsEvent = metricsEventsInRequest[index];
- if (responseEntries.size() > 0 && responseEntries[index].m_result == AwsMetricsPostMetricsEventsResponseEntrySuccessResult)
- {
- // The metrics event is sent to the backend successfully.
- if (metricsEvent.GetNumFailures() == 0)
- {
- m_globalStats.m_numEvents++;
- }
- else
- {
- // Reduce the number of errors when the retry succeeds.
- m_globalStats.m_numErrors--;
- }
- m_globalStats.m_numSuccesses++;
- m_globalStats.m_sendSizeInBytes += static_cast<uint32_t>(metricsEvent.GetSizeInBytes());
- }
- else
- {
- metricsEvent.MarkFailedSubmission();
- // The metrics event failed to be sent to the backend for the first time.
- if (metricsEvent.GetNumFailures() == 1)
- {
- m_globalStats.m_numErrors++;
- m_globalStats.m_numEvents++;
- }
- if (metricsEvent.GetNumFailures() <= m_clientConfiguration->GetMaxNumRetries())
- {
- metricsEventsForRetry.AddMetrics(metricsEvent);
- }
- else
- {
- m_globalStats.m_numDropped++;
- }
- }
- }
- PushMetricsForRetry(metricsEventsForRetry);
- }
- void MetricsManager::PushMetricsForRetry(MetricsQueue& metricsEventsForRetry)
- {
- if (m_clientConfiguration->GetMaxNumRetries() == 0)
- {
- // No retry is required.
- m_globalStats.m_numDropped += metricsEventsForRetry.GetNumMetrics();
- return;
- }
- // Push failed events to the front of the queue and reserve the order.
- AZStd::lock_guard<AZStd::mutex> lock(m_metricsMutex);
- m_metricsQueue.PushMetricsToFront(metricsEventsForRetry);
- // Filter metrics events by priority since the queue might be full.
- m_globalStats.m_numDropped += m_metricsQueue.FilterMetricsByPriority(m_clientConfiguration->GetMaxQueueSizeInBytes());
- }
- AZ::Outcome<void, AZStd::string> MetricsManager::SendMetricsToFile(AZStd::shared_ptr<MetricsQueue> metricsQueue)
- {
- AZStd::lock_guard<AZStd::mutex> lock(m_metricsFileMutex);
- AZ::IO::FileIOBase* fileIO = AZ::IO::FileIOBase::GetDirectInstance();
- if (!fileIO)
- {
- return AZ::Failure(AZStd::string{ "No FileIoBase Instance." });
- }
- const char* metricsFileFullPath = m_clientConfiguration->GetMetricsFileFullPath();
- const char* metricsFileDir = m_clientConfiguration->GetMetricsFileDir();
- MetricsQueue existingMetricsEvents;
- if (!metricsFileFullPath || !metricsFileDir)
- {
- return AZ::Failure(AZStd::string{ "Failed to get the metrics file directory or path." });
- }
- if (fileIO->Exists(metricsFileFullPath) && !existingMetricsEvents.ReadFromJson(metricsFileFullPath))
- {
- return AZ::Failure(AZStd::string{ "Failed to read the existing metrics on disk" });
- }
- else if (!fileIO->Exists(metricsFileDir) && !fileIO->CreatePath(metricsFileDir))
- {
- return AZ::Failure(AZStd::string{ "Failed to create metrics directory" });
- }
- // Append a copy of the metrics queue in the request to the existing metrics events and keep the original submission order.
- // Do not modify the metrics queue in the request directly for identifying the metrics events for retry on failure.
- MetricsQueue metricsEventsInRequest = *metricsQueue;
- existingMetricsEvents.AppendMetrics(metricsEventsInRequest);
- AZStd::string serializedMetrics = existingMetricsEvents.SerializeToJson();
- AZ::IO::HandleType fileHandle;
- if (!fileIO->Open(metricsFileFullPath, AZ::IO::OpenMode::ModeWrite | AZ::IO::OpenMode::ModeText, fileHandle))
- {
- return AZ::Failure(AZStd::string{ "Failed to open metrics file" });
- }
- fileIO->Write(fileHandle, serializedMetrics.c_str(), serializedMetrics.size());
- fileIO->Close(fileHandle);
- return AZ::Success();
- }
- void MetricsManager::FlushMetricsAsync()
- {
- AZStd::lock_guard<AZStd::mutex> lock(m_metricsMutex);
- if (m_metricsQueue.GetNumMetrics() == 0)
- {
- return;
- }
- auto metricsToFlush = AZStd::make_shared<MetricsQueue>();
- metricsToFlush->AppendMetrics(m_metricsQueue);
- m_metricsQueue.ClearMetrics();
- SendMetricsAsync(metricsToFlush);
- }
- void MetricsManager::ShutdownMetrics()
- {
- if (m_monitorTerminated)
- {
- return;
- }
- // Terminate the monitor thread
- m_monitorTerminated = true;
- m_waitEvent.release();
- if (m_monitorThread.joinable())
- {
- m_monitorThread.join();
- }
- }
- AZ::s64 MetricsManager::GetNumBufferedMetrics()
- {
- AZStd::lock_guard<AZStd::mutex> lock(m_metricsMutex);
- return m_metricsQueue.GetNumMetrics();
- }
- const GlobalStatistics& MetricsManager::GetGlobalStatistics() const
- {
- return m_globalStats;
- }
- void MetricsManager::UpdateOfflineRecordingStatus(bool enable, bool submitLocalMetrics)
- {
- m_clientConfiguration->UpdateOfflineRecordingStatus(enable);
- if (!enable && submitLocalMetrics)
- {
- SubmitLocalMetricsAsync();
- }
- }
- void MetricsManager::SubmitLocalMetricsAsync()
- {
- AZ::Job* job{ nullptr };
- job = AZ::CreateJobFunction([this]()
- {
- AZ::IO::FileIOBase* fileIO = AZ::IO::FileIOBase::GetDirectInstance();
- if (!fileIO)
- {
- AZ_Error("AWSMetrics", false, "No FileIoBase Instance.");
- return;
- }
- // Read metrics from the local metrics file.
- AZStd::lock_guard<AZStd::mutex> file_lock(m_metricsFileMutex);
- if (!fileIO->Exists(GetMetricsFilePath()))
- {
- // Local metrics file doesn't exist.
- return;
- }
- MetricsQueue offlineRecords;
- if (!offlineRecords.ReadFromJson(GetMetricsFilePath()))
- {
- AZ_Error("AWSMetrics", false, "Failed to read from the local metrics file %s", GetMetricsFilePath());
- return;
- }
- // Submit the metrics read from the local metrics file.
- int numOfflineRecords = offlineRecords.GetNumMetrics();
- for (int index = 0; index < numOfflineRecords; ++index)
- {
- AZStd::lock_guard<AZStd::mutex> lock(m_metricsMutex);
- m_metricsQueue.AddMetrics(offlineRecords[index]);
- if (m_metricsQueue.GetSizeInBytes() >= static_cast<size_t>(m_clientConfiguration->GetMaxQueueSizeInBytes()))
- {
- // Flush the metrics queue when the accumulated metrics size hits the limit
- m_waitEvent.release();
- }
- }
- // Remove the local metrics file after reading all its content.
- if (!fileIO->Remove(GetMetricsFilePath()))
- {
- AZ_Error("AWSMetrics", false, "Failed to remove the local metrics file %s", GetMetricsFilePath());
- return;
- }
- }, true, m_jobContext.get());
- job->Start();
- }
- const char* MetricsManager::GetMetricsFileDirectory() const
- {
- return m_clientConfiguration->GetMetricsFileDir();
- }
- const char* MetricsManager::GetMetricsFilePath() const
- {
- return m_clientConfiguration->GetMetricsFileFullPath();
- }
- int MetricsManager::GetNumTotalRequests() const
- {
- return m_sendMetricsId.load();
- }
- }
|