test_orchestrator.py 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493
  1. """
  2. Unit tests for the Orchestrator and Worker classes.
  3. Tests cover:
  4. 1. Orchestrator lifecycle (startup, shutdown)
  5. 2. Queue polling and worker spawning
  6. 3. Idle detection and exit logic
  7. 4. Worker registration and management
  8. 5. Process model methods (replacing old pid_utils)
  9. """
  10. import os
  11. import tempfile
  12. import time
  13. from pathlib import Path
  14. from datetime import timedelta
  15. from unittest.mock import patch, MagicMock
  16. import pytest
  17. from django.test import TestCase
  18. from django.utils import timezone
  19. from archivebox.workers.orchestrator import Orchestrator
  20. class TestOrchestratorUnit(TestCase):
  21. """Unit tests for Orchestrator class (mocked dependencies)."""
  22. def test_orchestrator_creation(self):
  23. """Orchestrator should initialize with correct defaults."""
  24. orchestrator = Orchestrator(exit_on_idle=True)
  25. self.assertTrue(orchestrator.exit_on_idle)
  26. self.assertEqual(orchestrator.idle_count, 0)
  27. self.assertIsNone(orchestrator.pid_file)
  28. def test_orchestrator_repr(self):
  29. """Orchestrator __repr__ should include PID."""
  30. orchestrator = Orchestrator()
  31. repr_str = repr(orchestrator)
  32. self.assertIn('Orchestrator', repr_str)
  33. self.assertIn(str(os.getpid()), repr_str)
  34. def test_has_pending_work(self):
  35. """has_pending_work should check if any queue has items."""
  36. orchestrator = Orchestrator()
  37. self.assertFalse(orchestrator.has_pending_work({'crawl': 0, 'snapshot': 0}))
  38. self.assertTrue(orchestrator.has_pending_work({'crawl': 0, 'snapshot': 5}))
  39. self.assertTrue(orchestrator.has_pending_work({'crawl': 10, 'snapshot': 0}))
  40. def test_should_exit_not_exit_on_idle(self):
  41. """should_exit should return False when exit_on_idle is False."""
  42. orchestrator = Orchestrator(exit_on_idle=False)
  43. orchestrator.idle_count = 100
  44. self.assertFalse(orchestrator.should_exit({'crawl': 0}))
  45. def test_should_exit_pending_work(self):
  46. """should_exit should return False when there's pending work."""
  47. orchestrator = Orchestrator(exit_on_idle=True)
  48. orchestrator.idle_count = 100
  49. self.assertFalse(orchestrator.should_exit({'crawl': 5}))
  50. @patch.object(Orchestrator, 'has_running_workers')
  51. def test_should_exit_running_workers(self, mock_has_workers):
  52. """should_exit should return False when workers are running."""
  53. mock_has_workers.return_value = True
  54. orchestrator = Orchestrator(exit_on_idle=True)
  55. orchestrator.idle_count = 100
  56. self.assertFalse(orchestrator.should_exit({'crawl': 0}))
  57. @patch.object(Orchestrator, 'has_running_workers')
  58. @patch.object(Orchestrator, 'has_future_work')
  59. def test_should_exit_idle_timeout(self, mock_future, mock_workers):
  60. """should_exit should return True after idle timeout with no work."""
  61. mock_workers.return_value = False
  62. mock_future.return_value = False
  63. orchestrator = Orchestrator(exit_on_idle=True)
  64. orchestrator.idle_count = orchestrator.IDLE_TIMEOUT
  65. self.assertTrue(orchestrator.should_exit({'crawl': 0, 'snapshot': 0}))
  66. @patch.object(Orchestrator, 'has_running_workers')
  67. @patch.object(Orchestrator, 'has_future_work')
  68. def test_should_exit_below_idle_timeout(self, mock_future, mock_workers):
  69. """should_exit should return False below idle timeout."""
  70. mock_workers.return_value = False
  71. mock_future.return_value = False
  72. orchestrator = Orchestrator(exit_on_idle=True)
  73. orchestrator.idle_count = orchestrator.IDLE_TIMEOUT - 1
  74. self.assertFalse(orchestrator.should_exit({'crawl': 0}))
  75. def test_should_spawn_worker_no_queue(self):
  76. """should_spawn_worker should return False when queue is empty."""
  77. orchestrator = Orchestrator()
  78. # Create a mock worker class
  79. mock_worker = MagicMock()
  80. mock_worker.get_running_workers.return_value = []
  81. self.assertFalse(orchestrator.should_spawn_worker(mock_worker, 0))
  82. def test_should_spawn_worker_at_limit(self):
  83. """should_spawn_worker should return False when at per-type limit."""
  84. orchestrator = Orchestrator()
  85. mock_worker = MagicMock()
  86. mock_worker.get_running_workers.return_value = [{}] * orchestrator.MAX_WORKERS_PER_TYPE
  87. self.assertFalse(orchestrator.should_spawn_worker(mock_worker, 10))
  88. @patch.object(Orchestrator, 'get_total_worker_count')
  89. def test_should_spawn_worker_at_total_limit(self, mock_total):
  90. """should_spawn_worker should return False when at total limit."""
  91. orchestrator = Orchestrator()
  92. mock_total.return_value = orchestrator.MAX_TOTAL_WORKERS
  93. mock_worker = MagicMock()
  94. mock_worker.get_running_workers.return_value = []
  95. self.assertFalse(orchestrator.should_spawn_worker(mock_worker, 10))
  96. @patch.object(Orchestrator, 'get_total_worker_count')
  97. def test_should_spawn_worker_success(self, mock_total):
  98. """should_spawn_worker should return True when conditions are met."""
  99. orchestrator = Orchestrator()
  100. mock_total.return_value = 0
  101. mock_worker = MagicMock()
  102. mock_worker.get_running_workers.return_value = []
  103. mock_worker.MAX_CONCURRENT_TASKS = 5
  104. self.assertTrue(orchestrator.should_spawn_worker(mock_worker, 10))
  105. @patch.object(Orchestrator, 'get_total_worker_count')
  106. def test_should_spawn_worker_enough_workers(self, mock_total):
  107. """should_spawn_worker should return False when enough workers for queue."""
  108. orchestrator = Orchestrator()
  109. mock_total.return_value = 2
  110. mock_worker = MagicMock()
  111. mock_worker.get_running_workers.return_value = [{}] # 1 worker running
  112. mock_worker.MAX_CONCURRENT_TASKS = 5 # Can handle 5 items
  113. # Queue size (3) <= running_workers (1) * MAX_CONCURRENT_TASKS (5)
  114. self.assertFalse(orchestrator.should_spawn_worker(mock_worker, 3))
  115. class TestOrchestratorWithProcess(TestCase):
  116. """Test Orchestrator using Process model for tracking."""
  117. def setUp(self):
  118. """Reset process cache."""
  119. import archivebox.machine.models as models
  120. models._CURRENT_MACHINE = None
  121. models._CURRENT_PROCESS = None
  122. def test_is_running_no_orchestrator(self):
  123. """is_running should return False when no orchestrator process exists."""
  124. from archivebox.machine.models import Process
  125. # Clean up any stale processes first
  126. Process.cleanup_stale_running()
  127. # Mark any running orchestrators as exited for clean test state
  128. Process.objects.filter(
  129. process_type=Process.TypeChoices.ORCHESTRATOR,
  130. status=Process.StatusChoices.RUNNING
  131. ).update(status=Process.StatusChoices.EXITED)
  132. self.assertFalse(Orchestrator.is_running())
  133. def test_is_running_with_orchestrator_process(self):
  134. """is_running should return True when orchestrator Process exists."""
  135. from archivebox.machine.models import Process, Machine
  136. machine = Machine.current()
  137. # Create an orchestrator Process record
  138. proc = Process.objects.create(
  139. machine=machine,
  140. process_type=Process.TypeChoices.ORCHESTRATOR,
  141. status=Process.StatusChoices.RUNNING,
  142. pid=os.getpid(), # Use current PID so it appears alive
  143. started_at=timezone.now(),
  144. cmd=['archivebox', 'manage', 'orchestrator'],
  145. )
  146. try:
  147. # Should detect running orchestrator
  148. self.assertTrue(Orchestrator.is_running())
  149. finally:
  150. # Clean up
  151. proc.status = Process.StatusChoices.EXITED
  152. proc.save()
  153. def test_orchestrator_uses_process_for_is_running(self):
  154. """Orchestrator.is_running should use Process.get_running_count."""
  155. from archivebox.machine.models import Process
  156. # Verify is_running uses Process model, not pid files
  157. with patch.object(Process, 'get_running_count') as mock_count:
  158. mock_count.return_value = 1
  159. result = Orchestrator.is_running()
  160. # Should have called Process.get_running_count with orchestrator type
  161. mock_count.assert_called()
  162. self.assertTrue(result)
  163. def test_orchestrator_scoped_worker_count(self):
  164. """Orchestrator with crawl_id should count only descendant workers."""
  165. import time
  166. from archivebox.machine.models import Process, Machine
  167. machine = Machine.current()
  168. orchestrator = Orchestrator(exit_on_idle=True, crawl_id='test-crawl')
  169. orchestrator.db_process = Process.objects.create(
  170. machine=machine,
  171. process_type=Process.TypeChoices.ORCHESTRATOR,
  172. status=Process.StatusChoices.RUNNING,
  173. pid=12345,
  174. started_at=timezone.now(),
  175. )
  176. # Prevent cleanup from marking fake PIDs as exited
  177. orchestrator._last_cleanup_time = time.time()
  178. Process.objects.create(
  179. machine=machine,
  180. process_type=Process.TypeChoices.WORKER,
  181. worker_type='crawl',
  182. status=Process.StatusChoices.RUNNING,
  183. pid=12346,
  184. parent=orchestrator.db_process,
  185. started_at=timezone.now(),
  186. )
  187. Process.objects.create(
  188. machine=machine,
  189. process_type=Process.TypeChoices.WORKER,
  190. worker_type='crawl',
  191. status=Process.StatusChoices.RUNNING,
  192. pid=12347,
  193. started_at=timezone.now(),
  194. )
  195. self.assertEqual(orchestrator.get_total_worker_count(), 1)
  196. class TestProcessBasedWorkerTracking(TestCase):
  197. """Test Process model methods that replace pid_utils functionality."""
  198. def setUp(self):
  199. """Reset caches."""
  200. import archivebox.machine.models as models
  201. models._CURRENT_MACHINE = None
  202. models._CURRENT_PROCESS = None
  203. def test_process_current_creates_record(self):
  204. """Process.current() should create a Process record for current PID."""
  205. from archivebox.machine.models import Process
  206. proc = Process.current()
  207. self.assertIsNotNone(proc)
  208. self.assertEqual(proc.pid, os.getpid())
  209. self.assertEqual(proc.status, Process.StatusChoices.RUNNING)
  210. self.assertIsNotNone(proc.machine)
  211. self.assertIsNotNone(proc.started_at)
  212. def test_process_current_caches_result(self):
  213. """Process.current() should return cached Process within interval."""
  214. from archivebox.machine.models import Process
  215. proc1 = Process.current()
  216. proc2 = Process.current()
  217. self.assertEqual(proc1.id, proc2.id)
  218. def test_process_get_running_count(self):
  219. """Process.get_running_count should count running processes by type."""
  220. from archivebox.machine.models import Process, Machine
  221. machine = Machine.current()
  222. # Create some worker processes
  223. for i in range(3):
  224. Process.objects.create(
  225. machine=machine,
  226. process_type=Process.TypeChoices.WORKER,
  227. status=Process.StatusChoices.RUNNING,
  228. pid=99990 + i, # Fake PIDs
  229. started_at=timezone.now(),
  230. )
  231. count = Process.get_running_count(process_type=Process.TypeChoices.WORKER)
  232. self.assertGreaterEqual(count, 3)
  233. def test_process_get_next_worker_id(self):
  234. """Process.get_next_worker_id should return count of running workers."""
  235. from archivebox.machine.models import Process, Machine
  236. machine = Machine.current()
  237. # Create 2 worker processes
  238. for i in range(2):
  239. Process.objects.create(
  240. machine=machine,
  241. process_type=Process.TypeChoices.WORKER,
  242. status=Process.StatusChoices.RUNNING,
  243. pid=99980 + i,
  244. started_at=timezone.now(),
  245. )
  246. next_id = Process.get_next_worker_id(process_type=Process.TypeChoices.WORKER)
  247. self.assertGreaterEqual(next_id, 2)
  248. def test_process_cleanup_stale_running(self):
  249. """Process.cleanup_stale_running should mark stale processes as exited."""
  250. from archivebox.machine.models import Process, Machine, PID_REUSE_WINDOW
  251. machine = Machine.current()
  252. # Create a stale process (old started_at, fake PID)
  253. stale_proc = Process.objects.create(
  254. machine=machine,
  255. process_type=Process.TypeChoices.WORKER,
  256. status=Process.StatusChoices.RUNNING,
  257. pid=999999, # Fake PID that doesn't exist
  258. started_at=timezone.now() - PID_REUSE_WINDOW - timedelta(hours=1),
  259. )
  260. cleaned = Process.cleanup_stale_running()
  261. self.assertGreaterEqual(cleaned, 1)
  262. stale_proc.refresh_from_db()
  263. self.assertEqual(stale_proc.status, Process.StatusChoices.EXITED)
  264. def test_process_get_running(self):
  265. """Process.get_running should return queryset of running processes."""
  266. from archivebox.machine.models import Process, Machine
  267. machine = Machine.current()
  268. # Create a running process
  269. proc = Process.objects.create(
  270. machine=machine,
  271. process_type=Process.TypeChoices.HOOK,
  272. status=Process.StatusChoices.RUNNING,
  273. pid=99970,
  274. started_at=timezone.now(),
  275. )
  276. running = Process.get_running(process_type=Process.TypeChoices.HOOK)
  277. self.assertIn(proc, running)
  278. def test_process_type_detection(self):
  279. """Process._detect_process_type should detect process type from argv."""
  280. from archivebox.machine.models import Process
  281. # Test detection logic
  282. with patch('sys.argv', ['archivebox', 'manage', 'orchestrator']):
  283. result = Process._detect_process_type()
  284. self.assertEqual(result, Process.TypeChoices.ORCHESTRATOR)
  285. with patch('sys.argv', ['archivebox', 'add', 'http://example.com']):
  286. result = Process._detect_process_type()
  287. self.assertEqual(result, Process.TypeChoices.CLI)
  288. with patch('sys.argv', ['supervisord', '-c', 'config.ini']):
  289. result = Process._detect_process_type()
  290. self.assertEqual(result, Process.TypeChoices.SUPERVISORD)
  291. class TestProcessLifecycle(TestCase):
  292. """Test Process model lifecycle methods."""
  293. def setUp(self):
  294. """Reset caches and create a machine."""
  295. import archivebox.machine.models as models
  296. models._CURRENT_MACHINE = None
  297. models._CURRENT_PROCESS = None
  298. self.machine = models.Machine.current()
  299. def test_process_is_running_property(self):
  300. """Process.is_running should check actual OS process."""
  301. from archivebox.machine.models import Process
  302. # Create a process with current PID (should be running)
  303. proc = Process.objects.create(
  304. machine=self.machine,
  305. status=Process.StatusChoices.RUNNING,
  306. pid=os.getpid(),
  307. started_at=timezone.now(),
  308. )
  309. # Should be running (current process exists)
  310. self.assertTrue(proc.is_running)
  311. # Create a process with fake PID
  312. fake_proc = Process.objects.create(
  313. machine=self.machine,
  314. status=Process.StatusChoices.RUNNING,
  315. pid=999999,
  316. started_at=timezone.now(),
  317. )
  318. # Should not be running (PID doesn't exist)
  319. self.assertFalse(fake_proc.is_running)
  320. def test_process_poll(self):
  321. """Process.poll should check and update exit status."""
  322. from archivebox.machine.models import Process
  323. # Create a process with fake PID (already exited)
  324. proc = Process.objects.create(
  325. machine=self.machine,
  326. status=Process.StatusChoices.RUNNING,
  327. pid=999999,
  328. started_at=timezone.now(),
  329. )
  330. exit_code = proc.poll()
  331. # Should have detected exit and updated status
  332. self.assertIsNotNone(exit_code)
  333. proc.refresh_from_db()
  334. self.assertEqual(proc.status, Process.StatusChoices.EXITED)
  335. def test_process_terminate_already_dead(self):
  336. """Process.terminate should handle already-dead processes."""
  337. from archivebox.machine.models import Process
  338. # Create a process with fake PID
  339. proc = Process.objects.create(
  340. machine=self.machine,
  341. status=Process.StatusChoices.RUNNING,
  342. pid=999999,
  343. started_at=timezone.now(),
  344. )
  345. result = proc.terminate()
  346. # Should return False (was already dead)
  347. self.assertFalse(result)
  348. proc.refresh_from_db()
  349. self.assertEqual(proc.status, Process.StatusChoices.EXITED)
  350. def test_process_tree_traversal(self):
  351. """Process parent/children relationships should work."""
  352. from archivebox.machine.models import Process
  353. # Create parent process
  354. parent = Process.objects.create(
  355. machine=self.machine,
  356. process_type=Process.TypeChoices.CLI,
  357. status=Process.StatusChoices.RUNNING,
  358. pid=1,
  359. started_at=timezone.now(),
  360. )
  361. # Create child process
  362. child = Process.objects.create(
  363. machine=self.machine,
  364. parent=parent,
  365. process_type=Process.TypeChoices.WORKER,
  366. status=Process.StatusChoices.RUNNING,
  367. pid=2,
  368. started_at=timezone.now(),
  369. )
  370. # Test relationships
  371. self.assertEqual(child.parent, parent)
  372. self.assertIn(child, parent.children.all())
  373. self.assertEqual(child.root, parent)
  374. self.assertEqual(child.depth, 1)
  375. self.assertEqual(parent.depth, 0)
  376. if __name__ == '__main__':
  377. pytest.main([__file__, '-v'])