test_real_world_add.py 4.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133
  1. import os
  2. import sqlite3
  3. import subprocess
  4. from pathlib import Path
  5. def _find_snapshot_dir(data_dir: Path, snapshot_id: str) -> Path | None:
  6. candidates = {snapshot_id}
  7. if len(snapshot_id) == 32:
  8. hyphenated = f"{snapshot_id[:8]}-{snapshot_id[8:12]}-{snapshot_id[12:16]}-{snapshot_id[16:20]}-{snapshot_id[20:]}"
  9. candidates.add(hyphenated)
  10. elif len(snapshot_id) == 36 and '-' in snapshot_id:
  11. candidates.add(snapshot_id.replace('-', ''))
  12. for needle in candidates:
  13. for path in data_dir.rglob(needle):
  14. if path.is_dir():
  15. return path
  16. return None
  17. def _find_html_with_text(root: Path, needle: str) -> list[Path]:
  18. hits: list[Path] = []
  19. for path in root.rglob("*.htm*"):
  20. if not path.is_file():
  21. continue
  22. try:
  23. if needle in path.read_text(errors="ignore"):
  24. hits.append(path)
  25. except Exception:
  26. continue
  27. return hits
  28. def test_add_real_world_example_domain(tmp_path):
  29. os.chdir(tmp_path)
  30. tmp_short = Path("/tmp") / f"abx-{tmp_path.name}"
  31. tmp_short.mkdir(parents=True, exist_ok=True)
  32. env = os.environ.copy()
  33. env["TMP_DIR"] = str(tmp_short)
  34. env["ARCHIVEBOX_ALLOW_NO_UNIX_SOCKETS"] = "true"
  35. init = subprocess.run(
  36. ["archivebox", "init"],
  37. capture_output=True,
  38. text=True,
  39. timeout=120,
  40. env=env,
  41. )
  42. assert init.returncode == 0, f"archivebox init failed: {init.stderr}"
  43. result = subprocess.run(
  44. ["archivebox", "add", "https://example.com"],
  45. capture_output=True,
  46. text=True,
  47. timeout=900,
  48. env=env,
  49. )
  50. assert result.returncode == 0, (
  51. "archivebox add failed.\n"
  52. f"stdout:\n{result.stdout}\n"
  53. f"stderr:\n{result.stderr}"
  54. )
  55. conn = sqlite3.connect(tmp_path / "index.sqlite3")
  56. c = conn.cursor()
  57. snapshot_row = c.execute(
  58. "SELECT id, url, title FROM core_snapshot WHERE url = ?",
  59. ("https://example.com",),
  60. ).fetchone()
  61. assert snapshot_row is not None, "Snapshot for https://example.com not found in DB"
  62. snapshot_id, snapshot_url, snapshot_title = snapshot_row
  63. assert snapshot_title and "Example Domain" in snapshot_title, (
  64. f"Expected title to contain Example Domain, got: {snapshot_title}"
  65. )
  66. failed_results = c.execute(
  67. "SELECT COUNT(*) FROM core_archiveresult WHERE snapshot_id = ? AND status = 'failed'",
  68. (snapshot_id,),
  69. ).fetchone()[0]
  70. assert failed_results == 0, "Some archive results failed for example.com snapshot"
  71. binary_workers = c.execute(
  72. "SELECT COUNT(*) FROM machine_process WHERE process_type = 'worker' AND worker_type = 'binary'"
  73. ).fetchone()[0]
  74. assert binary_workers > 0, "Expected BinaryWorker to run installs via BinaryMachine"
  75. failed_binary_workers = c.execute(
  76. "SELECT COUNT(*) FROM machine_process WHERE process_type = 'worker' AND worker_type = 'binary' "
  77. "AND exit_code IS NOT NULL AND exit_code != 0"
  78. ).fetchone()[0]
  79. assert failed_binary_workers == 0, "BinaryWorker reported non-zero exit codes"
  80. queued_binaries = c.execute(
  81. "SELECT name FROM machine_binary WHERE status != 'installed'"
  82. ).fetchall()
  83. assert not queued_binaries, f"Some binaries did not install: {queued_binaries}"
  84. conn.close()
  85. snapshot_dir = _find_snapshot_dir(tmp_path, str(snapshot_id))
  86. assert snapshot_dir is not None, "Snapshot output directory not found"
  87. title_path = snapshot_dir / "title" / "title.txt"
  88. assert title_path.exists(), f"Missing title output: {title_path}"
  89. assert "Example Domain" in title_path.read_text(errors="ignore")
  90. html_sources = []
  91. for candidate in ("wget", "singlefile", "dom"):
  92. for candidate_dir in (snapshot_dir / candidate, *snapshot_dir.glob(f"*_{candidate}")):
  93. if candidate_dir.exists():
  94. html_sources.extend(_find_html_with_text(candidate_dir, "Example Domain"))
  95. assert len(html_sources) >= 2, (
  96. "Expected HTML outputs from multiple extractors to contain Example Domain "
  97. f"(found {len(html_sources)})."
  98. )
  99. text_hits = 0
  100. for path in (
  101. *snapshot_dir.glob("*_readability/content.txt"),
  102. snapshot_dir / "readability" / "content.txt",
  103. ):
  104. if path.exists() and "Example Domain" in path.read_text(errors="ignore"):
  105. text_hits += 1
  106. for path in (
  107. *snapshot_dir.glob("*_htmltotext/htmltotext.txt"),
  108. snapshot_dir / "htmltotext" / "htmltotext.txt",
  109. ):
  110. if path.exists() and "Example Domain" in path.read_text(errors="ignore"):
  111. text_hits += 1
  112. assert text_hits >= 2, (
  113. "Expected multiple text extractors to contain Example Domain "
  114. f"(readability/htmltotext hits={text_hits})."
  115. )