sql.py 3.2 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495
  1. __package__ = 'archivebox.index'
  2. from io import StringIO
  3. from typing import List, Tuple, Iterator
  4. from .schema import Link
  5. from ..util import enforce_types
  6. from ..config import setup_django, OUTPUT_DIR
  7. ### Main Links Index
  8. @enforce_types
  9. def parse_sql_main_index(out_dir: str=OUTPUT_DIR) -> Iterator[Link]:
  10. setup_django(out_dir, check_db=True)
  11. from core.models import Snapshot
  12. return (
  13. Link.from_json(page.as_json(*Snapshot.keys))
  14. for page in Snapshot.objects.all()
  15. )
  16. @enforce_types
  17. def write_sql_main_index(links: List[Link], out_dir: str=OUTPUT_DIR) -> None:
  18. setup_django(out_dir, check_db=True)
  19. from core.models import Snapshot
  20. from django.db import transaction
  21. all_urls = {link.url: link for link in links}
  22. all_ts = {link.timestamp: link for link in links}
  23. with transaction.atomic():
  24. for snapshot in Snapshot.objects.all():
  25. if snapshot.timestamp in all_ts:
  26. info = {k: v for k, v in all_urls.pop(snapshot.url)._asdict().items() if k in Snapshot.keys}
  27. snapshot.delete()
  28. Snapshot.objects.create(**info)
  29. elif snapshot.url in all_urls:
  30. info = {k: v for k, v in all_urls.pop(snapshot.url)._asdict().items() if k in Snapshot.keys}
  31. snapshot.delete()
  32. Snapshot.objects.create(**info)
  33. else:
  34. snapshot.delete()
  35. for url, link in all_urls.items():
  36. info = {k: v for k, v in link._asdict().items() if k in Snapshot.keys}
  37. Snapshot.objects.update_or_create(url=url, defaults=info)
  38. @enforce_types
  39. def write_sql_link_details(link: Link, out_dir: str=OUTPUT_DIR) -> None:
  40. setup_django(out_dir, check_db=True)
  41. from core.models import Snapshot
  42. from django.db import transaction
  43. with transaction.atomic():
  44. snap = Snapshot.objects.get(url=link['url'], timestamp=link['timestamp'])
  45. snap.title = link.title
  46. snap.tags = link.tags
  47. snap.save()
  48. @enforce_types
  49. def list_migrations(out_dir: str=OUTPUT_DIR) -> List[Tuple[bool, str]]:
  50. setup_django(out_dir, check_db=False)
  51. from django.core.management import call_command
  52. out = StringIO()
  53. call_command("showmigrations", list=True, stdout=out)
  54. out.seek(0)
  55. migrations = []
  56. for line in out.readlines():
  57. if line.strip() and ']' in line:
  58. status_str, name_str = line.strip().split(']', 1)
  59. is_applied = 'X' in status_str
  60. migration_name = name_str.strip()
  61. migrations.append((is_applied, migration_name))
  62. return migrations
  63. @enforce_types
  64. def apply_migrations(out_dir: str=OUTPUT_DIR) -> List[str]:
  65. setup_django(out_dir, check_db=False)
  66. from django.core.management import call_command
  67. null, out = StringIO(), StringIO()
  68. call_command("makemigrations", interactive=False, stdout=null)
  69. call_command("migrate", interactive=False, stdout=out)
  70. out.seek(0)
  71. return [line.strip() for line in out.readlines() if line.strip()]
  72. @enforce_types
  73. def get_admins(out_dir: str=OUTPUT_DIR) -> List[str]:
  74. setup_django(out_dir, check_db=False)
  75. from django.contrib.auth.models import User
  76. return User.objects.filter(is_superuser=True)