test_snapshot.py 6.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194
  1. #!/usr/bin/env python3
  2. """Integration tests for archivebox snapshot command."""
  3. import os
  4. import subprocess
  5. import sqlite3
  6. import json
  7. import pytest
  8. from .fixtures import process, disable_extractors_dict
  9. def test_snapshot_creates_snapshot_with_correct_url(tmp_path, process, disable_extractors_dict):
  10. """Test that snapshot stores the exact URL in the database."""
  11. os.chdir(tmp_path)
  12. subprocess.run(
  13. ['archivebox', 'snapshot', 'https://example.com'],
  14. capture_output=True,
  15. env=disable_extractors_dict,
  16. )
  17. conn = sqlite3.connect('index.sqlite3')
  18. c = conn.cursor()
  19. result = c.execute("SELECT url FROM core_snapshot WHERE url = ?",
  20. ('https://example.com',)).fetchone()
  21. conn.close()
  22. assert result is not None
  23. assert result[0] == 'https://example.com'
  24. def test_snapshot_multiple_urls_creates_multiple_records(tmp_path, process, disable_extractors_dict):
  25. """Test that multiple URLs each get their own snapshot record."""
  26. os.chdir(tmp_path)
  27. subprocess.run(
  28. ['archivebox', 'snapshot',
  29. 'https://example.com',
  30. 'https://iana.org'],
  31. capture_output=True,
  32. env=disable_extractors_dict,
  33. )
  34. conn = sqlite3.connect('index.sqlite3')
  35. c = conn.cursor()
  36. urls = c.execute("SELECT url FROM core_snapshot ORDER BY url").fetchall()
  37. conn.close()
  38. urls = [u[0] for u in urls]
  39. assert 'https://example.com' in urls
  40. assert 'https://iana.org' in urls
  41. assert len(urls) >= 2
  42. def test_snapshot_tag_creates_tag_and_links_to_snapshot(tmp_path, process, disable_extractors_dict):
  43. """Test that --tag creates tag record and links it to the snapshot."""
  44. os.chdir(tmp_path)
  45. subprocess.run(
  46. ['archivebox', 'snapshot', '--tag=mytesttag',
  47. 'https://example.com'],
  48. capture_output=True,
  49. env=disable_extractors_dict,
  50. )
  51. conn = sqlite3.connect('index.sqlite3')
  52. c = conn.cursor()
  53. # Verify tag was created
  54. tag = c.execute("SELECT id, name FROM core_tag WHERE name = ?", ('mytesttag',)).fetchone()
  55. assert tag is not None, "Tag 'mytesttag' should exist in core_tag"
  56. tag_id = tag[0]
  57. # Verify snapshot exists
  58. snapshot = c.execute("SELECT id FROM core_snapshot WHERE url = ?",
  59. ('https://example.com',)).fetchone()
  60. assert snapshot is not None
  61. snapshot_id = snapshot[0]
  62. # Verify tag is linked to snapshot via join table
  63. link = c.execute("""
  64. SELECT * FROM core_snapshot_tags
  65. WHERE snapshot_id = ? AND tag_id = ?
  66. """, (snapshot_id, tag_id)).fetchone()
  67. conn.close()
  68. assert link is not None, "Tag should be linked to snapshot via core_snapshot_tags"
  69. def test_snapshot_jsonl_output_has_correct_structure(tmp_path, process, disable_extractors_dict):
  70. """Test that JSONL output contains required fields with correct types."""
  71. os.chdir(tmp_path)
  72. # Pass URL as argument instead of stdin for more reliable behavior
  73. result = subprocess.run(
  74. ['archivebox', 'snapshot', 'https://example.com'],
  75. capture_output=True,
  76. text=True,
  77. env=disable_extractors_dict,
  78. )
  79. # Parse JSONL output lines
  80. snapshot_records = []
  81. for line in result.stdout.strip().split('\n'):
  82. if line:
  83. try:
  84. record = json.loads(line)
  85. if record.get('type') == 'Snapshot':
  86. snapshot_records.append(record)
  87. except json.JSONDecodeError:
  88. continue
  89. assert len(snapshot_records) >= 1, "Should output at least one Snapshot JSONL record"
  90. record = snapshot_records[0]
  91. assert record.get('type') == 'Snapshot'
  92. assert 'id' in record, "Snapshot record should have 'id' field"
  93. assert 'url' in record, "Snapshot record should have 'url' field"
  94. assert record['url'] == 'https://example.com'
  95. def test_snapshot_with_tag_stores_tag_name(tmp_path, process, disable_extractors_dict):
  96. """Test that title is stored when provided via tag option."""
  97. os.chdir(tmp_path)
  98. # Use command line args instead of stdin
  99. subprocess.run(
  100. ['archivebox', 'snapshot', '--tag=customtag', 'https://example.com'],
  101. capture_output=True,
  102. text=True,
  103. env=disable_extractors_dict,
  104. )
  105. conn = sqlite3.connect('index.sqlite3')
  106. c = conn.cursor()
  107. # Verify tag was created with correct name
  108. tag = c.execute("SELECT name FROM core_tag WHERE name = ?",
  109. ('customtag',)).fetchone()
  110. conn.close()
  111. assert tag is not None
  112. assert tag[0] == 'customtag'
  113. def test_snapshot_with_depth_creates_crawl_object(tmp_path, process, disable_extractors_dict):
  114. """Test that --depth > 0 creates a Crawl object with correct max_depth."""
  115. os.chdir(tmp_path)
  116. subprocess.run(
  117. ['archivebox', 'snapshot', '--depth=1',
  118. 'https://example.com'],
  119. capture_output=True,
  120. env=disable_extractors_dict,
  121. )
  122. conn = sqlite3.connect('index.sqlite3')
  123. c = conn.cursor()
  124. crawl = c.execute("SELECT max_depth FROM crawls_crawl ORDER BY created_at DESC LIMIT 1").fetchone()
  125. conn.close()
  126. assert crawl is not None, "Crawl object should be created when depth > 0"
  127. assert crawl[0] == 1, "Crawl max_depth should match --depth value"
  128. def test_snapshot_deduplicates_urls(tmp_path, process, disable_extractors_dict):
  129. """Test that adding the same URL twice doesn't create duplicate snapshots."""
  130. os.chdir(tmp_path)
  131. # Add same URL twice
  132. subprocess.run(
  133. ['archivebox', 'snapshot', 'https://example.com'],
  134. capture_output=True,
  135. env=disable_extractors_dict,
  136. )
  137. subprocess.run(
  138. ['archivebox', 'snapshot', 'https://example.com'],
  139. capture_output=True,
  140. env=disable_extractors_dict,
  141. )
  142. conn = sqlite3.connect('index.sqlite3')
  143. c = conn.cursor()
  144. count = c.execute("SELECT COUNT(*) FROM core_snapshot WHERE url = ?",
  145. ('https://example.com',)).fetchone()[0]
  146. conn.close()
  147. assert count == 1, "Same URL should not create duplicate snapshots"
  148. if __name__ == '__main__':
  149. pytest.main([__file__, '-v'])