123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255 |
- """
- Copyright (c) Contributors to the Open 3D Engine Project.
- For complete copyright and license terms please see the LICENSE at the root of this distribution.
- SPDX-License-Identifier: Apache-2.0 OR MIT
- """
- import logging
- import pathlib
- import pytest
- import typing
- from datetime import datetime
- from botocore.exceptions import WaiterError
- from .aws_metrics_waiters import KinesisAnalyticsApplicationUpdatedWaiter, \
- CloudWatchMetricsDeliveredWaiter, DataLakeMetricsDeliveredWaiter, GlueCrawlerReadyWaiter
- logging.getLogger('boto').setLevel(logging.CRITICAL)
- # Expected directory and file extension for the S3 objects.
- EXPECTED_S3_DIRECTORY = 'firehose_events/'
- EXPECTED_S3_OBJECT_EXTENSION = '.parquet'
- class AWSMetricsUtils:
- """
- Provide utils functions for the AWSMetrics gem to interact with the deployed resources.
- """
- def __init__(self, aws_utils: pytest.fixture):
- self._aws_util = aws_utils
- def start_kinesis_data_analytics_application(self, application_name: str) -> None:
- """
- Start the Kinesis Data Analytics application for real-time analytics.
- :param application_name: Name of the Kinesis Data Analytics application.
- """
- input_id = self.get_kinesis_analytics_application_input_id(application_name)
- assert input_id, 'invalid Kinesis Data Analytics application input.'
- client = self._aws_util.client('kinesisanalytics')
- try:
- client.start_application(
- ApplicationName=application_name,
- InputConfigurations=[
- {
- 'Id': input_id,
- 'InputStartingPositionConfiguration': {
- 'InputStartingPosition': 'NOW'
- }
- },
- ]
- )
- except client.exceptions.ResourceInUseException:
- # The application has been started.
- return
- try:
- KinesisAnalyticsApplicationUpdatedWaiter(client, 'RUNNING').wait(application_name=application_name)
- except WaiterError as e:
- assert False, f'Failed to start the Kinesis Data Analytics application: {str(e)}.'
- def get_kinesis_analytics_application_input_id(self, application_name: str) -> str:
- """
- Get the input ID for the Kenisis Data Analytics application.
- :param application_name: Name of the Kenisis Data Analytics application.
- :return: Input ID for the Kenisis Data Analytics application.
- """
- client = self._aws_util.client('kinesisanalytics')
- response = client.describe_application(
- ApplicationName=application_name
- )
- if not response:
- return ''
- input_descriptions = response.get('ApplicationDetail', {}).get('InputDescriptions', [])
- if len(input_descriptions) != 1:
- return ''
- return input_descriptions[0].get('InputId', '')
- def stop_kinesis_data_analytics_application(self, application_name: str) -> None:
- """
- Stop the Kinesis Data Analytics application.
- :param application_name: Name of the Kinesis Data Analytics application.
- """
- client = self._aws_util.client('kinesisanalytics')
- client.stop_application(
- ApplicationName=application_name
- )
- try:
- KinesisAnalyticsApplicationUpdatedWaiter(client, 'READY').wait(application_name=application_name)
- except WaiterError as e:
- assert False, f'Failed to stop the Kinesis Data Analytics application: {str(e)}.'
- def verify_cloud_watch_delivery(self, namespace: str, metrics_name: str,
- dimensions: typing.List[dict], start_time: datetime) -> None:
- """
- Verify that the expected metrics is delivered to CloudWatch.
- :param namespace: Namespace of the metrics.
- :param metrics_name: Name of the metrics.
- :param dimensions: Dimensions of the metrics.
- :param start_time: Start time for generating the metrics.
- """
- client = self._aws_util.client('cloudwatch')
- try:
- CloudWatchMetricsDeliveredWaiter(client).wait(
- namespace=namespace,
- metrics_name=metrics_name,
- dimensions=dimensions,
- start_time=start_time
- )
- except WaiterError as e:
- assert False, f'Failed to deliver metrics to CloudWatch: {str(e)}.'
- def verify_s3_delivery(self, analytics_bucket_name: str) -> None:
- """
- Verify that metrics are delivered to S3 for batch analytics successfully.
- :param analytics_bucket_name: Name of the deployed S3 bucket.
- """
- client = self._aws_util.client('s3')
- bucket_name = analytics_bucket_name
- try:
- DataLakeMetricsDeliveredWaiter(client).wait(bucket_name=bucket_name, prefix=EXPECTED_S3_DIRECTORY)
- except WaiterError as e:
- assert False, f'Failed to find the S3 directory for storing metrics data: {str(e)}.'
- # Check whether the data is converted to the expected data format.
- response = client.list_objects_v2(
- Bucket=bucket_name,
- Prefix=EXPECTED_S3_DIRECTORY
- )
- assert response.get('KeyCount', 0) != 0, f'Failed to deliver metrics to the S3 bucket {bucket_name}.'
- s3_objects = response.get('Contents', [])
- for s3_object in s3_objects:
- key = s3_object.get('Key', '')
- assert pathlib.Path(key).suffix == EXPECTED_S3_OBJECT_EXTENSION, \
- f'Invalid data format is found in the S3 bucket {bucket_name}'
- def run_glue_crawler(self, crawler_name: str) -> None:
- """
- Run the Glue crawler and wait for it to finish.
- :param crawler_name: Name of the Glue crawler
- """
- client = self._aws_util.client('glue')
- try:
- client.start_crawler(
- Name=crawler_name
- )
- except client.exceptions.CrawlerRunningException:
- # The crawler has already been started.
- return
- try:
- GlueCrawlerReadyWaiter(client).wait(crawler_name=crawler_name)
- except WaiterError as e:
- assert False, f'Failed to run the Glue crawler: {str(e)}.'
- def run_named_queries(self, work_group: str) -> None:
- """
- Run the named queries under the specific Athena work group.
- :param work_group: Name of the Athena work group.
- """
- client = self._aws_util.client('athena')
- # List all the named queries.
- response = client.list_named_queries(
- WorkGroup=work_group
- )
- named_query_ids = response.get('NamedQueryIds', [])
- # Run each of the queries.
- for named_query_id in named_query_ids:
- get_named_query_response = client.get_named_query(
- NamedQueryId=named_query_id
- )
- named_query = get_named_query_response.get('NamedQuery', {})
- start_query_execution_response = client.start_query_execution(
- QueryString=named_query.get('QueryString', ''),
- QueryExecutionContext={
- 'Database': named_query.get('Database', '')
- },
- WorkGroup=work_group
- )
- # Wait for the query to finish.
- state = 'RUNNING'
- while state == 'QUEUED' or state == 'RUNNING':
- get_query_execution_response = client.get_query_execution(
- QueryExecutionId=start_query_execution_response.get('QueryExecutionId', '')
- )
- state = get_query_execution_response.get('QueryExecution', {}).get('Status', {}).get('State', '')
- assert state == 'SUCCEEDED', f'Failed to run the named query {named_query.get("Name", {})}'
- def empty_bucket(self, bucket_name: str) -> None:
- """
- Empty the S3 bucket following:
- https://boto3.amazonaws.com/v1/documentation/api/latest/guide/migrations3.html
- :param bucket_name: Name of the S3 bucket.
- """
- s3 = self._aws_util.resource('s3')
- bucket = s3.Bucket(bucket_name)
- for key in bucket.objects.all():
- key.delete()
- def delete_table(self, database_name: str, table_name: str) -> None:
- """
- Delete an existing Glue table.
- :param database_name: Name of the Glue database.
- :param table_name: Name of the table to delete.
- """
- client = self._aws_util.client('glue')
- client.delete_table(
- DatabaseName=database_name,
- Name=table_name
- )
- def try_delete_table(self, database_name: str, table_name: str) -> None:
- """
- Delete an existing Glue table. If table does not exist ignore
- :param database_name: Name of the Glue database.
- :param table_name: Name of the table to delete.
- """
- client = self._aws_util.client('glue')
- try:
- client.delete_table(
- DatabaseName=database_name,
- Name=table_name
- )
- except client.exceptions.EntityNotFoundException:
- print(f"Table {table_name} does not exist in database {database_name}")
- @pytest.fixture(scope='function')
- def aws_metrics_utils(
- request: pytest.fixture,
- aws_utils: pytest.fixture):
- """
- Fixture for the AWS metrics util functions.
- :param request: _pytest.fixtures.SubRequest class that handles getting
- a pytest fixture from a pytest function/fixture.
- :param aws_utils: aws_utils fixture.
- """
- aws_utils_obj = AWSMetricsUtils(aws_utils)
- return aws_utils_obj
|