sql.py 3.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108
  1. __package__ = 'archivebox.index'
  2. from io import StringIO
  3. from pathlib import Path
  4. from typing import List, Tuple, Iterator
  5. from django.db.models import QuerySet, Model
  6. from django.db import transaction
  7. from datetime import datetime
  8. from .schema import Link
  9. from ..util import enforce_types
  10. from ..config import OUTPUT_DIR
  11. ### Main Links Index
  12. @enforce_types
  13. def parse_sql_main_index(out_dir: Path=OUTPUT_DIR) -> Iterator[Link]:
  14. from core.models import Snapshot
  15. return (
  16. Link.from_json(page.as_json(*Snapshot.keys))
  17. for page in Snapshot.objects.all()
  18. )
  19. @enforce_types
  20. def remove_from_sql_main_index(snapshots: QuerySet, out_dir: Path=OUTPUT_DIR) -> None:
  21. with transaction.atomic():
  22. snapshots.delete()
  23. @enforce_types
  24. def write_snapshot_to_index(snapshot: Model):
  25. from core.models import Snapshot
  26. try:
  27. timestamp = Snapshot.objects.get(url=snapshot.url).timestamp
  28. except Snapshot.DoesNotExist:
  29. timestamp = snapshot.timestamp
  30. if not timestamp:
  31. timestamp = str(datetime.now().timestamp())
  32. while Snapshot.objects.filter(timestamp=timestamp).exists():
  33. print("the timestamp is: ", timestamp)
  34. timestamp = str(float(timestamp) + 1.0)
  35. snapshot.timestamp = timestamp
  36. snapshot.save()
  37. return snapshot
  38. @enforce_types
  39. def write_sql_main_index(snapshots: List[Model], out_dir: Path=OUTPUT_DIR) -> None:
  40. with transaction.atomic():
  41. for snapshot in snapshots:
  42. write_snapshot_to_index(snapshot)
  43. @enforce_types
  44. def write_sql_snapshot_details(snapshot: Model, out_dir: Path=OUTPUT_DIR) -> None:
  45. from core.models import Snapshot
  46. with transaction.atomic():
  47. try:
  48. snap = Snapshot.objects.get(url=snapshot.url)
  49. except Snapshot.DoesNotExist:
  50. snap = write_snapshot_to_index(snapshot)
  51. snap.title = snapshot.title
  52. # TODO: If there are actual tags, this will break
  53. #tag_set = (
  54. # set(tag.strip() for tag in (snapshot.tags.all() or '').split(','))
  55. #)
  56. #tag_list = list(tag_set) or []
  57. snap.save()
  58. #snap.save_tags(tag_list)
  59. return snap
  60. @enforce_types
  61. def list_migrations(out_dir: Path=OUTPUT_DIR) -> List[Tuple[bool, str]]:
  62. from django.core.management import call_command
  63. out = StringIO()
  64. call_command("showmigrations", list=True, stdout=out)
  65. out.seek(0)
  66. migrations = []
  67. for line in out.readlines():
  68. if line.strip() and ']' in line:
  69. status_str, name_str = line.strip().split(']', 1)
  70. is_applied = 'X' in status_str
  71. migration_name = name_str.strip()
  72. migrations.append((is_applied, migration_name))
  73. return migrations
  74. @enforce_types
  75. def apply_migrations(out_dir: Path=OUTPUT_DIR) -> List[str]:
  76. from django.core.management import call_command
  77. null, out = StringIO(), StringIO()
  78. call_command("makemigrations", interactive=False, stdout=null)
  79. call_command("migrate", interactive=False, stdout=out)
  80. out.seek(0)
  81. return [line.strip() for line in out.readlines() if line.strip()]
  82. @enforce_types
  83. def get_admins(out_dir: Path=OUTPUT_DIR) -> List[str]:
  84. from django.contrib.auth.models import User
  85. return User.objects.filter(is_superuser=True)