system.py 4.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151
  1. __package__ = 'archivebox'
  2. import os
  3. import shutil
  4. import json as pyjson
  5. from typing import Optional, Union, Set, Tuple
  6. from crontab import CronTab
  7. from subprocess import (
  8. Popen,
  9. PIPE,
  10. DEVNULL,
  11. CompletedProcess,
  12. TimeoutExpired,
  13. CalledProcessError,
  14. )
  15. from .util import enforce_types, ExtendedEncoder
  16. from .config import OUTPUT_PERMISSIONS
  17. def run(*popenargs, input=None, capture_output=False, timeout=None, check=False, **kwargs):
  18. """Patched of subprocess.run to fix blocking io making timeout=innefective"""
  19. if input is not None:
  20. if 'stdin' in kwargs:
  21. raise ValueError('stdin and input arguments may not both be used.')
  22. kwargs['stdin'] = PIPE
  23. if capture_output:
  24. if ('stdout' in kwargs) or ('stderr' in kwargs):
  25. raise ValueError('stdout and stderr arguments may not be used '
  26. 'with capture_output.')
  27. kwargs['stdout'] = PIPE
  28. kwargs['stderr'] = PIPE
  29. with Popen(*popenargs, **kwargs) as process:
  30. try:
  31. stdout, stderr = process.communicate(input, timeout=timeout)
  32. except TimeoutExpired:
  33. process.kill()
  34. try:
  35. stdout, stderr = process.communicate(input, timeout=2)
  36. except:
  37. pass
  38. raise TimeoutExpired(popenargs[0][0], timeout)
  39. except BaseException:
  40. process.kill()
  41. # We don't call process.wait() as .__exit__ does that for us.
  42. raise
  43. retcode = process.poll()
  44. if check and retcode:
  45. raise CalledProcessError(retcode, process.args,
  46. output=stdout, stderr=stderr)
  47. return CompletedProcess(process.args, retcode, stdout, stderr)
  48. def atomic_write(contents: Union[dict, str, bytes], path: str) -> None:
  49. """Safe atomic write to filesystem by writing to temp file + atomic rename"""
  50. try:
  51. tmp_file = '{}.tmp'.format(path)
  52. if isinstance(contents, bytes):
  53. args = {'mode': 'wb+'}
  54. else:
  55. args = {'mode': 'w+', 'encoding': 'utf-8'}
  56. with open(tmp_file, **args) as f:
  57. if isinstance(contents, dict):
  58. pyjson.dump(contents, f, indent=4, sort_keys=True, cls=ExtendedEncoder)
  59. else:
  60. f.write(contents)
  61. os.fsync(f.fileno())
  62. os.rename(tmp_file, path)
  63. chmod_file(path)
  64. finally:
  65. if os.path.exists(tmp_file):
  66. os.remove(tmp_file)
  67. @enforce_types
  68. def chmod_file(path: str, cwd: str='.', permissions: str=OUTPUT_PERMISSIONS, timeout: int=30) -> None:
  69. """chmod -R <permissions> <cwd>/<path>"""
  70. if not os.path.exists(os.path.join(cwd, path)):
  71. raise Exception('Failed to chmod: {} does not exist (did the previous step fail?)'.format(path))
  72. chmod_result = run(['chmod', '-R', permissions, path], cwd=cwd, stdout=DEVNULL, stderr=PIPE, timeout=timeout)
  73. if chmod_result.returncode == 1:
  74. print(' ', chmod_result.stderr.decode())
  75. raise Exception('Failed to chmod {}/{}'.format(cwd, path))
  76. @enforce_types
  77. def copy_and_overwrite(from_path: str, to_path: str):
  78. """copy a given file or directory to a given path, overwriting the destination"""
  79. if os.path.isdir(from_path):
  80. shutil.rmtree(to_path, ignore_errors=True)
  81. shutil.copytree(from_path, to_path)
  82. else:
  83. with open(from_path, 'rb') as src:
  84. atomic_write(src.read(), to_path)
  85. @enforce_types
  86. def get_dir_size(path: str, recursive: bool=True, pattern: Optional[str]=None) -> Tuple[int, int, int]:
  87. """get the total disk size of a given directory, optionally summing up
  88. recursively and limiting to a given filter list
  89. """
  90. num_bytes, num_dirs, num_files = 0, 0, 0
  91. for entry in os.scandir(path):
  92. if (pattern is not None) and (pattern not in entry.path):
  93. continue
  94. if entry.is_dir(follow_symlinks=False):
  95. if not recursive:
  96. continue
  97. num_dirs += 1
  98. bytes_inside, dirs_inside, files_inside = get_dir_size(entry.path)
  99. num_bytes += bytes_inside
  100. num_dirs += dirs_inside
  101. num_files += files_inside
  102. else:
  103. num_bytes += entry.stat(follow_symlinks=False).st_size
  104. num_files += 1
  105. return num_bytes, num_dirs, num_files
  106. CRON_COMMENT = 'archivebox_schedule'
  107. @enforce_types
  108. def dedupe_cron_jobs(cron: CronTab) -> CronTab:
  109. deduped: Set[Tuple[str, str]] = set()
  110. for job in list(cron):
  111. unique_tuple = (str(job.slices), job.command)
  112. if unique_tuple not in deduped:
  113. deduped.add(unique_tuple)
  114. cron.remove(job)
  115. for schedule, command in deduped:
  116. job = cron.new(command=command, comment=CRON_COMMENT)
  117. job.setall(schedule)
  118. job.enable()
  119. return cron