pid_utils.py 5.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191
  1. """
  2. PID file utilities for tracking worker and orchestrator processes.
  3. PID files are stored in data/tmp/workers/ and contain:
  4. - Line 1: PID
  5. - Line 2: Worker type (orchestrator, crawl, snapshot, archiveresult)
  6. - Line 3: Extractor filter (optional, for archiveresult workers)
  7. - Line 4: Started at ISO timestamp
  8. """
  9. __package__ = 'archivebox.workers'
  10. import os
  11. import signal
  12. from pathlib import Path
  13. from datetime import datetime, timezone
  14. from django.conf import settings
  15. def get_pid_dir() -> Path:
  16. """Get the directory for PID files, creating it if needed."""
  17. pid_dir = Path(settings.DATA_DIR) / 'tmp' / 'workers'
  18. pid_dir.mkdir(parents=True, exist_ok=True)
  19. return pid_dir
  20. def write_pid_file(worker_type: str, worker_id: int = 0, extractor: str | None = None) -> Path:
  21. """
  22. Write a PID file for the current process.
  23. Returns the path to the PID file.
  24. """
  25. pid_dir = get_pid_dir()
  26. if worker_type == 'orchestrator':
  27. pid_file = pid_dir / 'orchestrator.pid'
  28. else:
  29. pid_file = pid_dir / f'{worker_type}_worker_{worker_id}.pid'
  30. content = f"{os.getpid()}\n{worker_type}\n{extractor or ''}\n{datetime.now(timezone.utc).isoformat()}\n"
  31. pid_file.write_text(content)
  32. return pid_file
  33. def read_pid_file(path: Path) -> dict | None:
  34. """
  35. Read and parse a PID file.
  36. Returns dict with pid, worker_type, extractor, started_at or None if invalid.
  37. """
  38. try:
  39. if not path.exists():
  40. return None
  41. lines = path.read_text().strip().split('\n')
  42. if len(lines) < 4:
  43. return None
  44. return {
  45. 'pid': int(lines[0]),
  46. 'worker_type': lines[1],
  47. 'extractor': lines[2] or None,
  48. 'started_at': datetime.fromisoformat(lines[3]),
  49. 'pid_file': path,
  50. }
  51. except (ValueError, IndexError, OSError):
  52. return None
  53. def remove_pid_file(path: Path) -> None:
  54. """Remove a PID file if it exists."""
  55. try:
  56. path.unlink(missing_ok=True)
  57. except OSError:
  58. pass
  59. def is_process_alive(pid: int) -> bool:
  60. """Check if a process with the given PID is still running."""
  61. try:
  62. os.kill(pid, 0) # Signal 0 doesn't kill, just checks
  63. return True
  64. except (OSError, ProcessLookupError):
  65. return False
  66. def get_all_pid_files() -> list[Path]:
  67. """Get all PID files in the workers directory."""
  68. pid_dir = get_pid_dir()
  69. return list(pid_dir.glob('*.pid'))
  70. def get_all_worker_pids(worker_type: str | None = None) -> list[dict]:
  71. """
  72. Get info about all running workers.
  73. Optionally filter by worker_type.
  74. """
  75. workers = []
  76. for pid_file in get_all_pid_files():
  77. info = read_pid_file(pid_file)
  78. if info is None:
  79. continue
  80. # Skip if process is dead
  81. if not is_process_alive(info['pid']):
  82. continue
  83. # Filter by type if specified
  84. if worker_type and info['worker_type'] != worker_type:
  85. continue
  86. workers.append(info)
  87. return workers
  88. def cleanup_stale_pid_files() -> int:
  89. """
  90. Remove PID files for processes that are no longer running.
  91. Returns the number of stale files removed.
  92. """
  93. removed = 0
  94. for pid_file in get_all_pid_files():
  95. info = read_pid_file(pid_file)
  96. if info is None:
  97. # Invalid PID file, remove it
  98. remove_pid_file(pid_file)
  99. removed += 1
  100. continue
  101. if not is_process_alive(info['pid']):
  102. remove_pid_file(pid_file)
  103. removed += 1
  104. return removed
  105. def get_running_worker_count(worker_type: str) -> int:
  106. """Get the count of running workers of a specific type."""
  107. return len(get_all_worker_pids(worker_type))
  108. def get_next_worker_id(worker_type: str) -> int:
  109. """Get the next available worker ID for a given type."""
  110. existing_ids = set()
  111. for pid_file in get_all_pid_files():
  112. # Parse worker ID from filename like "snapshot_worker_3.pid"
  113. name = pid_file.stem
  114. if name.startswith(f'{worker_type}_worker_'):
  115. try:
  116. worker_id = int(name.split('_')[-1])
  117. existing_ids.add(worker_id)
  118. except ValueError:
  119. continue
  120. # Find the lowest unused ID
  121. next_id = 0
  122. while next_id in existing_ids:
  123. next_id += 1
  124. return next_id
  125. def stop_worker(pid: int, graceful: bool = True) -> bool:
  126. """
  127. Stop a worker process.
  128. If graceful=True, sends SIGTERM first, then SIGKILL after timeout.
  129. Returns True if process was stopped.
  130. """
  131. if not is_process_alive(pid):
  132. return True
  133. try:
  134. if graceful:
  135. os.kill(pid, signal.SIGTERM)
  136. # Give it a moment to shut down
  137. import time
  138. for _ in range(10): # Wait up to 1 second
  139. time.sleep(0.1)
  140. if not is_process_alive(pid):
  141. return True
  142. # Force kill if still running
  143. os.kill(pid, signal.SIGKILL)
  144. else:
  145. os.kill(pid, signal.SIGKILL)
  146. return True
  147. except (OSError, ProcessLookupError):
  148. return True # Process already dead