test_urls.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357
  1. import os
  2. import sys
  3. import subprocess
  4. import textwrap
  5. from pathlib import Path
  6. import pytest
  7. REPO_ROOT = Path(__file__).resolve().parents[3]
  8. def _merge_pythonpath(env: dict[str, str]) -> dict[str, str]:
  9. env.pop("DATA_DIR", None)
  10. pythonpath = env.get("PYTHONPATH", "")
  11. if pythonpath:
  12. env["PYTHONPATH"] = f"{REPO_ROOT}{os.pathsep}{pythonpath}"
  13. else:
  14. env["PYTHONPATH"] = str(REPO_ROOT)
  15. return env
  16. def _run_python(script: str, cwd: Path, timeout: int = 60) -> subprocess.CompletedProcess:
  17. env = _merge_pythonpath(os.environ.copy())
  18. return subprocess.run(
  19. [sys.executable, "-"],
  20. cwd=cwd,
  21. env=env,
  22. input=script,
  23. capture_output=True,
  24. text=True,
  25. timeout=timeout,
  26. )
  27. def _build_script(body: str) -> str:
  28. prelude = textwrap.dedent(
  29. """
  30. import os
  31. from pathlib import Path
  32. os.environ.setdefault("DJANGO_SETTINGS_MODULE", "archivebox.core.settings")
  33. import django
  34. django.setup()
  35. from django.test import Client
  36. from django.contrib.auth import get_user_model
  37. from archivebox.core.models import Snapshot, ArchiveResult
  38. from archivebox.config.common import SERVER_CONFIG
  39. from archivebox.core.host_utils import (
  40. get_admin_host,
  41. get_api_host,
  42. get_web_host,
  43. get_snapshot_host,
  44. get_original_host,
  45. get_listen_subdomain,
  46. split_host_port,
  47. host_matches,
  48. is_snapshot_subdomain,
  49. )
  50. def response_body(resp):
  51. if getattr(resp, "streaming", False):
  52. return b"".join(resp.streaming_content)
  53. return resp.content
  54. def ensure_admin_user():
  55. User = get_user_model()
  56. admin, _ = User.objects.get_or_create(
  57. username="testadmin",
  58. defaults={"email": "[email protected]", "is_staff": True, "is_superuser": True},
  59. )
  60. admin.set_password("testpassword")
  61. admin.save()
  62. return admin
  63. def get_snapshot():
  64. snapshot = Snapshot.objects.order_by("-created_at").first()
  65. assert snapshot is not None
  66. return snapshot
  67. def get_snapshot_files(snapshot):
  68. output_rel = None
  69. for output in snapshot.discover_outputs():
  70. candidate = output.get("path")
  71. if not candidate:
  72. continue
  73. if candidate.startswith("responses/"):
  74. continue
  75. if Path(snapshot.output_dir, candidate).is_file():
  76. output_rel = candidate
  77. break
  78. if output_rel is None:
  79. fallback = Path(snapshot.output_dir, "index.jsonl")
  80. if fallback.exists():
  81. output_rel = "index.jsonl"
  82. assert output_rel is not None
  83. responses_root = Path(snapshot.output_dir) / "responses" / snapshot.domain
  84. assert responses_root.exists()
  85. response_file = None
  86. response_rel = None
  87. for candidate in responses_root.rglob("*"):
  88. if not candidate.is_file():
  89. continue
  90. rel = candidate.relative_to(responses_root)
  91. if not (Path(snapshot.output_dir) / rel).exists():
  92. response_file = candidate
  93. response_rel = str(rel)
  94. break
  95. if response_file is None:
  96. response_file = next(p for p in responses_root.rglob("*") if p.is_file())
  97. response_rel = str(response_file.relative_to(responses_root))
  98. response_output_path = Path(snapshot.output_dir) / response_rel
  99. return output_rel, response_file, response_rel, response_output_path
  100. """
  101. )
  102. return prelude + "\n" + textwrap.dedent(body)
  103. @pytest.mark.usefixtures("real_archive_with_example")
  104. class TestUrlRouting:
  105. data_dir: Path
  106. def _run(self, body: str, timeout: int = 120) -> None:
  107. script = _build_script(body)
  108. result = _run_python(script, cwd=self.data_dir, timeout=timeout)
  109. assert result.returncode == 0, result.stderr
  110. assert "OK" in result.stdout
  111. def test_host_utils_and_public_redirect(self) -> None:
  112. self._run(
  113. """
  114. snapshot = get_snapshot()
  115. snapshot_id = str(snapshot.id)
  116. domain = snapshot.domain
  117. web_host = get_web_host()
  118. admin_host = get_admin_host()
  119. api_host = get_api_host()
  120. snapshot_host = get_snapshot_host(snapshot_id)
  121. original_host = get_original_host(domain)
  122. base_host = SERVER_CONFIG.LISTEN_HOST
  123. host_only, port = split_host_port(base_host)
  124. assert host_only == "archivebox.localhost"
  125. assert port == "8000"
  126. assert web_host == "web.archivebox.localhost:8000"
  127. assert admin_host == "admin.archivebox.localhost:8000"
  128. assert api_host == "api.archivebox.localhost:8000"
  129. assert snapshot_host == f"{snapshot_id}.archivebox.localhost:8000"
  130. assert original_host == f"{domain}.archivebox.localhost:8000"
  131. assert get_listen_subdomain(web_host) == "web"
  132. assert get_listen_subdomain(admin_host) == "admin"
  133. assert get_listen_subdomain(api_host) == "api"
  134. assert get_listen_subdomain(snapshot_host) == snapshot_id
  135. assert get_listen_subdomain(original_host) == domain
  136. assert get_listen_subdomain(base_host) == ""
  137. assert host_matches(web_host, get_web_host())
  138. assert is_snapshot_subdomain(snapshot_id)
  139. client = Client()
  140. resp = client.get("/public.html", HTTP_HOST=web_host)
  141. assert resp.status_code in (301, 302)
  142. assert resp["Location"].endswith("/public/")
  143. resp = client.get("/public/", HTTP_HOST=base_host)
  144. assert resp.status_code in (301, 302)
  145. assert resp["Location"].startswith(f"http://{web_host}/public/")
  146. resp = client.get("/", HTTP_HOST=api_host)
  147. assert resp.status_code in (301, 302)
  148. assert resp["Location"].startswith("/api/")
  149. print("OK")
  150. """
  151. )
  152. def test_web_admin_routing(self) -> None:
  153. self._run(
  154. """
  155. ensure_admin_user()
  156. client = Client()
  157. web_host = get_web_host()
  158. admin_host = get_admin_host()
  159. resp = client.get("/add/", HTTP_HOST=web_host)
  160. assert resp.status_code == 200
  161. resp = client.get("/admin/login/", HTTP_HOST=web_host)
  162. assert resp.status_code in (301, 302)
  163. assert admin_host in resp["Location"]
  164. resp = client.get("/admin/login/", HTTP_HOST=admin_host)
  165. assert resp.status_code == 200
  166. print("OK")
  167. """
  168. )
  169. def test_snapshot_routing_and_hosts(self) -> None:
  170. self._run(
  171. """
  172. snapshot = get_snapshot()
  173. output_rel, response_file, response_rel, response_output_path = get_snapshot_files(snapshot)
  174. snapshot_id = str(snapshot.id)
  175. snapshot_host = get_snapshot_host(snapshot_id)
  176. original_host = get_original_host(snapshot.domain)
  177. web_host = get_web_host()
  178. client = Client()
  179. snapshot_path = f"/{snapshot.url_path}/"
  180. resp = client.get(snapshot_path, HTTP_HOST=web_host)
  181. assert resp.status_code == 200
  182. resp = client.get(f"/web/{snapshot.domain}", HTTP_HOST=web_host)
  183. assert resp.status_code in (301, 302)
  184. assert resp["Location"].endswith(f"/{snapshot.url_path}")
  185. resp = client.get(f"/{snapshot.url_path}", HTTP_HOST=web_host)
  186. assert resp.status_code == 200
  187. date_segment = snapshot.url_path.split("/")[1]
  188. resp = client.get(f"/web/{date_segment}/{date_segment}/{snapshot_id}/", HTTP_HOST=web_host)
  189. assert resp.status_code == 404
  190. resp = client.get(f"/{snapshot.url_path}/{output_rel}", HTTP_HOST=web_host)
  191. assert resp.status_code in (301, 302)
  192. assert snapshot_host in resp["Location"]
  193. resp = client.get(f"/{output_rel}", HTTP_HOST=snapshot_host)
  194. assert resp.status_code == 200
  195. assert response_body(resp) == Path(snapshot.output_dir, output_rel).read_bytes()
  196. resp = client.get(f"/{response_rel}", HTTP_HOST=snapshot_host)
  197. assert resp.status_code == 200
  198. snapshot_body = response_body(resp)
  199. if response_output_path.exists():
  200. assert snapshot_body == response_output_path.read_bytes()
  201. else:
  202. assert snapshot_body == response_file.read_bytes()
  203. resp = client.get(f"/{response_rel}", HTTP_HOST=original_host)
  204. assert resp.status_code == 200
  205. assert response_body(resp) == response_file.read_bytes()
  206. print("OK")
  207. """
  208. )
  209. def test_template_and_admin_links(self) -> None:
  210. self._run(
  211. """
  212. ensure_admin_user()
  213. snapshot = get_snapshot()
  214. snapshot.write_html_details()
  215. snapshot_id = str(snapshot.id)
  216. snapshot_host = get_snapshot_host(snapshot_id)
  217. admin_host = get_admin_host()
  218. web_host = get_web_host()
  219. client = Client()
  220. resp = client.get("/public/", HTTP_HOST=web_host)
  221. assert resp.status_code == 200
  222. public_html = response_body(resp).decode("utf-8", "ignore")
  223. assert "http://web.archivebox.localhost:8000" in public_html
  224. resp = client.get(f"/{snapshot.url_path}/index.html", HTTP_HOST=web_host)
  225. assert resp.status_code == 200
  226. live_html = response_body(resp).decode("utf-8", "ignore")
  227. assert f"http://{snapshot_host}/" in live_html
  228. assert "http://web.archivebox.localhost:8000" in live_html
  229. static_html = Path(snapshot.output_dir, "index.html").read_text(encoding="utf-8", errors="ignore")
  230. assert f"http://{snapshot_host}/" in static_html
  231. client.login(username="testadmin", password="testpassword")
  232. resp = client.get(f"/admin/core/snapshot/{snapshot_id}/change/", HTTP_HOST=admin_host)
  233. assert resp.status_code == 200
  234. admin_html = response_body(resp).decode("utf-8", "ignore")
  235. assert f"http://web.archivebox.localhost:8000/{snapshot.archive_path}" in admin_html
  236. assert f"http://{snapshot_host}/" in admin_html
  237. result = ArchiveResult.objects.filter(snapshot=snapshot).first()
  238. assert result is not None
  239. resp = client.get(f"/admin/core/archiveresult/{result.id}/change/", HTTP_HOST=admin_host)
  240. assert resp.status_code == 200
  241. ar_html = response_body(resp).decode("utf-8", "ignore")
  242. assert f"http://{snapshot_host}/" in ar_html
  243. print("OK")
  244. """
  245. )
  246. def test_api_available_on_admin_and_api_hosts(self) -> None:
  247. self._run(
  248. """
  249. client = Client()
  250. admin_host = get_admin_host()
  251. api_host = get_api_host()
  252. resp = client.get("/api/v1/docs", HTTP_HOST=admin_host)
  253. assert resp.status_code == 200
  254. resp = client.get("/api/v1/docs", HTTP_HOST=api_host)
  255. assert resp.status_code == 200
  256. print("OK")
  257. """
  258. )
  259. def test_api_post_with_token_on_admin_and_api_hosts(self) -> None:
  260. self._run(
  261. """
  262. ensure_admin_user()
  263. from archivebox.api.auth import get_or_create_api_token
  264. token = get_or_create_api_token(get_user_model().objects.get(username="testadmin"))
  265. assert token is not None
  266. client = Client()
  267. admin_host = get_admin_host()
  268. api_host = get_api_host()
  269. payload = '{"name": "apitest-tag"}'
  270. headers = {"HTTP_X_ARCHIVEBOX_API_KEY": token.token}
  271. resp = client.post(
  272. "/api/v1/core/tags/create/",
  273. data=payload,
  274. content_type="application/json",
  275. HTTP_HOST=admin_host,
  276. **headers,
  277. )
  278. assert resp.status_code == 200
  279. data = resp.json()
  280. assert data.get("success") is True
  281. assert data.get("tag_name") == "apitest-tag"
  282. resp = client.post(
  283. "/api/v1/core/tags/create/",
  284. data=payload,
  285. content_type="application/json",
  286. HTTP_HOST=api_host,
  287. **headers,
  288. )
  289. assert resp.status_code == 200
  290. data = resp.json()
  291. assert data.get("success") is True
  292. assert data.get("tag_name") == "apitest-tag"
  293. print("OK")
  294. """
  295. )