archive_org.py 4.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121
  1. __package__ = 'archivebox.extractors'
  2. from pathlib import Path
  3. from typing import Optional, List, Dict, Tuple
  4. from collections import defaultdict
  5. from ..index.schema import Link, ArchiveResult, ArchiveOutput, ArchiveError
  6. from ..system import run, chmod_file
  7. from ..util import (
  8. enforce_types,
  9. is_static_file,
  10. dedupe,
  11. )
  12. from ..config import (
  13. TIMEOUT,
  14. CURL_ARGS,
  15. CURL_EXTRA_ARGS,
  16. CHECK_SSL_VALIDITY,
  17. SAVE_ARCHIVE_DOT_ORG,
  18. CURL_BINARY,
  19. CURL_VERSION,
  20. CURL_USER_AGENT,
  21. )
  22. from ..logging_util import TimedProgress
  23. def get_output_path():
  24. return 'archive.org.txt'
  25. @enforce_types
  26. def should_save_archive_dot_org(link: Link, out_dir: Optional[Path]=None, overwrite: Optional[bool]=False) -> bool:
  27. if is_static_file(link.url):
  28. return False
  29. out_dir = out_dir or Path(link.link_dir)
  30. if not overwrite and (out_dir / get_output_path()).exists():
  31. # if open(path, 'r', encoding='utf-8').read().strip() != 'None':
  32. return False
  33. return SAVE_ARCHIVE_DOT_ORG
  34. @enforce_types
  35. def save_archive_dot_org(link: Link, out_dir: Optional[Path]=None, timeout: int=TIMEOUT) -> ArchiveResult:
  36. """submit site to archive.org for archiving via their service, save returned archive url"""
  37. out_dir = out_dir or Path(link.link_dir)
  38. output: ArchiveOutput = get_output_path()
  39. archive_org_url = None
  40. submit_url = 'https://web.archive.org/save/{}'.format(link.url)
  41. # later options take precedence
  42. options = [
  43. *CURL_ARGS,
  44. *CURL_EXTRA_ARGS,
  45. '--head',
  46. '--max-time', str(timeout),
  47. *(['--user-agent', '{}'.format(CURL_USER_AGENT)] if CURL_USER_AGENT else []),
  48. *([] if CHECK_SSL_VALIDITY else ['--insecure']),
  49. ]
  50. cmd = [
  51. CURL_BINARY,
  52. *dedupe(options),
  53. submit_url,
  54. ]
  55. status = 'succeeded'
  56. timer = TimedProgress(timeout, prefix=' ')
  57. try:
  58. result = run(cmd, cwd=str(out_dir), timeout=timeout)
  59. content_location, errors = parse_archive_dot_org_response(result.stdout)
  60. if content_location:
  61. archive_org_url = content_location[0]
  62. elif len(errors) == 1 and 'RobotAccessControlException' in errors[0]:
  63. archive_org_url = None
  64. # raise ArchiveError('Archive.org denied by {}/robots.txt'.format(domain(link.url)))
  65. elif errors:
  66. raise ArchiveError(', '.join(errors))
  67. else:
  68. raise ArchiveError('Failed to find "content-location" URL header in Archive.org response.')
  69. except Exception as err:
  70. status = 'failed'
  71. output = err
  72. finally:
  73. timer.end()
  74. if output and not isinstance(output, Exception):
  75. # instead of writing None when archive.org rejects the url write the
  76. # url to resubmit it to archive.org. This is so when the user visits
  77. # the URL in person, it will attempt to re-archive it, and it'll show the
  78. # nicer error message explaining why the url was rejected if it fails.
  79. archive_org_url = archive_org_url or submit_url
  80. with open(str(out_dir / output), 'w', encoding='utf-8') as f:
  81. f.write(archive_org_url)
  82. chmod_file(str(out_dir / output), cwd=str(out_dir))
  83. output = archive_org_url
  84. return ArchiveResult(
  85. cmd=cmd,
  86. pwd=str(out_dir),
  87. cmd_version=CURL_VERSION,
  88. output=output,
  89. status=status,
  90. **timer.stats,
  91. )
  92. @enforce_types
  93. def parse_archive_dot_org_response(response: bytes) -> Tuple[List[str], List[str]]:
  94. # Parse archive.org response headers
  95. headers: Dict[str, List[str]] = defaultdict(list)
  96. # lowercase all the header names and store in dict
  97. for header in response.splitlines():
  98. if b':' not in header or not header.strip():
  99. continue
  100. name, val = header.decode().split(':', 1)
  101. headers[name.lower().strip()].append(val.strip())
  102. # Get successful archive url in "content-location" header or any errors
  103. content_location = headers.get('content-location', headers['location'])
  104. errors = headers['x-archive-wayback-runtime-error']
  105. return content_location, errors