sql.py 4.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155
  1. __package__ = 'archivebox.index'
  2. import re
  3. from io import StringIO
  4. from pathlib import Path
  5. from typing import List, Tuple, Iterator
  6. from django.db.models import QuerySet
  7. from django.db import transaction
  8. from .schema import Link
  9. from ..util import enforce_types, parse_date
  10. from ..config import (
  11. OUTPUT_DIR,
  12. TAG_SEPARATOR_PATTERN,
  13. )
  14. ### Main Links Index
  15. @enforce_types
  16. def parse_sql_main_index(out_dir: Path=OUTPUT_DIR) -> Iterator[Link]:
  17. from core.models import Snapshot
  18. return (
  19. Link.from_json(page.as_json(*Snapshot.keys))
  20. for page in Snapshot.objects.all()
  21. )
  22. @enforce_types
  23. def remove_from_sql_main_index(snapshots: QuerySet, atomic: bool=False, out_dir: Path=OUTPUT_DIR) -> None:
  24. if atomic:
  25. with transaction.atomic():
  26. return snapshots.delete()
  27. return snapshots.delete()
  28. @enforce_types
  29. def write_link_to_sql_index(link: Link):
  30. from core.models import Snapshot, ArchiveResult
  31. info = {k: v for k, v in link._asdict().items() if k in Snapshot.keys}
  32. tag_list = list(dict.fromkeys(
  33. tag.strip() for tag in re.split(TAG_SEPARATOR_PATTERN, link.tags or '')
  34. ))
  35. info.pop('tags')
  36. try:
  37. snapshot = Snapshot.objects.get(url=link.url)
  38. info["timestamp"] = snapshot.timestamp
  39. except Snapshot.DoesNotExist:
  40. while Snapshot.objects.filter(timestamp=info["timestamp"]).exists():
  41. info["timestamp"] = str(float(info["timestamp"]) + 1.0)
  42. snapshot, _ = Snapshot.objects.update_or_create(url=link.url, defaults=info)
  43. snapshot.save_tags(tag_list)
  44. for extractor, entries in link.history.items():
  45. for entry in entries:
  46. if isinstance(entry, dict):
  47. result, _ = ArchiveResult.objects.get_or_create(
  48. snapshot_id=snapshot.pk,
  49. extractor=extractor,
  50. start_ts=parse_date(entry['start_ts']),
  51. defaults={
  52. 'end_ts': parse_date(entry['end_ts']),
  53. 'cmd': entry['cmd'],
  54. 'output': entry['output'],
  55. 'cmd_version': entry.get('cmd_version') or 'unknown',
  56. 'pwd': entry['pwd'],
  57. 'status': entry['status'],
  58. }
  59. )
  60. else:
  61. result, _ = ArchiveResult.objects.update_or_create(
  62. snapshot_id=snapshot.pk,
  63. extractor=extractor,
  64. start_ts=parse_date(entry.start_ts),
  65. defaults={
  66. 'end_ts': parse_date(entry.end_ts),
  67. 'cmd': entry.cmd,
  68. 'output': entry.output,
  69. 'cmd_version': entry.cmd_version or 'unknown',
  70. 'pwd': entry.pwd,
  71. 'status': entry.status,
  72. }
  73. )
  74. return snapshot
  75. @enforce_types
  76. def write_sql_main_index(links: List[Link], out_dir: Path=OUTPUT_DIR) -> None:
  77. for link in links:
  78. # with transaction.atomic():
  79. # write_link_to_sql_index(link)
  80. write_link_to_sql_index(link)
  81. @enforce_types
  82. def write_sql_link_details(link: Link, out_dir: Path=OUTPUT_DIR) -> None:
  83. from core.models import Snapshot
  84. # with transaction.atomic():
  85. # try:
  86. # snap = Snapshot.objects.get(url=link.url)
  87. # except Snapshot.DoesNotExist:
  88. # snap = write_link_to_sql_index(link)
  89. # snap.title = link.title
  90. try:
  91. snap = Snapshot.objects.get(url=link.url)
  92. except Snapshot.DoesNotExist:
  93. snap = write_link_to_sql_index(link)
  94. snap.title = link.title
  95. tag_list = list(
  96. {tag.strip() for tag in re.split(TAG_SEPARATOR_PATTERN, link.tags or '')}
  97. | set(snap.tags.values_list('name', flat=True))
  98. )
  99. snap.save()
  100. snap.save_tags(tag_list)
  101. @enforce_types
  102. def list_migrations(out_dir: Path=OUTPUT_DIR) -> List[Tuple[bool, str]]:
  103. from django.core.management import call_command
  104. out = StringIO()
  105. call_command("showmigrations", list=True, stdout=out)
  106. out.seek(0)
  107. migrations = []
  108. for line in out.readlines():
  109. if line.strip() and ']' in line:
  110. status_str, name_str = line.strip().split(']', 1)
  111. is_applied = 'X' in status_str
  112. migration_name = name_str.strip()
  113. migrations.append((is_applied, migration_name))
  114. return migrations
  115. @enforce_types
  116. def apply_migrations(out_dir: Path=OUTPUT_DIR) -> List[str]:
  117. from django.core.management import call_command
  118. null, out = StringIO(), StringIO()
  119. # call_command("makemigrations", interactive=False, stdout=null)
  120. call_command("migrate", interactive=False, stdout=out)
  121. out.seek(0)
  122. return [line.strip() for line in out.readlines() if line.strip()]
  123. @enforce_types
  124. def get_admins(out_dir: Path=OUTPUT_DIR) -> List[str]:
  125. from django.contrib.auth.models import User
  126. return User.objects.filter(is_superuser=True)