2
0

sql.py 3.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110
  1. __package__ = 'archivebox.index'
  2. from io import StringIO
  3. from pathlib import Path
  4. from typing import List, Tuple, Iterator
  5. from django.db.models import QuerySet
  6. from .schema import Link
  7. from ..util import enforce_types
  8. from ..config import OUTPUT_DIR
  9. ### Main Links Index
  10. @enforce_types
  11. def parse_sql_main_index(out_dir: Path=OUTPUT_DIR) -> Iterator[Link]:
  12. from core.models import Snapshot
  13. return (
  14. Link.from_json(page.as_json(*Snapshot.keys))
  15. for page in Snapshot.objects.all()
  16. )
  17. @enforce_types
  18. def remove_from_sql_main_index(snapshots: QuerySet, out_dir: Path=OUTPUT_DIR) -> None:
  19. from django.db import transaction
  20. with transaction.atomic():
  21. snapshots.delete()
  22. @enforce_types
  23. def write_link_to_sql_index(link: Link):
  24. from core.models import Snapshot
  25. info = {k: v for k, v in link._asdict().items() if k in Snapshot.keys}
  26. tags = info.pop("tags")
  27. if tags is None:
  28. tags = []
  29. try:
  30. info["timestamp"] = Snapshot.objects.get(url=link.url).timestamp
  31. except Snapshot.DoesNotExist:
  32. while Snapshot.objects.filter(timestamp=info["timestamp"]).exists():
  33. info["timestamp"] = str(float(info["timestamp"]) + 1.0)
  34. snapshot, _ = Snapshot.objects.update_or_create(url=link.url, defaults=info)
  35. snapshot.save_tags(tags)
  36. return snapshot
  37. @enforce_types
  38. def write_sql_main_index(links: List[Link], out_dir: Path=OUTPUT_DIR) -> None:
  39. from django.db import transaction
  40. with transaction.atomic():
  41. for link in links:
  42. write_link_to_sql_index(link)
  43. @enforce_types
  44. def write_sql_link_details(link: Link, out_dir: Path=OUTPUT_DIR) -> None:
  45. from core.models import Snapshot
  46. from django.db import transaction
  47. with transaction.atomic():
  48. try:
  49. snap = Snapshot.objects.get(url=link.url)
  50. except Snapshot.DoesNotExist:
  51. snap = write_link_to_sql_index(link)
  52. snap.title = link.title
  53. tag_set = (
  54. set(tag.strip() for tag in (link.tags or '').split(','))
  55. )
  56. tag_list = list(tag_set) or []
  57. snap.save()
  58. snap.save_tags(tag_list)
  59. @enforce_types
  60. def list_migrations(out_dir: Path=OUTPUT_DIR) -> List[Tuple[bool, str]]:
  61. from django.core.management import call_command
  62. out = StringIO()
  63. call_command("showmigrations", list=True, stdout=out)
  64. out.seek(0)
  65. migrations = []
  66. for line in out.readlines():
  67. if line.strip() and ']' in line:
  68. status_str, name_str = line.strip().split(']', 1)
  69. is_applied = 'X' in status_str
  70. migration_name = name_str.strip()
  71. migrations.append((is_applied, migration_name))
  72. return migrations
  73. @enforce_types
  74. def apply_migrations(out_dir: Path=OUTPUT_DIR) -> List[str]:
  75. from django.core.management import call_command
  76. null, out = StringIO(), StringIO()
  77. call_command("makemigrations", interactive=False, stdout=null)
  78. call_command("migrate", interactive=False, stdout=out)
  79. out.seek(0)
  80. return [line.strip() for line in out.readlines() if line.strip()]
  81. @enforce_types
  82. def get_admins(out_dir: Path=OUTPUT_DIR) -> List[str]:
  83. from django.contrib.auth.models import User
  84. return User.objects.filter(is_superuser=True)