sql.py 5.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164
  1. __package__ = 'archivebox.index'
  2. import re
  3. from io import StringIO
  4. from pathlib import Path
  5. from typing import List, Tuple, Iterator
  6. from django.db.models import QuerySet
  7. from django.db import transaction
  8. from .schema import Link
  9. from ..util import enforce_types, parse_date
  10. from ..config import (
  11. OUTPUT_DIR,
  12. TAG_SEPARATOR_PATTERN,
  13. )
  14. ### Main Links Index
  15. @enforce_types
  16. def parse_sql_main_index(out_dir: Path=OUTPUT_DIR) -> Iterator[Link]:
  17. from core.models import Snapshot
  18. return (
  19. Link.from_json(page.as_json(*Snapshot.keys))
  20. for page in Snapshot.objects.all()
  21. )
  22. @enforce_types
  23. def remove_from_sql_main_index(snapshots: QuerySet, atomic: bool=False, out_dir: Path=OUTPUT_DIR) -> None:
  24. if atomic:
  25. with transaction.atomic():
  26. return snapshots.delete()
  27. return snapshots.delete()
  28. @enforce_types
  29. def write_link_to_sql_index(link: Link, created_by_id: int | None=None):
  30. from core.models import Snapshot, ArchiveResult
  31. info = {k: v for k, v in link._asdict().items() if k in Snapshot.keys}
  32. info['created_by_id'] = created_by_id
  33. tag_list = list(dict.fromkeys(
  34. tag.strip() for tag in re.split(TAG_SEPARATOR_PATTERN, link.tags or '')
  35. ))
  36. info.pop('tags')
  37. try:
  38. snapshot = Snapshot.objects.get(url=link.url)
  39. info["timestamp"] = snapshot.timestamp
  40. except Snapshot.DoesNotExist:
  41. while Snapshot.objects.filter(timestamp=info["timestamp"]).exists():
  42. info["timestamp"] = str(float(info["timestamp"]) + 1.0)
  43. snapshot, _ = Snapshot.objects.update_or_create(url=link.url, defaults=info)
  44. snapshot.save_tags(tag_list)
  45. for extractor, entries in link.history.items():
  46. for entry in entries:
  47. if isinstance(entry, dict):
  48. result, _ = ArchiveResult.objects.get_or_create(
  49. snapshot_id=snapshot.pk,
  50. extractor=extractor,
  51. start_ts=parse_date(entry['start_ts']),
  52. defaults={
  53. 'end_ts': parse_date(entry['end_ts']),
  54. 'cmd': entry['cmd'],
  55. 'output': entry['output'],
  56. 'cmd_version': entry.get('cmd_version') or 'unknown',
  57. 'pwd': entry['pwd'],
  58. 'status': entry['status'],
  59. 'created_by_id': snapshot.created_by_id,
  60. }
  61. )
  62. else:
  63. result, _ = ArchiveResult.objects.update_or_create(
  64. snapshot_id=snapshot.pk,
  65. extractor=extractor,
  66. start_ts=parse_date(entry.start_ts),
  67. defaults={
  68. 'end_ts': parse_date(entry.end_ts),
  69. 'cmd': entry.cmd,
  70. 'output': entry.output,
  71. 'cmd_version': entry.cmd_version or 'unknown',
  72. 'pwd': entry.pwd,
  73. 'status': entry.status,
  74. 'created_by_id': snapshot.created_by_id,
  75. }
  76. )
  77. return snapshot
  78. @enforce_types
  79. def write_sql_main_index(links: List[Link], out_dir: Path=OUTPUT_DIR, created_by_id: int | None=None) -> None:
  80. for link in links:
  81. # with transaction.atomic():
  82. # write_link_to_sql_index(link)
  83. write_link_to_sql_index(link, created_by_id=created_by_id)
  84. @enforce_types
  85. def write_sql_link_details(link: Link, out_dir: Path=OUTPUT_DIR, created_by_id: int | None=None) -> None:
  86. from core.models import Snapshot
  87. # with transaction.atomic():
  88. # try:
  89. # snap = Snapshot.objects.get(url=link.url)
  90. # except Snapshot.DoesNotExist:
  91. # snap = write_link_to_sql_index(link)
  92. # snap.title = link.title
  93. try:
  94. snap = Snapshot.objects.get(url=link.url)
  95. except Snapshot.DoesNotExist:
  96. snap = write_link_to_sql_index(link, created_by_id=created_by_id)
  97. snap.title = link.title
  98. tag_list = list(
  99. {tag.strip() for tag in re.split(TAG_SEPARATOR_PATTERN, link.tags or '')}
  100. | set(snap.tags.values_list('name', flat=True))
  101. )
  102. snap.save()
  103. snap.save_tags(tag_list)
  104. @enforce_types
  105. def list_migrations(out_dir: Path=OUTPUT_DIR) -> List[Tuple[bool, str]]:
  106. from django.core.management import call_command
  107. out = StringIO()
  108. call_command("showmigrations", list=True, stdout=out)
  109. out.seek(0)
  110. migrations = []
  111. for line in out.readlines():
  112. if line.strip() and ']' in line:
  113. status_str, name_str = line.strip().split(']', 1)
  114. is_applied = 'X' in status_str
  115. migration_name = name_str.strip()
  116. migrations.append((is_applied, migration_name))
  117. return migrations
  118. @enforce_types
  119. def apply_migrations(out_dir: Path=OUTPUT_DIR) -> List[str]:
  120. from django.core.management import call_command
  121. null, out = StringIO(), StringIO()
  122. try:
  123. call_command("makemigrations", interactive=False, stdout=null)
  124. except Exception as e:
  125. print('[!] Failed to create some migrations. Please open an issue and copy paste this output for help: {}'.format(e))
  126. print()
  127. call_command("migrate", interactive=False, stdout=out)
  128. out.seek(0)
  129. return [line.strip() for line in out.readlines() if line.strip()]
  130. @enforce_types
  131. def get_admins(out_dir: Path=OUTPUT_DIR) -> List[str]:
  132. from django.contrib.auth.models import User
  133. return User.objects.filter(is_superuser=True)