| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103 |
- # import uuid
- # from functools import wraps
- # from django.db import connection, transaction
- # from django.utils import timezone
- # from huey.exceptions import TaskLockedException
- # from archivebox.config import CONSTANTS
- # class SqliteSemaphore:
- # def __init__(self, db_path, table_name, name, value=1, timeout=None):
- # self.db_path = db_path
- # self.table_name = table_name
- # self.name = name
- # self.value = value
- # self.timeout = timeout or 86400 # Set a max age for lock holders
- # # Ensure the table exists
- # with connection.cursor() as cursor:
- # cursor.execute(f"""
- # CREATE TABLE IF NOT EXISTS {self.table_name} (
- # id TEXT PRIMARY KEY,
- # name TEXT,
- # timestamp DATETIME
- # )
- # """)
- # def acquire(self, name=None):
- # name = name or str(uuid.uuid4())
- # now = timezone.now()
- # expiration = now - timezone.timedelta(seconds=self.timeout)
- # with transaction.atomic():
- # # Remove expired locks
- # with connection.cursor() as cursor:
- # cursor.execute(f"""
- # DELETE FROM {self.table_name}
- # WHERE name = %s AND timestamp < %s
- # """, [self.name, expiration])
- # # Try to acquire the lock
- # with connection.cursor() as cursor:
- # cursor.execute(f"""
- # INSERT INTO {self.table_name} (id, name, timestamp)
- # SELECT %s, %s, %s
- # WHERE (
- # SELECT COUNT(*) FROM {self.table_name}
- # WHERE name = %s
- # ) < %s
- # """, [name, self.name, now, self.name, self.value])
- # if cursor.rowcount > 0:
- # return name
- # # If we couldn't acquire the lock, remove our attempted entry
- # with connection.cursor() as cursor:
- # cursor.execute(f"""
- # DELETE FROM {self.table_name}
- # WHERE id = %s AND name = %s
- # """, [name, self.name])
- # return None
- # def release(self, name):
- # with connection.cursor() as cursor:
- # cursor.execute(f"""
- # DELETE FROM {self.table_name}
- # WHERE id = %s AND name = %s
- # """, [name, self.name])
- # return cursor.rowcount > 0
- # LOCKS_DB_PATH = CONSTANTS.DATABASE_FILE.parent / 'locks.sqlite3'
- # def lock_task_semaphore(db_path, table_name, lock_name, value=1, timeout=None):
- # """
- # Lock which can be acquired multiple times (default = 1).
- # NOTE: no provisions are made for blocking, waiting, or notifying. This is
- # just a lock which can be acquired a configurable number of times.
- # Example:
- # # Allow up to 3 workers to run this task concurrently. If the task is
- # # locked, retry up to 2 times with a delay of 60s.
- # @huey.task(retries=2, retry_delay=60)
- # @lock_task_semaphore('path/to/db.sqlite3', 'semaphore_locks', 'my-lock', 3)
- # def my_task():
- # ...
- # """
- # sem = SqliteSemaphore(db_path, table_name, lock_name, value, timeout)
- # def decorator(fn):
- # @wraps(fn)
- # def inner(*args, **kwargs):
- # tid = sem.acquire()
- # if tid is None:
- # raise TaskLockedException(f'unable to acquire lock {lock_name}')
- # try:
- # return fn(*args, **kwargs)
- # finally:
- # sem.release(tid)
- # return inner
- # return decorator
|