3
0

file_system.py 18 KB


  1. """
  2. Copyright (c) Contributors to the Open 3D Engine Project.
  3. For complete copyright and license terms please see the LICENSE at the root of this distribution.
  4. SPDX-License-Identifier: Apache-2.0 OR MIT
  5. File system related functions.
  6. """
  7. import errno
  8. import glob
  9. import logging
  10. import os
  11. import psutil
  12. import shutil
  13. import stat
  14. import sys
  15. import tarfile
  16. import time
  17. import zipfile
  18. import ly_test_tools.environment.process_utils as process_utils
  19. logger = logging.getLogger(__name__)
  20. ONE_KIB = 1024
  21. ONE_MIB = 1024 * ONE_KIB
  22. ONE_GIB = 1024 * ONE_MIB
  23. def check_free_space(dest, required_space, msg):
  24. """ Make sure the required space is available on destination, raising an IOError if there is not. """
  25. free_space = psutil.disk_usage(dest).free
  26. if free_space < required_space:
  27. raise IOError(
  28. errno.ENOSPC,
  29. f'{msg} {free_space / ONE_GIB:.2f} GiB '
  30. f'vs {required_space / ONE_GIB:.2f} GiB')
  31. def safe_makedirs(dest_path):
  32. """ This allows an OSError in the case the directory cannot be created, which is logged but does not propagate."""
  33. try:
  34. logger.info(f'Creating directory "{dest_path}"')
  35. os.makedirs(dest_path, exist_ok=True)
  36. except OSError as e:
  37. if e.errno == errno.EEXIST:
  38. pass
  39. elif e.errno == errno.EACCES and sys.platform == 'win32' and dest_path.endswith(':\\'):
  40. # In this case, windows will raise EACCES instead of EEXIST if you try to make a directory at the root.
  41. pass
  42. else:
  43. logger.info(f'Could not create directory: "{dest_path}".')
  44. raise
  45. def get_newest_file_in_dir(path, exts):
  46. """ Find the newest file in a directory, matching the extensions provided. """
  47. dir_iter = []
  48. for ext in exts:
  49. dir_iter.extend(glob.iglob(os.path.join(path, ext)))
  50. try:
  51. return max(dir_iter, key=os.path.getctime)
  52. except ValueError:
  53. # May not be any files in that directory.
  54. return None
  55. def remove_path_and_extension(src):
  56. """
  57. Given a src, will strip off the path and the extension. Used in unzip and untgz
  58. Example:
  59. C:\\packages\\lumberyard-XXXX.zip would become lumberyard-XXXX
  60. """
  61. src_name = os.path.basename(src)
  62. src_no_extension, _ = os.path.splitext(src_name)
  63. return src_no_extension
  64. def set_up_decompression(full_size, dest, src, force, allow_exists=False):
  65. """
  66. Used in unzip and untgz, will check whether the dest has enough space and creates the new build path.
  67. :param full_size: Size of zipped package
  68. :param dest: Target unzip location
  69. :param src: Location of the zipped package
  70. :param force: Boolean determining whether to overwrite the build if it already exists
  71. :param allow_exists: Boolean determining whether to log critical if the build already exists
  72. :return: A tuple containing the unzipped build path and a bool determining whether the build already exists.
  73. """
  74. exists = False
  75. # Check free space leaving at least a GiB free.
  76. check_free_space(dest, full_size + ONE_GIB, 'Not enough space to safely extract: ')
  77. dst_path = os.path.join(dest, remove_path_and_extension(src))
  78. # Cannot easily compare the zip contents to existing dir. Assumes builds of the same name are identical.
  79. if os.path.exists(dst_path) and not force:
  80. exists = True
  81. # Only log critical if the user wants early termination of the command if the build exists
  82. if allow_exists:
  83. level = logging.getLevelName('INFO')
  84. else:
  85. level = logging.getLevelName('CRITICAL')
  86. logger.log(level, f'Found existing {dst_path}. Will not overwrite.')
  87. return dst_path, exists
  88. return dst_path, exists
  89. def unzip(dest, src, force=False, allow_exists=False):
  90. """
  91. decompress src_path\\name.zip to the dest directory in a subdirectory called name.
  92. Will strip assets names for lumberyard builds.
  93. Example:
  94. dest = D:\\builds
  95. src = C:\\packages\\lumberyard-XXXX.zip
  96. Result:
  97. C:\\packages\\lumberyard-XXXX.zip decompressed to D:\\builds\\lumberyard-XXXX
  98. src can be any file, but lumberyard asset builds will have their name shortened to match the build they belong to.
  99. """
  100. with zipfile.ZipFile(src, 'r') as zip_file:
  101. full_size = sum(info.file_size for info in zip_file.infolist())
  102. dst_path, exists = set_up_decompression(full_size, dest, src, force, allow_exists)
  103. if exists:
  104. return dst_path
  105. # Unzip and return final path.
  106. start_time = time.time()
  107. zip_file.extractall(dst_path)
  108. secs = time.time() - start_time
  109. if secs == 0:
  110. secs = 0.01
  111. logger.info(
  112. f'Extracted {full_size / ONE_GIB:.2f} GiB '
  113. f'from "{src}" to "{dst_path}" in '
  114. f'{secs / 60:2.2f} minutes, '
  115. f'at {(full_size / ONE_MIB) / secs:.2f} MiB/s.')
  116. return dst_path
  117. def untgz(dest, src, exact_tgz_size=False, force=False, allow_exists=False):
  118. """
  119. decompress src_path\\name.tgz to the dest directory in a subdirectoy called name.
  120. Will strip assets names for lumberyard builds.
  121. Example:
  122. dest = D:\\builds
  123. src = C:\\packages\\lumberyard-XXXX.tgz
  124. Result:
  125. C:\\packages\\lumberyard-XXXX.tgz decompressed to D:\\builds\\lumberyard-XXXX
  126. src can be any file, but lumberyard asset builds will have their name shortened to match the build they belong to.
  127. """
  128. with tarfile.open(src) as tar_file:
  129. # Determine exact size of tar if instructed, otherwise estimate.
  130. if exact_tgz_size:
  131. full_size = 0
  132. for tarinfo in tar_file:
  133. full_size += tarinfo.size
  134. else:
  135. full_size = os.stat(src).st_size * 4.5
  136. dst_path, exists = set_up_decompression(full_size, dest, src, force, allow_exists)
  137. if exists:
  138. return dst_path
  139. # Extract it and return final path.
  140. start_time = time.time()
  141. def is_within_directory(directory, target):
  142. abs_directory = os.path.abspath(directory)
  143. abs_target = os.path.abspath(target)
  144. prefix = os.path.commonprefix([abs_directory, abs_target])
  145. return prefix == abs_directory
  146. def safe_extract(tar, path=".", members=None, *, numeric_owner=False):
  147. for member in tar.getmembers():
  148. member_path = os.path.join(path, member.name)
  149. if not is_within_directory(path, member_path):
  150. raise Exception("Attempted Path Traversal in Tar File")
  151. tar.extractall(path, members, numeric_owner=numeric_owner)
  152. safe_extract(tar_file, dst_path)
  153. secs = time.time() - start_time
  154. if secs == 0:
  155. secs = 0.01
  156. logger.info(
  157. f'Extracted {full_size / ONE_GIB:.2f} MiB '
  158. f'from {src} to {dst_path} '
  159. f'in {secs / 60:2.2f} minutes, '
  160. f'at {(full_size / ONE_MIB) / secs:.2f} MiB/s.')
  161. return dst_path
  162. def change_permissions(path_list, perms):
  163. """ Changes the permissions of the files and folders defined in the file list """
  164. try:
  165. for root, dirs, files in os.walk(path_list):
  166. for dir_name in dirs:
  167. os.chmod(os.path.join(root, dir_name), perms)
  168. for file_name in files:
  169. os.chmod(os.path.join(root, file_name), perms)
  170. except OSError as e:
  171. logger.warning(f"Couldn't change permission : Error: {e.filename} - {e.strerror}.")
  172. return False
  173. else:
  174. return True
  175. def unlock_file(file_name):
  176. """
  177. Given a file name, unlocks the file for write access.
  178. :param file_name: Path to a file
  179. :return: True if unlock succeeded, else False
  180. """
  181. if not os.access(file_name, os.W_OK):
  182. file_stat = os.stat(file_name)
  183. os.chmod(file_name, file_stat.st_mode | stat.S_IWRITE)
  184. logger.info(f'Clearing write lock for file {file_name}.')
  185. return True
  186. else:
  187. logger.info(f'File {file_name} not write locked. Unlocking file not necessary.')
  188. return False
  189. def lock_file(file_name):
  190. """
  191. Given a file name, lock write access to the file.
  192. :param file_name: Path to a file
  193. :return: True if lock succeeded, else False
  194. """
  195. if os.access(file_name, os.W_OK):
  196. file_stat = os.stat(file_name)
  197. os.chmod(file_name, file_stat.st_mode & (~stat.S_IWRITE))
  198. logger.info(f'Write locking file {file_name}')
  199. return True
  200. else:
  201. logger.info(f'File {file_name} already locked. Locking file not necessary.')
  202. return False
  203. def remove_symlink(path):
  204. try:
  205. # Rmdir can delete a symlink without following the symlink to the original content
  206. os.rmdir(path)
  207. except OSError as e:
  208. if e.errno != errno.ENOTEMPTY:
  209. raise
  210. def remove_symlinks(path, remove_root=False):
  211. """ Removes all symlinks at the provided path and its subdirectories. """
  212. for root, dirs, files in os.walk(path):
  213. for name in dirs:
  214. remove_symlink(os.path.join(root, name))
  215. if remove_root:
  216. remove_symlink(path)
  217. def delete(file_list, del_files, del_dirs):
  218. """
  219. Given a list of directory paths, delete will remove all subdirectories and files based on which flag is set,
  220. del_files or del_dirs.
  221. :param file_list: A string or an array of artifact paths to delete
  222. :param del_files: True if delete should delete files
  223. :param del_dirs: True if delete should delete directories
  224. :return: True if delete was successful
  225. """
  226. if isinstance(file_list, str):
  227. file_list = [file_list]
  228. for file_to_delete in file_list:
  229. logger.info(f'Deleting "{file_to_delete}"')
  230. try:
  231. if del_dirs and os.path.isdir(file_to_delete):
  232. change_permissions(file_to_delete, 0o777)
  233. # Remove all symlinks before rmtree blows them away
  234. remove_symlinks(file_to_delete)
  235. shutil.rmtree(file_to_delete)
  236. elif del_files and os.path.isfile(file_to_delete):
  237. os.chmod(file_to_delete, 0o777)
  238. os.remove(file_to_delete)
  239. except OSError as e:
  240. logger.warning(f'Could not delete {e.filename} : Error: {e.strerror}.')
  241. return False
  242. return True
  243. def rename(src: str | bytes, dst: str | bytes) -> bool:
  244. """
  245. Given a file or directory path, will rename from src to dst.
  246. :param src: Full path to file to rename
  247. :param dst: Full path to renamed file
  248. Returns a boolean for success or failure of the operation.
  249. """
  250. logger.info(f"Renaming {src} to {dst}.")
  251. def _rename_helper(src: str | bytes, dst: str | bytes) -> bool:
  252. """ Helper: Change file permissions and renames the file."""
  253. try:
  254. os.chmod(src, 0o777)
  255. os.rename(src,dst)
  256. return True
  257. except OSError as e:
  258. logger.error(f"Could not rename {e.filename} Error: {e.strerror}.")
  259. return False
  260. if not os.path.exists(src):
  261. logger.error(f"No file located at: {src}")
  262. return False
  263. if os.path.exists(dst):
  264. logger.error(f"File already exists at: {dst}")
  265. return False
  266. return _rename_helper(src, dst)
  267. def create_backup(source, backup_dir, backup_name=None):
  268. """
  269. Creates a backup of a single source file by creating a copy of it with the same name + '.bak' in backup_dir
  270. e.g.: foo.txt is stored as backup_dir/foo.txt.bak
  271. If backup_name is provided, it will create a copy of the source file named "backup_name + .bak" instead.
  272. :param source: Full path to file to backup
  273. :param backup_dir: Path to the directory to store backup.
  274. :param backup_name: [Optional] Name of the backed up file to use instead or the source name.
  275. """
  276. if not backup_dir or not os.path.isdir(backup_dir):
  277. logger.error(f'Cannot create backup due to invalid backup directory {backup_dir}')
  278. return False
  279. if not os.path.exists(source):
  280. logger.warning(f'Source file {source} does not exist, aborting backup creation.')
  281. return False
  282. dest = None
  283. if backup_name is None:
  284. source_filename = os.path.basename(source)
  285. dest = os.path.join(backup_dir, f'{source_filename}.bak')
  286. else:
  287. dest = os.path.join(backup_dir, f'{backup_name}.bak')
  288. logger.info(f'Saving backup of {source} in {dest}')
  289. if os.path.exists(dest):
  290. logger.warning(f'Backup file already exists at {dest}, it will be overwritten.')
  291. try:
  292. shutil.copy2(source, dest)
  293. except Exception: # intentionally broad
  294. logger.warning('Could not create backup, exception occurred while copying.', exc_info=True)
  295. return False
  296. return True
  297. def restore_backup(original_file, backup_dir, backup_name=None):
  298. """
  299. Restores a backup file to its original location. Works with a single file only.
  300. :param original_file: Full path to file to overwrite.
  301. :param backup_dir: Path to the directory storing the backup.
  302. :param backup_name: [Optional] Provide if the backup file name is different from source. eg backup file = myFile_1.txt.bak original file = myfile.txt
  303. """
  304. if not backup_dir or not os.path.isdir(backup_dir):
  305. logger.error(f'Cannot restore backup due to invalid or nonexistent directory {backup_dir}.')
  306. return False
  307. backup = None
  308. if backup_name is None:
  309. source_filename = os.path.basename(original_file)
  310. backup = os.path.join(backup_dir, f'{source_filename}.bak')
  311. else:
  312. backup = os.path.join(backup_dir, f'{backup_name}.bak')
  313. if not os.path.exists(backup):
  314. logger.warning(f'Backup file {backup} does not exist, aborting backup restoration.')
  315. return False
  316. logger.info(f'Restoring backup of {original_file} from {backup}')
  317. try:
  318. shutil.copy2(backup, original_file)
  319. except Exception: # intentionally broad
  320. logger.warning('Could not restore backup, exception occurred while copying.', exc_info=True)
  321. return False
  322. return True
  323. def delete_oldest(path_glob, keep_num, del_files=True, del_dirs=False):
  324. """ Delete oldest builds, keeping a specific number """
  325. logger.info(
  326. f'Deleting dirs: {del_dirs} files: {del_files} "{path_glob}", keeping {keep_num}')
  327. paths = glob.iglob(path_glob)
  328. paths = sorted(paths, key=lambda fi: os.path.getctime(fi), reverse=True)
  329. return delete(paths[keep_num:], del_files, del_dirs)
  330. def make_junction(dst, src):
  331. """Create a directory junction on Windows or a hardlink on macOS."""
  332. if not os.path.isdir(src):
  333. raise IOError(f"{src} is not a directory")
  334. elif sys.platform == 'win32':
  335. process_utils.check_output(["mklink", "/J", dst, src], shell=True)
  336. elif sys.platform == 'darwin':
  337. process_utils.check_output(["ln", dst, src])
  338. else:
  339. raise IOError(f"Unsupported operating system: {sys.platform}")
  340. def split_path_where_exists(path):
  341. """
  342. Splits a path into 2 parts: the part that exists and the part that doesn't.
  343. :param path: the path to split
  344. :return: a tuple (exists_part, remainder) where exists_part is the part that exists and remainder is the part that
  345. doesn't. Either part may be None.
  346. """
  347. current = path
  348. remainder = None
  349. while True:
  350. if os.path.exists(current):
  351. return current, remainder
  352. next_, tail = os.path.split(current)
  353. tail = tail or next_
  354. remainder = tail if remainder is None else os.path.join(tail, remainder)
  355. if next_ == current:
  356. break
  357. current = next_
  358. return None, remainder
  359. def sanitize_file_name(file_name):
  360. """
  361. Replaces unsupported file name characters with a double underscore
  362. :param file_name: The target file name to sanitize
  363. :return: The sanitized name
  364. """
  365. return ''.join(
  366. '__' if c in ['\\', '/', ' ', ':', '*', '<', '>', '"', '|', '?'] + [chr(i) for i in range(32)] else c for c
  367. in file_name)
  368. def reduce_file_name_length(file_name, max_length):
  369. """
  370. Reduces the length of the string file_name to match the length parameter.
  371. :param file_name: string for the file name to reduce in length.
  372. :param max_length: the length to reduce file_name to.
  373. :return: file name string with a maximum length matching max_length.
  374. """
  375. reduce_amount = len(file_name) - max_length
  376. if len(file_name) > max_length:
  377. file_name = file_name[:-reduce_amount]
  378. return file_name
  379. def find_ancestor_file(target_file_name, start_path=os.getcwd()):
  380. """
  381. Find a file with the given name in the ancestor directories by walking up the starting path until the file is found.
  382. :param target_file_name: Name of the file to find.
  383. :param start_path: Optional path to start looking for the file.
  384. :return: Path to the file or None if not found.
  385. """
  386. current_path = os.path.abspath(start_path)
  387. candidate_path = os.path.join(current_path, target_file_name)
  388. # Limit the number of directories to traverse, to avoid infinite loop in path cycles
  389. for _ in range(15):
  390. if not os.path.exists(candidate_path):
  391. parent_path = os.path.dirname(current_path)
  392. if parent_path == current_path:
  393. # Only true when we are at the directory root, can't keep searching
  394. break
  395. candidate_path = os.path.join(parent_path, target_file_name)
  396. current_path = parent_path
  397. else:
  398. # Found the file we wanted
  399. break
  400. if not os.path.exists(candidate_path):
  401. logger.warning(f'The candidate path {candidate_path} does not exist.')
  402. return None
  403. return candidate_path