test_cli_update.py 3.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145
  1. #!/usr/bin/env python3
  2. """
  3. Comprehensive tests for archivebox update command.
  4. Verify update drains old dirs, reconciles DB, and queues snapshots.
  5. """
  6. import os
  7. import subprocess
  8. import sqlite3
  9. from .fixtures import *
  10. def test_update_runs_successfully_on_empty_archive(tmp_path, process):
  11. """Test that update runs without error on empty archive."""
  12. os.chdir(tmp_path)
  13. result = subprocess.run(
  14. ['archivebox', 'update'],
  15. capture_output=True,
  16. text=True,
  17. timeout=30,
  18. )
  19. # Should complete successfully even with no snapshots
  20. assert result.returncode == 0
  21. def test_update_reconciles_existing_snapshots(tmp_path, process, disable_extractors_dict):
  22. """Test that update command reconciles existing snapshots."""
  23. os.chdir(tmp_path)
  24. # Add a snapshot (index-only for faster test)
  25. subprocess.run(
  26. ['archivebox', 'add', '--index-only', '--depth=0', 'https://example.com'],
  27. capture_output=True,
  28. env=disable_extractors_dict,
  29. )
  30. # Run update - should reconcile and queue
  31. result = subprocess.run(
  32. ['archivebox', 'update'],
  33. capture_output=True,
  34. env=disable_extractors_dict,
  35. timeout=30,
  36. )
  37. assert result.returncode == 0
  38. def test_update_specific_snapshot_by_filter(tmp_path, process, disable_extractors_dict):
  39. """Test updating specific snapshot using filter."""
  40. os.chdir(tmp_path)
  41. # Add multiple snapshots
  42. subprocess.run(
  43. ['archivebox', 'add', '--depth=0', 'https://example.com'],
  44. capture_output=True,
  45. env=disable_extractors_dict,
  46. timeout=90,
  47. )
  48. subprocess.run(
  49. ['archivebox', 'add', '--depth=0', 'https://example.org'],
  50. capture_output=True,
  51. env=disable_extractors_dict,
  52. timeout=90,
  53. )
  54. # Update with filter pattern (uses filter_patterns argument)
  55. result = subprocess.run(
  56. ['archivebox', 'update', '--filter-type=substring', 'example.com'],
  57. capture_output=True,
  58. env=disable_extractors_dict,
  59. timeout=30,
  60. )
  61. # Should complete successfully
  62. assert result.returncode == 0
  63. def test_update_preserves_snapshot_count(tmp_path, process, disable_extractors_dict):
  64. """Test that update doesn't change snapshot count."""
  65. os.chdir(tmp_path)
  66. # Add snapshots
  67. subprocess.run(
  68. ['archivebox', 'add', '--depth=0', 'https://example.com'],
  69. capture_output=True,
  70. env=disable_extractors_dict,
  71. timeout=90,
  72. )
  73. # Count before update
  74. conn = sqlite3.connect("index.sqlite3")
  75. c = conn.cursor()
  76. count_before = c.execute("SELECT COUNT(*) FROM core_snapshot").fetchone()[0]
  77. conn.close()
  78. assert count_before == 1
  79. # Run update (should reconcile + queue, not create new snapshots)
  80. subprocess.run(
  81. ['archivebox', 'update'],
  82. capture_output=True,
  83. env=disable_extractors_dict,
  84. timeout=30,
  85. )
  86. # Count after update
  87. conn = sqlite3.connect("index.sqlite3")
  88. c = conn.cursor()
  89. count_after = c.execute("SELECT COUNT(*) FROM core_snapshot").fetchone()[0]
  90. conn.close()
  91. # Snapshot count should remain the same
  92. assert count_after == count_before
  93. def test_update_queues_snapshots_for_archiving(tmp_path, process, disable_extractors_dict):
  94. """Test that update queues snapshots for archiving."""
  95. os.chdir(tmp_path)
  96. subprocess.run(
  97. ['archivebox', 'add', '--depth=0', 'https://example.com'],
  98. capture_output=True,
  99. env=disable_extractors_dict,
  100. timeout=90,
  101. )
  102. # Run update
  103. result = subprocess.run(
  104. ['archivebox', 'update'],
  105. capture_output=True,
  106. env=disable_extractors_dict,
  107. timeout=30,
  108. )
  109. assert result.returncode == 0
  110. # Check that snapshot is queued
  111. conn = sqlite3.connect("index.sqlite3")
  112. c = conn.cursor()
  113. status = c.execute("SELECT status FROM core_snapshot").fetchone()[0]
  114. conn.close()
  115. assert status == 'queued'