""" Copyright (c) Contributors to the Open 3D Engine Project. For complete copyright and license terms please see the LICENSE at the root of this distribution. SPDX-License-Identifier: Apache-2.0 OR MIT """ import logging import pathlib import pytest import typing from datetime import datetime from botocore.exceptions import WaiterError from .aws_metrics_waiters import KinesisAnalyticsApplicationUpdatedWaiter, \ CloudWatchMetricsDeliveredWaiter, DataLakeMetricsDeliveredWaiter, GlueCrawlerReadyWaiter logging.getLogger('boto').setLevel(logging.CRITICAL) # Expected directory and file extension for the S3 objects. EXPECTED_S3_DIRECTORY = 'firehose_events/' EXPECTED_S3_OBJECT_EXTENSION = '.parquet' class AWSMetricsUtils: """ Provide utils functions for the AWSMetrics gem to interact with the deployed resources. """ def __init__(self, aws_utils: pytest.fixture): self._aws_util = aws_utils def start_kinesis_data_analytics_application(self, application_name: str) -> None: """ Start the Kinesis Data Analytics application for real-time analytics. :param application_name: Name of the Kinesis Data Analytics application. """ input_id = self.get_kinesis_analytics_application_input_id(application_name) assert input_id, 'invalid Kinesis Data Analytics application input.' client = self._aws_util.client('kinesisanalytics') try: client.start_application( ApplicationName=application_name, InputConfigurations=[ { 'Id': input_id, 'InputStartingPositionConfiguration': { 'InputStartingPosition': 'NOW' } }, ] ) except client.exceptions.ResourceInUseException: # The application has been started. return try: KinesisAnalyticsApplicationUpdatedWaiter(client, 'RUNNING').wait(application_name=application_name) except WaiterError as e: assert False, f'Failed to start the Kinesis Data Analytics application: {str(e)}.' def get_kinesis_analytics_application_input_id(self, application_name: str) -> str: """ Get the input ID for the Kenisis Data Analytics application. :param application_name: Name of the Kenisis Data Analytics application. :return: Input ID for the Kenisis Data Analytics application. """ client = self._aws_util.client('kinesisanalytics') response = client.describe_application( ApplicationName=application_name ) if not response: return '' input_descriptions = response.get('ApplicationDetail', {}).get('InputDescriptions', []) if len(input_descriptions) != 1: return '' return input_descriptions[0].get('InputId', '') def stop_kinesis_data_analytics_application(self, application_name: str) -> None: """ Stop the Kinesis Data Analytics application. :param application_name: Name of the Kinesis Data Analytics application. """ client = self._aws_util.client('kinesisanalytics') client.stop_application( ApplicationName=application_name ) try: KinesisAnalyticsApplicationUpdatedWaiter(client, 'READY').wait(application_name=application_name) except WaiterError as e: assert False, f'Failed to stop the Kinesis Data Analytics application: {str(e)}.' def verify_cloud_watch_delivery(self, namespace: str, metrics_name: str, dimensions: typing.List[dict], start_time: datetime) -> None: """ Verify that the expected metrics is delivered to CloudWatch. :param namespace: Namespace of the metrics. :param metrics_name: Name of the metrics. :param dimensions: Dimensions of the metrics. :param start_time: Start time for generating the metrics. """ client = self._aws_util.client('cloudwatch') try: CloudWatchMetricsDeliveredWaiter(client).wait( namespace=namespace, metrics_name=metrics_name, dimensions=dimensions, start_time=start_time ) except WaiterError as e: assert False, f'Failed to deliver metrics to CloudWatch: {str(e)}.' def verify_s3_delivery(self, analytics_bucket_name: str) -> None: """ Verify that metrics are delivered to S3 for batch analytics successfully. :param analytics_bucket_name: Name of the deployed S3 bucket. """ client = self._aws_util.client('s3') bucket_name = analytics_bucket_name try: DataLakeMetricsDeliveredWaiter(client).wait(bucket_name=bucket_name, prefix=EXPECTED_S3_DIRECTORY) except WaiterError as e: assert False, f'Failed to find the S3 directory for storing metrics data: {str(e)}.' # Check whether the data is converted to the expected data format. response = client.list_objects_v2( Bucket=bucket_name, Prefix=EXPECTED_S3_DIRECTORY ) assert response.get('KeyCount', 0) != 0, f'Failed to deliver metrics to the S3 bucket {bucket_name}.' s3_objects = response.get('Contents', []) for s3_object in s3_objects: key = s3_object.get('Key', '') assert pathlib.Path(key).suffix == EXPECTED_S3_OBJECT_EXTENSION, \ f'Invalid data format is found in the S3 bucket {bucket_name}' def run_glue_crawler(self, crawler_name: str) -> None: """ Run the Glue crawler and wait for it to finish. :param crawler_name: Name of the Glue crawler """ client = self._aws_util.client('glue') try: client.start_crawler( Name=crawler_name ) except client.exceptions.CrawlerRunningException: # The crawler has already been started. return try: GlueCrawlerReadyWaiter(client).wait(crawler_name=crawler_name) except WaiterError as e: assert False, f'Failed to run the Glue crawler: {str(e)}.' def run_named_queries(self, work_group: str) -> None: """ Run the named queries under the specific Athena work group. :param work_group: Name of the Athena work group. """ client = self._aws_util.client('athena') # List all the named queries. response = client.list_named_queries( WorkGroup=work_group ) named_query_ids = response.get('NamedQueryIds', []) # Run each of the queries. for named_query_id in named_query_ids: get_named_query_response = client.get_named_query( NamedQueryId=named_query_id ) named_query = get_named_query_response.get('NamedQuery', {}) start_query_execution_response = client.start_query_execution( QueryString=named_query.get('QueryString', ''), QueryExecutionContext={ 'Database': named_query.get('Database', '') }, WorkGroup=work_group ) # Wait for the query to finish. state = 'RUNNING' while state == 'QUEUED' or state == 'RUNNING': get_query_execution_response = client.get_query_execution( QueryExecutionId=start_query_execution_response.get('QueryExecutionId', '') ) state = get_query_execution_response.get('QueryExecution', {}).get('Status', {}).get('State', '') assert state == 'SUCCEEDED', f'Failed to run the named query {named_query.get("Name", {})}' def empty_bucket(self, bucket_name: str) -> None: """ Empty the S3 bucket following: https://boto3.amazonaws.com/v1/documentation/api/latest/guide/migrations3.html :param bucket_name: Name of the S3 bucket. """ s3 = self._aws_util.resource('s3') bucket = s3.Bucket(bucket_name) for key in bucket.objects.all(): key.delete() def delete_table(self, database_name: str, table_name: str) -> None: """ Delete an existing Glue table. :param database_name: Name of the Glue database. :param table_name: Name of the table to delete. """ client = self._aws_util.client('glue') client.delete_table( DatabaseName=database_name, Name=table_name ) def try_delete_table(self, database_name: str, table_name: str) -> None: """ Delete an existing Glue table. If table does not exist ignore :param database_name: Name of the Glue database. :param table_name: Name of the table to delete. """ client = self._aws_util.client('glue') try: client.delete_table( DatabaseName=database_name, Name=table_name ) except client.exceptions.EntityNotFoundException: print(f"Table {table_name} does not exist in database {database_name}") @pytest.fixture(scope='function') def aws_metrics_utils( request: pytest.fixture, aws_utils: pytest.fixture): """ Fixture for the AWS metrics util functions. :param request: _pytest.fixtures.SubRequest class that handles getting a pytest fixture from a pytest function/fixture. :param aws_utils: aws_utils fixture. """ aws_utils_obj = AWSMetricsUtils(aws_utils) return aws_utils_obj