tasks.py 4.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123
  1. __package__ = 'archivebox.workers'
  2. from functools import wraps
  3. # from django.utils import timezone
  4. from django_huey import db_task, task
  5. from huey_monitor.models import TaskModel
  6. from huey_monitor.tqdm import ProcessInfo
  7. from .supervisord_util import get_or_create_supervisord_process
  8. # @db_task(queue="commands", context=True, schedule=1)
  9. # def scheduler_tick():
  10. # print('SCHEDULER TICK', timezone.now().isoformat())
  11. # # abx.archivebox.events.on_scheduler_runloop_start(timezone.now(), machine=Machine.objects.get_current_machine())
  12. # # abx.archivebox.events.on_scheduler_tick_start(timezone.now(), machine=Machine.objects.get_current_machine())
  13. # scheduled_crawls = CrawlSchedule.objects.filter(is_enabled=True)
  14. # scheduled_crawls_due = scheduled_crawls.filter(next_run_at__lte=timezone.now())
  15. # for scheduled_crawl in scheduled_crawls_due:
  16. # try:
  17. # abx.archivebox.events.on_crawl_schedule_tick(scheduled_crawl)
  18. # except Exception as e:
  19. # abx.archivebox.events.on_crawl_schedule_failure(timezone.now(), machine=Machine.objects.get_current_machine(), error=e, schedule=scheduled_crawl)
  20. # # abx.archivebox.events.on_scheduler_tick_end(timezone.now(), machine=Machine.objects.get_current_machine(), tasks=scheduled_tasks_due)
  21. def db_task_with_parent(func):
  22. """Decorator for db_task that sets the parent task for the db_task"""
  23. @wraps(func)
  24. def wrapper(*args, **kwargs):
  25. task = kwargs.get('task')
  26. parent_task_id = kwargs.get('parent_task_id')
  27. if task and parent_task_id:
  28. TaskModel.objects.set_parent_task(main_task_id=parent_task_id, sub_task_id=task.id)
  29. return func(*args, **kwargs)
  30. return wrapper
  31. @db_task(queue="commands", context=True)
  32. def bg_add(add_kwargs, task=None, parent_task_id=None):
  33. get_or_create_supervisord_process(daemonize=False)
  34. from ..main import add
  35. if task and parent_task_id:
  36. TaskModel.objects.set_parent_task(main_task_id=parent_task_id, sub_task_id=task.id)
  37. assert add_kwargs and add_kwargs.get("urls")
  38. rough_url_count = add_kwargs["urls"].count("://")
  39. process_info = ProcessInfo(task, desc="add", parent_task_id=parent_task_id, total=rough_url_count)
  40. result = add(**add_kwargs)
  41. process_info.update(n=rough_url_count)
  42. return result
  43. @task(queue="commands", context=True)
  44. def bg_archive_links(args, kwargs=None, task=None, parent_task_id=None):
  45. get_or_create_supervisord_process(daemonize=False)
  46. from ..extractors import archive_links
  47. if task and parent_task_id:
  48. TaskModel.objects.set_parent_task(main_task_id=parent_task_id, sub_task_id=task.id)
  49. assert args and args[0]
  50. kwargs = kwargs or {}
  51. rough_count = len(args[0])
  52. process_info = ProcessInfo(task, desc="archive_links", parent_task_id=parent_task_id, total=rough_count)
  53. result = archive_links(*args, **kwargs)
  54. process_info.update(n=rough_count)
  55. return result
  56. @task(queue="commands", context=True)
  57. def bg_archive_link(args, kwargs=None,task=None, parent_task_id=None):
  58. get_or_create_supervisord_process(daemonize=False)
  59. from ..extractors import archive_link
  60. if task and parent_task_id:
  61. TaskModel.objects.set_parent_task(main_task_id=parent_task_id, sub_task_id=task.id)
  62. assert args and args[0]
  63. kwargs = kwargs or {}
  64. rough_count = len(args[0])
  65. process_info = ProcessInfo(task, desc="archive_link", parent_task_id=parent_task_id, total=rough_count)
  66. result = archive_link(*args, **kwargs)
  67. process_info.update(n=rough_count)
  68. return result
  69. @task(queue="commands", context=True)
  70. def bg_archive_snapshot(snapshot, overwrite=False, methods=None, task=None, parent_task_id=None):
  71. # get_or_create_supervisord_process(daemonize=False)
  72. from ..extractors import archive_link
  73. if task and parent_task_id:
  74. TaskModel.objects.set_parent_task(main_task_id=parent_task_id, sub_task_id=task.id)
  75. process_info = ProcessInfo(task, desc="archive_link", parent_task_id=parent_task_id, total=1)
  76. link = snapshot.as_link_with_details()
  77. result = archive_link(link, overwrite=overwrite, methods=methods)
  78. process_info.update(n=1)
  79. return result