Bläddra i källkod

Merge pull request #15463 from aws-lumberyard-dev/ProjectExportCLI

Implementing Project Export CLI MVP, including ablity to execute user's python scripts and process any commands. Unit tests are also included
Alex Peterson 2 år sedan
förälder
incheckning
0c6724e128

+ 19 - 6
scripts/o3de.py

@@ -13,7 +13,6 @@ import sys
 
 logger = logging.getLogger('o3de')
 
-
 def add_args(parser: argparse.ArgumentParser) -> None:
     """
     add_args is called to add expected parser arguments and subparsers arguments to each command such that it can be
@@ -35,7 +34,7 @@ def add_args(parser: argparse.ArgumentParser) -> None:
     sys.path.insert(0, str(o3de_package_dir))
     from o3de import engine_properties, engine_template, gem_properties, \
         global_project, register, print_registration, get_registration, \
-        enable_gem, disable_gem, project_properties, sha256, download
+        enable_gem, disable_gem, project_properties, sha256, download, export_project
     # Remove the temporarily added path
     sys.path = sys.path[1:]
 
@@ -75,6 +74,8 @@ def add_args(parser: argparse.ArgumentParser) -> None:
     # download
     download.add_args(subparsers)
 
+    # export_project
+    export_project.add_args(subparsers)
 
 if __name__ == "__main__":
     # parse the command line args
@@ -83,16 +84,28 @@ if __name__ == "__main__":
     # add args to the parser
     add_args(the_parser)
 
-    # parse args
-    the_args = the_parser.parse_args()
-
     # if empty print help
     if len(sys.argv) == 1:
         the_parser.print_help(sys.stderr)
         sys.exit(1)
 
+    # parse args
+    # argparse stores unknown arguments separately as a tuple,
+    # not packed in the same NameSpace as known arguments
+    known_args, unknown_args = the_parser.parse_known_args()
+    if hasattr(known_args, 'accepts_partial_args'):
+        ret = known_args.func(known_args, unknown_args) if hasattr(known_args, 'func') else 1
+    
+    elif unknown_args:
+        # since we expect every command which doesn't accept partial args to process only known args,
+        # if we face unknown args in such cases, we should throw an error.
+        # parse_args() calls parse_known_args() and will issue an error 
+        # https://hg.python.org/cpython/file/bb9fc884a838/Lib/argparse.py#l1725
+        the_parser.parse_args()
+    else:
+        ret = known_args.func(known_args) if hasattr(known_args, 'func') else 1
+
     # run
-    ret = the_args.func(the_args) if hasattr(the_args, 'func') else 1
     logger.info('Success!' if ret == 0 else 'Completed with issues: result {}'.format(ret))
 
     # return

+ 130 - 0
scripts/o3de/o3de/export_project.py

@@ -0,0 +1,130 @@
+#
+# Copyright (c) Contributors to the Open 3D Engine Project.
+# For complete copyright and license terms please see the LICENSE at the root of this distribution.
+#
+# SPDX-License-Identifier: Apache-2.0 OR MIT
+#
+#
+import argparse
+import logging
+import os
+import pathlib
+import sys
+
+from o3de import manifest, utils
+
+class O3DEScriptExportContext(object):
+    """
+    The context object is used to store parameter values and variables throughout the lifetime of an export script's execution.
+    It can also be passed onto nested scripts the export script may execute, which can in turn update the context as necessary.
+    """
+    
+    def __init__(self, export_script_path: pathlib.Path,
+                       project_path: pathlib.Path,
+                       engine_path: pathlib.Path,
+                       args: list = []) -> None:
+        self._export_script_path = export_script_path
+        self._project_path = project_path
+        self._engine_path = engine_path
+        self._args = args
+        
+    @property
+    def export_script_path(self) -> pathlib.Path:
+        """The absolute path to the export script being run."""
+        return self._export_script_path
+    
+    @property
+    def project_path(self) -> pathlib.Path:
+        """The absolute path to the project being exported."""
+        return self._project_path
+    
+    @property
+    def engine_path(self) -> pathlib.Path:
+        """The absolute path to the engine that the project is built with."""
+        return self._engine_path
+    
+    @property
+    def args(self) -> list:
+        """A list of the CLI arguments that were unparsed, and passed through for further processing, if necessary."""
+        return self._args
+
+# Helper API
+def process_command(args: list,
+                    cwd: pathlib.Path = None,
+                    env: os._Environ = None) -> int:
+    """
+    Wrapper for subprocess.Popen, which handles polling the process for logs, reacting to failure, and cleaning up the process.
+    :param args: A list of space separated strings which build up the entire command to run. Similar to the command list of subprocess.Popen
+    :param cwd: (Optional) The desired current working directory of the command. Useful for commands which require a differing starting environment.
+    :param env: (Optional) Environment to use when processing this command.
+    :return the exit code of the program that is run or 1 if no arguments were supplied
+    """
+    if len(args) == 0:
+        logging.error("function `process_command` must be supplied a non-empty list of arguments")
+        return 1
+    return utils.CLICommand(args, cwd, logging.getLogger(), env=env).run()
+
+
+def execute_python_script(target_script_path: pathlib.Path or str, o3de_context: O3DEScriptExportContext = None) -> int:
+    """
+    Execute a new python script, using new or existing O3DEScriptExportContexts to streamline communication between multiple scripts
+    :param target_script_path: The path to the python script to run.
+    :param o3de_context: An O3DEScriptExportContext object that contains necessary data to run the target script. The target script can also write to this context to pass back to its caller.
+    :return: return code upon success or failure
+    """
+    # Prepare import paths for script ease of use
+    # Allow for imports from calling script and the target script's local directory
+    utils.prepend_to_system_path(pathlib.Path(__file__))
+    utils.prepend_to_system_path(target_script_path)
+
+    logging.info(f"Begin loading script '{target_script_path}'...")
+    
+    return utils.load_and_execute_script(target_script_path, o3de_context = o3de_context, o3de_logger=logging.getLogger())
+
+
+def _export_script(export_script_path: pathlib.Path, project_path: pathlib.Path, passthru_args: list) -> int:
+    if not export_script_path.is_file() or export_script_path.suffix != '.py':
+        logging.error(f"Export script path unrecognized: '{export_script_path}'. Please provide a file path to an existing python script with '.py' extension.")
+        return 1
+
+    computed_project_path = utils.get_project_path_from_file(export_script_path, project_path)
+
+    if not computed_project_path:
+        if project_path:
+            logging.error(f"Project path '{project_path}' is invalid: does not contain a project.json file.")
+        else:
+            logging.error(f"Unable to find project folder associated with file '{export_script_path}'. Please specify using --project-path, or ensure the file is inside a project folder.")
+        return 1
+    
+    o3de_context = O3DEScriptExportContext(export_script_path= export_script_path,
+                                        project_path = computed_project_path,
+                                        engine_path = manifest.get_project_engine_path(computed_project_path),
+                                        args = passthru_args)
+
+    return execute_python_script(export_script_path, o3de_context)
+
+# Export Script entry point
+def _run_export_script(args: argparse, passthru_args: list) -> int:
+    logging.basicConfig(format=utils.LOG_FORMAT)
+    logging.getLogger().setLevel(args.log_level)
+    
+    return _export_script(args.export_script, args.project_path, passthru_args)
+
+
+# Argument handling
+def add_parser_args(parser) -> None:
+    parser.add_argument('-es', '--export-script', type=pathlib.Path, required=True, help="An external Python script to run")
+    parser.add_argument('-pp', '--project-path', type=pathlib.Path, required=False,
+                        help="Project to export. If not supplied, it will be inferred by the export script.")
+    
+    parser.add_argument('-ll', '--log-level', default='ERROR',
+                        choices=['DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL'],
+                        help="Set the log level")
+    
+    parser.set_defaults(func=_run_export_script, accepts_partial_args=True)
+    
+
+def add_args(subparsers) -> None:
+    export_subparser = subparsers.add_parser('export-project')
+    add_parser_args(export_subparser)
+

+ 195 - 5
scripts/o3de/o3de/utils.py

@@ -9,19 +9,24 @@
 This file contains utility functions
 """
 import argparse
