test_extract.py 8.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277
  1. #!/usr/bin/env python3
  2. """Integration tests for archivebox extract command."""
  3. import os
  4. import subprocess
  5. import sqlite3
  6. import json
  7. import pytest
  8. from .fixtures import process, disable_extractors_dict
  9. def test_extract_runs_on_snapshot_id(tmp_path, process, disable_extractors_dict):
  10. """Test that extract command accepts a snapshot ID."""
  11. os.chdir(tmp_path)
  12. # First create a snapshot
  13. subprocess.run(
  14. ['archivebox', 'add', '--index-only', 'https://example.com'],
  15. capture_output=True,
  16. env=disable_extractors_dict,
  17. )
  18. # Get the snapshot ID
  19. conn = sqlite3.connect('index.sqlite3')
  20. c = conn.cursor()
  21. snapshot_id = c.execute("SELECT id FROM core_snapshot LIMIT 1").fetchone()[0]
  22. conn.close()
  23. # Run extract on the snapshot
  24. result = subprocess.run(
  25. ['archivebox', 'extract', '--no-wait', str(snapshot_id)],
  26. capture_output=True,
  27. text=True,
  28. env=disable_extractors_dict,
  29. )
  30. # Should not error about invalid snapshot ID
  31. assert 'not found' not in result.stderr.lower()
  32. def test_extract_with_enabled_extractor_creates_archiveresult(tmp_path, process, disable_extractors_dict):
  33. """Test that extract creates ArchiveResult when extractor is enabled."""
  34. os.chdir(tmp_path)
  35. # First create a snapshot
  36. subprocess.run(
  37. ['archivebox', 'add', '--index-only', 'https://example.com'],
  38. capture_output=True,
  39. env=disable_extractors_dict,
  40. )
  41. # Get the snapshot ID
  42. conn = sqlite3.connect('index.sqlite3')
  43. c = conn.cursor()
  44. snapshot_id = c.execute("SELECT id FROM core_snapshot LIMIT 1").fetchone()[0]
  45. conn.close()
  46. # Run extract with title extractor enabled
  47. env = disable_extractors_dict.copy()
  48. env['SAVE_TITLE'] = 'true'
  49. subprocess.run(
  50. ['archivebox', 'extract', '--no-wait', str(snapshot_id)],
  51. capture_output=True,
  52. text=True,
  53. env=env,
  54. )
  55. # Check for archiveresults (may be queued, not completed with --no-wait)
  56. conn = sqlite3.connect('index.sqlite3')
  57. c = conn.cursor()
  58. count = c.execute("SELECT COUNT(*) FROM core_archiveresult WHERE snapshot_id = ?",
  59. (snapshot_id,)).fetchone()[0]
  60. conn.close()
  61. # May or may not have results depending on timing
  62. assert count >= 0
  63. def test_extract_plugin_option_accepted(tmp_path, process, disable_extractors_dict):
  64. """Test that --plugin option is accepted."""
  65. os.chdir(tmp_path)
  66. # First create a snapshot
  67. subprocess.run(
  68. ['archivebox', 'add', '--index-only', 'https://example.com'],
  69. capture_output=True,
  70. env=disable_extractors_dict,
  71. )
  72. # Get the snapshot ID
  73. conn = sqlite3.connect('index.sqlite3')
  74. c = conn.cursor()
  75. snapshot_id = c.execute("SELECT id FROM core_snapshot LIMIT 1").fetchone()[0]
  76. conn.close()
  77. result = subprocess.run(
  78. ['archivebox', 'extract', '--plugin=title', '--no-wait', str(snapshot_id)],
  79. capture_output=True,
  80. text=True,
  81. env=disable_extractors_dict,
  82. )
  83. assert 'unrecognized arguments: --plugin' not in result.stderr
  84. def test_extract_stdin_snapshot_id(tmp_path, process, disable_extractors_dict):
  85. """Test that extract reads snapshot IDs from stdin."""
  86. os.chdir(tmp_path)
  87. # First create a snapshot
  88. subprocess.run(
  89. ['archivebox', 'add', '--index-only', 'https://example.com'],
  90. capture_output=True,
  91. env=disable_extractors_dict,
  92. )
  93. # Get the snapshot ID
  94. conn = sqlite3.connect('index.sqlite3')
  95. c = conn.cursor()
  96. snapshot_id = c.execute("SELECT id FROM core_snapshot LIMIT 1").fetchone()[0]
  97. conn.close()
  98. result = subprocess.run(
  99. ['archivebox', 'extract', '--no-wait'],
  100. input=f'{snapshot_id}\n',
  101. capture_output=True,
  102. text=True,
  103. env=disable_extractors_dict,
  104. )
  105. # Should not show "not found" error
  106. assert 'not found' not in result.stderr.lower() or result.returncode == 0
  107. def test_extract_stdin_jsonl_input(tmp_path, process, disable_extractors_dict):
  108. """Test that extract reads JSONL records from stdin."""
  109. os.chdir(tmp_path)
  110. # First create a snapshot
  111. subprocess.run(
  112. ['archivebox', 'add', '--index-only', 'https://example.com'],
  113. capture_output=True,
  114. env=disable_extractors_dict,
  115. )
  116. # Get the snapshot ID
  117. conn = sqlite3.connect('index.sqlite3')
  118. c = conn.cursor()
  119. snapshot_id = c.execute("SELECT id FROM core_snapshot LIMIT 1").fetchone()[0]
  120. conn.close()
  121. jsonl_input = json.dumps({"type": "Snapshot", "id": str(snapshot_id)}) + '\n'
  122. result = subprocess.run(
  123. ['archivebox', 'extract', '--no-wait'],
  124. input=jsonl_input,
  125. capture_output=True,
  126. text=True,
  127. env=disable_extractors_dict,
  128. )
  129. # Should not show "not found" error
  130. assert 'not found' not in result.stderr.lower() or result.returncode == 0
  131. def test_extract_pipeline_from_snapshot(tmp_path, process, disable_extractors_dict):
  132. """Test piping snapshot output to extract."""
  133. os.chdir(tmp_path)
  134. # Create snapshot and pipe to extract
  135. snapshot_proc = subprocess.Popen(
  136. ['archivebox', 'snapshot', 'https://example.com'],
  137. stdout=subprocess.PIPE,
  138. stderr=subprocess.PIPE,
  139. env=disable_extractors_dict,
  140. )
  141. subprocess.run(
  142. ['archivebox', 'extract', '--no-wait'],
  143. stdin=snapshot_proc.stdout,
  144. capture_output=True,
  145. text=True,
  146. env=disable_extractors_dict,
  147. )
  148. snapshot_proc.wait()
  149. # Check database for snapshot
  150. conn = sqlite3.connect('index.sqlite3')
  151. c = conn.cursor()
  152. snapshot = c.execute("SELECT id, url FROM core_snapshot WHERE url = ?",
  153. ('https://example.com',)).fetchone()
  154. conn.close()
  155. assert snapshot is not None, "Snapshot should be created by pipeline"
  156. def test_extract_multiple_snapshots(tmp_path, process, disable_extractors_dict):
  157. """Test extracting from multiple snapshots."""
  158. os.chdir(tmp_path)
  159. # Create multiple snapshots one at a time to avoid deduplication issues
  160. subprocess.run(
  161. ['archivebox', 'add', '--index-only', 'https://example.com'],
  162. capture_output=True,
  163. env=disable_extractors_dict,
  164. )
  165. subprocess.run(
  166. ['archivebox', 'add', '--index-only', 'https://iana.org'],
  167. capture_output=True,
  168. env=disable_extractors_dict,
  169. )
  170. # Get all snapshot IDs
  171. conn = sqlite3.connect('index.sqlite3')
  172. c = conn.cursor()
  173. snapshot_ids = c.execute("SELECT id FROM core_snapshot").fetchall()
  174. conn.close()
  175. assert len(snapshot_ids) >= 2, "Should have at least 2 snapshots"
  176. # Extract from all snapshots
  177. ids_input = '\n'.join(str(s[0]) for s in snapshot_ids) + '\n'
  178. result = subprocess.run(
  179. ['archivebox', 'extract', '--no-wait'],
  180. input=ids_input,
  181. capture_output=True,
  182. text=True,
  183. env=disable_extractors_dict,
  184. )
  185. # Should not error
  186. conn = sqlite3.connect('index.sqlite3')
  187. c = conn.cursor()
  188. count = c.execute("SELECT COUNT(*) FROM core_snapshot").fetchone()[0]
  189. conn.close()
  190. assert count >= 2, "Both snapshots should still exist after extraction"
  191. class TestExtractCLI:
  192. """Test the CLI interface for extract command."""
  193. def test_cli_help(self, tmp_path, process):
  194. """Test that --help works for extract command."""
  195. os.chdir(tmp_path)
  196. result = subprocess.run(
  197. ['archivebox', 'extract', '--help'],
  198. capture_output=True,
  199. text=True,
  200. )
  201. assert result.returncode == 0
  202. assert '--plugin' in result.stdout or '-p' in result.stdout
  203. assert '--wait' in result.stdout or '--no-wait' in result.stdout
  204. def test_cli_no_snapshots_shows_warning(self, tmp_path, process):
  205. """Test that running without snapshots shows a warning."""
  206. os.chdir(tmp_path)
  207. result = subprocess.run(
  208. ['archivebox', 'extract', '--no-wait'],
  209. input='',
  210. capture_output=True,
  211. text=True,
  212. )
  213. # Should show warning about no snapshots or exit normally (empty input)
  214. assert result.returncode == 0 or 'No' in result.stderr
  215. if __name__ == '__main__':
  216. pytest.main([__file__, '-v'])