aws_metrics_utils.py 9.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255
  1. """
  2. Copyright (c) Contributors to the Open 3D Engine Project.
  3. For complete copyright and license terms please see the LICENSE at the root of this distribution.
  4. SPDX-License-Identifier: Apache-2.0 OR MIT
  5. """
  6. import logging
  7. import pathlib
  8. import pytest
  9. import typing
  10. from datetime import datetime
  11. from botocore.exceptions import WaiterError
  12. from .aws_metrics_waiters import KinesisAnalyticsApplicationUpdatedWaiter, \
  13. CloudWatchMetricsDeliveredWaiter, DataLakeMetricsDeliveredWaiter, GlueCrawlerReadyWaiter
  14. logging.getLogger('boto').setLevel(logging.CRITICAL)
  15. # Expected directory and file extension for the S3 objects.
  16. EXPECTED_S3_DIRECTORY = 'firehose_events/'
  17. EXPECTED_S3_OBJECT_EXTENSION = '.parquet'
  18. class AWSMetricsUtils:
  19. """
  20. Provide utils functions for the AWSMetrics gem to interact with the deployed resources.
  21. """
  22. def __init__(self, aws_utils: pytest.fixture):
  23. self._aws_util = aws_utils
  24. def start_kinesis_data_analytics_application(self, application_name: str) -> None:
  25. """
  26. Start the Kinesis Data Analytics application for real-time analytics.
  27. :param application_name: Name of the Kinesis Data Analytics application.
  28. """
  29. input_id = self.get_kinesis_analytics_application_input_id(application_name)
  30. assert input_id, 'invalid Kinesis Data Analytics application input.'
  31. client = self._aws_util.client('kinesisanalytics')
  32. try:
  33. client.start_application(
  34. ApplicationName=application_name,
  35. InputConfigurations=[
  36. {
  37. 'Id': input_id,
  38. 'InputStartingPositionConfiguration': {
  39. 'InputStartingPosition': 'NOW'
  40. }
  41. },
  42. ]
  43. )
  44. except client.exceptions.ResourceInUseException:
  45. # The application has been started.
  46. return
  47. try:
  48. KinesisAnalyticsApplicationUpdatedWaiter(client, 'RUNNING').wait(application_name=application_name)
  49. except WaiterError as e:
  50. assert False, f'Failed to start the Kinesis Data Analytics application: {str(e)}.'
  51. def get_kinesis_analytics_application_input_id(self, application_name: str) -> str:
  52. """
  53. Get the input ID for the Kenisis Data Analytics application.
  54. :param application_name: Name of the Kenisis Data Analytics application.
  55. :return: Input ID for the Kenisis Data Analytics application.
  56. """
  57. client = self._aws_util.client('kinesisanalytics')
  58. response = client.describe_application(
  59. ApplicationName=application_name
  60. )
  61. if not response:
  62. return ''
  63. input_descriptions = response.get('ApplicationDetail', {}).get('InputDescriptions', [])
  64. if len(input_descriptions) != 1:
  65. return ''
  66. return input_descriptions[0].get('InputId', '')
  67. def stop_kinesis_data_analytics_application(self, application_name: str) -> None:
  68. """
  69. Stop the Kinesis Data Analytics application.
  70. :param application_name: Name of the Kinesis Data Analytics application.
  71. """
  72. client = self._aws_util.client('kinesisanalytics')
  73. client.stop_application(
  74. ApplicationName=application_name
  75. )
  76. try:
  77. KinesisAnalyticsApplicationUpdatedWaiter(client, 'READY').wait(application_name=application_name)
  78. except WaiterError as e:
  79. assert False, f'Failed to stop the Kinesis Data Analytics application: {str(e)}.'
  80. def verify_cloud_watch_delivery(self, namespace: str, metrics_name: str,
  81. dimensions: typing.List[dict], start_time: datetime) -> None:
  82. """
  83. Verify that the expected metrics is delivered to CloudWatch.
  84. :param namespace: Namespace of the metrics.
  85. :param metrics_name: Name of the metrics.
  86. :param dimensions: Dimensions of the metrics.
  87. :param start_time: Start time for generating the metrics.
  88. """
  89. client = self._aws_util.client('cloudwatch')
  90. try:
  91. CloudWatchMetricsDeliveredWaiter(client).wait(
  92. namespace=namespace,
  93. metrics_name=metrics_name,
  94. dimensions=dimensions,
  95. start_time=start_time
  96. )
  97. except WaiterError as e:
  98. assert False, f'Failed to deliver metrics to CloudWatch: {str(e)}.'
  99. def verify_s3_delivery(self, analytics_bucket_name: str) -> None:
  100. """
  101. Verify that metrics are delivered to S3 for batch analytics successfully.
  102. :param analytics_bucket_name: Name of the deployed S3 bucket.
  103. """
  104. client = self._aws_util.client('s3')
  105. bucket_name = analytics_bucket_name
  106. try:
  107. DataLakeMetricsDeliveredWaiter(client).wait(bucket_name=bucket_name, prefix=EXPECTED_S3_DIRECTORY)
  108. except WaiterError as e:
  109. assert False, f'Failed to find the S3 directory for storing metrics data: {str(e)}.'
  110. # Check whether the data is converted to the expected data format.
  111. response = client.list_objects_v2(
  112. Bucket=bucket_name,
  113. Prefix=EXPECTED_S3_DIRECTORY
  114. )
  115. assert response.get('KeyCount', 0) != 0, f'Failed to deliver metrics to the S3 bucket {bucket_name}.'
  116. s3_objects = response.get('Contents', [])
  117. for s3_object in s3_objects:
  118. key = s3_object.get('Key', '')
  119. assert pathlib.Path(key).suffix == EXPECTED_S3_OBJECT_EXTENSION, \
  120. f'Invalid data format is found in the S3 bucket {bucket_name}'
  121. def run_glue_crawler(self, crawler_name: str) -> None:
  122. """
  123. Run the Glue crawler and wait for it to finish.
  124. :param crawler_name: Name of the Glue crawler
  125. """
  126. client = self._aws_util.client('glue')
  127. try:
  128. client.start_crawler(
  129. Name=crawler_name
  130. )
  131. except client.exceptions.CrawlerRunningException:
  132. # The crawler has already been started.
  133. return
  134. try:
  135. GlueCrawlerReadyWaiter(client).wait(crawler_name=crawler_name)
  136. except WaiterError as e:
  137. assert False, f'Failed to run the Glue crawler: {str(e)}.'
  138. def run_named_queries(self, work_group: str) -> None:
  139. """
  140. Run the named queries under the specific Athena work group.
  141. :param work_group: Name of the Athena work group.
  142. """
  143. client = self._aws_util.client('athena')
  144. # List all the named queries.
  145. response = client.list_named_queries(
  146. WorkGroup=work_group
  147. )
  148. named_query_ids = response.get('NamedQueryIds', [])
  149. # Run each of the queries.
  150. for named_query_id in named_query_ids:
  151. get_named_query_response = client.get_named_query(
  152. NamedQueryId=named_query_id
  153. )
  154. named_query = get_named_query_response.get('NamedQuery', {})
  155. start_query_execution_response = client.start_query_execution(
  156. QueryString=named_query.get('QueryString', ''),
  157. QueryExecutionContext={
  158. 'Database': named_query.get('Database', '')
  159. },
  160. WorkGroup=work_group
  161. )
  162. # Wait for the query to finish.
  163. state = 'RUNNING'
  164. while state == 'QUEUED' or state == 'RUNNING':
  165. get_query_execution_response = client.get_query_execution(
  166. QueryExecutionId=start_query_execution_response.get('QueryExecutionId', '')
  167. )
  168. state = get_query_execution_response.get('QueryExecution', {}).get('Status', {}).get('State', '')
  169. assert state == 'SUCCEEDED', f'Failed to run the named query {named_query.get("Name", {})}'
  170. def empty_bucket(self, bucket_name: str) -> None:
  171. """
  172. Empty the S3 bucket following:
  173. https://boto3.amazonaws.com/v1/documentation/api/latest/guide/migrations3.html
  174. :param bucket_name: Name of the S3 bucket.
  175. """
  176. s3 = self._aws_util.resource('s3')
  177. bucket = s3.Bucket(bucket_name)
  178. for key in bucket.objects.all():
  179. key.delete()
  180. def delete_table(self, database_name: str, table_name: str) -> None:
  181. """
  182. Delete an existing Glue table.
  183. :param database_name: Name of the Glue database.
  184. :param table_name: Name of the table to delete.
  185. """
  186. client = self._aws_util.client('glue')
  187. client.delete_table(
  188. DatabaseName=database_name,
  189. Name=table_name
  190. )
  191. def try_delete_table(self, database_name: str, table_name: str) -> None:
  192. """
  193. Delete an existing Glue table. If table does not exist ignore
  194. :param database_name: Name of the Glue database.
  195. :param table_name: Name of the table to delete.
  196. """
  197. client = self._aws_util.client('glue')
  198. try:
  199. client.delete_table(
  200. DatabaseName=database_name,
  201. Name=table_name
  202. )
  203. except client.exceptions.EntityNotFoundException:
  204. print(f"Table {table_name} does not exist in database {database_name}")
  205. @pytest.fixture(scope='function')
  206. def aws_metrics_utils(
  207. request: pytest.fixture,
  208. aws_utils: pytest.fixture):
  209. """
  210. Fixture for the AWS metrics util functions.
  211. :param request: _pytest.fixtures.SubRequest class that handles getting
  212. a pytest fixture from a pytest function/fixture.
  213. :param aws_utils: aws_utils fixture.
  214. """
  215. aws_utils_obj = AWSMetricsUtils(aws_utils)
  216. return aws_utils_obj