archive_org.py 3.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112
  1. __package__ = 'archivebox.extractors'
  2. from pathlib import Path
  3. from typing import Optional, List, Dict, Tuple
  4. from collections import defaultdict
  5. from ..index.schema import Link, ArchiveResult, ArchiveOutput, ArchiveError
  6. from ..system import run, chmod_file
  7. from ..util import (
  8. enforce_types,
  9. is_static_file,
  10. )
  11. from ..config import (
  12. TIMEOUT,
  13. CURL_ARGS,
  14. CHECK_SSL_VALIDITY,
  15. SAVE_ARCHIVE_DOT_ORG,
  16. CURL_BINARY,
  17. CURL_VERSION,
  18. CURL_USER_AGENT,
  19. )
  20. from ..logging_util import TimedProgress
  21. @enforce_types
  22. def should_save_archive_dot_org(link: Link, out_dir: Optional[Path]=None, overwrite: Optional[bool]=False) -> bool:
  23. if is_static_file(link.url):
  24. return False
  25. out_dir = out_dir or Path(link.link_dir)
  26. if not overwrite and (out_dir / 'archive.org.txt').exists():
  27. # if open(path, 'r', encoding='utf-8').read().strip() != 'None':
  28. return False
  29. return SAVE_ARCHIVE_DOT_ORG
  30. @enforce_types
  31. def save_archive_dot_org(link: Link, out_dir: Optional[Path]=None, timeout: int=TIMEOUT) -> ArchiveResult:
  32. """submit site to archive.org for archiving via their service, save returned archive url"""
  33. out_dir = out_dir or Path(link.link_dir)
  34. output: ArchiveOutput = 'archive.org.txt'
  35. archive_org_url = None
  36. submit_url = 'https://web.archive.org/save/{}'.format(link.url)
  37. cmd = [
  38. CURL_BINARY,
  39. *CURL_ARGS,
  40. '--head',
  41. '--max-time', str(timeout),
  42. *(['--user-agent', '{}'.format(CURL_USER_AGENT)] if CURL_USER_AGENT else []),
  43. *([] if CHECK_SSL_VALIDITY else ['--insecure']),
  44. submit_url,
  45. ]
  46. status = 'succeeded'
  47. timer = TimedProgress(timeout, prefix=' ')
  48. try:
  49. result = run(cmd, cwd=str(out_dir), timeout=timeout)
  50. content_location, errors = parse_archive_dot_org_response(result.stdout)
  51. if content_location:
  52. archive_org_url = content_location[0]
  53. elif len(errors) == 1 and 'RobotAccessControlException' in errors[0]:
  54. archive_org_url = None
  55. # raise ArchiveError('Archive.org denied by {}/robots.txt'.format(domain(link.url)))
  56. elif errors:
  57. raise ArchiveError(', '.join(errors))
  58. else:
  59. raise ArchiveError('Failed to find "content-location" URL header in Archive.org response.')
  60. except Exception as err:
  61. status = 'failed'
  62. output = err
  63. finally:
  64. timer.end()
  65. if output and not isinstance(output, Exception):
  66. # instead of writing None when archive.org rejects the url write the
  67. # url to resubmit it to archive.org. This is so when the user visits
  68. # the URL in person, it will attempt to re-archive it, and it'll show the
  69. # nicer error message explaining why the url was rejected if it fails.
  70. archive_org_url = archive_org_url or submit_url
  71. with open(str(out_dir / output), 'w', encoding='utf-8') as f:
  72. f.write(archive_org_url)
  73. chmod_file('archive.org.txt', cwd=str(out_dir))
  74. output = archive_org_url
  75. return ArchiveResult(
  76. cmd=cmd,
  77. pwd=str(out_dir),
  78. cmd_version=CURL_VERSION,
  79. output=output,
  80. status=status,
  81. **timer.stats,
  82. )
  83. @enforce_types
  84. def parse_archive_dot_org_response(response: bytes) -> Tuple[List[str], List[str]]:
  85. # Parse archive.org response headers
  86. headers: Dict[str, List[str]] = defaultdict(list)
  87. # lowercase all the header names and store in dict
  88. for header in response.splitlines():
  89. if b':' not in header or not header.strip():
  90. continue
  91. name, val = header.decode().split(':', 1)
  92. headers[name.lower().strip()].append(val.strip())
  93. # Get successful archive url in "content-location" header or any errors
  94. content_location = headers.get('content-location', headers['location'])
  95. errors = headers['x-archive-wayback-runtime-error']
  96. return content_location, errors