sql.py 3.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596
  1. __package__ = 'archivebox.index'
  2. from io import StringIO
  3. from typing import List, Tuple, Iterator
  4. from django.db.models import QuerySet
  5. from .schema import Link
  6. from ..util import enforce_types
  7. from ..config import setup_django, OUTPUT_DIR
  8. ### Main Links Index
  9. @enforce_types
  10. def parse_sql_main_index(out_dir: str=OUTPUT_DIR) -> Iterator[Link]:
  11. setup_django(out_dir, check_db=True)
  12. from core.models import Snapshot
  13. return (
  14. Link.from_json(page.as_json(*Snapshot.keys))
  15. for page in Snapshot.objects.all()
  16. )
  17. @enforce_types
  18. def remove_from_sql_main_index(snapshots: QuerySet, out_dir: str=OUTPUT_DIR) -> None:
  19. setup_django(out_dir, check_db=True)
  20. from core.models import Snapshot
  21. from django.db import transaction
  22. with transaction.atomic():
  23. snapshots.delete()
  24. @enforce_types
  25. def write_sql_main_index(links: List[Link], out_dir: str=OUTPUT_DIR) -> None:
  26. setup_django(out_dir, check_db=True)
  27. from core.models import Snapshot
  28. from django.db import transaction
  29. with transaction.atomic():
  30. for link in links:
  31. info = {k: v for k, v in link._asdict().items() if k in Snapshot.keys}
  32. try:
  33. info["timestamp"] = Snapshot.objects.get(url=link.url).timestamp
  34. except Snapshot.DoesNotExist:
  35. while Snapshot.objects.filter(timestamp=info["timestamp"]).exists():
  36. info["timestamp"] = str(float(info["timestamp"]) + 1.0)
  37. Snapshot.objects.update_or_create(url=link.url, defaults=info)
  38. @enforce_types
  39. def write_sql_link_details(link: Link, out_dir: str=OUTPUT_DIR) -> None:
  40. setup_django(out_dir, check_db=True)
  41. from core.models import Snapshot
  42. from django.db import transaction
  43. with transaction.atomic():
  44. snap = Snapshot.objects.get(url=link.url)
  45. snap.title = link.title
  46. snap.tags = link.tags
  47. snap.save()
  48. @enforce_types
  49. def list_migrations(out_dir: str=OUTPUT_DIR) -> List[Tuple[bool, str]]:
  50. setup_django(out_dir, check_db=False)
  51. from django.core.management import call_command
  52. out = StringIO()
  53. call_command("showmigrations", list=True, stdout=out)
  54. out.seek(0)
  55. migrations = []
  56. for line in out.readlines():
  57. if line.strip() and ']' in line:
  58. status_str, name_str = line.strip().split(']', 1)
  59. is_applied = 'X' in status_str
  60. migration_name = name_str.strip()
  61. migrations.append((is_applied, migration_name))
  62. return migrations
  63. @enforce_types
  64. def apply_migrations(out_dir: str=OUTPUT_DIR) -> List[str]:
  65. setup_django(out_dir, check_db=False)
  66. from django.core.management import call_command
  67. null, out = StringIO(), StringIO()
  68. call_command("makemigrations", interactive=False, stdout=null)
  69. call_command("migrate", interactive=False, stdout=out)
  70. out.seek(0)
  71. return [line.strip() for line in out.readlines() if line.strip()]
  72. @enforce_types
  73. def get_admins(out_dir: str=OUTPUT_DIR) -> List[str]:
  74. setup_django(out_dir, check_db=False)
  75. from django.contrib.auth.models import User
  76. return User.objects.filter(is_superuser=True)