sql.py 2.8 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283
  1. __package__ = 'archivebox.index'
  2. from io import StringIO
  3. from typing import List, Tuple, Iterator
  4. from .schema import Link
  5. from ..util import enforce_types
  6. from ..config import setup_django, OUTPUT_DIR
  7. ### Main Links Index
  8. @enforce_types
  9. def parse_sql_main_index(out_dir: str=OUTPUT_DIR) -> Iterator[Link]:
  10. setup_django(out_dir, check_db=True)
  11. from core.models import Snapshot
  12. return (
  13. Link.from_json(page.as_json(*Snapshot.keys))
  14. for page in Snapshot.objects.all()
  15. )
  16. @enforce_types
  17. def write_sql_main_index(links: List[Link], out_dir: str=OUTPUT_DIR) -> None:
  18. setup_django(out_dir, check_db=True)
  19. from core.models import Snapshot
  20. from django.db import transaction
  21. all_urls = {link.url: link for link in links}
  22. all_ts = {link.timestamp: link for link in links}
  23. with transaction.atomic():
  24. for snapshot in Snapshot.objects.all():
  25. if snapshot.timestamp in all_ts:
  26. info = {k: v for k, v in all_urls.pop(snapshot.url)._asdict().items() if k in Snapshot.keys}
  27. snapshot.delete()
  28. Snapshot.objects.create(**info)
  29. elif snapshot.url in all_urls:
  30. info = {k: v for k, v in all_urls.pop(snapshot.url)._asdict().items() if k in Snapshot.keys}
  31. snapshot.delete()
  32. Snapshot.objects.create(**info)
  33. else:
  34. snapshot.delete()
  35. for url, link in all_urls.items():
  36. info = {k: v for k, v in link._asdict().items() if k in Snapshot.keys}
  37. Snapshot.objects.update_or_create(url=url, defaults=info)
  38. @enforce_types
  39. def list_migrations(out_dir: str=OUTPUT_DIR) -> List[Tuple[bool, str]]:
  40. setup_django(out_dir, check_db=False)
  41. from django.core.management import call_command
  42. out = StringIO()
  43. call_command("showmigrations", list=True, stdout=out)
  44. out.seek(0)
  45. migrations = []
  46. for line in out.readlines():
  47. if line.strip() and ']' in line:
  48. status_str, name_str = line.strip().split(']', 1)
  49. is_applied = 'X' in status_str
  50. migration_name = name_str.strip()
  51. migrations.append((is_applied, migration_name))
  52. return migrations
  53. @enforce_types
  54. def apply_migrations(out_dir: str=OUTPUT_DIR) -> List[str]:
  55. setup_django(out_dir, check_db=False)
  56. from django.core.management import call_command
  57. null, out = StringIO(), StringIO()
  58. call_command("makemigrations", interactive=False, stdout=null)
  59. call_command("migrate", interactive=False, stdout=out)
  60. out.seek(0)
  61. return [line.strip() for line in out.readlines() if line.strip()]
  62. @enforce_types
  63. def get_admins(out_dir: str=OUTPUT_DIR) -> List[str]:
  64. setup_django(out_dir, check_db=False)
  65. from django.contrib.auth.models import User
  66. return User.objects.filter(is_superuser=True)