sql.py 4.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152
  1. __package__ = 'archivebox.index'
  2. import re
  3. from io import StringIO
  4. from pathlib import Path
  5. from typing import List, Tuple, Iterator
  6. from django.db.models import QuerySet
  7. from django.db import transaction
  8. from .schema import Link
  9. from ..util import enforce_types, parse_date
  10. from ..config import (
  11. OUTPUT_DIR,
  12. TAG_SEPARATOR_PATTERN,
  13. )
  14. ### Main Links Index
  15. @enforce_types
  16. def parse_sql_main_index(out_dir: Path=OUTPUT_DIR) -> Iterator[Link]:
  17. from core.models import Snapshot
  18. return (
  19. Link.from_json(page.as_json(*Snapshot.keys))
  20. for page in Snapshot.objects.all()
  21. )
  22. @enforce_types
  23. def remove_from_sql_main_index(snapshots: QuerySet, atomic: bool=False, out_dir: Path=OUTPUT_DIR) -> None:
  24. if atomic:
  25. with transaction.atomic():
  26. return snapshots.delete()
  27. return snapshots.delete()
  28. @enforce_types
  29. def write_link_to_sql_index(link: Link):
  30. from core.models import Snapshot, ArchiveResult
  31. info = {k: v for k, v in link._asdict().items() if k in Snapshot.keys}
  32. tag_list = list(dict.fromkeys(
  33. tag.strip() for tag in re.split(TAG_SEPARATOR_PATTERN, link.tags or '')
  34. ))
  35. info.pop('tags')
  36. try:
  37. info["timestamp"] = Snapshot.objects.get(url=link.url).timestamp
  38. except Snapshot.DoesNotExist:
  39. while Snapshot.objects.filter(timestamp=info["timestamp"]).exists():
  40. info["timestamp"] = str(float(info["timestamp"]) + 1.0)
  41. snapshot, _ = Snapshot.objects.update_or_create(url=link.url, defaults=info)
  42. snapshot.save_tags(tag_list)
  43. for extractor, entries in link.history.items():
  44. for entry in entries:
  45. if isinstance(entry, dict):
  46. result, _ = ArchiveResult.objects.get_or_create(
  47. snapshot_id=snapshot.id,
  48. extractor=extractor,
  49. start_ts=parse_date(entry['start_ts']),
  50. defaults={
  51. 'end_ts': parse_date(entry['end_ts']),
  52. 'cmd': entry['cmd'],
  53. 'output': entry['output'],
  54. 'cmd_version': entry.get('cmd_version') or 'unknown',
  55. 'pwd': entry['pwd'],
  56. 'status': entry['status'],
  57. }
  58. )
  59. else:
  60. result, _ = ArchiveResult.objects.update_or_create(
  61. snapshot_id=snapshot.id,
  62. extractor=extractor,
  63. start_ts=parse_date(entry.start_ts),
  64. defaults={
  65. 'end_ts': parse_date(entry.end_ts),
  66. 'cmd': entry.cmd,
  67. 'output': entry.output,
  68. 'cmd_version': entry.cmd_version or 'unknown',
  69. 'pwd': entry.pwd,
  70. 'status': entry.status,
  71. }
  72. )
  73. return snapshot
  74. @enforce_types
  75. def write_sql_main_index(links: List[Link], out_dir: Path=OUTPUT_DIR) -> None:
  76. for link in links:
  77. # with transaction.atomic():
  78. # write_link_to_sql_index(link)
  79. write_link_to_sql_index(link)
  80. @enforce_types
  81. def write_sql_link_details(link: Link, out_dir: Path=OUTPUT_DIR) -> None:
  82. from core.models import Snapshot
  83. # with transaction.atomic():
  84. # try:
  85. # snap = Snapshot.objects.get(url=link.url)
  86. # except Snapshot.DoesNotExist:
  87. # snap = write_link_to_sql_index(link)
  88. # snap.title = link.title
  89. try:
  90. snap = Snapshot.objects.get(url=link.url)
  91. except Snapshot.DoesNotExist:
  92. snap = write_link_to_sql_index(link)
  93. snap.title = link.title
  94. tag_list = list(dict.fromkeys(
  95. tag.strip() for tag in re.split(TAG_SEPARATOR_PATTERN, link.tags or '')
  96. ))
  97. snap.save()
  98. snap.save_tags(tag_list)
  99. @enforce_types
  100. def list_migrations(out_dir: Path=OUTPUT_DIR) -> List[Tuple[bool, str]]:
  101. from django.core.management import call_command
  102. out = StringIO()
  103. call_command("showmigrations", list=True, stdout=out)
  104. out.seek(0)
  105. migrations = []
  106. for line in out.readlines():
  107. if line.strip() and ']' in line:
  108. status_str, name_str = line.strip().split(']', 1)
  109. is_applied = 'X' in status_str
  110. migration_name = name_str.strip()
  111. migrations.append((is_applied, migration_name))
  112. return migrations
  113. @enforce_types
  114. def apply_migrations(out_dir: Path=OUTPUT_DIR) -> List[str]:
  115. from django.core.management import call_command
  116. null, out = StringIO(), StringIO()
  117. call_command("makemigrations", interactive=False, stdout=null)
  118. call_command("migrate", interactive=False, stdout=out)
  119. out.seek(0)
  120. return [line.strip() for line in out.readlines() if line.strip()]
  121. @enforce_types
  122. def get_admins(out_dir: Path=OUTPUT_DIR) -> List[str]:
  123. from django.contrib.auth.models import User
  124. return User.objects.filter(is_superuser=True)