sql.py 3.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117
  1. __package__ = 'archivebox.index'
  2. from io import StringIO
  3. from pathlib import Path
  4. from typing import List, Tuple, Iterator
  5. from django.db.models import QuerySet
  6. from .schema import Link
  7. from ..util import enforce_types
  8. from ..config import setup_django, OUTPUT_DIR
  9. ### Main Links Index
  10. @enforce_types
  11. def parse_sql_main_index(out_dir: Path=OUTPUT_DIR) -> Iterator[Link]:
  12. setup_django(out_dir, check_db=True)
  13. from core.models import Snapshot
  14. return (
  15. Link.from_json(page.as_json(*Snapshot.keys))
  16. for page in Snapshot.objects.all()
  17. )
  18. @enforce_types
  19. def remove_from_sql_main_index(snapshots: QuerySet, out_dir: Path=OUTPUT_DIR) -> None:
  20. setup_django(out_dir, check_db=True)
  21. from django.db import transaction
  22. with transaction.atomic():
  23. snapshots.delete()
  24. @enforce_types
  25. def write_link_to_sql_index(link: Link):
  26. from core.models import Snapshot
  27. info = {k: v for k, v in link._asdict().items() if k in Snapshot.keys}
  28. tags = info.pop("tags")
  29. if tags is None:
  30. tags = []
  31. try:
  32. info["timestamp"] = Snapshot.objects.get(url=link.url).timestamp
  33. except Snapshot.DoesNotExist:
  34. while Snapshot.objects.filter(timestamp=info["timestamp"]).exists():
  35. info["timestamp"] = str(float(info["timestamp"]) + 1.0)
  36. snapshot, _ = Snapshot.objects.update_or_create(url=link.url, defaults=info)
  37. snapshot.save_tags(tags)
  38. return snapshot
  39. @enforce_types
  40. def write_sql_main_index(links: List[Link], out_dir: Path=OUTPUT_DIR) -> None:
  41. setup_django(out_dir, check_db=True)
  42. from django.db import transaction
  43. with transaction.atomic():
  44. for link in links:
  45. write_link_to_sql_index(link)
  46. @enforce_types
  47. def write_sql_link_details(link: Link, out_dir: Path=OUTPUT_DIR) -> None:
  48. setup_django(out_dir, check_db=True)
  49. from core.models import Snapshot
  50. from django.db import transaction
  51. with transaction.atomic():
  52. try:
  53. snap = Snapshot.objects.get(url=link.url)
  54. except Snapshot.DoesNotExist:
  55. snap = write_link_to_sql_index(link)
  56. snap.title = link.title
  57. tag_set = (
  58. set(tag.strip() for tag in (link.tags or '').split(','))
  59. )
  60. tag_list = list(tag_set) or []
  61. snap.save()
  62. snap.save_tags(tag_list)
  63. @enforce_types
  64. def list_migrations(out_dir: Path=OUTPUT_DIR) -> List[Tuple[bool, str]]:
  65. setup_django(out_dir, check_db=False)
  66. from django.core.management import call_command
  67. out = StringIO()
  68. call_command("showmigrations", list=True, stdout=out)
  69. out.seek(0)
  70. migrations = []
  71. for line in out.readlines():
  72. if line.strip() and ']' in line:
  73. status_str, name_str = line.strip().split(']', 1)
  74. is_applied = 'X' in status_str
  75. migration_name = name_str.strip()
  76. migrations.append((is_applied, migration_name))
  77. return migrations
  78. @enforce_types
  79. def apply_migrations(out_dir: Path=OUTPUT_DIR) -> List[str]:
  80. setup_django(out_dir, check_db=False)
  81. from django.core.management import call_command
  82. null, out = StringIO(), StringIO()
  83. call_command("makemigrations", interactive=False, stdout=null)
  84. call_command("migrate", interactive=False, stdout=out)
  85. out.seek(0)
  86. return [line.strip() for line in out.readlines() if line.strip()]
  87. @enforce_types
  88. def get_admins(out_dir: Path=OUTPUT_DIR) -> List[str]:
  89. setup_django(out_dir, check_db=False)
  90. from django.contrib.auth.models import User
  91. return User.objects.filter(is_superuser=True)