semaphores.py 3.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103
  1. # import uuid
  2. # from functools import wraps
  3. # from django.db import connection, transaction
  4. # from django.utils import timezone
  5. # from huey.exceptions import TaskLockedException
  6. # from archivebox.config import CONSTANTS
  7. # class SqliteSemaphore:
  8. # def __init__(self, db_path, table_name, name, value=1, timeout=None):
  9. # self.db_path = db_path
  10. # self.table_name = table_name
  11. # self.name = name
  12. # self.value = value
  13. # self.timeout = timeout or 86400 # Set a max age for lock holders
  14. # # Ensure the table exists
  15. # with connection.cursor() as cursor:
  16. # cursor.execute(f"""
  17. # CREATE TABLE IF NOT EXISTS {self.table_name} (
  18. # id TEXT PRIMARY KEY,
  19. # name TEXT,
  20. # timestamp DATETIME
  21. # )
  22. # """)
  23. # def acquire(self, name=None):
  24. # name = name or str(uuid.uuid4())
  25. # now = timezone.now()
  26. # expiration = now - timezone.timedelta(seconds=self.timeout)
  27. # with transaction.atomic():
  28. # # Remove expired locks
  29. # with connection.cursor() as cursor:
  30. # cursor.execute(f"""
  31. # DELETE FROM {self.table_name}
  32. # WHERE name = %s AND timestamp < %s
  33. # """, [self.name, expiration])
  34. # # Try to acquire the lock
  35. # with connection.cursor() as cursor:
  36. # cursor.execute(f"""
  37. # INSERT INTO {self.table_name} (id, name, timestamp)
  38. # SELECT %s, %s, %s
  39. # WHERE (
  40. # SELECT COUNT(*) FROM {self.table_name}
  41. # WHERE name = %s
  42. # ) < %s
  43. # """, [name, self.name, now, self.name, self.value])
  44. # if cursor.rowcount > 0:
  45. # return name
  46. # # If we couldn't acquire the lock, remove our attempted entry
  47. # with connection.cursor() as cursor:
  48. # cursor.execute(f"""
  49. # DELETE FROM {self.table_name}
  50. # WHERE id = %s AND name = %s
  51. # """, [name, self.name])
  52. # return None
  53. # def release(self, name):
  54. # with connection.cursor() as cursor:
  55. # cursor.execute(f"""
  56. # DELETE FROM {self.table_name}
  57. # WHERE id = %s AND name = %s
  58. # """, [name, self.name])
  59. # return cursor.rowcount > 0
  60. # LOCKS_DB_PATH = CONSTANTS.DATABASE_FILE.parent / 'locks.sqlite3'
  61. # def lock_task_semaphore(db_path, table_name, lock_name, value=1, timeout=None):
  62. # """
  63. # Lock which can be acquired multiple times (default = 1).
  64. # NOTE: no provisions are made for blocking, waiting, or notifying. This is
  65. # just a lock which can be acquired a configurable number of times.
  66. # Example:
  67. # # Allow up to 3 workers to run this task concurrently. If the task is
  68. # # locked, retry up to 2 times with a delay of 60s.
  69. # @huey.task(retries=2, retry_delay=60)
  70. # @lock_task_semaphore('path/to/db.sqlite3', 'semaphore_locks', 'my-lock', 3)
  71. # def my_task():
  72. # ...
  73. # """
  74. # sem = SqliteSemaphore(db_path, table_name, lock_name, value, timeout)
  75. # def decorator(fn):
  76. # @wraps(fn)
  77. # def inner(*args, **kwargs):
  78. # tid = sem.acquire()
  79. # if tid is None:
  80. # raise TaskLockedException(f'unable to acquire lock {lock_name}')
  81. # try:
  82. # return fn(*args, **kwargs)
  83. # finally:
  84. # sem.release(tid)
  85. # return inner
  86. # return decorator