test_init.py 7.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176
  1. # archivebox init
  2. # archivebox add
  3. import os
  4. import subprocess
  5. from pathlib import Path
  6. import json, shutil
  7. import sqlite3
  8. from archivebox.config import OUTPUT_PERMISSIONS
  9. from .fixtures import *
  10. def test_init(tmp_path, process):
  11. assert "Initializing a new ArchiveBox collection in this folder..." in process.stdout.decode("utf-8")
  12. def test_update(tmp_path, process):
  13. os.chdir(tmp_path)
  14. update_process = subprocess.run(['archivebox', 'init'], capture_output=True)
  15. assert "Updating existing ArchiveBox collection in this folder" in update_process.stdout.decode("utf-8")
  16. def test_add_link(tmp_path, process, disable_extractors_dict):
  17. disable_extractors_dict.update({"USE_WGET": "true"})
  18. os.chdir(tmp_path)
  19. add_process = subprocess.run(['archivebox', 'add', 'http://127.0.0.1:8080/static/example.com.html'],
  20. capture_output=True, env=disable_extractors_dict)
  21. archived_item_path = list(tmp_path.glob('archive/**/*'))[0]
  22. assert "index.json" in [x.name for x in archived_item_path.iterdir()]
  23. with open(archived_item_path / "index.json", "r") as f:
  24. output_json = json.load(f)
  25. assert "Example Domain" == output_json['history']['title'][0]['output']
  26. with open(archived_item_path / "index.html", "r") as f:
  27. output_html = f.read()
  28. assert "Example Domain" in output_html
  29. def test_add_link_support_stdin(tmp_path, process, disable_extractors_dict):
  30. disable_extractors_dict.update({"USE_WGET": "true"})
  31. os.chdir(tmp_path)
  32. stdin_process = subprocess.Popen(["archivebox", "add"], stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.STDOUT,
  33. env=disable_extractors_dict)
  34. stdin_process.communicate(input="http://127.0.0.1:8080/static/example.com.html".encode())
  35. archived_item_path = list(tmp_path.glob('archive/**/*'))[0]
  36. assert "index.json" in [x.name for x in archived_item_path.iterdir()]
  37. with open(archived_item_path / "index.json", "r") as f:
  38. output_json = json.load(f)
  39. assert "Example Domain" == output_json['history']['title'][0]['output']
  40. def test_correct_permissions_output_folder(tmp_path, process):
  41. index_files = ['index.sqlite3', 'archive']
  42. for file in index_files:
  43. file_path = tmp_path / file
  44. assert oct(file_path.stat().st_mode)[-3:] == OUTPUT_PERMISSIONS
  45. def test_correct_permissions_add_command_results(tmp_path, process, disable_extractors_dict):
  46. os.chdir(tmp_path)
  47. add_process = subprocess.run(['archivebox', 'add', 'http://127.0.0.1:8080/static/example.com.html'], capture_output=True,
  48. env=disable_extractors_dict)
  49. archived_item_path = list(tmp_path.glob('archive/**/*'))[0]
  50. for path in archived_item_path.iterdir():
  51. assert oct(path.stat().st_mode)[-3:] == OUTPUT_PERMISSIONS
  52. def test_collision_urls_different_timestamps(tmp_path, process, disable_extractors_dict):
  53. os.chdir(tmp_path)
  54. subprocess.run(['archivebox', 'add', 'http://127.0.0.1:8080/static/example.com.html'], capture_output=True,
  55. env=disable_extractors_dict)
  56. subprocess.run(['archivebox', 'add', 'http://127.0.0.1:8080/static/iana.org.html'], capture_output=True,
  57. env=disable_extractors_dict)
  58. archive_folders = [x.name for x in (tmp_path / "archive").iterdir()]
  59. first_archive = tmp_path / "archive" / str(min([float(folder) for folder in archive_folders]))
  60. json_index = str(first_archive / "index.json")
  61. with open(json_index, "r") as f:
  62. link_details = json.loads(f.read())
  63. link_details["url"] = "http://127.0.0.1:8080/static/iana.org.html"
  64. with open(json_index, "w") as f:
  65. json.dump(link_details, f)
  66. init_process = subprocess.run(['archivebox', 'init'], capture_output=True, env=disable_extractors_dict)
  67. # 1 from duplicated url, 1 from corrupted index
  68. assert "Skipped adding 2 invalid snapshot data directories" in init_process.stdout.decode("utf-8")
  69. assert init_process.returncode == 0
  70. def test_collision_timestamps_different_urls(tmp_path, process, disable_extractors_dict):
  71. os.chdir(tmp_path)
  72. subprocess.run(['archivebox', 'add', 'http://127.0.0.1:8080/static/example.com.html'], capture_output=True,
  73. env=disable_extractors_dict)
  74. subprocess.run(['archivebox', 'add', 'http://127.0.0.1:8080/static/iana.org.html'], capture_output=True,
  75. env=disable_extractors_dict)
  76. archive_folders = [x.name for x in (tmp_path / "archive").iterdir()]
  77. first_archive = tmp_path / "archive" / str(min([float(folder) for folder in archive_folders]))
  78. archive_folders.remove(first_archive.name)
  79. json_index = str(first_archive / "index.json")
  80. with open(json_index, "r") as f:
  81. link_details = json.loads(f.read())
  82. link_details["timestamp"] = archive_folders[0]
  83. with open(json_index, "w") as f:
  84. json.dump(link_details, f)
  85. init_process = subprocess.run(['archivebox', 'init'], capture_output=True, env=disable_extractors_dict)
  86. assert "Skipped adding 1 invalid snapshot data directories" in init_process.stdout.decode("utf-8")
  87. assert init_process.returncode == 0
  88. def test_orphaned_folders(tmp_path, process, disable_extractors_dict):
  89. os.chdir(tmp_path)
  90. subprocess.run(['archivebox', 'add', 'http://127.0.0.1:8080/static/example.com.html'], capture_output=True,
  91. env=disable_extractors_dict)
  92. list_process = subprocess.run(["archivebox", "list", "--json", "--with-headers"], capture_output=True)
  93. with open(tmp_path / "index.json", "wb") as f:
  94. f.write(list_process.stdout)
  95. conn = sqlite3.connect("index.sqlite3")
  96. c = conn.cursor()
  97. c.execute("DELETE from core_snapshot")
  98. conn.commit()
  99. conn.close()
  100. init_process = subprocess.run(['archivebox', 'init'], capture_output=True, env=disable_extractors_dict)
  101. assert "Added 1 orphaned snapshots from deprecated JSON index" in init_process.stdout.decode("utf-8")
  102. assert init_process.returncode == 0
  103. def test_unrecognized_folders(tmp_path, process, disable_extractors_dict):
  104. os.chdir(tmp_path)
  105. subprocess.run(['archivebox', 'add', 'http://127.0.0.1:8080/static/example.com.html'], capture_output=True,
  106. env=disable_extractors_dict)
  107. (tmp_path / "archive" / "some_random_folder").mkdir()
  108. init_process = subprocess.run(['archivebox', 'init'], capture_output=True, env=disable_extractors_dict)
  109. assert "Skipped adding 1 invalid snapshot data directories" in init_process.stdout.decode("utf-8")
  110. assert init_process.returncode == 0
  111. def test_tags_migration(tmp_path, disable_extractors_dict):
  112. base_sqlite_path = Path(__file__).parent / 'tags_migration'
  113. if os.path.exists(tmp_path):
  114. shutil.rmtree(tmp_path)
  115. shutil.copytree(str(base_sqlite_path), tmp_path)
  116. os.chdir(tmp_path)
  117. conn = sqlite3.connect("index.sqlite3")
  118. conn.row_factory = sqlite3.Row
  119. c = conn.cursor()
  120. c.execute("SELECT id, tags from core_snapshot")
  121. snapshots = c.fetchall()
  122. snapshots_dict = { sn['id']: sn['tags'] for sn in snapshots}
  123. conn.commit()
  124. conn.close()
  125. init_process = subprocess.run(['archivebox', 'init'], capture_output=True, env=disable_extractors_dict)
  126. conn = sqlite3.connect("index.sqlite3")
  127. conn.row_factory = sqlite3.Row
  128. c = conn.cursor()
  129. c.execute("""
  130. SELECT core_snapshot.id, core_tag.name from core_snapshot
  131. JOIN core_snapshot_tags on core_snapshot_tags.snapshot_id=core_snapshot.id
  132. JOIN core_tag on core_tag.id=core_snapshot_tags.tag_id
  133. """)
  134. tags = c.fetchall()
  135. conn.commit()
  136. conn.close()
  137. for tag in tags:
  138. snapshot_id = tag["id"]
  139. tag_name = tag["name"]
  140. # Check each tag migrated is in the previous field
  141. assert tag_name in snapshots_dict[snapshot_id]