sql.py 1.8 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859
  1. __package__ = 'archivebox.legacy.storage'
  2. from io import StringIO
  3. from typing import List, Tuple, Iterator
  4. from ..schema import Link
  5. from ..util import enforce_types
  6. from ..config import setup_django, OUTPUT_DIR
  7. ### Main Links Index
  8. @enforce_types
  9. def parse_sql_main_index(out_dir: str=OUTPUT_DIR) -> Iterator[Link]:
  10. setup_django(out_dir, check_db=True)
  11. from core.models import Page
  12. return (
  13. Link.from_json(page.as_json(*Page.keys))
  14. for page in Page.objects.all()
  15. )
  16. @enforce_types
  17. def write_sql_main_index(links: List[Link], out_dir: str=OUTPUT_DIR) -> None:
  18. setup_django(out_dir, check_db=True)
  19. from core.models import Page
  20. for link in links:
  21. info = {k: v for k, v in link._asdict().items() if k in Page.keys}
  22. Page.objects.update_or_create(url=link.url, defaults=info)
  23. @enforce_types
  24. def list_migrations(out_dir: str=OUTPUT_DIR) -> List[Tuple[bool, str]]:
  25. setup_django(out_dir, check_db=False)
  26. from django.core.management import call_command
  27. out = StringIO()
  28. call_command("showmigrations", list=True, stdout=out)
  29. out.seek(0)
  30. migrations = []
  31. for line in out.readlines():
  32. if line.strip() and ']' in line:
  33. status_str, name_str = line.strip().split(']', 1)
  34. is_applied = 'X' in status_str
  35. migration_name = name_str.strip()
  36. migrations.append((is_applied, migration_name))
  37. return migrations
  38. @enforce_types
  39. def apply_migrations(out_dir: str=OUTPUT_DIR) -> List[str]:
  40. setup_django(out_dir, check_db=False)
  41. from django.core.management import call_command
  42. null, out = StringIO(), StringIO()
  43. call_command("makemigrations", interactive=False, stdout=null)
  44. call_command("migrate", interactive=False, stdout=out)
  45. out.seek(0)
  46. return [line.strip() for line in out.readlines() if line.strip()]