sql.py 3.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106
  1. __package__ = 'archivebox.index'
  2. from io import StringIO
  3. from pathlib import Path
  4. from typing import List, Tuple, Iterator
  5. from django.db.models import QuerySet
  6. from django.db import transaction
  7. from .schema import Link
  8. from ..util import enforce_types
  9. from ..config import OUTPUT_DIR
  10. ### Main Links Index
  11. @enforce_types
  12. def parse_sql_main_index(out_dir: Path=OUTPUT_DIR) -> Iterator[Link]:
  13. from core.models import Snapshot
  14. return (
  15. Link.from_json(page.as_json(*Snapshot.keys))
  16. for page in Snapshot.objects.all()
  17. )
  18. @enforce_types
  19. def remove_from_sql_main_index(snapshots: QuerySet, out_dir: Path=OUTPUT_DIR) -> None:
  20. with transaction.atomic():
  21. snapshots.delete()
  22. @enforce_types
  23. def write_link_to_sql_index(link: Link):
  24. from core.models import Snapshot
  25. info = {k: v for k, v in link._asdict().items() if k in Snapshot.keys}
  26. tags = info.pop("tags")
  27. if tags is None:
  28. tags = []
  29. try:
  30. info["timestamp"] = Snapshot.objects.get(url=link.url).timestamp
  31. except Snapshot.DoesNotExist:
  32. while Snapshot.objects.filter(timestamp=info["timestamp"]).exists():
  33. info["timestamp"] = str(float(info["timestamp"]) + 1.0)
  34. snapshot, _ = Snapshot.objects.update_or_create(url=link.url, defaults=info)
  35. snapshot.save_tags(tags)
  36. return snapshot
  37. @enforce_types
  38. def write_sql_main_index(links: List[Link], out_dir: Path=OUTPUT_DIR) -> None:
  39. with transaction.atomic():
  40. for link in links:
  41. write_link_to_sql_index(link)
  42. @enforce_types
  43. def write_sql_link_details(link: Link, out_dir: Path=OUTPUT_DIR) -> None:
  44. from core.models import Snapshot
  45. with transaction.atomic():
  46. try:
  47. snap = Snapshot.objects.get(url=link.url)
  48. except Snapshot.DoesNotExist:
  49. snap = write_link_to_sql_index(link)
  50. snap.title = link.title
  51. tag_set = (
  52. set(tag.strip() for tag in (link.tags or '').split(','))
  53. )
  54. tag_list = list(tag_set) or []
  55. snap.save()
  56. snap.save_tags(tag_list)
  57. @enforce_types
  58. def list_migrations(out_dir: Path=OUTPUT_DIR) -> List[Tuple[bool, str]]:
  59. from django.core.management import call_command
  60. out = StringIO()
  61. call_command("showmigrations", list=True, stdout=out)
  62. out.seek(0)
  63. migrations = []
  64. for line in out.readlines():
  65. if line.strip() and ']' in line:
  66. status_str, name_str = line.strip().split(']', 1)
  67. is_applied = 'X' in status_str
  68. migration_name = name_str.strip()
  69. migrations.append((is_applied, migration_name))
  70. return migrations
  71. @enforce_types
  72. def apply_migrations(out_dir: Path=OUTPUT_DIR) -> List[str]:
  73. from django.core.management import call_command
  74. null, out = StringIO(), StringIO()
  75. call_command("makemigrations", interactive=False, stdout=null)
  76. call_command("migrate", interactive=False, stdout=out)
  77. out.seek(0)
  78. return [line.strip() for line in out.readlines() if line.strip()]
  79. @enforce_types
  80. def get_admins(out_dir: Path=OUTPUT_DIR) -> List[str]:
  81. from django.contrib.auth.models import User
  82. return User.objects.filter(is_superuser=True)