aws_metrics_utils.py 9.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248
  1. """
  2. Copyright (c) Contributors to the Open 3D Engine Project.
  3. For complete copyright and license terms please see the LICENSE at the root of this distribution.
  4. SPDX-License-Identifier: Apache-2.0 OR MIT
  5. """
  6. import logging
  7. import pathlib
  8. import pytest
  9. import typing
  10. from datetime import datetime
  11. from botocore.exceptions import WaiterError
  12. from AWS.common.aws_utils import AwsUtils
  13. from .aws_metrics_waiters import KinesisAnalyticsApplicationUpdatedWaiter, \
  14. CloudWatchMetricsDeliveredWaiter, DataLakeMetricsDeliveredWaiter, GlueCrawlerReadyWaiter
  15. logging.getLogger('boto').setLevel(logging.CRITICAL)
  16. # Expected directory and file extension for the S3 objects.
  17. EXPECTED_S3_DIRECTORY = 'firehose_events/'
  18. EXPECTED_S3_OBJECT_EXTENSION = '.parquet'
  19. class AWSMetricsUtils:
  20. """
  21. Provide utils functions for the AWSMetrics gem to interact with the deployed resources.
  22. """
  23. def __init__(self, aws_utils: AwsUtils):
  24. self._aws_util = aws_utils
  25. def start_kinesis_data_analytics_application(self, application_name: str) -> None:
  26. """
  27. Start the Kenisis Data Analytics application for real-time analytics.
  28. :param application_name: Name of the Kenisis Data Analytics application.
  29. """
  30. input_id = self.get_kinesis_analytics_application_input_id(application_name)
  31. assert input_id, 'invalid Kinesis Data Analytics application input.'
  32. client = self._aws_util.client('kinesisanalytics')
  33. try:
  34. client.start_application(
  35. ApplicationName=application_name,
  36. InputConfigurations=[
  37. {
  38. 'Id': input_id,
  39. 'InputStartingPositionConfiguration': {
  40. 'InputStartingPosition': 'NOW'
  41. }
  42. },
  43. ]
  44. )
  45. except client.exceptions.ResourceInUseException:
  46. # The application has been started.
  47. return
  48. try:
  49. KinesisAnalyticsApplicationUpdatedWaiter(client, 'RUNNING').wait(application_name=application_name)
  50. except WaiterError as e:
  51. assert False, f'Failed to start the Kinesis Data Analytics application: {str(e)}.'
  52. def get_kinesis_analytics_application_input_id(self, application_name: str) -> str:
  53. """
  54. Get the input ID for the Kenisis Data Analytics application.
  55. :param application_name: Name of the Kenisis Data Analytics application.
  56. :return: Input ID for the Kenisis Data Analytics application.
  57. """
  58. client = self._aws_util.client('kinesisanalytics')
  59. response = client.describe_application(
  60. ApplicationName=application_name
  61. )
  62. if not response:
  63. return ''
  64. input_descriptions = response.get('ApplicationDetail', {}).get('InputDescriptions', [])
  65. if len(input_descriptions) != 1:
  66. return ''
  67. return input_descriptions[0].get('InputId', '')
  68. def stop_kinesis_data_analytics_application(self, application_name: str) -> None:
  69. """
  70. Stop the Kenisis Data Analytics application.
  71. :param application_name: Name of the Kenisis Data Analytics application.
  72. """
  73. client = self._aws_util.client('kinesisanalytics')
  74. client.stop_application(
  75. ApplicationName=application_name
  76. )
  77. try:
  78. KinesisAnalyticsApplicationUpdatedWaiter(client, 'READY').wait(application_name=application_name)
  79. except WaiterError as e:
  80. assert False, f'Failed to stop the Kinesis Data Analytics application: {str(e)}.'
  81. def verify_cloud_watch_delivery(self, namespace: str, metrics_name: str,
  82. dimensions: typing.List[dict], start_time: datetime) -> None:
  83. """
  84. Verify that the expected metrics is delivered to CloudWatch.
  85. :param namespace: Namespace of the metrics.
  86. :param metrics_name: Name of the metrics.
  87. :param dimensions: Dimensions of the metrics.
  88. :param start_time: Start time for generating the metrics.
  89. """
  90. client = self._aws_util.client('cloudwatch')
  91. try:
  92. CloudWatchMetricsDeliveredWaiter(client).wait(
  93. namespace=namespace,
  94. metrics_name=metrics_name,
  95. dimensions=dimensions,
  96. start_time=start_time
  97. )
  98. except WaiterError as e:
  99. assert False, f'Failed to deliver metrics to CloudWatch: {str(e)}.'
  100. def verify_s3_delivery(self, analytics_bucket_name: str) -> None:
  101. """
  102. Verify that metrics are delivered to S3 for batch analytics successfully.
  103. :param analytics_bucket_name: Name of the deployed S3 bucket.
  104. """
  105. client = self._aws_util.client('s3')
  106. bucket_name = analytics_bucket_name
  107. try:
  108. DataLakeMetricsDeliveredWaiter(client).wait(bucket_name=bucket_name, prefix=EXPECTED_S3_DIRECTORY)
  109. except WaiterError as e:
  110. assert False, f'Failed to find the S3 directory for storing metrics data: {str(e)}.'
  111. # Check whether the data is converted to the expected data format.
  112. response = client.list_objects_v2(
  113. Bucket=bucket_name,
  114. Prefix=EXPECTED_S3_DIRECTORY
  115. )
  116. assert response.get('KeyCount', 0) != 0, f'Failed to deliver metrics to the S3 bucket {bucket_name}.'
  117. s3_objects = response.get('Contents', [])
  118. for s3_object in s3_objects:
  119. key = s3_object.get('Key', '')
  120. assert pathlib.Path(key).suffix == EXPECTED_S3_OBJECT_EXTENSION, \
  121. f'Invalid data format is found in the S3 bucket {bucket_name}'
  122. def run_glue_crawler(self, crawler_name: str) -> None:
  123. """
  124. Run the Glue crawler and wait for it to finish.
  125. :param crawler_name: Name of the Glue crawler
  126. """
  127. client = self._aws_util.client('glue')
  128. try:
  129. client.start_crawler(
  130. Name=crawler_name
  131. )
  132. except client.exceptions.CrawlerRunningException:
  133. # The crawler has already been started.
  134. return
  135. try:
  136. GlueCrawlerReadyWaiter(client).wait(crawler_name=crawler_name)
  137. except WaiterError as e:
  138. assert False, f'Failed to run the Glue crawler: {str(e)}.'
  139. def run_named_queries(self, work_group: str) -> None:
  140. """
  141. Run the named queries under the specific Athena work group.
  142. :param work_group: Name of the Athena work group.
  143. """
  144. client = self._aws_util.client('athena')
  145. # List all the named queries.
  146. response = client.list_named_queries(
  147. WorkGroup=work_group
  148. )
  149. named_query_ids = response.get('NamedQueryIds', [])
  150. # Run each of the queries.
  151. for named_query_id in named_query_ids:
  152. get_named_query_response = client.get_named_query(
  153. NamedQueryId=named_query_id
  154. )
  155. named_query = get_named_query_response.get('NamedQuery', {})
  156. start_query_execution_response = client.start_query_execution(
  157. QueryString=named_query.get('QueryString', ''),
  158. QueryExecutionContext={
  159. 'Database': named_query.get('Database', '')
  160. },
  161. WorkGroup=work_group
  162. )
  163. # Wait for the query to finish.
  164. state = 'RUNNING'
  165. while state == 'QUEUED' or state == 'RUNNING':
  166. get_query_execution_response = client.get_query_execution(
  167. QueryExecutionId=start_query_execution_response.get('QueryExecutionId', '')
  168. )
  169. state = get_query_execution_response.get('QueryExecution', {}).get('Status', {}).get('State', '')
  170. assert state == 'SUCCEEDED', f'Failed to run the named query {named_query.get("Name", {})}'
  171. def empty_s3_bucket(self, bucket_name: str) -> None:
  172. """
  173. Empty the S3 bucket following:
  174. https://boto3.amazonaws.com/v1/documentation/api/latest/guide/migrations3.html
  175. :param bucket_name: Name of the S3 bucket.
  176. """
  177. s3 = self._aws_util.resource('s3')
  178. bucket = s3.Bucket(bucket_name)
  179. for key in bucket.objects.all():
  180. key.delete()
  181. def get_analytics_bucket_name(self, stack_name: str) -> str:
  182. """
  183. Get the name of the deployed S3 bucket.
  184. :param stack_name: Name of the CloudFormation stack.
  185. :return: Name of the deployed S3 bucket.
  186. """
  187. client = self._aws_util.client('cloudformation')
  188. response = client.describe_stack_resources(
  189. StackName=stack_name
  190. )
  191. resources = response.get('StackResources', [])
  192. for resource in resources:
  193. if resource.get('ResourceType') == 'AWS::S3::Bucket':
  194. return resource.get('PhysicalResourceId', '')
  195. return ''
  196. @pytest.fixture(scope='function')
  197. def aws_metrics_utils(
  198. request: pytest.fixture,
  199. aws_utils: pytest.fixture):
  200. """
  201. Fixture for the AWS metrics util functions.
  202. :param request: _pytest.fixtures.SubRequest class that handles getting
  203. a pytest fixture from a pytest function/fixture.
  204. :param aws_utils: aws_utils fixture.
  205. """
  206. aws_utils_obj = AWSMetricsUtils(aws_utils)
  207. return aws_utils_obj