tests_piping.py 34 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960
  1. #!/usr/bin/env python3
  2. """
  3. Tests for CLI piping workflow: crawl | snapshot | extract
  4. This module tests the JSONL-based piping between CLI commands as described in:
  5. https://github.com/ArchiveBox/ArchiveBox/issues/1363
  6. Workflows tested:
  7. archivebox crawl URL -> Crawl JSONL
  8. archivebox snapshot -> Snapshot JSONL (accepts Crawl or URL input)
  9. archivebox extract -> ArchiveResult JSONL (accepts Snapshot input)
  10. Pipeline:
  11. archivebox crawl URL | archivebox snapshot | archivebox extract
  12. Each command should:
  13. - Accept URLs, IDs, or JSONL as input (args or stdin)
  14. - Output JSONL to stdout when piped (not TTY)
  15. - Output human-readable to stderr when TTY
  16. """
  17. __package__ = 'archivebox.cli'
  18. import os
  19. import sys
  20. import json
  21. import shutil
  22. import tempfile
  23. import unittest
  24. from io import StringIO
  25. from pathlib import Path
  26. from unittest.mock import patch, MagicMock
  27. # Test configuration - disable slow extractors
  28. TEST_CONFIG = {
  29. 'USE_COLOR': 'False',
  30. 'SHOW_PROGRESS': 'False',
  31. 'SAVE_ARCHIVEDOTORG': 'False',
  32. 'SAVE_TITLE': 'True', # Fast extractor
  33. 'SAVE_FAVICON': 'False',
  34. 'SAVE_WGET': 'False',
  35. 'SAVE_WARC': 'False',
  36. 'SAVE_PDF': 'False',
  37. 'SAVE_SCREENSHOT': 'False',
  38. 'SAVE_DOM': 'False',
  39. 'SAVE_SINGLEFILE': 'False',
  40. 'SAVE_READABILITY': 'False',
  41. 'SAVE_MERCURY': 'False',
  42. 'SAVE_GIT': 'False',
  43. 'SAVE_YTDLP': 'False',
  44. 'SAVE_HEADERS': 'False',
  45. 'USE_CURL': 'False',
  46. 'USE_WGET': 'False',
  47. 'USE_GIT': 'False',
  48. 'USE_CHROME': 'False',
  49. 'USE_YOUTUBEDL': 'False',
  50. 'USE_NODE': 'False',
  51. }
  52. os.environ.update(TEST_CONFIG)
  53. # =============================================================================
  54. # JSONL Utility Tests
  55. # =============================================================================
  56. class TestJSONLParsing(unittest.TestCase):
  57. """Test JSONL input parsing utilities."""
  58. def test_parse_plain_url(self):
  59. """Plain URLs should be parsed as Snapshot records."""
  60. from archivebox.misc.jsonl import parse_line, TYPE_SNAPSHOT
  61. result = parse_line('https://example.com')
  62. self.assertIsNotNone(result)
  63. self.assertEqual(result['type'], TYPE_SNAPSHOT)
  64. self.assertEqual(result['url'], 'https://example.com')
  65. def test_parse_jsonl_snapshot(self):
  66. """JSONL Snapshot records should preserve all fields."""
  67. from archivebox.misc.jsonl import parse_line, TYPE_SNAPSHOT
  68. line = '{"type": "Snapshot", "url": "https://example.com", "tags": "test,demo"}'
  69. result = parse_line(line)
  70. self.assertIsNotNone(result)
  71. self.assertEqual(result['type'], TYPE_SNAPSHOT)
  72. self.assertEqual(result['url'], 'https://example.com')
  73. self.assertEqual(result['tags'], 'test,demo')
  74. def test_parse_jsonl_crawl(self):
  75. """JSONL Crawl records should be parsed correctly."""
  76. from archivebox.misc.jsonl import parse_line, TYPE_CRAWL
  77. line = '{"type": "Crawl", "id": "abc123", "urls": "https://example.com", "max_depth": 1}'
  78. result = parse_line(line)
  79. self.assertIsNotNone(result)
  80. self.assertEqual(result['type'], TYPE_CRAWL)
  81. self.assertEqual(result['id'], 'abc123')
  82. self.assertEqual(result['urls'], 'https://example.com')
  83. self.assertEqual(result['max_depth'], 1)
  84. def test_parse_jsonl_with_id(self):
  85. """JSONL with id field should be recognized."""
  86. from archivebox.misc.jsonl import parse_line, TYPE_SNAPSHOT
  87. line = '{"type": "Snapshot", "id": "abc123", "url": "https://example.com"}'
  88. result = parse_line(line)
  89. self.assertIsNotNone(result)
  90. self.assertEqual(result['id'], 'abc123')
  91. self.assertEqual(result['url'], 'https://example.com')
  92. def test_parse_uuid_as_snapshot_id(self):
  93. """Bare UUIDs should be parsed as snapshot IDs."""
  94. from archivebox.misc.jsonl import parse_line, TYPE_SNAPSHOT
  95. uuid = '01234567-89ab-cdef-0123-456789abcdef'
  96. result = parse_line(uuid)
  97. self.assertIsNotNone(result)
  98. self.assertEqual(result['type'], TYPE_SNAPSHOT)
  99. self.assertEqual(result['id'], uuid)
  100. def test_parse_empty_line(self):
  101. """Empty lines should return None."""
  102. from archivebox.misc.jsonl import parse_line
  103. self.assertIsNone(parse_line(''))
  104. self.assertIsNone(parse_line(' '))
  105. self.assertIsNone(parse_line('\n'))
  106. def test_parse_comment_line(self):
  107. """Comment lines should return None."""
  108. from archivebox.misc.jsonl import parse_line
  109. self.assertIsNone(parse_line('# This is a comment'))
  110. self.assertIsNone(parse_line(' # Indented comment'))
  111. def test_parse_invalid_url(self):
  112. """Invalid URLs should return None."""
  113. from archivebox.misc.jsonl import parse_line
  114. self.assertIsNone(parse_line('not-a-url'))
  115. self.assertIsNone(parse_line('ftp://example.com')) # Only http/https/file
  116. def test_parse_file_url(self):
  117. """file:// URLs should be parsed."""
  118. from archivebox.misc.jsonl import parse_line, TYPE_SNAPSHOT
  119. result = parse_line('file:///path/to/file.txt')
  120. self.assertIsNotNone(result)
  121. self.assertEqual(result['type'], TYPE_SNAPSHOT)
  122. self.assertEqual(result['url'], 'file:///path/to/file.txt')
  123. class TestJSONLOutput(unittest.TestCase):
  124. """Test JSONL output formatting."""
  125. def test_crawl_to_jsonl(self):
  126. """Crawl model should serialize to JSONL correctly."""
  127. from archivebox.misc.jsonl import TYPE_CRAWL
  128. # Create a mock crawl with to_jsonl method configured
  129. mock_crawl = MagicMock()
  130. mock_crawl.to_jsonl.return_value = {
  131. 'type': TYPE_CRAWL,
  132. 'schema_version': '0.9.0',
  133. 'id': 'test-crawl-uuid',
  134. 'urls': 'https://example.com',
  135. 'status': 'queued',
  136. 'max_depth': 0,
  137. 'tags_str': 'tag1,tag2',
  138. 'label': '',
  139. 'created_at': None,
  140. }
  141. result = mock_crawl.to_jsonl()
  142. self.assertEqual(result['type'], TYPE_CRAWL)
  143. self.assertEqual(result['id'], 'test-crawl-uuid')
  144. self.assertEqual(result['urls'], 'https://example.com')
  145. self.assertEqual(result['status'], 'queued')
  146. # Note: Snapshot and ArchiveResult serialization is tested in integration tests
  147. # (TestPipingWorkflowIntegration) using real model instances, not mocks.
  148. class TestReadArgsOrStdin(unittest.TestCase):
  149. """Test reading from args or stdin."""
  150. def test_read_from_args(self):
  151. """Should read URLs from command line args."""
  152. from archivebox.misc.jsonl import read_args_or_stdin
  153. args = ('https://example1.com', 'https://example2.com')
  154. records = list(read_args_or_stdin(args))
  155. self.assertEqual(len(records), 2)
  156. self.assertEqual(records[0]['url'], 'https://example1.com')
  157. self.assertEqual(records[1]['url'], 'https://example2.com')
  158. def test_read_from_stdin(self):
  159. """Should read URLs from stdin when no args provided."""
  160. from archivebox.misc.jsonl import read_args_or_stdin
  161. stdin_content = 'https://example1.com\nhttps://example2.com\n'
  162. stream = StringIO(stdin_content)
  163. # Mock isatty to return False (simulating piped input)
  164. stream.isatty = lambda: False
  165. records = list(read_args_or_stdin((), stream=stream))
  166. self.assertEqual(len(records), 2)
  167. self.assertEqual(records[0]['url'], 'https://example1.com')
  168. self.assertEqual(records[1]['url'], 'https://example2.com')
  169. def test_read_jsonl_from_stdin(self):
  170. """Should read JSONL from stdin."""
  171. from archivebox.misc.jsonl import read_args_or_stdin
  172. stdin_content = '{"type": "Snapshot", "url": "https://example.com", "tags": "test"}\n'
  173. stream = StringIO(stdin_content)
  174. stream.isatty = lambda: False
  175. records = list(read_args_or_stdin((), stream=stream))
  176. self.assertEqual(len(records), 1)
  177. self.assertEqual(records[0]['url'], 'https://example.com')
  178. self.assertEqual(records[0]['tags'], 'test')
  179. def test_read_crawl_jsonl_from_stdin(self):
  180. """Should read Crawl JSONL from stdin."""
  181. from archivebox.misc.jsonl import read_args_or_stdin, TYPE_CRAWL
  182. stdin_content = '{"type": "Crawl", "id": "abc123", "urls": "https://example.com\\nhttps://foo.com"}\n'
  183. stream = StringIO(stdin_content)
  184. stream.isatty = lambda: False
  185. records = list(read_args_or_stdin((), stream=stream))
  186. self.assertEqual(len(records), 1)
  187. self.assertEqual(records[0]['type'], TYPE_CRAWL)
  188. self.assertEqual(records[0]['id'], 'abc123')
  189. def test_skip_tty_stdin(self):
  190. """Should not read from TTY stdin (would block)."""
  191. from archivebox.misc.jsonl import read_args_or_stdin
  192. stream = StringIO('https://example.com')
  193. stream.isatty = lambda: True # Simulate TTY
  194. records = list(read_args_or_stdin((), stream=stream))
  195. self.assertEqual(len(records), 0)
  196. # =============================================================================
  197. # Unit Tests for Individual Commands
  198. # =============================================================================
  199. class TestCrawlCommand(unittest.TestCase):
  200. """Unit tests for archivebox crawl command."""
  201. def setUp(self):
  202. """Set up test environment."""
  203. self.test_dir = tempfile.mkdtemp()
  204. os.environ['DATA_DIR'] = self.test_dir
  205. def tearDown(self):
  206. """Clean up test environment."""
  207. shutil.rmtree(self.test_dir, ignore_errors=True)
  208. def test_crawl_accepts_url(self):
  209. """crawl should accept URLs as input."""
  210. from archivebox.misc.jsonl import read_args_or_stdin
  211. args = ('https://example.com',)
  212. records = list(read_args_or_stdin(args))
  213. self.assertEqual(len(records), 1)
  214. self.assertEqual(records[0]['url'], 'https://example.com')
  215. def test_crawl_output_format(self):
  216. """crawl should output Crawl JSONL records."""
  217. from archivebox.misc.jsonl import TYPE_CRAWL
  218. # Mock crawl output
  219. crawl_output = {
  220. 'type': TYPE_CRAWL,
  221. 'schema_version': '0.9.0',
  222. 'id': 'test-crawl-id',
  223. 'urls': 'https://example.com',
  224. 'status': 'queued',
  225. 'max_depth': 0,
  226. }
  227. self.assertEqual(crawl_output['type'], TYPE_CRAWL)
  228. self.assertIn('id', crawl_output)
  229. self.assertIn('urls', crawl_output)
  230. class TestSnapshotCommand(unittest.TestCase):
  231. """Unit tests for archivebox snapshot command."""
  232. def setUp(self):
  233. """Set up test environment."""
  234. self.test_dir = tempfile.mkdtemp()
  235. os.environ['DATA_DIR'] = self.test_dir
  236. def tearDown(self):
  237. """Clean up test environment."""
  238. shutil.rmtree(self.test_dir, ignore_errors=True)
  239. def test_snapshot_accepts_url(self):
  240. """snapshot should accept URLs as input."""
  241. from archivebox.misc.jsonl import read_args_or_stdin
  242. args = ('https://example.com',)
  243. records = list(read_args_or_stdin(args))
  244. self.assertEqual(len(records), 1)
  245. self.assertEqual(records[0]['url'], 'https://example.com')
  246. def test_snapshot_accepts_crawl_jsonl(self):
  247. """snapshot should accept Crawl JSONL as input."""
  248. from archivebox.misc.jsonl import read_args_or_stdin, TYPE_CRAWL
  249. stdin = StringIO('{"type": "Crawl", "id": "abc123", "urls": "https://example.com"}\n')
  250. stdin.isatty = lambda: False
  251. records = list(read_args_or_stdin((), stream=stdin))
  252. self.assertEqual(len(records), 1)
  253. self.assertEqual(records[0]['type'], TYPE_CRAWL)
  254. self.assertEqual(records[0]['id'], 'abc123')
  255. self.assertEqual(records[0]['urls'], 'https://example.com')
  256. def test_snapshot_accepts_jsonl_with_metadata(self):
  257. """snapshot should accept JSONL with tags and other metadata."""
  258. from archivebox.misc.jsonl import read_args_or_stdin
  259. stdin = StringIO('{"type": "Snapshot", "url": "https://example.com", "tags": "tag1,tag2", "title": "Test"}\n')
  260. stdin.isatty = lambda: False
  261. records = list(read_args_or_stdin((), stream=stdin))
  262. self.assertEqual(len(records), 1)
  263. self.assertEqual(records[0]['url'], 'https://example.com')
  264. self.assertEqual(records[0]['tags'], 'tag1,tag2')
  265. self.assertEqual(records[0]['title'], 'Test')
  266. # Note: Snapshot output format is tested in integration tests
  267. # (TestPipingWorkflowIntegration.test_snapshot_creates_and_outputs_jsonl)
  268. # using real Snapshot instances.
  269. class TestExtractCommand(unittest.TestCase):
  270. """Unit tests for archivebox extract command."""
  271. def setUp(self):
  272. """Set up test environment."""
  273. self.test_dir = tempfile.mkdtemp()
  274. os.environ['DATA_DIR'] = self.test_dir
  275. def tearDown(self):
  276. """Clean up test environment."""
  277. shutil.rmtree(self.test_dir, ignore_errors=True)
  278. def test_extract_accepts_snapshot_id(self):
  279. """extract should accept snapshot IDs as input."""
  280. from archivebox.misc.jsonl import read_args_or_stdin
  281. uuid = '01234567-89ab-cdef-0123-456789abcdef'
  282. args = (uuid,)
  283. records = list(read_args_or_stdin(args))
  284. self.assertEqual(len(records), 1)
  285. self.assertEqual(records[0]['id'], uuid)
  286. def test_extract_accepts_jsonl_snapshot(self):
  287. """extract should accept JSONL Snapshot records."""
  288. from archivebox.misc.jsonl import read_args_or_stdin, TYPE_SNAPSHOT
  289. stdin = StringIO('{"type": "Snapshot", "id": "abc123", "url": "https://example.com"}\n')
  290. stdin.isatty = lambda: False
  291. records = list(read_args_or_stdin((), stream=stdin))
  292. self.assertEqual(len(records), 1)
  293. self.assertEqual(records[0]['type'], TYPE_SNAPSHOT)
  294. self.assertEqual(records[0]['id'], 'abc123')
  295. def test_extract_gathers_snapshot_ids(self):
  296. """extract should gather snapshot IDs from various input formats."""
  297. from archivebox.misc.jsonl import TYPE_SNAPSHOT, TYPE_ARCHIVERESULT
  298. records = [
  299. {'type': TYPE_SNAPSHOT, 'id': 'snap-1'},
  300. {'type': TYPE_SNAPSHOT, 'id': 'snap-2', 'url': 'https://example.com'},
  301. {'type': TYPE_ARCHIVERESULT, 'snapshot_id': 'snap-3'},
  302. {'id': 'snap-4'}, # Bare id
  303. ]
  304. snapshot_ids = set()
  305. for record in records:
  306. record_type = record.get('type')
  307. if record_type == TYPE_SNAPSHOT:
  308. snapshot_id = record.get('id')
  309. if snapshot_id:
  310. snapshot_ids.add(snapshot_id)
  311. elif record_type == TYPE_ARCHIVERESULT:
  312. snapshot_id = record.get('snapshot_id')
  313. if snapshot_id:
  314. snapshot_ids.add(snapshot_id)
  315. elif 'id' in record:
  316. snapshot_ids.add(record['id'])
  317. self.assertEqual(len(snapshot_ids), 4)
  318. self.assertIn('snap-1', snapshot_ids)
  319. self.assertIn('snap-2', snapshot_ids)
  320. self.assertIn('snap-3', snapshot_ids)
  321. self.assertIn('snap-4', snapshot_ids)
  322. # =============================================================================
  323. # URL Collection Tests
  324. # =============================================================================
  325. class TestURLCollection(unittest.TestCase):
  326. """Test collecting urls.jsonl from extractor output."""
  327. def setUp(self):
  328. """Create test directory structure."""
  329. self.test_dir = Path(tempfile.mkdtemp())
  330. # Create fake extractor output directories with urls.jsonl
  331. (self.test_dir / 'wget').mkdir()
  332. (self.test_dir / 'wget' / 'urls.jsonl').write_text(
  333. '{"url": "https://wget-link-1.com"}\n'
  334. '{"url": "https://wget-link-2.com"}\n'
  335. )
  336. (self.test_dir / 'parse_html_urls').mkdir()
  337. (self.test_dir / 'parse_html_urls' / 'urls.jsonl').write_text(
  338. '{"url": "https://html-link-1.com"}\n'
  339. '{"url": "https://html-link-2.com", "title": "HTML Link 2"}\n'
  340. )
  341. (self.test_dir / 'screenshot').mkdir()
  342. # No urls.jsonl in screenshot dir - not a parser
  343. def tearDown(self):
  344. """Clean up test directory."""
  345. shutil.rmtree(self.test_dir, ignore_errors=True)
  346. def test_collect_urls_from_plugins(self):
  347. """Should collect urls.jsonl from all parser plugin subdirectories."""
  348. from archivebox.hooks import collect_urls_from_plugins
  349. urls = collect_urls_from_plugins(self.test_dir)
  350. self.assertEqual(len(urls), 4)
  351. # Check that plugin is set
  352. plugins = {u['plugin'] for u in urls}
  353. self.assertIn('wget', plugins)
  354. self.assertIn('parse_html_urls', plugins)
  355. self.assertNotIn('screenshot', plugins) # No urls.jsonl
  356. def test_collect_urls_preserves_metadata(self):
  357. """Should preserve metadata from urls.jsonl entries."""
  358. from archivebox.hooks import collect_urls_from_plugins
  359. urls = collect_urls_from_plugins(self.test_dir)
  360. # Find the entry with title
  361. titled = [u for u in urls if u.get('title') == 'HTML Link 2']
  362. self.assertEqual(len(titled), 1)
  363. self.assertEqual(titled[0]['url'], 'https://html-link-2.com')
  364. def test_collect_urls_empty_dir(self):
  365. """Should handle empty or non-existent directories."""
  366. from archivebox.hooks import collect_urls_from_plugins
  367. empty_dir = self.test_dir / 'nonexistent'
  368. urls = collect_urls_from_plugins(empty_dir)
  369. self.assertEqual(len(urls), 0)
  370. # =============================================================================
  371. # Integration Tests
  372. # =============================================================================
  373. class TestPipingWorkflowIntegration(unittest.TestCase):
  374. """
  375. Integration tests for the complete piping workflow.
  376. These tests require Django to be set up and use the actual database.
  377. """
  378. @classmethod
  379. def setUpClass(cls):
  380. """Set up Django and test database."""
  381. cls.test_dir = tempfile.mkdtemp()
  382. os.environ['DATA_DIR'] = cls.test_dir
  383. # Initialize Django
  384. from archivebox.config.django import setup_django
  385. setup_django()
  386. # Initialize the archive
  387. from archivebox.cli.archivebox_init import init
  388. init()
  389. @classmethod
  390. def tearDownClass(cls):
  391. """Clean up test database."""
  392. shutil.rmtree(cls.test_dir, ignore_errors=True)
  393. def test_crawl_creates_and_outputs_jsonl(self):
  394. """
  395. Test: archivebox crawl URL1 URL2 URL3
  396. Should create a single Crawl with all URLs and output JSONL when piped.
  397. """
  398. from archivebox.crawls.models import Crawl
  399. from archivebox.misc.jsonl import TYPE_CRAWL
  400. from archivebox.base_models.models import get_or_create_system_user_pk
  401. created_by_id = get_or_create_system_user_pk()
  402. # Create crawl with multiple URLs (as newline-separated string)
  403. urls = 'https://test-crawl-1.example.com\nhttps://test-crawl-2.example.com'
  404. crawl = Crawl.from_jsonl({'urls': urls}, overrides={'created_by_id': created_by_id})
  405. self.assertIsNotNone(crawl)
  406. self.assertIsNotNone(crawl.id)
  407. self.assertEqual(crawl.urls, urls)
  408. self.assertEqual(crawl.status, 'queued')
  409. # Verify URLs list
  410. urls_list = crawl.get_urls_list()
  411. self.assertEqual(len(urls_list), 2)
  412. self.assertIn('https://test-crawl-1.example.com', urls_list)
  413. self.assertIn('https://test-crawl-2.example.com', urls_list)
  414. # Verify output format
  415. output = crawl.to_jsonl()
  416. self.assertEqual(output['type'], TYPE_CRAWL)
  417. self.assertIn('id', output)
  418. self.assertEqual(output['urls'], urls)
  419. self.assertIn('schema_version', output)
  420. def test_snapshot_accepts_crawl_jsonl(self):
  421. """
  422. Test: archivebox crawl URL | archivebox snapshot
  423. Snapshot should accept Crawl JSONL and create Snapshots for each URL.
  424. """
  425. from archivebox.crawls.models import Crawl
  426. from archivebox.core.models import Snapshot
  427. from archivebox.misc.jsonl import (
  428. read_args_or_stdin,
  429. TYPE_CRAWL, TYPE_SNAPSHOT
  430. )
  431. from archivebox.base_models.models import get_or_create_system_user_pk
  432. created_by_id = get_or_create_system_user_pk()
  433. # Step 1: Create crawl (simulating 'archivebox crawl')
  434. urls = 'https://crawl-to-snap-1.example.com\nhttps://crawl-to-snap-2.example.com'
  435. crawl = Crawl.from_jsonl({'urls': urls}, overrides={'created_by_id': created_by_id})
  436. crawl_output = crawl.to_jsonl()
  437. # Step 2: Parse crawl output as snapshot input
  438. stdin = StringIO(json.dumps(crawl_output) + '\n')
  439. stdin.isatty = lambda: False
  440. records = list(read_args_or_stdin((), stream=stdin))
  441. self.assertEqual(len(records), 1)
  442. self.assertEqual(records[0]['type'], TYPE_CRAWL)
  443. # Step 3: Create snapshots from crawl URLs
  444. created_snapshots = []
  445. for url in crawl.get_urls_list():
  446. snapshot = Snapshot.from_jsonl({'url': url}, overrides={'created_by_id': created_by_id})
  447. if snapshot:
  448. created_snapshots.append(snapshot)
  449. self.assertEqual(len(created_snapshots), 2)
  450. # Verify snapshot output
  451. for snapshot in created_snapshots:
  452. output = snapshot.to_jsonl()
  453. self.assertEqual(output['type'], TYPE_SNAPSHOT)
  454. self.assertIn(output['url'], [
  455. 'https://crawl-to-snap-1.example.com',
  456. 'https://crawl-to-snap-2.example.com'
  457. ])
  458. def test_snapshot_creates_and_outputs_jsonl(self):
  459. """
  460. Test: archivebox snapshot URL
  461. Should create a Snapshot and output JSONL when piped.
  462. """
  463. from archivebox.core.models import Snapshot
  464. from archivebox.misc.jsonl import (
  465. read_args_or_stdin, write_record,
  466. TYPE_SNAPSHOT
  467. )
  468. from archivebox.base_models.models import get_or_create_system_user_pk
  469. created_by_id = get_or_create_system_user_pk()
  470. # Simulate input
  471. url = 'https://test-snapshot-1.example.com'
  472. records = list(read_args_or_stdin((url,)))
  473. self.assertEqual(len(records), 1)
  474. self.assertEqual(records[0]['url'], url)
  475. # Create snapshot
  476. overrides = {'created_by_id': created_by_id}
  477. snapshot = Snapshot.from_jsonl(records[0], overrides=overrides)
  478. self.assertIsNotNone(snapshot.id)
  479. self.assertEqual(snapshot.url, url)
  480. # Verify output format
  481. output = snapshot.to_jsonl()
  482. self.assertEqual(output['type'], TYPE_SNAPSHOT)
  483. self.assertIn('id', output)
  484. self.assertEqual(output['url'], url)
  485. def test_extract_accepts_snapshot_from_previous_command(self):
  486. """
  487. Test: archivebox snapshot URL | archivebox extract
  488. Extract should accept JSONL output from snapshot command.
  489. """
  490. from archivebox.core.models import Snapshot, ArchiveResult
  491. from archivebox.misc.jsonl import (
  492. read_args_or_stdin,
  493. TYPE_SNAPSHOT
  494. )
  495. from archivebox.base_models.models import get_or_create_system_user_pk
  496. created_by_id = get_or_create_system_user_pk()
  497. # Step 1: Create snapshot (simulating 'archivebox snapshot')
  498. url = 'https://test-extract-1.example.com'
  499. overrides = {'created_by_id': created_by_id}
  500. snapshot = Snapshot.from_jsonl({'url': url}, overrides=overrides)
  501. snapshot_output = snapshot.to_jsonl()
  502. # Step 2: Parse snapshot output as extract input
  503. stdin = StringIO(json.dumps(snapshot_output) + '\n')
  504. stdin.isatty = lambda: False
  505. records = list(read_args_or_stdin((), stream=stdin))
  506. self.assertEqual(len(records), 1)
  507. self.assertEqual(records[0]['type'], TYPE_SNAPSHOT)
  508. self.assertEqual(records[0]['id'], str(snapshot.id))
  509. # Step 3: Gather snapshot IDs (as extract does)
  510. snapshot_ids = set()
  511. for record in records:
  512. if record.get('type') == TYPE_SNAPSHOT and record.get('id'):
  513. snapshot_ids.add(record['id'])
  514. self.assertIn(str(snapshot.id), snapshot_ids)
  515. def test_full_pipeline_crawl_snapshot_extract(self):
  516. """
  517. Test: archivebox crawl URL | archivebox snapshot | archivebox extract
  518. This is equivalent to: archivebox add --depth=0 URL
  519. """
  520. from archivebox.crawls.models import Crawl
  521. from archivebox.core.models import Snapshot
  522. from archivebox.misc.jsonl import (
  523. read_args_or_stdin,
  524. TYPE_CRAWL, TYPE_SNAPSHOT
  525. )
  526. from archivebox.base_models.models import get_or_create_system_user_pk
  527. created_by_id = get_or_create_system_user_pk()
  528. # === archivebox crawl https://example.com ===
  529. url = 'https://test-pipeline-full.example.com'
  530. crawl = Crawl.from_jsonl({'url': url}, overrides={'created_by_id': created_by_id})
  531. crawl_jsonl = json.dumps(crawl.to_jsonl())
  532. # === | archivebox snapshot ===
  533. stdin = StringIO(crawl_jsonl + '\n')
  534. stdin.isatty = lambda: False
  535. records = list(read_args_or_stdin((), stream=stdin))
  536. self.assertEqual(len(records), 1)
  537. self.assertEqual(records[0]['type'], TYPE_CRAWL)
  538. # Create snapshots from crawl
  539. created_snapshots = []
  540. for record in records:
  541. if record.get('type') == TYPE_CRAWL:
  542. crawl_id = record.get('id')
  543. if crawl_id:
  544. db_crawl = Crawl.objects.get(id=crawl_id)
  545. for crawl_url in db_crawl.get_urls_list():
  546. snapshot = Snapshot.from_jsonl({'url': crawl_url}, overrides={'created_by_id': created_by_id})
  547. if snapshot:
  548. created_snapshots.append(snapshot)
  549. self.assertEqual(len(created_snapshots), 1)
  550. self.assertEqual(created_snapshots[0].url, url)
  551. # === | archivebox extract ===
  552. snapshot_jsonl_lines = [json.dumps(s.to_jsonl()) for s in created_snapshots]
  553. stdin = StringIO('\n'.join(snapshot_jsonl_lines) + '\n')
  554. stdin.isatty = lambda: False
  555. records = list(read_args_or_stdin((), stream=stdin))
  556. self.assertEqual(len(records), 1)
  557. self.assertEqual(records[0]['type'], TYPE_SNAPSHOT)
  558. self.assertEqual(records[0]['id'], str(created_snapshots[0].id))
  559. class TestDepthWorkflows(unittest.TestCase):
  560. """Test various depth crawl workflows."""
  561. @classmethod
  562. def setUpClass(cls):
  563. """Set up Django and test database."""
  564. cls.test_dir = tempfile.mkdtemp()
  565. os.environ['DATA_DIR'] = cls.test_dir
  566. from archivebox.config.django import setup_django
  567. setup_django()
  568. from archivebox.cli.archivebox_init import init
  569. init()
  570. @classmethod
  571. def tearDownClass(cls):
  572. """Clean up test database."""
  573. shutil.rmtree(cls.test_dir, ignore_errors=True)
  574. def test_depth_0_workflow(self):
  575. """
  576. Test: archivebox crawl URL | archivebox snapshot | archivebox extract
  577. Depth 0: Only archive the specified URL, no recursive crawling.
  578. """
  579. from archivebox.crawls.models import Crawl
  580. from archivebox.core.models import Snapshot
  581. from archivebox.base_models.models import get_or_create_system_user_pk
  582. created_by_id = get_or_create_system_user_pk()
  583. # Create crawl with depth 0
  584. url = 'https://depth0-test.example.com'
  585. crawl = Crawl.from_jsonl({'url': url, 'max_depth': 0}, overrides={'created_by_id': created_by_id})
  586. self.assertEqual(crawl.max_depth, 0)
  587. # Create snapshot
  588. snapshot = Snapshot.from_jsonl({'url': url}, overrides={'created_by_id': created_by_id})
  589. self.assertEqual(snapshot.url, url)
  590. def test_depth_metadata_in_crawl(self):
  591. """Test that depth metadata is stored in Crawl."""
  592. from archivebox.crawls.models import Crawl
  593. from archivebox.base_models.models import get_or_create_system_user_pk
  594. created_by_id = get_or_create_system_user_pk()
  595. # Create crawl with depth
  596. crawl = Crawl.from_jsonl(
  597. {'url': 'https://depth-meta-test.example.com', 'max_depth': 2},
  598. overrides={'created_by_id': created_by_id}
  599. )
  600. self.assertEqual(crawl.max_depth, 2)
  601. # Verify in JSONL output
  602. output = crawl.to_jsonl()
  603. self.assertEqual(output['max_depth'], 2)
  604. class TestParserPluginWorkflows(unittest.TestCase):
  605. """Test workflows with specific parser plugins."""
  606. @classmethod
  607. def setUpClass(cls):
  608. """Set up Django and test database."""
  609. cls.test_dir = tempfile.mkdtemp()
  610. os.environ['DATA_DIR'] = cls.test_dir
  611. from archivebox.config.django import setup_django
  612. setup_django()
  613. from archivebox.cli.archivebox_init import init
  614. init()
  615. @classmethod
  616. def tearDownClass(cls):
  617. """Clean up test database."""
  618. shutil.rmtree(cls.test_dir, ignore_errors=True)
  619. def test_html_parser_workflow(self):
  620. """
  621. Test: archivebox crawl --plugin=parse_html_urls URL | archivebox snapshot | archivebox extract
  622. """
  623. from archivebox.hooks import collect_urls_from_plugins
  624. from archivebox.misc.jsonl import TYPE_SNAPSHOT
  625. # Create mock output directory
  626. snapshot_dir = Path(self.test_dir) / 'archive' / 'html-parser-test'
  627. snapshot_dir.mkdir(parents=True, exist_ok=True)
  628. (snapshot_dir / 'parse_html_urls').mkdir(exist_ok=True)
  629. (snapshot_dir / 'parse_html_urls' / 'urls.jsonl').write_text(
  630. '{"url": "https://html-discovered.com", "title": "HTML Link"}\n'
  631. )
  632. # Collect URLs
  633. discovered = collect_urls_from_plugins(snapshot_dir)
  634. self.assertEqual(len(discovered), 1)
  635. self.assertEqual(discovered[0]['url'], 'https://html-discovered.com')
  636. self.assertEqual(discovered[0]['plugin'], 'parse_html_urls')
  637. def test_rss_parser_workflow(self):
  638. """
  639. Test: archivebox crawl --plugin=parse_rss_urls URL | archivebox snapshot | archivebox extract
  640. """
  641. from archivebox.hooks import collect_urls_from_plugins
  642. # Create mock output directory
  643. snapshot_dir = Path(self.test_dir) / 'archive' / 'rss-parser-test'
  644. snapshot_dir.mkdir(parents=True, exist_ok=True)
  645. (snapshot_dir / 'parse_rss_urls').mkdir(exist_ok=True)
  646. (snapshot_dir / 'parse_rss_urls' / 'urls.jsonl').write_text(
  647. '{"url": "https://rss-item-1.com", "title": "RSS Item 1"}\n'
  648. '{"url": "https://rss-item-2.com", "title": "RSS Item 2"}\n'
  649. )
  650. # Collect URLs
  651. discovered = collect_urls_from_plugins(snapshot_dir)
  652. self.assertEqual(len(discovered), 2)
  653. self.assertTrue(all(d['plugin'] == 'parse_rss_urls' for d in discovered))
  654. def test_multiple_parsers_dedupe(self):
  655. """
  656. Multiple parsers may discover the same URL - should be deduplicated.
  657. """
  658. from archivebox.hooks import collect_urls_from_plugins
  659. # Create mock output with duplicate URLs from different parsers
  660. snapshot_dir = Path(self.test_dir) / 'archive' / 'dedupe-test'
  661. snapshot_dir.mkdir(parents=True, exist_ok=True)
  662. (snapshot_dir / 'parse_html_urls').mkdir(exist_ok=True)
  663. (snapshot_dir / 'parse_html_urls' / 'urls.jsonl').write_text(
  664. '{"url": "https://same-url.com"}\n'
  665. )
  666. (snapshot_dir / 'wget').mkdir(exist_ok=True)
  667. (snapshot_dir / 'wget' / 'urls.jsonl').write_text(
  668. '{"url": "https://same-url.com"}\n' # Same URL, different extractor
  669. )
  670. # Collect URLs
  671. all_discovered = collect_urls_from_plugins(snapshot_dir)
  672. # Both entries are returned (deduplication happens at the crawl command level)
  673. self.assertEqual(len(all_discovered), 2)
  674. # Verify both extractors found the same URL
  675. urls = {d['url'] for d in all_discovered}
  676. self.assertEqual(urls, {'https://same-url.com'})
  677. class TestEdgeCases(unittest.TestCase):
  678. """Test edge cases and error handling."""
  679. def test_empty_input(self):
  680. """Commands should handle empty input gracefully."""
  681. from archivebox.misc.jsonl import read_args_or_stdin
  682. # Empty args, TTY stdin (should not block)
  683. stdin = StringIO('')
  684. stdin.isatty = lambda: True
  685. records = list(read_args_or_stdin((), stream=stdin))
  686. self.assertEqual(len(records), 0)
  687. def test_malformed_jsonl(self):
  688. """Should skip malformed JSONL lines."""
  689. from archivebox.misc.jsonl import read_args_or_stdin
  690. stdin = StringIO(
  691. '{"url": "https://good.com"}\n'
  692. 'not valid json\n'
  693. '{"url": "https://also-good.com"}\n'
  694. )
  695. stdin.isatty = lambda: False
  696. records = list(read_args_or_stdin((), stream=stdin))
  697. self.assertEqual(len(records), 2)
  698. urls = {r['url'] for r in records}
  699. self.assertEqual(urls, {'https://good.com', 'https://also-good.com'})
  700. def test_mixed_input_formats(self):
  701. """Should handle mixed URLs and JSONL."""
  702. from archivebox.misc.jsonl import read_args_or_stdin
  703. stdin = StringIO(
  704. 'https://plain-url.com\n'
  705. '{"type": "Snapshot", "url": "https://jsonl-url.com", "tags": "test"}\n'
  706. '01234567-89ab-cdef-0123-456789abcdef\n' # UUID
  707. )
  708. stdin.isatty = lambda: False
  709. records = list(read_args_or_stdin((), stream=stdin))
  710. self.assertEqual(len(records), 3)
  711. # Plain URL
  712. self.assertEqual(records[0]['url'], 'https://plain-url.com')
  713. # JSONL with metadata
  714. self.assertEqual(records[1]['url'], 'https://jsonl-url.com')
  715. self.assertEqual(records[1]['tags'], 'test')
  716. # UUID
  717. self.assertEqual(records[2]['id'], '01234567-89ab-cdef-0123-456789abcdef')
  718. def test_crawl_with_multiple_urls(self):
  719. """Crawl should handle multiple URLs in a single crawl."""
  720. from archivebox.misc.jsonl import TYPE_CRAWL
  721. # Test crawl JSONL with multiple URLs
  722. crawl_output = {
  723. 'type': TYPE_CRAWL,
  724. 'id': 'test-multi-url-crawl',
  725. 'urls': 'https://url1.com\nhttps://url2.com\nhttps://url3.com',
  726. 'max_depth': 0,
  727. }
  728. # Parse the URLs
  729. urls = [u.strip() for u in crawl_output['urls'].split('\n') if u.strip()]
  730. self.assertEqual(len(urls), 3)
  731. self.assertEqual(urls[0], 'https://url1.com')
  732. self.assertEqual(urls[1], 'https://url2.com')
  733. self.assertEqual(urls[2], 'https://url3.com')
  734. if __name__ == '__main__':
  735. unittest.main()