| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493 |
- """
- Unit tests for the Orchestrator and Worker classes.
- Tests cover:
- 1. Orchestrator lifecycle (startup, shutdown)
- 2. Queue polling and worker spawning
- 3. Idle detection and exit logic
- 4. Worker registration and management
- 5. Process model methods (replacing old pid_utils)
- """
- import os
- import tempfile
- import time
- from pathlib import Path
- from datetime import timedelta
- from unittest.mock import patch, MagicMock
- import pytest
- from django.test import TestCase
- from django.utils import timezone
- from archivebox.workers.orchestrator import Orchestrator
- class TestOrchestratorUnit(TestCase):
- """Unit tests for Orchestrator class (mocked dependencies)."""
- def test_orchestrator_creation(self):
- """Orchestrator should initialize with correct defaults."""
- orchestrator = Orchestrator(exit_on_idle=True)
- self.assertTrue(orchestrator.exit_on_idle)
- self.assertEqual(orchestrator.idle_count, 0)
- self.assertIsNone(orchestrator.pid_file)
- def test_orchestrator_repr(self):
- """Orchestrator __repr__ should include PID."""
- orchestrator = Orchestrator()
- repr_str = repr(orchestrator)
- self.assertIn('Orchestrator', repr_str)
- self.assertIn(str(os.getpid()), repr_str)
- def test_has_pending_work(self):
- """has_pending_work should check if any queue has items."""
- orchestrator = Orchestrator()
- self.assertFalse(orchestrator.has_pending_work({'crawl': 0, 'snapshot': 0}))
- self.assertTrue(orchestrator.has_pending_work({'crawl': 0, 'snapshot': 5}))
- self.assertTrue(orchestrator.has_pending_work({'crawl': 10, 'snapshot': 0}))
- def test_should_exit_not_exit_on_idle(self):
- """should_exit should return False when exit_on_idle is False."""
- orchestrator = Orchestrator(exit_on_idle=False)
- orchestrator.idle_count = 100
- self.assertFalse(orchestrator.should_exit({'crawl': 0}))
- def test_should_exit_pending_work(self):
- """should_exit should return False when there's pending work."""
- orchestrator = Orchestrator(exit_on_idle=True)
- orchestrator.idle_count = 100
- self.assertFalse(orchestrator.should_exit({'crawl': 5}))
- @patch.object(Orchestrator, 'has_running_workers')
- def test_should_exit_running_workers(self, mock_has_workers):
- """should_exit should return False when workers are running."""
- mock_has_workers.return_value = True
- orchestrator = Orchestrator(exit_on_idle=True)
- orchestrator.idle_count = 100
- self.assertFalse(orchestrator.should_exit({'crawl': 0}))
- @patch.object(Orchestrator, 'has_running_workers')
- @patch.object(Orchestrator, 'has_future_work')
- def test_should_exit_idle_timeout(self, mock_future, mock_workers):
- """should_exit should return True after idle timeout with no work."""
- mock_workers.return_value = False
- mock_future.return_value = False
- orchestrator = Orchestrator(exit_on_idle=True)
- orchestrator.idle_count = orchestrator.IDLE_TIMEOUT
- self.assertTrue(orchestrator.should_exit({'crawl': 0, 'snapshot': 0}))
- @patch.object(Orchestrator, 'has_running_workers')
- @patch.object(Orchestrator, 'has_future_work')
- def test_should_exit_below_idle_timeout(self, mock_future, mock_workers):
- """should_exit should return False below idle timeout."""
- mock_workers.return_value = False
- mock_future.return_value = False
- orchestrator = Orchestrator(exit_on_idle=True)
- orchestrator.idle_count = orchestrator.IDLE_TIMEOUT - 1
- self.assertFalse(orchestrator.should_exit({'crawl': 0}))
- def test_should_spawn_worker_no_queue(self):
- """should_spawn_worker should return False when queue is empty."""
- orchestrator = Orchestrator()
- # Create a mock worker class
- mock_worker = MagicMock()
- mock_worker.get_running_workers.return_value = []
- self.assertFalse(orchestrator.should_spawn_worker(mock_worker, 0))
- def test_should_spawn_worker_at_limit(self):
- """should_spawn_worker should return False when at per-type limit."""
- orchestrator = Orchestrator()
- mock_worker = MagicMock()
- mock_worker.get_running_workers.return_value = [{}] * orchestrator.MAX_WORKERS_PER_TYPE
- self.assertFalse(orchestrator.should_spawn_worker(mock_worker, 10))
- @patch.object(Orchestrator, 'get_total_worker_count')
- def test_should_spawn_worker_at_total_limit(self, mock_total):
- """should_spawn_worker should return False when at total limit."""
- orchestrator = Orchestrator()
- mock_total.return_value = orchestrator.MAX_TOTAL_WORKERS
- mock_worker = MagicMock()
- mock_worker.get_running_workers.return_value = []
- self.assertFalse(orchestrator.should_spawn_worker(mock_worker, 10))
- @patch.object(Orchestrator, 'get_total_worker_count')
- def test_should_spawn_worker_success(self, mock_total):
- """should_spawn_worker should return True when conditions are met."""
- orchestrator = Orchestrator()
- mock_total.return_value = 0
- mock_worker = MagicMock()
- mock_worker.get_running_workers.return_value = []
- mock_worker.MAX_CONCURRENT_TASKS = 5
- self.assertTrue(orchestrator.should_spawn_worker(mock_worker, 10))
- @patch.object(Orchestrator, 'get_total_worker_count')
- def test_should_spawn_worker_enough_workers(self, mock_total):
- """should_spawn_worker should return False when enough workers for queue."""
- orchestrator = Orchestrator()
- mock_total.return_value = 2
- mock_worker = MagicMock()
- mock_worker.get_running_workers.return_value = [{}] # 1 worker running
- mock_worker.MAX_CONCURRENT_TASKS = 5 # Can handle 5 items
- # Queue size (3) <= running_workers (1) * MAX_CONCURRENT_TASKS (5)
- self.assertFalse(orchestrator.should_spawn_worker(mock_worker, 3))
- class TestOrchestratorWithProcess(TestCase):
- """Test Orchestrator using Process model for tracking."""
- def setUp(self):
- """Reset process cache."""
- import archivebox.machine.models as models
- models._CURRENT_MACHINE = None
- models._CURRENT_PROCESS = None
- def test_is_running_no_orchestrator(self):
- """is_running should return False when no orchestrator process exists."""
- from archivebox.machine.models import Process
- # Clean up any stale processes first
- Process.cleanup_stale_running()
- # Mark any running orchestrators as exited for clean test state
- Process.objects.filter(
- process_type=Process.TypeChoices.ORCHESTRATOR,
- status=Process.StatusChoices.RUNNING
- ).update(status=Process.StatusChoices.EXITED)
- self.assertFalse(Orchestrator.is_running())
- def test_is_running_with_orchestrator_process(self):
- """is_running should return True when orchestrator Process exists."""
- from archivebox.machine.models import Process, Machine
- machine = Machine.current()
- # Create an orchestrator Process record
- proc = Process.objects.create(
- machine=machine,
- process_type=Process.TypeChoices.ORCHESTRATOR,
- status=Process.StatusChoices.RUNNING,
- pid=os.getpid(), # Use current PID so it appears alive
- started_at=timezone.now(),
- cmd=['archivebox', 'manage', 'orchestrator'],
- )
- try:
- # Should detect running orchestrator
- self.assertTrue(Orchestrator.is_running())
- finally:
- # Clean up
- proc.status = Process.StatusChoices.EXITED
- proc.save()
- def test_orchestrator_uses_process_for_is_running(self):
- """Orchestrator.is_running should use Process.get_running_count."""
- from archivebox.machine.models import Process
- # Verify is_running uses Process model, not pid files
- with patch.object(Process, 'get_running_count') as mock_count:
- mock_count.return_value = 1
- result = Orchestrator.is_running()
- # Should have called Process.get_running_count with orchestrator type
- mock_count.assert_called()
- self.assertTrue(result)
- def test_orchestrator_scoped_worker_count(self):
- """Orchestrator with crawl_id should count only descendant workers."""
- import time
- from archivebox.machine.models import Process, Machine
- machine = Machine.current()
- orchestrator = Orchestrator(exit_on_idle=True, crawl_id='test-crawl')
- orchestrator.db_process = Process.objects.create(
- machine=machine,
- process_type=Process.TypeChoices.ORCHESTRATOR,
- status=Process.StatusChoices.RUNNING,
- pid=12345,
- started_at=timezone.now(),
- )
- # Prevent cleanup from marking fake PIDs as exited
- orchestrator._last_cleanup_time = time.time()
- Process.objects.create(
- machine=machine,
- process_type=Process.TypeChoices.WORKER,
- worker_type='crawl',
- status=Process.StatusChoices.RUNNING,
- pid=12346,
- parent=orchestrator.db_process,
- started_at=timezone.now(),
- )
- Process.objects.create(
- machine=machine,
- process_type=Process.TypeChoices.WORKER,
- worker_type='crawl',
- status=Process.StatusChoices.RUNNING,
- pid=12347,
- started_at=timezone.now(),
- )
- self.assertEqual(orchestrator.get_total_worker_count(), 1)
- class TestProcessBasedWorkerTracking(TestCase):
- """Test Process model methods that replace pid_utils functionality."""
- def setUp(self):
- """Reset caches."""
- import archivebox.machine.models as models
- models._CURRENT_MACHINE = None
- models._CURRENT_PROCESS = None
- def test_process_current_creates_record(self):
- """Process.current() should create a Process record for current PID."""
- from archivebox.machine.models import Process
- proc = Process.current()
- self.assertIsNotNone(proc)
- self.assertEqual(proc.pid, os.getpid())
- self.assertEqual(proc.status, Process.StatusChoices.RUNNING)
- self.assertIsNotNone(proc.machine)
- self.assertIsNotNone(proc.started_at)
- def test_process_current_caches_result(self):
- """Process.current() should return cached Process within interval."""
- from archivebox.machine.models import Process
- proc1 = Process.current()
- proc2 = Process.current()
- self.assertEqual(proc1.id, proc2.id)
- def test_process_get_running_count(self):
- """Process.get_running_count should count running processes by type."""
- from archivebox.machine.models import Process, Machine
- machine = Machine.current()
- # Create some worker processes
- for i in range(3):
- Process.objects.create(
- machine=machine,
- process_type=Process.TypeChoices.WORKER,
- status=Process.StatusChoices.RUNNING,
- pid=99990 + i, # Fake PIDs
- started_at=timezone.now(),
- )
- count = Process.get_running_count(process_type=Process.TypeChoices.WORKER)
- self.assertGreaterEqual(count, 3)
- def test_process_get_next_worker_id(self):
- """Process.get_next_worker_id should return count of running workers."""
- from archivebox.machine.models import Process, Machine
- machine = Machine.current()
- # Create 2 worker processes
- for i in range(2):
- Process.objects.create(
- machine=machine,
- process_type=Process.TypeChoices.WORKER,
- status=Process.StatusChoices.RUNNING,
- pid=99980 + i,
- started_at=timezone.now(),
- )
- next_id = Process.get_next_worker_id(process_type=Process.TypeChoices.WORKER)
- self.assertGreaterEqual(next_id, 2)
- def test_process_cleanup_stale_running(self):
- """Process.cleanup_stale_running should mark stale processes as exited."""
- from archivebox.machine.models import Process, Machine, PID_REUSE_WINDOW
- machine = Machine.current()
- # Create a stale process (old started_at, fake PID)
- stale_proc = Process.objects.create(
- machine=machine,
- process_type=Process.TypeChoices.WORKER,
- status=Process.StatusChoices.RUNNING,
- pid=999999, # Fake PID that doesn't exist
- started_at=timezone.now() - PID_REUSE_WINDOW - timedelta(hours=1),
- )
- cleaned = Process.cleanup_stale_running()
- self.assertGreaterEqual(cleaned, 1)
- stale_proc.refresh_from_db()
- self.assertEqual(stale_proc.status, Process.StatusChoices.EXITED)
- def test_process_get_running(self):
- """Process.get_running should return queryset of running processes."""
- from archivebox.machine.models import Process, Machine
- machine = Machine.current()
- # Create a running process
- proc = Process.objects.create(
- machine=machine,
- process_type=Process.TypeChoices.HOOK,
- status=Process.StatusChoices.RUNNING,
- pid=99970,
- started_at=timezone.now(),
- )
- running = Process.get_running(process_type=Process.TypeChoices.HOOK)
- self.assertIn(proc, running)
- def test_process_type_detection(self):
- """Process._detect_process_type should detect process type from argv."""
- from archivebox.machine.models import Process
- # Test detection logic
- with patch('sys.argv', ['archivebox', 'manage', 'orchestrator']):
- result = Process._detect_process_type()
- self.assertEqual(result, Process.TypeChoices.ORCHESTRATOR)
- with patch('sys.argv', ['archivebox', 'add', 'http://example.com']):
- result = Process._detect_process_type()
- self.assertEqual(result, Process.TypeChoices.CLI)
- with patch('sys.argv', ['supervisord', '-c', 'config.ini']):
- result = Process._detect_process_type()
- self.assertEqual(result, Process.TypeChoices.SUPERVISORD)
- class TestProcessLifecycle(TestCase):
- """Test Process model lifecycle methods."""
- def setUp(self):
- """Reset caches and create a machine."""
- import archivebox.machine.models as models
- models._CURRENT_MACHINE = None
- models._CURRENT_PROCESS = None
- self.machine = models.Machine.current()
- def test_process_is_running_property(self):
- """Process.is_running should check actual OS process."""
- from archivebox.machine.models import Process
- # Create a process with current PID (should be running)
- proc = Process.objects.create(
- machine=self.machine,
- status=Process.StatusChoices.RUNNING,
- pid=os.getpid(),
- started_at=timezone.now(),
- )
- # Should be running (current process exists)
- self.assertTrue(proc.is_running)
- # Create a process with fake PID
- fake_proc = Process.objects.create(
- machine=self.machine,
- status=Process.StatusChoices.RUNNING,
- pid=999999,
- started_at=timezone.now(),
- )
- # Should not be running (PID doesn't exist)
- self.assertFalse(fake_proc.is_running)
- def test_process_poll(self):
- """Process.poll should check and update exit status."""
- from archivebox.machine.models import Process
- # Create a process with fake PID (already exited)
- proc = Process.objects.create(
- machine=self.machine,
- status=Process.StatusChoices.RUNNING,
- pid=999999,
- started_at=timezone.now(),
- )
- exit_code = proc.poll()
- # Should have detected exit and updated status
- self.assertIsNotNone(exit_code)
- proc.refresh_from_db()
- self.assertEqual(proc.status, Process.StatusChoices.EXITED)
- def test_process_terminate_already_dead(self):
- """Process.terminate should handle already-dead processes."""
- from archivebox.machine.models import Process
- # Create a process with fake PID
- proc = Process.objects.create(
- machine=self.machine,
- status=Process.StatusChoices.RUNNING,
- pid=999999,
- started_at=timezone.now(),
- )
- result = proc.terminate()
- # Should return False (was already dead)
- self.assertFalse(result)
- proc.refresh_from_db()
- self.assertEqual(proc.status, Process.StatusChoices.EXITED)
- def test_process_tree_traversal(self):
- """Process parent/children relationships should work."""
- from archivebox.machine.models import Process
- # Create parent process
- parent = Process.objects.create(
- machine=self.machine,
- process_type=Process.TypeChoices.CLI,
- status=Process.StatusChoices.RUNNING,
- pid=1,
- started_at=timezone.now(),
- )
- # Create child process
- child = Process.objects.create(
- machine=self.machine,
- parent=parent,
- process_type=Process.TypeChoices.WORKER,
- status=Process.StatusChoices.RUNNING,
- pid=2,
- started_at=timezone.now(),
- )
- # Test relationships
- self.assertEqual(child.parent, parent)
- self.assertIn(child, parent.children.all())
- self.assertEqual(child.root, parent)
- self.assertEqual(child.depth, 1)
- self.assertEqual(parent.depth, 0)
- if __name__ == '__main__':
- pytest.main([__file__, '-v'])
|