test_migrations_04_to_09.py 7.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178
  1. #!/usr/bin/env python3
  2. """
  3. Migration tests from 0.4.x to 0.9.x.
  4. 0.4.x was the first Django-powered version with a simpler schema:
  5. - No Tag model (tags stored as comma-separated string in Snapshot)
  6. - No ArchiveResult model (results stored in JSON files)
  7. """
  8. import shutil
  9. import sqlite3
  10. import tempfile
  11. import unittest
  12. from pathlib import Path
  13. from .test_migrations_helpers import (
  14. SCHEMA_0_4,
  15. seed_0_4_data,
  16. run_archivebox,
  17. create_data_dir_structure,
  18. verify_snapshot_count,
  19. verify_snapshot_urls,
  20. verify_tag_count,
  21. )
  22. class TestMigrationFrom04x(unittest.TestCase):
  23. """Test migration from 0.4.x schema to latest."""
  24. def setUp(self):
  25. """Create a temporary directory with 0.4.x schema and data."""
  26. self.work_dir = Path(tempfile.mkdtemp())
  27. self.db_path = self.work_dir / 'index.sqlite3'
  28. # Create directory structure
  29. create_data_dir_structure(self.work_dir)
  30. # Create database with 0.4.x schema
  31. conn = sqlite3.connect(str(self.db_path))
  32. conn.executescript(SCHEMA_0_4)
  33. conn.close()
  34. # Seed with test data
  35. self.original_data = seed_0_4_data(self.db_path)
  36. def tearDown(self):
  37. """Clean up temporary directory."""
  38. shutil.rmtree(self.work_dir, ignore_errors=True)
  39. def test_migration_preserves_snapshot_count(self):
  40. """Migration should preserve all snapshots from 0.4.x."""
  41. expected_count = len(self.original_data['snapshots'])
  42. result = run_archivebox(self.work_dir, ['init'], timeout=45)
  43. self.assertEqual(result.returncode, 0, f"Init failed: {result.stderr}")
  44. ok, msg = verify_snapshot_count(self.db_path, expected_count)
  45. self.assertTrue(ok, msg)
  46. def test_migration_preserves_snapshot_urls(self):
  47. """Migration should preserve all snapshot URLs from 0.4.x."""
  48. expected_urls = [s['url'] for s in self.original_data['snapshots']]
  49. result = run_archivebox(self.work_dir, ['init'], timeout=45)
  50. self.assertEqual(result.returncode, 0, f"Init failed: {result.stderr}")
  51. ok, msg = verify_snapshot_urls(self.db_path, expected_urls)
  52. self.assertTrue(ok, msg)
  53. def test_migration_converts_string_tags_to_model(self):
  54. """Migration should convert comma-separated tags to Tag model instances."""
  55. result = run_archivebox(self.work_dir, ['init'], timeout=45)
  56. self.assertEqual(result.returncode, 0, f"Init failed: {result.stderr}")
  57. # Collect unique tags from original data
  58. original_tags = set()
  59. for tags_str in self.original_data['tags_str']:
  60. if tags_str:
  61. for tag in tags_str.split(','):
  62. original_tags.add(tag.strip())
  63. # Tags should have been created
  64. ok, msg = verify_tag_count(self.db_path, len(original_tags))
  65. self.assertTrue(ok, msg)
  66. def test_migration_preserves_snapshot_titles(self):
  67. """Migration should preserve all snapshot titles."""
  68. result = run_archivebox(self.work_dir, ['init'], timeout=45)
  69. self.assertEqual(result.returncode, 0, f"Init failed: {result.stderr}")
  70. conn = sqlite3.connect(str(self.db_path))
  71. cursor = conn.cursor()
  72. cursor.execute("SELECT url, title FROM core_snapshot")
  73. actual = {row[0]: row[1] for row in cursor.fetchall()}
  74. conn.close()
  75. for snapshot in self.original_data['snapshots']:
  76. self.assertEqual(
  77. actual.get(snapshot['url']),
  78. snapshot['title'],
  79. f"Title mismatch for {snapshot['url']}"
  80. )
  81. def test_status_works_after_migration(self):
  82. """Status command should work after migration."""
  83. result = run_archivebox(self.work_dir, ['init'], timeout=45)
  84. self.assertEqual(result.returncode, 0, f"Init failed: {result.stderr}")
  85. result = run_archivebox(self.work_dir, ['status'])
  86. self.assertEqual(result.returncode, 0, f"Status failed after migration: {result.stderr}")
  87. def test_list_works_after_migration(self):
  88. """List command should work and show ALL migrated snapshots."""
  89. result = run_archivebox(self.work_dir, ['init'], timeout=45)
  90. self.assertEqual(result.returncode, 0, f"Init failed: {result.stderr}")
  91. result = run_archivebox(self.work_dir, ['list'])
  92. self.assertEqual(result.returncode, 0, f"List failed after migration: {result.stderr}")
  93. # Verify ALL snapshots appear in output
  94. output = result.stdout + result.stderr
  95. for snapshot in self.original_data['snapshots']:
  96. url_fragment = snapshot['url'][:30]
  97. self.assertIn(url_fragment, output,
  98. f"Snapshot {snapshot['url']} not found in list output")
  99. def test_add_works_after_migration(self):
  100. """Adding new URLs should work after migration from 0.4.x."""
  101. result = run_archivebox(self.work_dir, ['init'], timeout=45)
  102. self.assertEqual(result.returncode, 0, f"Init failed: {result.stderr}")
  103. # Try to add a new URL after migration
  104. result = run_archivebox(self.work_dir, ['add', '--index-only', 'https://example.com/new-page'], timeout=45)
  105. self.assertEqual(result.returncode, 0, f"Add failed after migration: {result.stderr}")
  106. # Verify snapshot was added
  107. conn = sqlite3.connect(str(self.db_path))
  108. cursor = conn.cursor()
  109. cursor.execute("SELECT COUNT(*) FROM core_snapshot WHERE url = 'https://example.com/new-page'")
  110. count = cursor.fetchone()[0]
  111. conn.close()
  112. self.assertEqual(count, 1, "New snapshot was not created after migration")
  113. def test_new_schema_elements_created(self):
  114. """Migration should create new 0.9.x schema elements."""
  115. result = run_archivebox(self.work_dir, ['init'], timeout=45)
  116. self.assertEqual(result.returncode, 0, f"Init failed: {result.stderr}")
  117. conn = sqlite3.connect(str(self.db_path))
  118. cursor = conn.cursor()
  119. cursor.execute("SELECT name FROM sqlite_master WHERE type='table'")
  120. tables = {row[0] for row in cursor.fetchall()}
  121. conn.close()
  122. # New tables should exist
  123. self.assertIn('crawls_crawl', tables, "crawls_crawl table not created")
  124. self.assertIn('core_tag', tables, "core_tag table not created")
  125. self.assertIn('core_archiveresult', tables, "core_archiveresult table not created")
  126. def test_snapshots_have_new_fields(self):
  127. """Migrated snapshots should have new 0.9.x fields."""
  128. result = run_archivebox(self.work_dir, ['init'], timeout=45)
  129. self.assertEqual(result.returncode, 0, f"Init failed: {result.stderr}")
  130. conn = sqlite3.connect(str(self.db_path))
  131. cursor = conn.cursor()
  132. cursor.execute('PRAGMA table_info(core_snapshot)')
  133. columns = {row[1] for row in cursor.fetchall()}
  134. conn.close()
  135. required_columns = {'status', 'depth', 'created_at', 'modified_at'}
  136. for col in required_columns:
  137. self.assertIn(col, columns, f"Snapshot missing new column: {col}")
  138. if __name__ == '__main__':
  139. unittest.main()