| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133 |
- import os
- import sqlite3
- import subprocess
- from pathlib import Path
- def _find_snapshot_dir(data_dir: Path, snapshot_id: str) -> Path | None:
- candidates = {snapshot_id}
- if len(snapshot_id) == 32:
- hyphenated = f"{snapshot_id[:8]}-{snapshot_id[8:12]}-{snapshot_id[12:16]}-{snapshot_id[16:20]}-{snapshot_id[20:]}"
- candidates.add(hyphenated)
- elif len(snapshot_id) == 36 and '-' in snapshot_id:
- candidates.add(snapshot_id.replace('-', ''))
- for needle in candidates:
- for path in data_dir.rglob(needle):
- if path.is_dir():
- return path
- return None
- def _find_html_with_text(root: Path, needle: str) -> list[Path]:
- hits: list[Path] = []
- for path in root.rglob("*.htm*"):
- if not path.is_file():
- continue
- try:
- if needle in path.read_text(errors="ignore"):
- hits.append(path)
- except Exception:
- continue
- return hits
- def test_add_real_world_example_domain(tmp_path):
- os.chdir(tmp_path)
- tmp_short = Path("/tmp") / f"abx-{tmp_path.name}"
- tmp_short.mkdir(parents=True, exist_ok=True)
- env = os.environ.copy()
- env["TMP_DIR"] = str(tmp_short)
- env["ARCHIVEBOX_ALLOW_NO_UNIX_SOCKETS"] = "true"
- init = subprocess.run(
- ["archivebox", "init"],
- capture_output=True,
- text=True,
- timeout=120,
- env=env,
- )
- assert init.returncode == 0, f"archivebox init failed: {init.stderr}"
- result = subprocess.run(
- ["archivebox", "add", "https://example.com"],
- capture_output=True,
- text=True,
- timeout=900,
- env=env,
- )
- assert result.returncode == 0, (
- "archivebox add failed.\n"
- f"stdout:\n{result.stdout}\n"
- f"stderr:\n{result.stderr}"
- )
- conn = sqlite3.connect(tmp_path / "index.sqlite3")
- c = conn.cursor()
- snapshot_row = c.execute(
- "SELECT id, url, title FROM core_snapshot WHERE url = ?",
- ("https://example.com",),
- ).fetchone()
- assert snapshot_row is not None, "Snapshot for https://example.com not found in DB"
- snapshot_id, snapshot_url, snapshot_title = snapshot_row
- assert snapshot_title and "Example Domain" in snapshot_title, (
- f"Expected title to contain Example Domain, got: {snapshot_title}"
- )
- failed_results = c.execute(
- "SELECT COUNT(*) FROM core_archiveresult WHERE snapshot_id = ? AND status = 'failed'",
- (snapshot_id,),
- ).fetchone()[0]
- assert failed_results == 0, "Some archive results failed for example.com snapshot"
- binary_workers = c.execute(
- "SELECT COUNT(*) FROM machine_process WHERE process_type = 'worker' AND worker_type = 'binary'"
- ).fetchone()[0]
- assert binary_workers > 0, "Expected BinaryWorker to run installs via BinaryMachine"
- failed_binary_workers = c.execute(
- "SELECT COUNT(*) FROM machine_process WHERE process_type = 'worker' AND worker_type = 'binary' "
- "AND exit_code IS NOT NULL AND exit_code != 0"
- ).fetchone()[0]
- assert failed_binary_workers == 0, "BinaryWorker reported non-zero exit codes"
- queued_binaries = c.execute(
- "SELECT name FROM machine_binary WHERE status != 'installed'"
- ).fetchall()
- assert not queued_binaries, f"Some binaries did not install: {queued_binaries}"
- conn.close()
- snapshot_dir = _find_snapshot_dir(tmp_path, str(snapshot_id))
- assert snapshot_dir is not None, "Snapshot output directory not found"
- title_path = snapshot_dir / "title" / "title.txt"
- assert title_path.exists(), f"Missing title output: {title_path}"
- assert "Example Domain" in title_path.read_text(errors="ignore")
- html_sources = []
- for candidate in ("wget", "singlefile", "dom"):
- for candidate_dir in (snapshot_dir / candidate, *snapshot_dir.glob(f"*_{candidate}")):
- if candidate_dir.exists():
- html_sources.extend(_find_html_with_text(candidate_dir, "Example Domain"))
- assert len(html_sources) >= 2, (
- "Expected HTML outputs from multiple extractors to contain Example Domain "
- f"(found {len(html_sources)})."
- )
- text_hits = 0
- for path in (
- *snapshot_dir.glob("*_readability/content.txt"),
- snapshot_dir / "readability" / "content.txt",
- ):
- if path.exists() and "Example Domain" in path.read_text(errors="ignore"):
- text_hits += 1
- for path in (
- *snapshot_dir.glob("*_htmltotext/htmltotext.txt"),
- snapshot_dir / "htmltotext" / "htmltotext.txt",
- ):
- if path.exists() and "Example Domain" in path.read_text(errors="ignore"):
- text_hits += 1
- assert text_hits >= 2, (
- "Expected multiple text extractors to contain Example Domain "
- f"(readability/htmltotext hits={text_hits})."
- )
|