system.py 9.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260
  1. __package__ = 'archivebox'
  2. import os
  3. import signal
  4. import shutil
  5. import getpass
  6. from json import dump
  7. from pathlib import Path
  8. from typing import Optional, Union, Set, Tuple
  9. from subprocess import _mswindows, PIPE, Popen, CalledProcessError, CompletedProcess, TimeoutExpired
  10. from crontab import CronTab
  11. from atomicwrites import atomic_write as lib_atomic_write
  12. from .util import enforce_types, ExtendedEncoder
  13. from .config.legacy import OUTPUT_PERMISSIONS, DIR_OUTPUT_PERMISSIONS, ENFORCE_ATOMIC_WRITES
  14. def run(cmd, *args, input=None, capture_output=True, timeout=None, check=False, text=False, start_new_session=True, **kwargs):
  15. """Patched of subprocess.run to kill forked child subprocesses and fix blocking io making timeout=innefective
  16. Mostly copied from https://github.com/python/cpython/blob/master/Lib/subprocess.py
  17. """
  18. cmd = [str(arg) for arg in cmd]
  19. if input is not None:
  20. if kwargs.get('stdin') is not None:
  21. raise ValueError('stdin and input arguments may not both be used.')
  22. kwargs['stdin'] = PIPE
  23. if capture_output:
  24. if ('stdout' in kwargs) or ('stderr' in kwargs):
  25. raise ValueError('stdout and stderr arguments may not be used with capture_output.')
  26. kwargs['stdout'] = PIPE
  27. kwargs['stderr'] = PIPE
  28. pgid = None
  29. try:
  30. if isinstance(cmd, (list, tuple)) and cmd[0].endswith('.py'):
  31. PYTHON_BINARY = sys.executable
  32. cmd = (PYTHON_BINARY, *cmd)
  33. with Popen(cmd, *args, start_new_session=start_new_session, text=text, **kwargs) as process:
  34. pgid = os.getpgid(process.pid)
  35. try:
  36. stdout, stderr = process.communicate(input, timeout=timeout)
  37. except TimeoutExpired as exc:
  38. process.kill()
  39. if _mswindows:
  40. # Windows accumulates the output in a single blocking
  41. # read() call run on child threads, with the timeout
  42. # being done in a join() on those threads. communicate()
  43. # _after_ kill() is required to collect that and add it
  44. # to the exception.
  45. exc.stdout, exc.stderr = process.communicate()
  46. else:
  47. # POSIX _communicate already populated the output so
  48. # far into the TimeoutExpired exception.
  49. process.wait()
  50. raise
  51. except: # Including KeyboardInterrupt, communicate handled that.
  52. process.kill()
  53. # We don't call process.wait() as .__exit__ does that for us.
  54. raise
  55. retcode = process.poll()
  56. if check and retcode:
  57. raise CalledProcessError(retcode, process.args,
  58. output=stdout, stderr=stderr)
  59. finally:
  60. # force kill any straggler subprocesses that were forked from the main proc
  61. try:
  62. os.killpg(pgid, signal.SIGINT)
  63. except Exception:
  64. pass
  65. return CompletedProcess(process.args, retcode, stdout, stderr)
  66. @enforce_types
  67. def atomic_write(path: Union[Path, str], contents: Union[dict, str, bytes], overwrite: bool=True) -> None:
  68. """Safe atomic write to filesystem by writing to temp file + atomic rename"""
  69. mode = 'wb+' if isinstance(contents, bytes) else 'w'
  70. encoding = None if isinstance(contents, bytes) else 'utf-8' # enforce utf-8 on all text writes
  71. # print('\n> Atomic Write:', mode, path, len(contents), f'overwrite={overwrite}')
  72. try:
  73. with lib_atomic_write(path, mode=mode, overwrite=overwrite, encoding=encoding) as f:
  74. if isinstance(contents, dict):
  75. dump(contents, f, indent=4, sort_keys=True, cls=ExtendedEncoder)
  76. elif isinstance(contents, (bytes, str)):
  77. f.write(contents)
  78. except OSError as e:
  79. if ENFORCE_ATOMIC_WRITES:
  80. print(f"[X] OSError: Failed to write {path} with fcntl.F_FULLFSYNC. ({e})")
  81. print(" You can store the archive/ subfolder on a hard drive or network share that doesn't support support syncronous writes,")
  82. print(" but the main folder containing the index.sqlite3 and ArchiveBox.conf files must be on a filesystem that supports FSYNC.")
  83. raise SystemExit(1)
  84. # retry the write without forcing FSYNC (aka atomic mode)
  85. with open(path, mode=mode, encoding=encoding) as f:
  86. if isinstance(contents, dict):
  87. dump(contents, f, indent=4, sort_keys=True, cls=ExtendedEncoder)
  88. elif isinstance(contents, (bytes, str)):
  89. f.write(contents)
  90. # set file permissions
  91. os.chmod(path, int(OUTPUT_PERMISSIONS, base=8))
  92. @enforce_types
  93. def chmod_file(path: str, cwd: str='.') -> None:
  94. """chmod -R <permissions> <cwd>/<path>"""
  95. root = Path(cwd) / path
  96. if not root.exists():
  97. raise Exception('Failed to chmod: {} does not exist (did the previous step fail?)'.format(path))
  98. if not root.is_dir():
  99. # path is just a plain file
  100. os.chmod(root, int(OUTPUT_PERMISSIONS, base=8))
  101. else:
  102. for subpath in Path(path).glob('**/*'):
  103. if subpath.is_dir():
  104. # directories need execute permissions to be able to list contents
  105. os.chmod(subpath, int(DIR_OUTPUT_PERMISSIONS, base=8))
  106. else:
  107. os.chmod(subpath, int(OUTPUT_PERMISSIONS, base=8))
  108. @enforce_types
  109. def copy_and_overwrite(from_path: Union[str, Path], to_path: Union[str, Path]):
  110. """copy a given file or directory to a given path, overwriting the destination"""
  111. if Path(from_path).is_dir():
  112. shutil.rmtree(to_path, ignore_errors=True)
  113. shutil.copytree(from_path, to_path)
  114. else:
  115. with open(from_path, 'rb') as src:
  116. contents = src.read()
  117. atomic_write(to_path, contents)
  118. @enforce_types
  119. def get_dir_size(path: Union[str, Path], recursive: bool=True, pattern: Optional[str]=None) -> Tuple[int, int, int]:
  120. """get the total disk size of a given directory, optionally summing up
  121. recursively and limiting to a given filter list
  122. """
  123. num_bytes, num_dirs, num_files = 0, 0, 0
  124. try:
  125. for entry in os.scandir(path):
  126. if (pattern is not None) and (pattern not in entry.path):
  127. continue
  128. if entry.is_dir(follow_symlinks=False):
  129. if not recursive:
  130. continue
  131. num_dirs += 1
  132. bytes_inside, dirs_inside, files_inside = get_dir_size(entry.path)
  133. num_bytes += bytes_inside
  134. num_dirs += dirs_inside
  135. num_files += files_inside
  136. else:
  137. num_bytes += entry.stat(follow_symlinks=False).st_size
  138. num_files += 1
  139. except OSError:
  140. # e.g. FileNameTooLong or other error while trying to read dir
  141. pass
  142. return num_bytes, num_dirs, num_files
  143. CRON_COMMENT = 'archivebox_schedule'
  144. @enforce_types
  145. def dedupe_cron_jobs(cron: CronTab) -> CronTab:
  146. deduped: Set[Tuple[str, str]] = set()
  147. for job in list(cron):
  148. unique_tuple = (str(job.slices), str(job.command))
  149. if unique_tuple not in deduped:
  150. deduped.add(unique_tuple)
  151. cron.remove(job)
  152. for schedule, command in deduped:
  153. job = cron.new(command=command, comment=CRON_COMMENT)
  154. job.setall(schedule)
  155. job.enable()
  156. return cron
  157. class suppress_output(object):
  158. """
  159. A context manager for doing a "deep suppression" of stdout and stderr in
  160. Python, i.e. will suppress all print, even if the print originates in a
  161. compiled C/Fortran sub-function.
  162. This will not suppress raised exceptions, since exceptions are printed
  163. to stderr just before a script exits, and after the context manager has
  164. exited (at least, I think that is why it lets exceptions through).
  165. with suppress_stdout_stderr():
  166. rogue_function()
  167. """
  168. def __init__(self, stdout=True, stderr=True):
  169. # Open a pair of null files
  170. # Save the actual stdout (1) and stderr (2) file descriptors.
  171. self.stdout, self.stderr = stdout, stderr
  172. if stdout:
  173. self.null_stdout = os.open(os.devnull, os.O_RDWR)
  174. self.real_stdout = os.dup(1)
  175. if stderr:
  176. self.null_stderr = os.open(os.devnull, os.O_RDWR)
  177. self.real_stderr = os.dup(2)
  178. def __enter__(self):
  179. # Assign the null pointers to stdout and stderr.
  180. if self.stdout:
  181. os.dup2(self.null_stdout, 1)
  182. if self.stderr:
  183. os.dup2(self.null_stderr, 2)
  184. def __exit__(self, *_):
  185. # Re-assign the real stdout/stderr back to (1) and (2)
  186. if self.stdout:
  187. os.dup2(self.real_stdout, 1)
  188. os.close(self.null_stdout)
  189. if self.stderr:
  190. os.dup2(self.real_stderr, 2)
  191. os.close(self.null_stderr)
  192. def get_system_user() -> str:
  193. # some host OS's are unable to provide a username (k3s, Windows), making this complicated
  194. # uid 999 is especially problematic and breaks many attempts
  195. SYSTEM_USER = None
  196. FALLBACK_USER_PLACHOLDER = f'user_{os.getuid()}'
  197. # Option 1
  198. try:
  199. import pwd
  200. SYSTEM_USER = SYSTEM_USER or pwd.getpwuid(os.geteuid()).pw_name
  201. except (ModuleNotFoundError, Exception):
  202. pass
  203. # Option 2
  204. try:
  205. SYSTEM_USER = SYSTEM_USER or getpass.getuser()
  206. except Exception:
  207. pass
  208. # Option 3
  209. try:
  210. SYSTEM_USER = SYSTEM_USER or os.getlogin()
  211. except Exception:
  212. pass
  213. return SYSTEM_USER or FALLBACK_USER_PLACHOLDER