test_snapshot.py 7.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223
  1. #!/usr/bin/env python3
  2. """Integration tests for archivebox snapshot command."""
  3. import os
  4. import subprocess
  5. import sqlite3
  6. from archivebox.machine.models import Process
  7. from datetime import datetime
  8. from pathlib import Path
  9. from urllib.parse import urlparse
  10. import uuid
  11. import pytest
  12. from .fixtures import process, disable_extractors_dict
  13. def test_snapshot_creates_snapshot_with_correct_url(tmp_path, process, disable_extractors_dict):
  14. """Test that snapshot stores the exact URL in the database."""
  15. os.chdir(tmp_path)
  16. subprocess.run(
  17. ['archivebox', 'snapshot', 'create', 'https://example.com'],
  18. capture_output=True,
  19. env={**disable_extractors_dict, 'DATA_DIR': str(tmp_path)},
  20. )
  21. conn = sqlite3.connect('index.sqlite3')
  22. c = conn.cursor()
  23. snapshot_row = c.execute(
  24. "SELECT id, created_at, url, crawl_id FROM core_snapshot WHERE url = ?",
  25. ('https://example.com',)
  26. ).fetchone()
  27. assert snapshot_row is not None
  28. crawl_row = c.execute(
  29. "SELECT id, created_at, urls, created_by_id FROM crawls_crawl WHERE id = ?",
  30. (snapshot_row[3],)
  31. ).fetchone()
  32. assert crawl_row is not None
  33. user_row = c.execute(
  34. "SELECT username FROM auth_user WHERE id = ?",
  35. (crawl_row[3],)
  36. ).fetchone()
  37. assert user_row is not None
  38. conn.close()
  39. snapshot_id_raw, snapshot_created_at, snapshot_url, crawl_id = snapshot_row
  40. snapshot_id = str(uuid.UUID(snapshot_id_raw))
  41. crawl_id, crawl_created_at, crawl_urls, crawl_created_by_id = crawl_row
  42. username = user_row[0]
  43. crawl_date_str = datetime.fromisoformat(crawl_created_at).strftime('%Y%m%d')
  44. snapshot_date_str = datetime.fromisoformat(snapshot_created_at).strftime('%Y%m%d')
  45. domain = urlparse(snapshot_url).hostname or 'unknown'
  46. # Verify crawl symlink exists and is relative
  47. target_path = tmp_path / 'users' / username / 'snapshots' / snapshot_date_str / domain / snapshot_id
  48. symlinks = [
  49. p for p in tmp_path.rglob(str(snapshot_id))
  50. if p.is_symlink()
  51. ]
  52. assert symlinks, "Snapshot symlink should exist under crawl dir"
  53. link_path = symlinks[0]
  54. assert link_path.is_symlink(), "Snapshot symlink should exist under crawl dir"
  55. link_target = os.readlink(link_path)
  56. assert not os.path.isabs(link_target), "Symlink should be relative"
  57. assert link_path.resolve() == target_path.resolve()
  58. def test_snapshot_multiple_urls_creates_multiple_records(tmp_path, process, disable_extractors_dict):
  59. """Test that multiple URLs each get their own snapshot record."""
  60. os.chdir(tmp_path)
  61. subprocess.run(
  62. ['archivebox', 'snapshot', 'create',
  63. 'https://example.com',
  64. 'https://iana.org'],
  65. capture_output=True,
  66. env={**disable_extractors_dict, 'DATA_DIR': str(tmp_path)},
  67. )
  68. conn = sqlite3.connect('index.sqlite3')
  69. c = conn.cursor()
  70. urls = c.execute("SELECT url FROM core_snapshot ORDER BY url").fetchall()
  71. conn.close()
  72. urls = [u[0] for u in urls]
  73. assert 'https://example.com' in urls
  74. assert 'https://iana.org' in urls
  75. assert len(urls) >= 2
  76. def test_snapshot_tag_creates_tag_and_links_to_snapshot(tmp_path, process, disable_extractors_dict):
  77. """Test that --tag creates tag record and links it to the snapshot."""
  78. os.chdir(tmp_path)
  79. subprocess.run(
  80. ['archivebox', 'snapshot', 'create', '--tag=mytesttag',
  81. 'https://example.com'],
  82. capture_output=True,
  83. env={**disable_extractors_dict, 'DATA_DIR': str(tmp_path)},
  84. )
  85. conn = sqlite3.connect('index.sqlite3')
  86. c = conn.cursor()
  87. # Verify tag was created
  88. tag = c.execute("SELECT id, name FROM core_tag WHERE name = ?", ('mytesttag',)).fetchone()
  89. assert tag is not None, "Tag 'mytesttag' should exist in core_tag"
  90. tag_id = tag[0]
  91. # Verify snapshot exists
  92. snapshot = c.execute("SELECT id FROM core_snapshot WHERE url = ?",
  93. ('https://example.com',)).fetchone()
  94. assert snapshot is not None
  95. snapshot_id = snapshot[0]
  96. # Verify tag is linked to snapshot via join table
  97. link = c.execute("""
  98. SELECT * FROM core_snapshot_tags
  99. WHERE snapshot_id = ? AND tag_id = ?
  100. """, (snapshot_id, tag_id)).fetchone()
  101. conn.close()
  102. assert link is not None, "Tag should be linked to snapshot via core_snapshot_tags"
  103. def test_snapshot_jsonl_output_has_correct_structure(tmp_path, process, disable_extractors_dict):
  104. """Test that JSONL output contains required fields with correct types."""
  105. os.chdir(tmp_path)
  106. # Pass URL as argument instead of stdin for more reliable behavior
  107. result = subprocess.run(
  108. ['archivebox', 'snapshot', 'create', 'https://example.com'],
  109. capture_output=True,
  110. text=True,
  111. env={**disable_extractors_dict, 'DATA_DIR': str(tmp_path)},
  112. )
  113. # Parse JSONL output lines
  114. records = Process.parse_records_from_text(result.stdout)
  115. snapshot_records = [r for r in records if r.get('type') == 'Snapshot']
  116. assert len(snapshot_records) >= 1, "Should output at least one Snapshot JSONL record"
  117. record = snapshot_records[0]
  118. assert record.get('type') == 'Snapshot'
  119. assert 'id' in record, "Snapshot record should have 'id' field"
  120. assert 'url' in record, "Snapshot record should have 'url' field"
  121. assert record['url'] == 'https://example.com'
  122. def test_snapshot_with_tag_stores_tag_name(tmp_path, process, disable_extractors_dict):
  123. """Test that title is stored when provided via tag option."""
  124. os.chdir(tmp_path)
  125. # Use command line args instead of stdin
  126. subprocess.run(
  127. ['archivebox', 'snapshot', 'create', '--tag=customtag', 'https://example.com'],
  128. capture_output=True,
  129. text=True,
  130. env={**disable_extractors_dict, 'DATA_DIR': str(tmp_path)},
  131. )
  132. conn = sqlite3.connect('index.sqlite3')
  133. c = conn.cursor()
  134. # Verify tag was created with correct name
  135. tag = c.execute("SELECT name FROM core_tag WHERE name = ?",
  136. ('customtag',)).fetchone()
  137. conn.close()
  138. assert tag is not None
  139. assert tag[0] == 'customtag'
  140. def test_snapshot_with_depth_sets_snapshot_depth(tmp_path, process, disable_extractors_dict):
  141. """Test that --depth sets snapshot depth when creating snapshots."""
  142. os.chdir(tmp_path)
  143. subprocess.run(
  144. ['archivebox', 'snapshot', 'create', '--depth=1',
  145. 'https://example.com'],
  146. capture_output=True,
  147. env={**disable_extractors_dict, 'DATA_DIR': str(tmp_path)},
  148. )
  149. conn = sqlite3.connect('index.sqlite3')
  150. c = conn.cursor()
  151. snapshot = c.execute("SELECT depth FROM core_snapshot ORDER BY created_at DESC LIMIT 1").fetchone()
  152. conn.close()
  153. assert snapshot is not None, "Snapshot should be created when depth is provided"
  154. assert snapshot[0] == 1, "Snapshot depth should match --depth value"
  155. def test_snapshot_allows_duplicate_urls_across_crawls(tmp_path, process, disable_extractors_dict):
  156. """Snapshot create auto-creates a crawl per run; same URL can appear multiple times."""
  157. os.chdir(tmp_path)
  158. # Add same URL twice
  159. subprocess.run(
  160. ['archivebox', 'snapshot', 'create', 'https://example.com'],
  161. capture_output=True,
  162. env={**disable_extractors_dict, 'DATA_DIR': str(tmp_path)},
  163. )
  164. subprocess.run(
  165. ['archivebox', 'snapshot', 'create', 'https://example.com'],
  166. capture_output=True,
  167. env={**disable_extractors_dict, 'DATA_DIR': str(tmp_path)},
  168. )
  169. conn = sqlite3.connect('index.sqlite3')
  170. c = conn.cursor()
  171. count = c.execute("SELECT COUNT(*) FROM core_snapshot WHERE url = ?",
  172. ('https://example.com',)).fetchone()[0]
  173. conn.close()
  174. assert count == 2, "Same URL should create separate snapshots across different crawls"
  175. if __name__ == '__main__':
  176. pytest.main([__file__, '-v'])