system.py 8.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234
  1. __package__ = 'archivebox.misc'
  2. import os
  3. import signal
  4. import shutil
  5. from json import dump
  6. from pathlib import Path
  7. from typing import Optional, Union, Set, Tuple
  8. from subprocess import _mswindows, PIPE, Popen, CalledProcessError, CompletedProcess, TimeoutExpired
  9. from crontab import CronTab
  10. from atomicwrites import atomic_write as lib_atomic_write
  11. from archivebox.config.common import STORAGE_CONFIG
  12. from archivebox.misc.util import enforce_types, ExtendedEncoder
  13. def run(cmd, *args, input=None, capture_output=True, timeout=None, check=False, text=False, start_new_session=True, **kwargs):
  14. """Patched of subprocess.run to kill forked child subprocesses and fix blocking io making timeout=innefective
  15. Mostly copied from https://github.com/python/cpython/blob/master/Lib/subprocess.py
  16. """
  17. cmd = [str(arg) for arg in cmd]
  18. if input is not None:
  19. if kwargs.get('stdin') is not None:
  20. raise ValueError('stdin and input arguments may not both be used.')
  21. kwargs['stdin'] = PIPE
  22. if capture_output:
  23. if ('stdout' in kwargs) or ('stderr' in kwargs):
  24. raise ValueError('stdout and stderr arguments may not be used with capture_output.')
  25. kwargs['stdout'] = PIPE
  26. kwargs['stderr'] = PIPE
  27. pgid = None
  28. try:
  29. if isinstance(cmd, (list, tuple)) and cmd[0].endswith('.py'):
  30. PYTHON_BINARY = sys.executable
  31. cmd = (PYTHON_BINARY, *cmd)
  32. with Popen(cmd, *args, start_new_session=start_new_session, text=text, **kwargs) as process:
  33. pgid = os.getpgid(process.pid)
  34. try:
  35. stdout, stderr = process.communicate(input, timeout=timeout)
  36. except TimeoutExpired as exc:
  37. process.kill()
  38. if _mswindows:
  39. # Windows accumulates the output in a single blocking
  40. # read() call run on child threads, with the timeout
  41. # being done in a join() on those threads. communicate()
  42. # _after_ kill() is required to collect that and add it
  43. # to the exception.
  44. exc.stdout, exc.stderr = process.communicate()
  45. else:
  46. # POSIX _communicate already populated the output so
  47. # far into the TimeoutExpired exception.
  48. process.wait()
  49. raise
  50. except: # Including KeyboardInterrupt, communicate handled that.
  51. process.kill()
  52. # We don't call process.wait() as .__exit__ does that for us.
  53. raise
  54. retcode = process.poll()
  55. if check and retcode:
  56. raise CalledProcessError(retcode, process.args,
  57. output=stdout, stderr=stderr)
  58. finally:
  59. # force kill any straggler subprocesses that were forked from the main proc
  60. try:
  61. os.killpg(pgid, signal.SIGINT)
  62. except Exception:
  63. pass
  64. return CompletedProcess(process.args, retcode, stdout, stderr)
  65. @enforce_types
  66. def atomic_write(path: Union[Path, str], contents: Union[dict, str, bytes], overwrite: bool=True) -> None:
  67. """Safe atomic write to filesystem by writing to temp file + atomic rename"""
  68. mode = 'wb+' if isinstance(contents, bytes) else 'w'
  69. encoding = None if isinstance(contents, bytes) else 'utf-8' # enforce utf-8 on all text writes
  70. # print('\n> Atomic Write:', mode, path, len(contents), f'overwrite={overwrite}')
  71. try:
  72. with lib_atomic_write(path, mode=mode, overwrite=overwrite, encoding=encoding) as f:
  73. if isinstance(contents, dict):
  74. dump(contents, f, indent=4, sort_keys=True, cls=ExtendedEncoder)
  75. elif isinstance(contents, (bytes, str)):
  76. f.write(contents)
  77. except OSError as e:
  78. if STORAGE_CONFIG.ENFORCE_ATOMIC_WRITES:
  79. print(f"[X] OSError: Failed to write {path} with fcntl.F_FULLFSYNC. ({e})")
  80. print(" You can store the archive/ subfolder on a hard drive or network share that doesn't support support syncronous writes,")
  81. print(" but the main folder containing the index.sqlite3 and ArchiveBox.conf files must be on a filesystem that supports FSYNC.")
  82. raise SystemExit(1)
  83. # retry the write without forcing FSYNC (aka atomic mode)
  84. with open(path, mode=mode, encoding=encoding) as f:
  85. if isinstance(contents, dict):
  86. dump(contents, f, indent=4, sort_keys=True, cls=ExtendedEncoder)
  87. elif isinstance(contents, (bytes, str)):
  88. f.write(contents)
  89. # set file permissions
  90. os.chmod(path, int(STORAGE_CONFIG.OUTPUT_PERMISSIONS, base=8))
  91. @enforce_types
  92. def chmod_file(path: str, cwd: str='') -> None:
  93. """chmod -R <permissions> <cwd>/<path>"""
  94. root = Path(cwd or os.getcwd()) / path
  95. if not os.access(root, os.R_OK):
  96. raise Exception('Failed to chmod: {} does not exist (did the previous step fail?)'.format(path))
  97. if not root.is_dir():
  98. # path is just a plain file
  99. os.chmod(root, int(STORAGE_CONFIG.OUTPUT_PERMISSIONS, base=8))
  100. else:
  101. for subpath in Path(path).glob('**/*'):
  102. if subpath.is_dir():
  103. # directories need execute permissions to be able to list contents
  104. os.chmod(subpath, int(STORAGE_CONFIG.DIR_OUTPUT_PERMISSIONS, base=8))
  105. else:
  106. os.chmod(subpath, int(STORAGE_CONFIG.OUTPUT_PERMISSIONS, base=8))
  107. @enforce_types
  108. def copy_and_overwrite(from_path: Union[str, Path], to_path: Union[str, Path]):
  109. """copy a given file or directory to a given path, overwriting the destination"""
  110. assert os.access(from_path, os.R_OK)
  111. if Path(from_path).is_dir():
  112. shutil.rmtree(to_path, ignore_errors=True)
  113. shutil.copytree(from_path, to_path)
  114. else:
  115. with open(from_path, 'rb') as src:
  116. contents = src.read()
  117. atomic_write(to_path, contents)
  118. @enforce_types
  119. def get_dir_size(path: Union[str, Path], recursive: bool=True, pattern: Optional[str]=None) -> Tuple[int, int, int]:
  120. """get the total disk size of a given directory, optionally summing up
  121. recursively and limiting to a given filter list
  122. """
  123. num_bytes, num_dirs, num_files = 0, 0, 0
  124. try:
  125. for entry in os.scandir(path):
  126. if (pattern is not None) and (pattern not in entry.path):
  127. continue
  128. if entry.is_dir(follow_symlinks=False):
  129. if not recursive:
  130. continue
  131. num_dirs += 1
  132. bytes_inside, dirs_inside, files_inside = get_dir_size(entry.path)
  133. num_bytes += bytes_inside
  134. num_dirs += dirs_inside
  135. num_files += files_inside
  136. else:
  137. num_bytes += entry.stat(follow_symlinks=False).st_size
  138. num_files += 1
  139. except OSError:
  140. # e.g. FileNameTooLong or other error while trying to read dir
  141. pass
  142. return num_bytes, num_dirs, num_files
  143. CRON_COMMENT = 'archivebox_schedule'
  144. @enforce_types
  145. def dedupe_cron_jobs(cron: CronTab) -> CronTab:
  146. deduped: Set[Tuple[str, str]] = set()
  147. for job in list(cron):
  148. unique_tuple = (str(job.slices), str(job.command))
  149. if unique_tuple not in deduped:
  150. deduped.add(unique_tuple)
  151. cron.remove(job)
  152. for schedule, command in deduped:
  153. job = cron.new(command=command, comment=CRON_COMMENT)
  154. job.setall(schedule)
  155. job.enable()
  156. return cron
  157. class suppress_output(object):
  158. """
  159. A context manager for doing a "deep suppression" of stdout and stderr in
  160. Python, i.e. will suppress all print, even if the print originates in a
  161. compiled C/Fortran sub-function.
  162. This will not suppress raised exceptions, since exceptions are printed
  163. to stderr just before a script exits, and after the context manager has
  164. exited (at least, I think that is why it lets exceptions through).
  165. with suppress_stdout_stderr():
  166. rogue_function()
  167. """
  168. def __init__(self, stdout=True, stderr=True):
  169. # Open a pair of null files
  170. # Save the actual stdout (1) and stderr (2) file descriptors.
  171. self.stdout, self.stderr = stdout, stderr
  172. if stdout:
  173. self.null_stdout = os.open(os.devnull, os.O_RDWR)
  174. self.real_stdout = os.dup(1)
  175. if stderr:
  176. self.null_stderr = os.open(os.devnull, os.O_RDWR)
  177. self.real_stderr = os.dup(2)
  178. def __enter__(self):
  179. # Assign the null pointers to stdout and stderr.
  180. if self.stdout:
  181. os.dup2(self.null_stdout, 1)
  182. if self.stderr:
  183. os.dup2(self.null_stderr, 2)
  184. def __exit__(self, *_):
  185. # Re-assign the real stdout/stderr back to (1) and (2)
  186. if self.stdout:
  187. os.dup2(self.real_stdout, 1)
  188. os.close(self.null_stdout)
  189. if self.stderr:
  190. os.dup2(self.real_stderr, 2)
  191. os.close(self.null_stderr)