3
0

MetricsManager.cpp 17 KB


  1. /*
  2. * Copyright (c) Contributors to the Open 3D Engine Project.
  3. * For complete copyright and license terms please see the LICENSE at the root of this distribution.
  4. *
  5. * SPDX-License-Identifier: Apache-2.0 OR MIT
  6. *
  7. */
  8. #include <AWSMetricsBus.h>
  9. #include <AWSMetricsConstant.h>
  10. #include <AWSMetricsServiceApi.h>
  11. #include <ClientConfiguration.h>
  12. #include <DefaultClientIdProvider.h>
  13. #include <MetricsEvent.h>
  14. #include <MetricsEventBuilder.h>
  15. #include <MetricsManager.h>
  16. #include <AzCore/Jobs/JobFunction.h>
  17. #include <AzCore/IO/FileIO.h>
  18. #include <AzCore/Math/MathUtils.h>
  19. #include <AzCore/std/smart_ptr/make_shared.h>
  20. namespace AWSMetrics
  21. {
  22. MetricsManager::MetricsManager()
  23. : m_clientConfiguration(AZStd::make_unique<ClientConfiguration>())
  24. , m_clientIdProvider(IdentityProvider::CreateIdentityProvider())
  25. , m_monitorTerminated(true)
  26. , m_sendMetricsId(0)
  27. {
  28. }
  29. MetricsManager::~MetricsManager()
  30. {
  31. ShutdownMetrics();
  32. }
  33. bool MetricsManager::Init()
  34. {
  35. if (!m_clientConfiguration->InitClientConfiguration())
  36. {
  37. return false;
  38. }
  39. SetupJobContext();
  40. return true;
  41. }
  42. void MetricsManager::StartMetrics()
  43. {
  44. if (!m_monitorTerminated)
  45. {
  46. // The background thread has been started.
  47. return;
  48. }
  49. m_monitorTerminated = false;
  50. // Start a separate thread to monitor and consume the metrics queue.
  51. // Avoid using the job system since the worker is long-running over multiple frames
  52. m_monitorThread = AZStd::thread(AZStd::bind(&MetricsManager::MonitorMetricsQueue, this));
  53. }
  54. void MetricsManager::MonitorMetricsQueue()
  55. {
  56. // Continue to loop until the monitor is terminated.
  57. while (!m_monitorTerminated)
  58. {
  59. // The thread will wake up either when the metrics event queue is full (try_acquire_for call returns true),
  60. // or the flush period limit is hit (try_acquire_for call returns false).
  61. m_waitEvent.try_acquire_for(AZStd::chrono::seconds(m_clientConfiguration->GetQueueFlushPeriodInSeconds()));
  62. FlushMetricsAsync();
  63. }
  64. }
  65. void MetricsManager::SetupJobContext()
  66. {
  67. // Avoid using the default job context since we will do blocking IO instead of CPU/memory intensive work
  68. unsigned int numWorkerThreads = AZ::GetMin(DesiredMaxWorkers, AZStd::thread::hardware_concurrency());
  69. AZ::JobManagerDesc jobDesc;
  70. AZ::JobManagerThreadDesc threadDesc;
  71. for (unsigned int i = 0; i < numWorkerThreads; ++i)
  72. {
  73. jobDesc.m_workerThreads.push_back(threadDesc);
  74. }
  75. m_jobManager.reset(aznew AZ::JobManager{ jobDesc });
  76. m_jobContext.reset(aznew AZ::JobContext{ *m_jobManager });
  77. }
  78. bool MetricsManager::SubmitMetrics(const AZStd::vector<MetricsAttribute>& metricsAttributes, int eventPriority, const AZStd::string& eventSourceOverride)
  79. {
  80. MetricsEvent metricsEvent = MetricsEventBuilder().
  81. AddDefaultMetricsAttributes(m_clientIdProvider->GetIdentifier(), eventSourceOverride).
  82. AddMetricsAttributes(metricsAttributes).
  83. SetMetricsPriority(eventPriority).
  84. Build();
  85. if (!metricsEvent.ValidateAgainstSchema())
  86. {
  87. m_globalStats.m_numDropped++;
  88. return false;
  89. }
  90. AZStd::lock_guard<AZStd::mutex> lock(m_metricsMutex);
  91. m_metricsQueue.AddMetrics(metricsEvent);
  92. if (m_metricsQueue.GetSizeInBytes() >= static_cast<size_t>(m_clientConfiguration->GetMaxQueueSizeInBytes()))
  93. {
  94. // Flush the metrics queue when the accumulated metrics size hits the limit
  95. m_waitEvent.release();
  96. }
  97. return true;
  98. }
  99. bool MetricsManager::SendMetricsAsync(const AZStd::vector<MetricsAttribute>& metricsAttributes, int eventPriority, const AZStd::string & eventSourceOverride)
  100. {
  101. MetricsEvent metricsEvent = MetricsEventBuilder().
  102. AddDefaultMetricsAttributes(m_clientIdProvider->GetIdentifier(), eventSourceOverride).
  103. AddMetricsAttributes(metricsAttributes).
  104. SetMetricsPriority(eventPriority).
  105. Build();
  106. if (!metricsEvent.ValidateAgainstSchema())
  107. {
  108. m_globalStats.m_numDropped++;
  109. return false;
  110. }
  111. auto metricsToFlush = AZStd::make_shared<MetricsQueue>();
  112. metricsToFlush->AddMetrics(metricsEvent);
  113. SendMetricsAsync(metricsToFlush);
  114. return true;
  115. }
  116. void MetricsManager::SendMetricsAsync(AZStd::shared_ptr<MetricsQueue> metricsQueue)
  117. {
  118. if (m_clientConfiguration->OfflineRecordingEnabled())
  119. {
  120. SendMetricsToLocalFileAsync(metricsQueue);
  121. }
  122. else
  123. {
  124. // Constant used to convert size limit from MB to Bytes.
  125. static constexpr int MbToBytes = 1000000;
  126. while (metricsQueue->GetNumMetrics() > 0)
  127. {
  128. // Break the metrics queue by the payload and records count limits. Make one or more service API requests to send all the buffered metrics.
  129. MetricsQueue metricsEventsToProcess;
  130. metricsQueue->PopBufferedEventsByServiceLimits(metricsEventsToProcess, AwsMetricsMaxRestApiPayloadSizeInMb * MbToBytes, AwsMetricsMaxKinesisBatchedRecordCount);
  131. SendMetricsToServiceApiAsync(metricsEventsToProcess);
  132. }
  133. }
  134. }
  135. void MetricsManager::SendMetricsToLocalFileAsync(AZStd::shared_ptr<MetricsQueue> metricsQueue)
  136. {
  137. int requestId = ++m_sendMetricsId;
  138. // Send metrics to a local file
  139. AZ::Job* job{nullptr};
  140. job = AZ::CreateJobFunction(
  141. [this, metricsQueue, requestId]()
  142. {
  143. AZ::Outcome<void, AZStd::string> outcome = SendMetricsToFile(metricsQueue);
  144. if (outcome.IsSuccess())
  145. {
  146. // Generate response records for success call to keep consistency with the Service API response
  147. ServiceAPI::PostMetricsEventsResponseEntries responseEntries;
  148. int numMetricsEventsInRequest = metricsQueue->GetNumMetrics();
  149. for (int index = 0; index < numMetricsEventsInRequest; ++index)
  150. {
  151. ServiceAPI::PostMetricsEventsResponseEntry responseEntry;
  152. responseEntry.m_result = AwsMetricsPostMetricsEventsResponseEntrySuccessResult;
  153. responseEntries.emplace_back(responseEntry);
  154. }
  155. OnResponseReceived(*metricsQueue, responseEntries);
  156. AZ::TickBus::QueueFunction([requestId]()
  157. {
  158. AWSMetricsNotificationBus::Broadcast(&AWSMetricsNotifications::OnSendMetricsSuccess, requestId);
  159. });
  160. }
  161. else
  162. {
  163. OnResponseReceived(*metricsQueue);
  164. AZStd::string errorMessage = outcome.GetError();
  165. AZ::TickBus::QueueFunction([requestId, errorMessage]()
  166. {
  167. AWSMetricsNotificationBus::Broadcast(&AWSMetricsNotifications::OnSendMetricsFailure, requestId, errorMessage);
  168. });
  169. }
  170. },
  171. true, m_jobContext.get());
  172. job->Start();
  173. }
  174. void MetricsManager::SendMetricsToServiceApiAsync(const MetricsQueue& metricsQueue)
  175. {
  176. int requestId = ++m_sendMetricsId;
  177. ServiceAPI::PostMetricsEventsRequestJob* requestJob = ServiceAPI::PostMetricsEventsRequestJob::Create(
  178. [this, requestId](ServiceAPI::PostMetricsEventsRequestJob* successJob)
  179. {
  180. OnResponseReceived(successJob->parameters.m_metricsQueue, successJob->result.m_responseEntries);
  181. AZ::TickBus::QueueFunction([requestId]()
  182. {
  183. AWSMetricsNotificationBus::Broadcast(&AWSMetricsNotifications::OnSendMetricsSuccess, requestId);
  184. });
  185. },
  186. [this, requestId](ServiceAPI::PostMetricsEventsRequestJob* failedJob)
  187. {
  188. OnResponseReceived(failedJob->parameters.m_metricsQueue);
  189. AZStd::string errorMessage = failedJob->error.message;
  190. AZ::TickBus::QueueFunction([requestId, errorMessage]()
  191. {
  192. AWSMetricsNotificationBus::Broadcast(&AWSMetricsNotifications::OnSendMetricsFailure, requestId, errorMessage);
  193. });
  194. });
  195. requestJob->parameters.m_metricsQueue = AZStd::move(metricsQueue);
  196. requestJob->Start();
  197. }
  198. void MetricsManager::OnResponseReceived(const MetricsQueue& metricsEventsInRequest, const ServiceAPI::PostMetricsEventsResponseEntries& responseEntries)
  199. {
  200. MetricsQueue metricsEventsForRetry;
  201. int numMetricsEventsInRequest = metricsEventsInRequest.GetNumMetrics();
  202. for (int index = 0; index < numMetricsEventsInRequest; ++index)
  203. {
  204. MetricsEvent metricsEvent = metricsEventsInRequest[index];
  205. if (responseEntries.size() > 0 && responseEntries[index].m_result == AwsMetricsPostMetricsEventsResponseEntrySuccessResult)
  206. {
  207. // The metrics event is sent to the backend successfully.
  208. if (metricsEvent.GetNumFailures() == 0)
  209. {
  210. m_globalStats.m_numEvents++;
  211. }
  212. else
  213. {
  214. // Reduce the number of errors when the retry succeeds.
  215. m_globalStats.m_numErrors--;
  216. }
  217. m_globalStats.m_numSuccesses++;
  218. m_globalStats.m_sendSizeInBytes += static_cast<uint32_t>(metricsEvent.GetSizeInBytes());
  219. }
  220. else
  221. {
  222. metricsEvent.MarkFailedSubmission();
  223. // The metrics event failed to be sent to the backend for the first time.
  224. if (metricsEvent.GetNumFailures() == 1)
  225. {
  226. m_globalStats.m_numErrors++;
  227. m_globalStats.m_numEvents++;
  228. }
  229. if (metricsEvent.GetNumFailures() <= m_clientConfiguration->GetMaxNumRetries())
  230. {
  231. metricsEventsForRetry.AddMetrics(metricsEvent);
  232. }
  233. else
  234. {
  235. m_globalStats.m_numDropped++;
  236. }
  237. }
  238. }
  239. PushMetricsForRetry(metricsEventsForRetry);
  240. }
  241. void MetricsManager::PushMetricsForRetry(MetricsQueue& metricsEventsForRetry)
  242. {
  243. if (m_clientConfiguration->GetMaxNumRetries() == 0)
  244. {
  245. // No retry is required.
  246. m_globalStats.m_numDropped += metricsEventsForRetry.GetNumMetrics();
  247. return;
  248. }
  249. // Push failed events to the front of the queue and reserve the order.
  250. AZStd::lock_guard<AZStd::mutex> lock(m_metricsMutex);
  251. m_metricsQueue.PushMetricsToFront(metricsEventsForRetry);
  252. // Filter metrics events by priority since the queue might be full.
  253. m_globalStats.m_numDropped += m_metricsQueue.FilterMetricsByPriority(m_clientConfiguration->GetMaxQueueSizeInBytes());
  254. }
  255. AZ::Outcome<void, AZStd::string> MetricsManager::SendMetricsToFile(AZStd::shared_ptr<MetricsQueue> metricsQueue)
  256. {
  257. AZStd::lock_guard<AZStd::mutex> lock(m_metricsFileMutex);
  258. AZ::IO::FileIOBase* fileIO = AZ::IO::FileIOBase::GetDirectInstance();
  259. if (!fileIO)
  260. {
  261. return AZ::Failure(AZStd::string{ "No FileIoBase Instance." });
  262. }
  263. const char* metricsFileFullPath = m_clientConfiguration->GetMetricsFileFullPath();
  264. const char* metricsFileDir = m_clientConfiguration->GetMetricsFileDir();
  265. MetricsQueue existingMetricsEvents;
  266. if (!metricsFileFullPath || !metricsFileDir)
  267. {
  268. return AZ::Failure(AZStd::string{ "Failed to get the metrics file directory or path." });
  269. }
  270. if (fileIO->Exists(metricsFileFullPath) && !existingMetricsEvents.ReadFromJson(metricsFileFullPath))
  271. {
  272. return AZ::Failure(AZStd::string{ "Failed to read the existing metrics on disk" });
  273. }
  274. else if (!fileIO->Exists(metricsFileDir) && !fileIO->CreatePath(metricsFileDir))
  275. {
  276. return AZ::Failure(AZStd::string{ "Failed to create metrics directory" });
  277. }
  278. // Append a copy of the metrics queue in the request to the existing metrics events and keep the original submission order.
  279. // Do not modify the metrics queue in the request directly for identifying the metrics events for retry on failure.
  280. MetricsQueue metricsEventsInRequest = *metricsQueue;
  281. existingMetricsEvents.AppendMetrics(metricsEventsInRequest);
  282. AZStd::string serializedMetrics = existingMetricsEvents.SerializeToJson();
  283. AZ::IO::HandleType fileHandle;
  284. if (!fileIO->Open(metricsFileFullPath, AZ::IO::OpenMode::ModeWrite | AZ::IO::OpenMode::ModeText, fileHandle))
  285. {
  286. return AZ::Failure(AZStd::string{ "Failed to open metrics file" });
  287. }
  288. fileIO->Write(fileHandle, serializedMetrics.c_str(), serializedMetrics.size());
  289. fileIO->Close(fileHandle);
  290. return AZ::Success();
  291. }
  292. void MetricsManager::FlushMetricsAsync()
  293. {
  294. AZStd::lock_guard<AZStd::mutex> lock(m_metricsMutex);
  295. if (m_metricsQueue.GetNumMetrics() == 0)
  296. {
  297. return;
  298. }
  299. auto metricsToFlush = AZStd::make_shared<MetricsQueue>();
  300. metricsToFlush->AppendMetrics(m_metricsQueue);
  301. m_metricsQueue.ClearMetrics();
  302. SendMetricsAsync(metricsToFlush);
  303. }
  304. void MetricsManager::ShutdownMetrics()
  305. {
  306. if (m_monitorTerminated)
  307. {
  308. return;
  309. }
  310. // Terminate the monitor thread
  311. m_monitorTerminated = true;
  312. m_waitEvent.release();
  313. if (m_monitorThread.joinable())
  314. {
  315. m_monitorThread.join();
  316. }
  317. }
  318. AZ::s64 MetricsManager::GetNumBufferedMetrics()
  319. {
  320. AZStd::lock_guard<AZStd::mutex> lock(m_metricsMutex);
  321. return m_metricsQueue.GetNumMetrics();
  322. }
  323. const GlobalStatistics& MetricsManager::GetGlobalStatistics() const
  324. {
  325. return m_globalStats;
  326. }
  327. void MetricsManager::UpdateOfflineRecordingStatus(bool enable, bool submitLocalMetrics)
  328. {
  329. m_clientConfiguration->UpdateOfflineRecordingStatus(enable);
  330. if (!enable && submitLocalMetrics)
  331. {
  332. SubmitLocalMetricsAsync();
  333. }
  334. }
  335. void MetricsManager::SubmitLocalMetricsAsync()
  336. {
  337. AZ::Job* job{ nullptr };
  338. job = AZ::CreateJobFunction([this]()
  339. {
  340. AZ::IO::FileIOBase* fileIO = AZ::IO::FileIOBase::GetDirectInstance();
  341. if (!fileIO)
  342. {
  343. AZ_Error("AWSMetrics", false, "No FileIoBase Instance.");
  344. return;
  345. }
  346. // Read metrics from the local metrics file.
  347. AZStd::lock_guard<AZStd::mutex> file_lock(m_metricsFileMutex);
  348. if (!fileIO->Exists(GetMetricsFilePath()))
  349. {
  350. // Local metrics file doesn't exist.
  351. return;
  352. }
  353. MetricsQueue offlineRecords;
  354. if (!offlineRecords.ReadFromJson(GetMetricsFilePath()))
  355. {
  356. AZ_Error("AWSMetrics", false, "Failed to read from the local metrics file %s", GetMetricsFilePath());
  357. return;
  358. }
  359. // Submit the metrics read from the local metrics file.
  360. int numOfflineRecords = offlineRecords.GetNumMetrics();
  361. for (int index = 0; index < numOfflineRecords; ++index)
  362. {
  363. AZStd::lock_guard<AZStd::mutex> lock(m_metricsMutex);
  364. m_metricsQueue.AddMetrics(offlineRecords[index]);
  365. if (m_metricsQueue.GetSizeInBytes() >= static_cast<size_t>(m_clientConfiguration->GetMaxQueueSizeInBytes()))
  366. {
  367. // Flush the metrics queue when the accumulated metrics size hits the limit
  368. m_waitEvent.release();
  369. }
  370. }
  371. // Remove the local metrics file after reading all its content.
  372. if (!fileIO->Remove(GetMetricsFilePath()))
  373. {
  374. AZ_Error("AWSMetrics", false, "Failed to remove the local metrics file %s", GetMetricsFilePath());
  375. return;
  376. }
  377. }, true, m_jobContext.get());
  378. job->Start();
  379. }
  380. const char* MetricsManager::GetMetricsFileDirectory() const
  381. {
  382. return m_clientConfiguration->GetMetricsFileDir();
  383. }
  384. const char* MetricsManager::GetMetricsFilePath() const
  385. {
  386. return m_clientConfiguration->GetMetricsFileFullPath();
  387. }
  388. int MetricsManager::GetNumTotalRequests() const
  389. {
  390. return m_sendMetricsId.load();
  391. }
  392. }