system.py 5.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163
  1. __package__ = 'archivebox'
  2. import os
  3. import shutil
  4. from json import dump
  5. from pathlib import Path
  6. from typing import Optional, Union, Set, Tuple
  7. from subprocess import run as subprocess_run
  8. from crontab import CronTab
  9. from atomicwrites import atomic_write as lib_atomic_write
  10. from .util import enforce_types, ExtendedEncoder
  11. from .config import OUTPUT_PERMISSIONS
  12. def run(*args, input=None, capture_output=True, text=False, **kwargs):
  13. """Patched of subprocess.run to fix blocking io making timeout=innefective"""
  14. if input is not None:
  15. if 'stdin' in kwargs:
  16. raise ValueError('stdin and input arguments may not both be used.')
  17. if capture_output:
  18. if ('stdout' in kwargs) or ('stderr' in kwargs):
  19. raise ValueError('stdout and stderr arguments may not be used '
  20. 'with capture_output.')
  21. return subprocess_run(*args, input=input, capture_output=capture_output, text=text, **kwargs)
  22. @enforce_types
  23. def atomic_write(path: Union[Path, str], contents: Union[dict, str, bytes], overwrite: bool=True) -> None:
  24. """Safe atomic write to filesystem by writing to temp file + atomic rename"""
  25. mode = 'wb+' if isinstance(contents, bytes) else 'w'
  26. # print('\n> Atomic Write:', mode, path, len(contents), f'overwrite={overwrite}')
  27. try:
  28. with lib_atomic_write(path, mode=mode, overwrite=overwrite) as f:
  29. if isinstance(contents, dict):
  30. dump(contents, f, indent=4, sort_keys=True, cls=ExtendedEncoder)
  31. elif isinstance(contents, (bytes, str)):
  32. f.write(contents)
  33. except OSError as e:
  34. print(f"[X] OSError: Failed to write {path} with fcntl.F_FULLFSYNC. ({e})")
  35. print(" For data integrity, ArchiveBox requires a filesystem that supports atomic writes.")
  36. print(" Filesystems and network drives that don't implement FSYNC are incompatible and require workarounds.")
  37. raise SystemExit(1)
  38. os.chmod(path, int(OUTPUT_PERMISSIONS, base=8))
  39. @enforce_types
  40. def chmod_file(path: str, cwd: str='.', permissions: str=OUTPUT_PERMISSIONS) -> None:
  41. """chmod -R <permissions> <cwd>/<path>"""
  42. root = Path(cwd) / path
  43. if not root.exists():
  44. raise Exception('Failed to chmod: {} does not exist (did the previous step fail?)'.format(path))
  45. if not root.is_dir():
  46. os.chmod(root, int(OUTPUT_PERMISSIONS, base=8))
  47. else:
  48. for subpath in Path(path).glob('**/*'):
  49. os.chmod(subpath, int(OUTPUT_PERMISSIONS, base=8))
  50. @enforce_types
  51. def copy_and_overwrite(from_path: Union[str, Path], to_path: Union[str, Path]):
  52. """copy a given file or directory to a given path, overwriting the destination"""
  53. if Path(from_path).is_dir():
  54. shutil.rmtree(to_path, ignore_errors=True)
  55. shutil.copytree(from_path, to_path)
  56. else:
  57. with open(from_path, 'rb') as src:
  58. contents = src.read()
  59. atomic_write(to_path, contents)
  60. @enforce_types
  61. def get_dir_size(path: Union[str, Path], recursive: bool=True, pattern: Optional[str]=None) -> Tuple[int, int, int]:
  62. """get the total disk size of a given directory, optionally summing up
  63. recursively and limiting to a given filter list
  64. """
  65. num_bytes, num_dirs, num_files = 0, 0, 0
  66. for entry in os.scandir(path):
  67. if (pattern is not None) and (pattern not in entry.path):
  68. continue
  69. if entry.is_dir(follow_symlinks=False):
  70. if not recursive:
  71. continue
  72. num_dirs += 1
  73. bytes_inside, dirs_inside, files_inside = get_dir_size(entry.path)
  74. num_bytes += bytes_inside
  75. num_dirs += dirs_inside
  76. num_files += files_inside
  77. else:
  78. num_bytes += entry.stat(follow_symlinks=False).st_size
  79. num_files += 1
  80. return num_bytes, num_dirs, num_files
  81. CRON_COMMENT = 'archivebox_schedule'
  82. @enforce_types
  83. def dedupe_cron_jobs(cron: CronTab) -> CronTab:
  84. deduped: Set[Tuple[str, str]] = set()
  85. for job in list(cron):
  86. unique_tuple = (str(job.slices), job.command)
  87. if unique_tuple not in deduped:
  88. deduped.add(unique_tuple)
  89. cron.remove(job)
  90. for schedule, command in deduped:
  91. job = cron.new(command=command, comment=CRON_COMMENT)
  92. job.setall(schedule)
  93. job.enable()
  94. return cron
  95. class suppress_output(object):
  96. '''
  97. A context manager for doing a "deep suppression" of stdout and stderr in
  98. Python, i.e. will suppress all print, even if the print originates in a
  99. compiled C/Fortran sub-function.
  100. This will not suppress raised exceptions, since exceptions are printed
  101. to stderr just before a script exits, and after the context manager has
  102. exited (at least, I think that is why it lets exceptions through).
  103. with suppress_stdout_stderr():
  104. rogue_function()
  105. '''
  106. def __init__(self, stdout=True, stderr=True):
  107. # Open a pair of null files
  108. # Save the actual stdout (1) and stderr (2) file descriptors.
  109. self.stdout, self.stderr = stdout, stderr
  110. if stdout:
  111. self.null_stdout = os.open(os.devnull, os.O_RDWR)
  112. self.real_stdout = os.dup(1)
  113. if stderr:
  114. self.null_stderr = os.open(os.devnull, os.O_RDWR)
  115. self.real_stderr = os.dup(2)
  116. def __enter__(self):
  117. # Assign the null pointers to stdout and stderr.
  118. if self.stdout:
  119. os.dup2(self.null_stdout, 1)
  120. if self.stderr:
  121. os.dup2(self.null_stderr, 2)
  122. def __exit__(self, *_):
  123. # Re-assign the real stdout/stderr back to (1) and (2)
  124. if self.stdout:
  125. os.dup2(self.real_stdout, 1)
  126. os.close(self.null_stdout)
  127. if self.stderr:
  128. os.dup2(self.real_stderr, 2)
  129. os.close(self.null_stderr)