host_utils.py 5.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189
  1. from __future__ import annotations
  2. from __future__ import annotations
  3. import re
  4. from urllib.parse import urlparse
  5. from archivebox.config.common import SERVER_CONFIG
  6. _SNAPSHOT_ID_RE = re.compile(r"^[0-9a-fA-F-]{8,36}$")
  7. def split_host_port(host: str) -> tuple[str, str | None]:
  8. parsed = urlparse(f"//{host}")
  9. hostname = (parsed.hostname or host or "").lower()
  10. port = str(parsed.port) if parsed.port else None
  11. return hostname, port
  12. def _normalize_base_url(value: str | None) -> str:
  13. if not value:
  14. return ""
  15. base = value.strip()
  16. if not base:
  17. return ""
  18. if "://" not in base:
  19. base = f"http://{base}"
  20. parsed = urlparse(base)
  21. if not parsed.netloc:
  22. return ""
  23. return f"{parsed.scheme}://{parsed.netloc}"
  24. def normalize_base_url(value: str | None) -> str:
  25. return _normalize_base_url(value)
  26. def get_listen_host() -> str:
  27. return (SERVER_CONFIG.LISTEN_HOST or "").strip()
  28. def get_listen_parts() -> tuple[str, str | None]:
  29. return split_host_port(get_listen_host())
  30. def _build_listen_host(subdomain: str | None) -> str:
  31. host, port = get_listen_parts()
  32. if not host:
  33. return ""
  34. full_host = f"{subdomain}.{host}" if subdomain else host
  35. if port:
  36. return f"{full_host}:{port}"
  37. return full_host
  38. def get_admin_host() -> str:
  39. override = _normalize_base_url(SERVER_CONFIG.ADMIN_BASE_URL)
  40. if override:
  41. return urlparse(override).netloc.lower()
  42. return _build_listen_host("admin")
  43. def get_web_host() -> str:
  44. override = _normalize_base_url(SERVER_CONFIG.ARCHIVE_BASE_URL)
  45. if override:
  46. return urlparse(override).netloc.lower()
  47. return _build_listen_host("web")
  48. def get_api_host() -> str:
  49. return _build_listen_host("api")
  50. def get_public_host() -> str:
  51. return _build_listen_host("public")
  52. def get_snapshot_host(snapshot_id: str) -> str:
  53. return _build_listen_host(snapshot_id)
  54. def get_original_host(domain: str) -> str:
  55. return _build_listen_host(domain)
  56. def is_snapshot_subdomain(subdomain: str) -> bool:
  57. return bool(_SNAPSHOT_ID_RE.match(subdomain or ""))
  58. def get_listen_subdomain(request_host: str) -> str:
  59. req_host, req_port = split_host_port(request_host)
  60. listen_host, listen_port = get_listen_parts()
  61. if not listen_host:
  62. return ""
  63. if listen_port and req_port and listen_port != req_port:
  64. return ""
  65. if req_host == listen_host:
  66. return ""
  67. suffix = f".{listen_host}"
  68. if req_host.endswith(suffix):
  69. return req_host[: -len(suffix)]
  70. return ""
  71. def host_matches(request_host: str, target_host: str) -> bool:
  72. if not request_host or not target_host:
  73. return False
  74. req_host, req_port = split_host_port(request_host)
  75. target_host_only, target_port = split_host_port(target_host)
  76. if req_host != target_host_only:
  77. return False
  78. if target_port and req_port and target_port != req_port:
  79. return False
  80. return True
  81. def _scheme_from_request(request=None) -> str:
  82. if request:
  83. return request.scheme
  84. return "http"
  85. def _build_base_url_for_host(host: str, request=None) -> str:
  86. if not host:
  87. return ""
  88. scheme = _scheme_from_request(request)
  89. return f"{scheme}://{host}"
  90. def get_admin_base_url(request=None) -> str:
  91. override = _normalize_base_url(SERVER_CONFIG.ADMIN_BASE_URL)
  92. if override:
  93. return override
  94. return _build_base_url_for_host(get_admin_host(), request=request)
  95. def get_web_base_url(request=None) -> str:
  96. override = _normalize_base_url(SERVER_CONFIG.ARCHIVE_BASE_URL)
  97. if override:
  98. return override
  99. return _build_base_url_for_host(get_web_host(), request=request)
  100. def get_api_base_url(request=None) -> str:
  101. return _build_base_url_for_host(get_api_host(), request=request)
  102. # Backwards-compat aliases (archive == web)
  103. def get_archive_base_url(request=None) -> str:
  104. return get_web_base_url(request=request)
  105. def get_snapshot_base_url(snapshot_id: str, request=None) -> str:
  106. return _build_base_url_for_host(get_snapshot_host(snapshot_id), request=request)
  107. def get_original_base_url(domain: str, request=None) -> str:
  108. return _build_base_url_for_host(get_original_host(domain), request=request)
  109. def build_admin_url(path: str = "", request=None) -> str:
  110. return _build_url(get_admin_base_url(request), path)
  111. def build_web_url(path: str = "", request=None) -> str:
  112. return _build_url(get_web_base_url(request), path)
  113. def build_api_url(path: str = "", request=None) -> str:
  114. return _build_url(get_api_base_url(request), path)
  115. def build_archive_url(path: str = "", request=None) -> str:
  116. return _build_url(get_archive_base_url(request), path)
  117. def build_snapshot_url(snapshot_id: str, path: str = "", request=None) -> str:
  118. return _build_url(get_snapshot_base_url(snapshot_id, request=request), path)
  119. def build_original_url(domain: str, path: str = "", request=None) -> str:
  120. return _build_url(get_original_base_url(domain, request=request), path)
  121. def _build_url(base_url: str, path: str) -> str:
  122. if not base_url:
  123. if not path:
  124. return ""
  125. return path if path.startswith("/") else f"/{path}"
  126. if not path:
  127. return base_url
  128. return f"{base_url}{path if path.startswith('/') else f'/{path}'}"