-import sys
-import uuid
+import importlib.util
+import logging
 import os
 import pathlib
+import psutil
+import re
 import shutil
+import subprocess
+import sys
 import urllib.request
-import logging
+import uuid
 import zipfile
-import re
 from packaging.version import Version
 from packaging.specifiers import SpecifierSet
 
-from o3de import gitproviderinterface, github_utils
+from o3de import gitproviderinterface, github_utils, validation as valid
+from subprocess import Popen, PIPE
+from typing import List, Tuple
 
 LOG_FORMAT = '[%(levelname)s] %(name)s: %(message)s'
 
@@ -60,6 +65,139 @@ class VerbosityAction(argparse.Action):
         elif count == 1:
             log.setLevel(logging.INFO)
 
+class CLICommand(object):
+    """
+    CLICommand is an interface for storing CLI commands as list of string arguments to run later in a script.
+    A current working directory, pre-existing OS environment, and desired logger can also be specified.
+    To execute a command, use the run() function.
+    This class is responsible for starting a new process, polling it for updates and logging, and safely terminating it.
+    """
+    def __init__(self, 
+                args: list,
+                cwd: pathlib.Path,
+                logger: logging.Logger,
+                env: os._Environ=None) -> None:
+        self.args = args
+        self.cwd = cwd
+        self.env = env
+        self.logger = logger
+        self._stdout_lines = []
+        self._stderr_lines = []
+    
+    @property
+    def stdout_lines(self) -> List[str]:
+        """The result of stdout, separated by newlines."""
+        return self._stdout_lines
+
+    @property
+    def stdout(self) -> str:
+        """The result of stdout, as a single string."""
+        return "\n".join(self._stdout_lines)
+
+    @property
+    def stderr_lines(self) -> List[str]:
+        """The result of stderr, separated by newlines."""
+        return self._stderr_lines
+
+    @property
+    def stderr(self) -> str:
+        """The result of stderr, as a single string."""
+        return "\n".join(self._stderr_lines)
+
+    
+    def _poll_process(self, process) -> None:
+        # while process is not done, read any log lines coming from subprocess
+        while process.poll() is None:
+            line = process.stdout.readline()
+            if not line: break
+
+            log_line = line.decode('utf-8', 'ignore')
+            self._stdout_lines.append(log_line)
+            self.logger.info(log_line)
+    
+    def _cleanup_process(self, process) -> str:
+        # flush remaining log lines
+        log_lines = process.stdout.read().decode('utf-8', 'ignore')
+        self._stdout_lines += log_lines.split('\n')
+        self.logger.info(log_lines)
+        stderr = process.stderr.read()
+
+        safe_kill_processes(process, process_logger = self.logger)
+
+        return stderr
+    
+    def run(self) -> int:
+        """
+        Takes the arguments specified during CLICommand initialization, and opens a new subprocess to handle it.
+        This function automatically manages polling the process for logs, error reporting, and safely cleaning up the process afterwards.
+        :return return code on success or failure 
+        """
+        ret = 1
+        try:
+            with Popen(self.args, cwd=self.cwd, env=self.env, stdout=PIPE, stderr=PIPE) as process:
+                self.logger.info(f"Running process '{self.args[0]}' with PID({process.pid}): {self.args}")
+                
+                self._poll_process(process)
+                stderr = self._cleanup_process(process)
+
+                ret = process.returncode
+
+                # print out errors if there are any      
+                if stderr:
+                    # bool(ret) --> if the process returns a FAILURE code (>0)
+                    logger_func = self.logger.error if bool(ret) else self.logger.warning
+                    err_txt = stderr.decode('utf-8', 'ignore')
+                    logger_func(err_txt)
+                    self._stderr_lines = err_txt.split("\n")
+        except Exception as err:
+            self.logger.error(err)
+            raise err
+        return ret
+
+
+# Per Python documentation, only strings should be inserted into sys.path
+# https://docs.python.org/3/library/sys.html#sys.path
+def prepend_to_system_path(file_path: pathlib.Path or str) -> None:
+    """
+    Prepend the running script's imported system module paths. Useful for loading scripts in a foreign directory
+    :param path: The file path of the desired script to load
+    :return: None
+    """
+    if isinstance(file_path, str):
+        file_path = pathlib.Path(file_path)
+
+    folder_path = file_path if file_path.is_dir() else file_path.parent
+    if str(folder_path) not in sys.path:
+        sys.path.insert(0, str(folder_path))
+
+def load_and_execute_script(script_path: pathlib.Path or str, **context_variables) -> int:
+    """
+    For a given python script, use importlib to load the script spec and module to execute it later
+    :param script_path: The path to the python script to run
+    :param context_variables: A series of keyword arguments which specify the context for the script before it is run.
+    :return: return code indicating succes or failure of script
+    """
+    
+    if isinstance(script_path, str):
+        script_path = pathlib.Path(script_path)
+    
+    # load the target script as a module, set the context, and then execute
+    script_name = script_path.name
+    spec = importlib.util.spec_from_file_location(script_name, script_path)
+    script_module = importlib.util.module_from_spec(spec)
+    sys.modules[script_name] = script_module
+
+    # inject the script module with relevant context variables
+    for key, value in context_variables.items():
+        setattr(script_module, key, value)
+
+    try:
+        spec.loader.exec_module(script_module)
+    except Exception as err:
+        logger.error(f"Failed to run script '{script_path}'. Here is the stacktrace: ", exc_info=True)
+        return 1
+
+    return 0
 
 def add_verbosity_arg(parser: argparse.ArgumentParser) -> None:
     """
@@ -348,6 +486,21 @@ def find_ancestor_dir_containing_file(target_file_name: pathlib.PurePath, start_
     ancestor_file = find_ancestor_file(target_file_name, start_path, max_scan_up_range)
     return ancestor_file.parent if ancestor_file else None
 
+def get_project_path_from_file(target_file_path: pathlib.Path, supplied_project_path: pathlib.Path = None) -> pathlib.Path or None:
+    """
+    Based on a file supplied by the user, and optionally a differing project path, determine and validate a proper project path, if any.
+    :param target_file_path: A user supplied file path
+    :param supplied_project_path: (Optional) If the target file is different from the project path, the user may also specify this path as well.
+    :return: A valid project path, or None
+    """
+    project_path = supplied_project_path
+    if not project_path:
+        project_path = find_ancestor_dir_containing_file(pathlib.PurePath('project.json'), target_file_path)
+    
+    if not project_path or not valid.valid_o3de_project_json(project_path / 'project.json'):
+        return None
+
+    return project_path
 
 def get_gem_names_set(gems: list, include_optional:bool = True) -> set:
     """
