| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357 |
- import os
- import sys
- import subprocess
- import textwrap
- from pathlib import Path
- import pytest
- REPO_ROOT = Path(__file__).resolve().parents[3]
- def _merge_pythonpath(env: dict[str, str]) -> dict[str, str]:
- env.pop("DATA_DIR", None)
- pythonpath = env.get("PYTHONPATH", "")
- if pythonpath:
- env["PYTHONPATH"] = f"{REPO_ROOT}{os.pathsep}{pythonpath}"
- else:
- env["PYTHONPATH"] = str(REPO_ROOT)
- return env
- def _run_python(script: str, cwd: Path, timeout: int = 60) -> subprocess.CompletedProcess:
- env = _merge_pythonpath(os.environ.copy())
- return subprocess.run(
- [sys.executable, "-"],
- cwd=cwd,
- env=env,
- input=script,
- capture_output=True,
- text=True,
- timeout=timeout,
- )
- def _build_script(body: str) -> str:
- prelude = textwrap.dedent(
- """
- import os
- from pathlib import Path
- os.environ.setdefault("DJANGO_SETTINGS_MODULE", "archivebox.core.settings")
- import django
- django.setup()
- from django.test import Client
- from django.contrib.auth import get_user_model
- from archivebox.core.models import Snapshot, ArchiveResult
- from archivebox.config.common import SERVER_CONFIG
- from archivebox.core.host_utils import (
- get_admin_host,
- get_api_host,
- get_web_host,
- get_snapshot_host,
- get_original_host,
- get_listen_subdomain,
- split_host_port,
- host_matches,
- is_snapshot_subdomain,
- )
- def response_body(resp):
- if getattr(resp, "streaming", False):
- return b"".join(resp.streaming_content)
- return resp.content
- def ensure_admin_user():
- User = get_user_model()
- admin, _ = User.objects.get_or_create(
- username="testadmin",
- defaults={"email": "[email protected]", "is_staff": True, "is_superuser": True},
- )
- admin.set_password("testpassword")
- admin.save()
- return admin
- def get_snapshot():
- snapshot = Snapshot.objects.order_by("-created_at").first()
- assert snapshot is not None
- return snapshot
- def get_snapshot_files(snapshot):
- output_rel = None
- for output in snapshot.discover_outputs():
- candidate = output.get("path")
- if not candidate:
- continue
- if candidate.startswith("responses/"):
- continue
- if Path(snapshot.output_dir, candidate).is_file():
- output_rel = candidate
- break
- if output_rel is None:
- fallback = Path(snapshot.output_dir, "index.jsonl")
- if fallback.exists():
- output_rel = "index.jsonl"
- assert output_rel is not None
- responses_root = Path(snapshot.output_dir) / "responses" / snapshot.domain
- assert responses_root.exists()
- response_file = None
- response_rel = None
- for candidate in responses_root.rglob("*"):
- if not candidate.is_file():
- continue
- rel = candidate.relative_to(responses_root)
- if not (Path(snapshot.output_dir) / rel).exists():
- response_file = candidate
- response_rel = str(rel)
- break
- if response_file is None:
- response_file = next(p for p in responses_root.rglob("*") if p.is_file())
- response_rel = str(response_file.relative_to(responses_root))
- response_output_path = Path(snapshot.output_dir) / response_rel
- return output_rel, response_file, response_rel, response_output_path
- """
- )
- return prelude + "\n" + textwrap.dedent(body)
- @pytest.mark.usefixtures("real_archive_with_example")
- class TestUrlRouting:
- data_dir: Path
- def _run(self, body: str, timeout: int = 120) -> None:
- script = _build_script(body)
- result = _run_python(script, cwd=self.data_dir, timeout=timeout)
- assert result.returncode == 0, result.stderr
- assert "OK" in result.stdout
- def test_host_utils_and_public_redirect(self) -> None:
- self._run(
- """
- snapshot = get_snapshot()
- snapshot_id = str(snapshot.id)
- domain = snapshot.domain
- web_host = get_web_host()
- admin_host = get_admin_host()
- api_host = get_api_host()
- snapshot_host = get_snapshot_host(snapshot_id)
- original_host = get_original_host(domain)
- base_host = SERVER_CONFIG.LISTEN_HOST
- host_only, port = split_host_port(base_host)
- assert host_only == "archivebox.localhost"
- assert port == "8000"
- assert web_host == "web.archivebox.localhost:8000"
- assert admin_host == "admin.archivebox.localhost:8000"
- assert api_host == "api.archivebox.localhost:8000"
- assert snapshot_host == f"{snapshot_id}.archivebox.localhost:8000"
- assert original_host == f"{domain}.archivebox.localhost:8000"
- assert get_listen_subdomain(web_host) == "web"
- assert get_listen_subdomain(admin_host) == "admin"
- assert get_listen_subdomain(api_host) == "api"
- assert get_listen_subdomain(snapshot_host) == snapshot_id
- assert get_listen_subdomain(original_host) == domain
- assert get_listen_subdomain(base_host) == ""
- assert host_matches(web_host, get_web_host())
- assert is_snapshot_subdomain(snapshot_id)
- client = Client()
- resp = client.get("/public.html", HTTP_HOST=web_host)
- assert resp.status_code in (301, 302)
- assert resp["Location"].endswith("/public/")
- resp = client.get("/public/", HTTP_HOST=base_host)
- assert resp.status_code in (301, 302)
- assert resp["Location"].startswith(f"http://{web_host}/public/")
- resp = client.get("/", HTTP_HOST=api_host)
- assert resp.status_code in (301, 302)
- assert resp["Location"].startswith("/api/")
- print("OK")
- """
- )
- def test_web_admin_routing(self) -> None:
- self._run(
- """
- ensure_admin_user()
- client = Client()
- web_host = get_web_host()
- admin_host = get_admin_host()
- resp = client.get("/add/", HTTP_HOST=web_host)
- assert resp.status_code == 200
- resp = client.get("/admin/login/", HTTP_HOST=web_host)
- assert resp.status_code in (301, 302)
- assert admin_host in resp["Location"]
- resp = client.get("/admin/login/", HTTP_HOST=admin_host)
- assert resp.status_code == 200
- print("OK")
- """
- )
- def test_snapshot_routing_and_hosts(self) -> None:
- self._run(
- """
- snapshot = get_snapshot()
- output_rel, response_file, response_rel, response_output_path = get_snapshot_files(snapshot)
- snapshot_id = str(snapshot.id)
- snapshot_host = get_snapshot_host(snapshot_id)
- original_host = get_original_host(snapshot.domain)
- web_host = get_web_host()
- client = Client()
- snapshot_path = f"/{snapshot.url_path}/"
- resp = client.get(snapshot_path, HTTP_HOST=web_host)
- assert resp.status_code == 200
- resp = client.get(f"/web/{snapshot.domain}", HTTP_HOST=web_host)
- assert resp.status_code in (301, 302)
- assert resp["Location"].endswith(f"/{snapshot.url_path}")
- resp = client.get(f"/{snapshot.url_path}", HTTP_HOST=web_host)
- assert resp.status_code == 200
- date_segment = snapshot.url_path.split("/")[1]
- resp = client.get(f"/web/{date_segment}/{date_segment}/{snapshot_id}/", HTTP_HOST=web_host)
- assert resp.status_code == 404
- resp = client.get(f"/{snapshot.url_path}/{output_rel}", HTTP_HOST=web_host)
- assert resp.status_code in (301, 302)
- assert snapshot_host in resp["Location"]
- resp = client.get(f"/{output_rel}", HTTP_HOST=snapshot_host)
- assert resp.status_code == 200
- assert response_body(resp) == Path(snapshot.output_dir, output_rel).read_bytes()
- resp = client.get(f"/{response_rel}", HTTP_HOST=snapshot_host)
- assert resp.status_code == 200
- snapshot_body = response_body(resp)
- if response_output_path.exists():
- assert snapshot_body == response_output_path.read_bytes()
- else:
- assert snapshot_body == response_file.read_bytes()
- resp = client.get(f"/{response_rel}", HTTP_HOST=original_host)
- assert resp.status_code == 200
- assert response_body(resp) == response_file.read_bytes()
- print("OK")
- """
- )
- def test_template_and_admin_links(self) -> None:
- self._run(
- """
- ensure_admin_user()
- snapshot = get_snapshot()
- snapshot.write_html_details()
- snapshot_id = str(snapshot.id)
- snapshot_host = get_snapshot_host(snapshot_id)
- admin_host = get_admin_host()
- web_host = get_web_host()
- client = Client()
- resp = client.get("/public/", HTTP_HOST=web_host)
- assert resp.status_code == 200
- public_html = response_body(resp).decode("utf-8", "ignore")
- assert "http://web.archivebox.localhost:8000" in public_html
- resp = client.get(f"/{snapshot.url_path}/index.html", HTTP_HOST=web_host)
- assert resp.status_code == 200
- live_html = response_body(resp).decode("utf-8", "ignore")
- assert f"http://{snapshot_host}/" in live_html
- assert "http://web.archivebox.localhost:8000" in live_html
- static_html = Path(snapshot.output_dir, "index.html").read_text(encoding="utf-8", errors="ignore")
- assert f"http://{snapshot_host}/" in static_html
- client.login(username="testadmin", password="testpassword")
- resp = client.get(f"/admin/core/snapshot/{snapshot_id}/change/", HTTP_HOST=admin_host)
- assert resp.status_code == 200
- admin_html = response_body(resp).decode("utf-8", "ignore")
- assert f"http://web.archivebox.localhost:8000/{snapshot.archive_path}" in admin_html
- assert f"http://{snapshot_host}/" in admin_html
- result = ArchiveResult.objects.filter(snapshot=snapshot).first()
- assert result is not None
- resp = client.get(f"/admin/core/archiveresult/{result.id}/change/", HTTP_HOST=admin_host)
- assert resp.status_code == 200
- ar_html = response_body(resp).decode("utf-8", "ignore")
- assert f"http://{snapshot_host}/" in ar_html
- print("OK")
- """
- )
- def test_api_available_on_admin_and_api_hosts(self) -> None:
- self._run(
- """
- client = Client()
- admin_host = get_admin_host()
- api_host = get_api_host()
- resp = client.get("/api/v1/docs", HTTP_HOST=admin_host)
- assert resp.status_code == 200
- resp = client.get("/api/v1/docs", HTTP_HOST=api_host)
- assert resp.status_code == 200
- print("OK")
- """
- )
- def test_api_post_with_token_on_admin_and_api_hosts(self) -> None:
- self._run(
- """
- ensure_admin_user()
- from archivebox.api.auth import get_or_create_api_token
- token = get_or_create_api_token(get_user_model().objects.get(username="testadmin"))
- assert token is not None
- client = Client()
- admin_host = get_admin_host()
- api_host = get_api_host()
- payload = '{"name": "apitest-tag"}'
- headers = {"HTTP_X_ARCHIVEBOX_API_KEY": token.token}
- resp = client.post(
- "/api/v1/core/tags/create/",
- data=payload,
- content_type="application/json",
- HTTP_HOST=admin_host,
- **headers,
- )
- assert resp.status_code == 200
- data = resp.json()
- assert data.get("success") is True
- assert data.get("tag_name") == "apitest-tag"
- resp = client.post(
- "/api/v1/core/tags/create/",
- data=payload,
- content_type="application/json",
- HTTP_HOST=api_host,
- **headers,
- )
- assert resp.status_code == 200
- data = resp.json()
- assert data.get("success") is True
- assert data.get("tag_name") == "apitest-tag"
- print("OK")
- """
- )
|