conftest.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407
  1. """archivebox/tests/conftest.py - Pytest fixtures for CLI tests."""
  2. import os
  3. import shutil
  4. import sys
  5. import subprocess
  6. import textwrap
  7. from pathlib import Path
  8. from typing import List, Dict, Any, Optional, Tuple
  9. import pytest
  10. # =============================================================================
  11. # CLI Helpers (defined before fixtures that use them)
  12. # =============================================================================
  13. def run_archivebox_cmd(
  14. args: List[str],
  15. data_dir: Path,
  16. stdin: Optional[str] = None,
  17. timeout: int = 60,
  18. env: Optional[Dict[str, str]] = None,
  19. ) -> Tuple[str, str, int]:
  20. """
  21. Run archivebox command via subprocess, return (stdout, stderr, returncode).
  22. Args:
  23. args: Command arguments (e.g., ['crawl', 'create', 'https://example.com'])
  24. data_dir: The DATA_DIR to use
  25. stdin: Optional string to pipe to stdin
  26. timeout: Command timeout in seconds
  27. env: Additional environment variables
  28. Returns:
  29. Tuple of (stdout, stderr, returncode)
  30. """
  31. cmd = [sys.executable, '-m', 'archivebox'] + args
  32. base_env = os.environ.copy()
  33. base_env['DATA_DIR'] = str(data_dir)
  34. base_env['USE_COLOR'] = 'False'
  35. base_env['SHOW_PROGRESS'] = 'False'
  36. # Disable slow extractors for faster tests
  37. base_env['SAVE_ARCHIVEDOTORG'] = 'False'
  38. base_env['SAVE_TITLE'] = 'False'
  39. base_env['SAVE_FAVICON'] = 'False'
  40. base_env['SAVE_WGET'] = 'False'
  41. base_env['SAVE_WARC'] = 'False'
  42. base_env['SAVE_PDF'] = 'False'
  43. base_env['SAVE_SCREENSHOT'] = 'False'
  44. base_env['SAVE_DOM'] = 'False'
  45. base_env['SAVE_SINGLEFILE'] = 'False'
  46. base_env['SAVE_READABILITY'] = 'False'
  47. base_env['SAVE_MERCURY'] = 'False'
  48. base_env['SAVE_GIT'] = 'False'
  49. base_env['SAVE_YTDLP'] = 'False'
  50. base_env['SAVE_HEADERS'] = 'False'
  51. base_env['SAVE_HTMLTOTEXT'] = 'False'
  52. if env:
  53. base_env.update(env)
  54. result = subprocess.run(
  55. cmd,
  56. input=stdin,
  57. capture_output=True,
  58. text=True,
  59. cwd=data_dir,
  60. env=base_env,
  61. timeout=timeout,
  62. )
  63. return result.stdout, result.stderr, result.returncode
  64. # =============================================================================
  65. # Fixtures
  66. # =============================================================================
  67. @pytest.fixture
  68. def isolated_data_dir(tmp_path):
  69. """
  70. Create isolated DATA_DIR for each test.
  71. Uses tmp_path for complete isolation.
  72. """
  73. data_dir = tmp_path / 'archivebox_data'
  74. data_dir.mkdir()
  75. return data_dir
  76. @pytest.fixture
  77. def initialized_archive(isolated_data_dir):
  78. """
  79. Initialize ArchiveBox archive in isolated directory.
  80. Runs `archivebox init` via subprocess to set up database and directories.
  81. """
  82. stdout, stderr, returncode = run_archivebox_cmd(
  83. ['init', '--quick'],
  84. data_dir=isolated_data_dir,
  85. timeout=60,
  86. )
  87. assert returncode == 0, f"archivebox init failed: {stderr}"
  88. return isolated_data_dir
  89. # =============================================================================
  90. # CWD-based CLI Helpers (no DATA_DIR env)
  91. # =============================================================================
  92. def run_archivebox_cmd_cwd(
  93. args: List[str],
  94. cwd: Path,
  95. stdin: Optional[str] = None,
  96. timeout: int = 60,
  97. env: Optional[Dict[str, str]] = None,
  98. ) -> Tuple[str, str, int]:
  99. """
  100. Run archivebox command via subprocess using cwd as DATA_DIR (no DATA_DIR env).
  101. Returns (stdout, stderr, returncode).
  102. """
  103. cmd = [sys.executable, '-m', 'archivebox'] + args
  104. base_env = os.environ.copy()
  105. base_env.pop('DATA_DIR', None)
  106. base_env['USE_COLOR'] = 'False'
  107. base_env['SHOW_PROGRESS'] = 'False'
  108. if env:
  109. base_env.update(env)
  110. result = subprocess.run(
  111. cmd,
  112. input=stdin,
  113. capture_output=True,
  114. text=True,
  115. cwd=cwd,
  116. env=base_env,
  117. timeout=timeout,
  118. )
  119. return result.stdout, result.stderr, result.returncode
  120. def run_python_cwd(
  121. script: str,
  122. cwd: Path,
  123. timeout: int = 60,
  124. ) -> Tuple[str, str, int]:
  125. base_env = os.environ.copy()
  126. base_env.pop('DATA_DIR', None)
  127. result = subprocess.run(
  128. [sys.executable, '-'],
  129. input=script,
  130. capture_output=True,
  131. text=True,
  132. cwd=cwd,
  133. env=base_env,
  134. timeout=timeout,
  135. )
  136. return result.stdout, result.stderr, result.returncode
  137. def _get_machine_type() -> str:
  138. import platform
  139. os_name = platform.system().lower()
  140. arch = platform.machine().lower()
  141. in_docker = os.environ.get('IN_DOCKER', '').lower() in ('1', 'true', 'yes')
  142. suffix = '-docker' if in_docker else ''
  143. return f'{arch}-{os_name}{suffix}'
  144. def _find_cached_chromium(lib_dir: Path) -> Optional[Path]:
  145. candidates = [
  146. lib_dir / 'puppeteer',
  147. lib_dir / 'npm' / 'node_modules' / 'puppeteer' / '.local-chromium',
  148. ]
  149. for base in candidates:
  150. if not base.exists():
  151. continue
  152. for path in base.rglob('Chromium.app/Contents/MacOS/Chromium'):
  153. return path
  154. for path in base.rglob('chrome-linux/chrome'):
  155. return path
  156. for path in base.rglob('chrome-linux64/chrome'):
  157. return path
  158. return None
  159. def _find_system_browser() -> Optional[Path]:
  160. candidates = [
  161. Path('/Applications/Chromium.app/Contents/MacOS/Chromium'),
  162. Path('/usr/bin/chromium'),
  163. Path('/usr/bin/chromium-browser'),
  164. ]
  165. for candidate in candidates:
  166. if candidate.exists():
  167. return candidate
  168. return None
  169. def _ensure_puppeteer(shared_lib: Path) -> None:
  170. npm_prefix = shared_lib / 'npm'
  171. node_modules = npm_prefix / 'node_modules'
  172. puppeteer_dir = node_modules / 'puppeteer'
  173. if puppeteer_dir.exists():
  174. return
  175. npm_prefix.mkdir(parents=True, exist_ok=True)
  176. env = os.environ.copy()
  177. env['PUPPETEER_SKIP_DOWNLOAD'] = '1'
  178. subprocess.run(
  179. ['npm', 'install', 'puppeteer'],
  180. cwd=str(npm_prefix),
  181. env=env,
  182. check=True,
  183. capture_output=True,
  184. text=True,
  185. timeout=600,
  186. )
  187. @pytest.fixture(scope="class")
  188. def real_archive_with_example(tmp_path_factory, request):
  189. """
  190. Initialize archive and add https://example.com using chrome+responses only.
  191. Uses cwd for DATA_DIR and symlinks lib dir to a shared cache.
  192. """
  193. tmp_path = tmp_path_factory.mktemp("archivebox_data")
  194. if getattr(request, "cls", None) is not None:
  195. request.cls.data_dir = tmp_path
  196. stdout, stderr, returncode = run_archivebox_cmd_cwd(
  197. ['init', '--quick'],
  198. cwd=tmp_path,
  199. timeout=120,
  200. )
  201. assert returncode == 0, f"archivebox init failed: {stderr}"
  202. stdout, stderr, returncode = run_archivebox_cmd_cwd(
  203. [
  204. 'config',
  205. '--set',
  206. 'LISTEN_HOST=archivebox.localhost:8000',
  207. 'PUBLIC_INDEX=True',
  208. 'PUBLIC_SNAPSHOTS=True',
  209. 'PUBLIC_ADD_VIEW=True',
  210. ],
  211. cwd=tmp_path,
  212. )
  213. assert returncode == 0, f"archivebox config failed: {stderr}"
  214. machine_type = _get_machine_type()
  215. shared_root = Path(__file__).resolve().parents[3] / 'tmp' / 'test_lib_cache'
  216. shared_lib = shared_root / machine_type
  217. shared_lib.mkdir(parents=True, exist_ok=True)
  218. lib_target = tmp_path / 'lib' / machine_type
  219. if lib_target.exists() and not lib_target.is_symlink():
  220. shutil.rmtree(lib_target)
  221. if not lib_target.exists():
  222. lib_target.parent.mkdir(parents=True, exist_ok=True)
  223. lib_target.symlink_to(shared_lib, target_is_directory=True)
  224. _ensure_puppeteer(shared_lib)
  225. cached_chromium = _find_cached_chromium(shared_lib)
  226. if cached_chromium:
  227. browser_binary = cached_chromium
  228. else:
  229. browser_binary = _find_system_browser()
  230. if browser_binary:
  231. chromium_link = shared_lib / 'chromium-bin'
  232. if not chromium_link.exists():
  233. chromium_link.symlink_to(browser_binary)
  234. browser_binary = chromium_link
  235. if browser_binary:
  236. stdout, stderr, returncode = run_archivebox_cmd_cwd(
  237. [f'config', '--set', f'CHROME_BINARY={browser_binary}'],
  238. cwd=tmp_path,
  239. )
  240. assert returncode == 0, f"archivebox config CHROME_BINARY failed: {stderr}"
  241. script = textwrap.dedent(f"""\
  242. import os
  243. os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'archivebox.core.settings')
  244. import django
  245. django.setup()
  246. from django.utils import timezone
  247. from archivebox.machine.models import Binary, Machine
  248. machine = Machine.current()
  249. Binary.objects.filter(machine=machine, name='chromium').update(
  250. status='installed',
  251. abspath='{browser_binary}',
  252. binprovider='env',
  253. retry_at=timezone.now(),
  254. )
  255. Binary.objects.update_or_create(
  256. machine=machine,
  257. name='chromium',
  258. defaults={{
  259. 'status': 'installed',
  260. 'abspath': '{browser_binary}',
  261. 'binprovider': 'env',
  262. 'retry_at': timezone.now(),
  263. }},
  264. )
  265. print('OK')
  266. """
  267. )
  268. stdout, stderr, returncode = run_python_cwd(script, cwd=tmp_path, timeout=60)
  269. assert returncode == 0, f"Register chromium binary failed: {stderr}"
  270. add_env = {
  271. 'CHROME_ENABLED': 'True',
  272. 'RESPONSES_ENABLED': 'True',
  273. 'DOM_ENABLED': 'False',
  274. 'SHOW_PROGRESS': 'False',
  275. 'USE_COLOR': 'False',
  276. 'CHROME_HEADLESS': 'True',
  277. 'CHROME_PAGELOAD_TIMEOUT': '45',
  278. 'CHROME_TIMEOUT': '60',
  279. 'RESPONSES_TIMEOUT': '30',
  280. }
  281. if browser_binary:
  282. add_env['CHROME_BINARY'] = str(browser_binary)
  283. if cached_chromium:
  284. add_env['PUPPETEER_CACHE_DIR'] = str(shared_lib / 'puppeteer')
  285. stdout, stderr, returncode = run_archivebox_cmd_cwd(
  286. ['add', '--depth=0', '--plugins=chrome,responses', 'https://example.com'],
  287. cwd=tmp_path,
  288. timeout=600,
  289. env=add_env,
  290. )
  291. assert returncode == 0, f"archivebox add failed: {stderr}"
  292. return tmp_path
  293. # =============================================================================
  294. # Output Assertions
  295. # =============================================================================
  296. def parse_jsonl_output(stdout: str) -> List[Dict[str, Any]]:
  297. """Parse JSONL output into list of dicts via Process parser."""
  298. from archivebox.machine.models import Process
  299. return Process.parse_records_from_text(stdout or '')
  300. def assert_jsonl_contains_type(stdout: str, record_type: str, min_count: int = 1):
  301. """Assert output contains at least min_count records of type."""
  302. records = parse_jsonl_output(stdout)
  303. matching = [r for r in records if r.get('type') == record_type]
  304. assert len(matching) >= min_count, \
  305. f"Expected >= {min_count} {record_type}, got {len(matching)}"
  306. return matching
  307. def assert_jsonl_pass_through(stdout: str, input_records: List[Dict[str, Any]]):
  308. """Assert that input records appear in output (pass-through behavior)."""
  309. output_records = parse_jsonl_output(stdout)
  310. output_ids = {r.get('id') for r in output_records if r.get('id')}
  311. for input_rec in input_records:
  312. input_id = input_rec.get('id')
  313. if input_id:
  314. assert input_id in output_ids, \
  315. f"Input record {input_id} not found in output (pass-through failed)"
  316. def assert_record_has_fields(record: Dict[str, Any], required_fields: List[str]):
  317. """Assert record has all required fields with non-None values."""
  318. for field in required_fields:
  319. assert field in record, f"Record missing field: {field}"
  320. assert record[field] is not None, f"Record field is None: {field}"
  321. # =============================================================================
  322. # Test Data Factories
  323. # =============================================================================
  324. def create_test_url(domain: str = 'example.com', path: str = None) -> str:
  325. """Generate unique test URL."""
  326. import uuid
  327. path = path or uuid.uuid4().hex[:8]
  328. return f'https://{domain}/{path}'
  329. def create_test_crawl_json(urls: List[str] = None, **kwargs) -> Dict[str, Any]:
  330. """Create Crawl JSONL record for testing."""
  331. urls = urls or [create_test_url()]
  332. return {
  333. 'type': 'Crawl',
  334. 'urls': '\n'.join(urls),
  335. 'max_depth': kwargs.get('max_depth', 0),
  336. 'tags_str': kwargs.get('tags_str', ''),
  337. 'status': kwargs.get('status', 'queued'),
  338. **{k: v for k, v in kwargs.items() if k not in ('max_depth', 'tags_str', 'status')},
  339. }
  340. def create_test_snapshot_json(url: str = None, **kwargs) -> Dict[str, Any]:
  341. """Create Snapshot JSONL record for testing."""
  342. return {
  343. 'type': 'Snapshot',
  344. 'url': url or create_test_url(),
  345. 'tags_str': kwargs.get('tags_str', ''),
  346. 'status': kwargs.get('status', 'queued'),
  347. **{k: v for k, v in kwargs.items() if k not in ('tags_str', 'status')},
  348. }