@@ -554,3 +707,40 @@ def replace_dict_keys_with_value_key(input:dict, value_key:str, replaced_key_nam
             input[value[value_key]] = entries
         else:
             input[value[value_key]] = value
+
+def safe_kill_processes(*processes: List[Popen], process_logger: logging.Logger = None) -> None:
+    """
+    Kills a given process without raising an error
+    :param processes: An iterable of processes to kill
+    :param process_logger: (Optional) logger to use
+    """
+    def on_terminate(proc) -> None:
+        try:
+            process_logger.info(f"process '{proc.args[0]}' with PID({proc.pid}) terminated with exit code {proc.returncode}")
+        except psutil.AccessDenied:
+            process_logger.warning("Termination failed, Access Denied with stacktrace:", exc_info=True)
+        except psutil.NoSuchProcess:
+            process_logger.warning("Termination request ignored, process was already terminated during iteration with stacktrace:", exc_info=True)
+
+    if not process_logger:
+        process_logger = logger
+    
+    for proc in processes:
+        try:
+            process_logger.info(f"Terminating process '{proc.args[0]}' with PID({proc.pid})")
+            proc.kill()
+        except psutil.AccessDenied:
+            process_logger.warning("Termination failed, Access Denied with stacktrace:", exc_info=True)
+        except psutil.NoSuchProcess:
+            process_logger.warning("Termination request ignored, process was already terminated during iteration with stacktrace:", exc_info=True)
+        except Exception:  # purposefully broad
+            process_logger.error("Unexpected exception ignored while terminating process, with stacktrace:", exc_info=True)
+    try:
+        psutil.wait_procs(processes, timeout=30, callback=on_terminate)
+    except psutil.AccessDenied:
+        process_logger.warning("Termination failed, Access Denied with stacktrace:", exc_info=True)
+    except psutil.NoSuchProcess:
+        process_logger.warning("Termination request ignored, process was already terminated during iteration with stacktrace:", exc_info=True)
+    except Exception:  # purposefully broad
+        process_logger.error("Unexpected exception while waiting for processes to terminate, with stacktrace:", exc_info=True)
+

+ 15 - 0
scripts/o3de/tests/CMakeLists.txt

@@ -101,3 +101,18 @@ ly_add_pytest(
     TEST_SUITE smoke
     EXCLUDE_TEST_RUN_TARGET_FROM_IDE
 )
+
+ly_add_pytest(
+    NAME o3de_export_project
+    PATH ${CMAKE_CURRENT_LIST_DIR}/test_export_project.py
+    TEST_SUITE smoke
+    EXCLUDE_TEST_RUN_TARGET_FROM_IDE
+)
+
+ly_add_pytest(
+    NAME o3de_utils
+    PATH ${CMAKE_CURRENT_LIST_DIR}/test_utils.py
+    TEST_SUITE smoke
+    EXCLUDE_TEST_RUN_TARGET_FROM_IDE
+)
+

+ 146 - 0
scripts/o3de/tests/test_export_project.py

@@ -0,0 +1,146 @@
+#
+# Copyright (c) Contributors to the Open 3D Engine Project.
+# For complete copyright and license terms please see the LICENSE at the root of this distribution.
+#
+# SPDX-License-Identifier: Apache-2.0 OR MIT
+#
+#
+
+import pytest
+import pathlib
+import unittest.mock as mock
+from unittest.mock import patch
+from o3de.export_project import _export_script, process_command
+
+TEST_PROJECT_JSON_PAYLOAD = '''
+{
+    "project_name": "TestProject",
+    "project_id": "{24114e69-306d-4de6-b3b4-4cb1a3eca58e}",
+    "version" : "0.0.0",
+    "compatible_engines" : [
+        "o3de-sdk==2205.01"
+    ],
+    "engine_api_dependencies" : [
+        "framework==1.2.3"
+    ],
+    "origin": "The primary repo for TestProject goes here: i.e. http://www.mydomain.com",
+    "license": "What license TestProject uses goes here: i.e. https://opensource.org/licenses/MIT",
+    "display_name": "TestProject",
+    "summary": "A short description of TestProject.",
+    "canonical_tags": [
+        "Project"
+    ],
+    "user_tags": [
+        "TestProject"
+    ],
+    "icon_path": "preview.png",
+    "engine": "o3de-install",
+    "restricted_name": "projects",
+    "external_subdirectories": [
+        "D:/TestGem"
+    ]
+}
+'''
+
+# Note: the underlying command logic is found in CLICommand class object. That is tested in test_utils.py
[email protected]("args, expected_result",[
+    pytest.param(["cmake", "--version"], 0),
+    pytest.param(["cmake"], 0),
+    pytest.param(["cmake", "-B"], 1),
+    pytest.param([], 1),
+])
+def test_process_command(args, expected_result):
+
+    cli_command = mock.Mock()
+    cli_command.run.return_value = expected_result
+
+    with patch("o3de.utils.CLICommand", return_value=cli_command) as cli:
+        result = process_command(args)
+        assert result == expected_result
+
+
+
+# The following functions will do integration tests of _export_script and execute_python_script, thereby testing all of script execution
+TEST_PYTHON_SCRIPT = """
+import pathlib
+folder = pathlib.Path(__file__).parent
+with open(folder / "test_output.txt", 'w') as test_file:
+    test_file.write(f"This is a test for the following: {o3de_context.args[0]}")
+    """
+
+def check_for_o3de_context_arg(output_file_text, args):
+    if len(args) > 0:
+        assert output_file_text == f"This is a test for the following: {args[0]}"
+
+TEST_ERR_PYTHON_SCRIPT = """
+import pathlib
+raise RuntimeError("Test export RuntimeError")
+print("hi there")
+    """
+
[email protected]("input_script, args, should_pass_project_folder, project_folder_subpath, script_folder_subpath, output_filename, is_expecting_error, output_evaluation_func, expected_result", [
+    # TEST_PYTHON_SCRIPT
+    # successful cases
+    pytest.param(TEST_PYTHON_SCRIPT, ['456'], False, pathlib.PurePath("test_project"), pathlib.PurePath("test_project/ExportScripts"), pathlib.PurePath("test_output.txt"), False, check_for_o3de_context_arg, 0),
+    pytest.param(TEST_PYTHON_SCRIPT, ['456'], True, pathlib.PurePath("test_project"), pathlib.PurePath("test_project/ExportScripts"), pathlib.PurePath("test_output.txt"), False, check_for_o3de_context_arg, 0),
+    pytest.param(TEST_PYTHON_SCRIPT, ['456'], True, pathlib.PurePath("test_project"), pathlib.PurePath("export_scripts"), pathlib.PurePath("test_output.txt"), False, check_for_o3de_context_arg, 0),
+    pytest.param(TEST_PYTHON_SCRIPT, [456], True, pathlib.PurePath("test_project"), pathlib.PurePath("export_scripts"), pathlib.PurePath("test_output.txt"), False, check_for_o3de_context_arg, 0),
+    # failure cases
+    pytest.param(TEST_PYTHON_SCRIPT, [], True, pathlib.PurePath("test_project"), pathlib.PurePath("export_scripts"), pathlib.PurePath("test_output.txt"), False, check_for_o3de_context_arg, 1),
+    pytest.param(TEST_PYTHON_SCRIPT, [456], False, pathlib.PurePath("test_project"), pathlib.PurePath("export_scripts"), pathlib.PurePath("test_output.txt"), False, check_for_o3de_context_arg, 1),
+    # TEST_ERR_PYTHON_SCRIPT
+    pytest.param(TEST_ERR_PYTHON_SCRIPT, [], True, pathlib.PurePath("test_project"), pathlib.PurePath("export_scripts"), None, True, None, 1)
+])
+def test_export_script(tmp_path,
+                       input_script, 
+                       args, 
+                       should_pass_project_folder, 
+                       project_folder_subpath, 
+                       script_folder_subpath, 
+                       output_filename, 
+                       is_expecting_error, 
+                       output_evaluation_func,
+                       expected_result):
+    import sys
+
+    project_folder = tmp_path / project_folder_subpath
+    project_folder.mkdir()
+
+    script_folder = tmp_path / script_folder_subpath
+    script_folder.mkdir()
+
+
+    project_json = project_folder / "project.json"
+    project_json.write_text(TEST_PROJECT_JSON_PAYLOAD)
+
+
+    test_script = script_folder / "test.py"
+    test_script.write_text(input_script)
+
+    if output_filename:
+        test_output = script_folder / output_filename
+
+        assert not test_output.is_file()
+    
+    result = _export_script(test_script, project_folder if should_pass_project_folder else None, args)
+
+    assert result == expected_result
+
+
+    # only check for these if we're simulating a successful case
+    if result == 0 and not is_expecting_error:
+        assert str(script_folder) in sys.path
+
+        if output_filename:
+            assert test_output.is_file()
+
+            with test_output.open('r') as t_out:
+                output_text = t_out.read()
+        
+            if output_evaluation_func:
+                output_evaluation_func(output_text, args)
+
+        o3de_cli_folder = pathlib.Path(__file__).parent.parent / "o3de"
+
+        assert o3de_cli_folder in [pathlib.Path(sysPath) for sysPath in sys.path]
+

+ 133 - 1
scripts/o3de/tests/test_utils.py

@@ -7,10 +7,15 @@
 #
 
 import pytest
+import pathlib
+import psutil
+# import subprocess
+import logging
+import unittest.mock as mock
+from unittest.mock import patch
 
 from o3de import utils
 
-
 @pytest.mark.parametrize(
     "value, expected_result", [
         pytest.param('Game1', True),
@@ -66,3 +71,130 @@ def test_remove_gem_duplicates(in_list, out_list):
 def test_get_gem_names_set(gems, include_optional, expected_result):
     result = utils.get_gem_names_set(gems=gems, include_optional=include_optional)
     assert result == set(expected_result)
+
[email protected]("args, expected_return_code", [
+    pytest.param(['cmake', '--version'], 0),
+    pytest.param(['cmake'], 0),
+    pytest.param(['cmake', '-B'], 1)
+])
+def test_cli_command(args, expected_return_code):
+    process = mock.Mock()
+    process.returncode = expected_return_code
+    process.poll.return_value = expected_return_code
+    with patch("subprocess.Popen", return_value = process) as subproc_popen_patch:
+        new_command = utils.CLICommand(args, None, logging.getLogger())
+        result = new_command.run()
+        assert result == expected_return_code
+
[email protected]("test_path", [
+    pytest.param(__file__),
+    pytest.param(pathlib.Path(__file__)),
+    pytest.param(pathlib.Path(__file__).parent)
+])
+def test_prepend_to_system_path(test_path):
+    with patch("sys.path") as path_patch:
+        utils.prepend_to_system_path(test_path)
+        assert path_patch.insert.called
+        result_path = pathlib.Path(path_patch.insert.call_args.args[1])
+        assert result_path.is_dir()
+
+        if isinstance(test_path, str):
+            test_path = pathlib.Path(test_path)
+
+        if test_path.is_dir():
+            assert test_path == result_path    
+        else:
+            assert test_path.parent == result_path
+
+
[email protected]("input_file, supplied_project_path, ancestor_path, can_validate_project_path, expected_path",[
+    # successful cases
+    pytest.param(pathlib.Path(__file__), None, pathlib.Path(__file__).parent, True,  pathlib.Path(__file__).parent),
+    pytest.param(pathlib.Path(__file__), pathlib.Path(__file__).parent, None, True,  pathlib.Path(__file__).parent),
+    pytest.param(pathlib.Path(__file__), pathlib.Path(__file__).parent, pathlib.PurePath("Somewhere/Else"), True,  pathlib.Path(__file__).parent),
+    pytest.param(pathlib.Path(__file__), pathlib.PurePath("Somewhere/Else"), None, True,  pathlib.PurePath("Somewhere/Else")),
+    
+    # failure cases
+    pytest.param(pathlib.Path(__file__), None, pathlib.Path(__file__).parent, False,  None),
+    pytest.param(pathlib.Path(__file__), None, pathlib.PurePath("Somewhere/Else"), False,  None),
+    pytest.param(pathlib.Path(__file__), pathlib.Path(__file__).parent, None, False,  None),
+    pytest.param(pathlib.Path(__file__), pathlib.Path(__file__).parent, pathlib.PurePath("Somewhere/Else"), False,  None),
+    pytest.param(pathlib.Path(__file__), None, None, True,  None),
+    pytest.param(pathlib.Path(__file__), None, None, False,  None),
+])
+def test_get_project_path_from_file(input_file, supplied_project_path, ancestor_path, can_validate_project_path, expected_path):
+
+    with patch("o3de.validation.valid_o3de_project_json", return_value = can_validate_project_path), \
+        patch("o3de.utils.find_ancestor_dir_containing_file", return_value = ancestor_path):
+
+        result_path = utils.get_project_path_from_file(input_file, supplied_project_path)
+        assert result_path == expected_path
+
[email protected]("input_script_path, context_vars_dict, raisedException, expected_result", [
+    # successful cases
+    pytest.param(__file__, {}, None, 0),
+    pytest.param(pathlib.Path(__file__), {}, None, 0),
+    pytest.param(__file__, {"test":"value", "key":12}, None, 0),
+    pytest.param(pathlib.Path(__file__), {"test":"value", "key":12}, None, 0),
+    
+    # failure cases
+    pytest.param(__file__, {}, RuntimeError, 1),
+    pytest.param(pathlib.Path(__file__), {}, RuntimeError, 1),
+    pytest.param(__file__, {"test":"value", "key":12}, RuntimeError, 1),
+    pytest.param(pathlib.Path(__file__), {"test":"value", "key":12}, RuntimeError, 1),
+])
+def test_load_and_execute_script(input_script_path, context_vars_dict, raisedException, expected_result):
+    def mock_error():
+        if raisedException:
+            raise raisedException
+
+    mock_spec = mock.Mock()
+    mock_spec.loader.exec_module.return_value = None
+    if raisedException:
+        mock_spec.loader.exec_module.side_effect = mock_error
+
+    mock_module = mock.Mock()
+
+    with patch("importlib.util.spec_from_file_location", return_value = mock_spec) as spec_from_file_patch,\
+        patch("importlib.util.module_from_spec", return_value = mock_module) as module_from_spec_patch,\
+        patch("sys.modules") as sys_modules_patch:
+
+        result = utils.load_and_execute_script(input_script_path, **context_vars_dict)
+
+        for key, value in context_vars_dict.items():
+            assert hasattr(mock_module, key)
+            assert getattr(mock_module, key) == value
+
+        assert result == expected_result
+
+
+
[email protected]("process_obj, raisedException",[
+    # the successful case
+    pytest.param({"args":['cmake', '--version'], "pid":0}, None),
+    # these raise exceptions, but safe_kill_processes should intercept and log instead
+    pytest.param({"args":['cmake', '--version'], "pid":0}, psutil.AccessDenied),
+    pytest.param({"args":['cmake', '--version'], "pid":0}, psutil.NoSuchProcess),
+    pytest.param({"args":['cmake', '--version'], "pid":0}, RuntimeError)
+])
+def test_safe_kill_processes(process_obj, raisedException):
+    exceptionCaught = False
+    def mock_kill():
+        nonlocal exceptionCaught
+        if raisedException:
+            exceptionCaught = True
+            raise raisedException
+
+    process = mock.Mock()
+    process.configure_mock(**process_obj)
+    process.kill.side_effect = mock_kill
+
+    with patch("subprocess.Popen", return_value = process) as subproc_popen_patch:
+
+        utils.safe_kill_processes(subproc_popen_patch(process_obj["args"]))
+
+
+        assert (subproc_popen_patch.called)
+        assert (process.kill.called)
+        assert (not raisedException or exceptionCaught)
+