| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770 |
- __package__ = 'archivebox.machine'
- import os
- import sys
- import socket
- from pathlib import Path
- from archivebox.uuid_compat import uuid7
- from datetime import timedelta, datetime
- from statemachine import State, registry
- from django.db import models
- from django.utils import timezone
- from django.utils.functional import cached_property
- from archivebox.base_models.models import ModelWithHealthStats
- from archivebox.workers.models import BaseStateMachine
- from .detect import get_host_guid, get_os_info, get_vm_info, get_host_network, get_host_stats
- try:
- import psutil
- PSUTIL_AVAILABLE = True
- except ImportError:
- PSUTIL_AVAILABLE = False
- _CURRENT_MACHINE = None
- _CURRENT_INTERFACE = None
- _CURRENT_BINARIES = {}
- _CURRENT_PROCESS = None
- MACHINE_RECHECK_INTERVAL = 7 * 24 * 60 * 60
- NETWORK_INTERFACE_RECHECK_INTERVAL = 1 * 60 * 60
- BINARY_RECHECK_INTERVAL = 1 * 30 * 60
- PROCESS_RECHECK_INTERVAL = 60 # Re-validate every 60 seconds
- PID_REUSE_WINDOW = timedelta(hours=24) # Max age for considering a PID match valid
- START_TIME_TOLERANCE = 5.0 # Seconds tolerance for start time matching
- class MachineManager(models.Manager):
- def current(self) -> 'Machine':
- return Machine.current()
- class Machine(ModelWithHealthStats):
- id = models.UUIDField(primary_key=True, default=uuid7, editable=False, unique=True)
- created_at = models.DateTimeField(default=timezone.now, db_index=True)
- modified_at = models.DateTimeField(auto_now=True)
- guid = models.CharField(max_length=64, default=None, null=False, unique=True, editable=False)
- hostname = models.CharField(max_length=63, default=None, null=False)
- hw_in_docker = models.BooleanField(default=False, null=False)
- hw_in_vm = models.BooleanField(default=False, null=False)
- hw_manufacturer = models.CharField(max_length=63, default=None, null=False)
- hw_product = models.CharField(max_length=63, default=None, null=False)
- hw_uuid = models.CharField(max_length=255, default=None, null=False)
- os_arch = models.CharField(max_length=15, default=None, null=False)
- os_family = models.CharField(max_length=15, default=None, null=False)
- os_platform = models.CharField(max_length=63, default=None, null=False)
- os_release = models.CharField(max_length=63, default=None, null=False)
- os_kernel = models.CharField(max_length=255, default=None, null=False)
- stats = models.JSONField(default=dict, null=True, blank=True)
- config = models.JSONField(default=dict, null=True, blank=True,
- help_text="Machine-specific config overrides (e.g., resolved binary paths like WGET_BINARY)")
- num_uses_failed = models.PositiveIntegerField(default=0)
- num_uses_succeeded = models.PositiveIntegerField(default=0)
- objects: MachineManager = MachineManager()
- networkinterface_set: models.Manager['NetworkInterface']
- class Meta:
- app_label = 'machine'
- @classmethod
- def current(cls) -> 'Machine':
- global _CURRENT_MACHINE
- if _CURRENT_MACHINE:
- if timezone.now() < _CURRENT_MACHINE.modified_at + timedelta(seconds=MACHINE_RECHECK_INTERVAL):
- return _CURRENT_MACHINE
- _CURRENT_MACHINE = None
- _CURRENT_MACHINE, _ = cls.objects.update_or_create(
- guid=get_host_guid(),
- defaults={'hostname': socket.gethostname(), **get_os_info(), **get_vm_info(), 'stats': get_host_stats()},
- )
- return _CURRENT_MACHINE
- @staticmethod
- def from_jsonl(record: dict, overrides: dict = None):
- """
- Update Machine config from JSONL record.
- Args:
- record: JSONL record with '_method': 'update', 'key': '...', 'value': '...'
- overrides: Not used
- Returns:
- Machine instance or None
- """
- method = record.get('_method')
- if method == 'update':
- key = record.get('key')
- value = record.get('value')
- if key and value:
- machine = Machine.current()
- if not machine.config:
- machine.config = {}
- machine.config[key] = value
- machine.save(update_fields=['config'])
- return machine
- return None
- class NetworkInterfaceManager(models.Manager):
- def current(self) -> 'NetworkInterface':
- return NetworkInterface.current()
- class NetworkInterface(ModelWithHealthStats):
- id = models.UUIDField(primary_key=True, default=uuid7, editable=False, unique=True)
- created_at = models.DateTimeField(default=timezone.now, db_index=True)
- modified_at = models.DateTimeField(auto_now=True)
- machine = models.ForeignKey(Machine, on_delete=models.CASCADE, default=None, null=False)
- mac_address = models.CharField(max_length=17, default=None, null=False, editable=False)
- ip_public = models.GenericIPAddressField(default=None, null=False, editable=False)
- ip_local = models.GenericIPAddressField(default=None, null=False, editable=False)
- dns_server = models.GenericIPAddressField(default=None, null=False, editable=False)
- hostname = models.CharField(max_length=63, default=None, null=False)
- iface = models.CharField(max_length=15, default=None, null=False)
- isp = models.CharField(max_length=63, default=None, null=False)
- city = models.CharField(max_length=63, default=None, null=False)
- region = models.CharField(max_length=63, default=None, null=False)
- country = models.CharField(max_length=63, default=None, null=False)
- num_uses_failed = models.PositiveIntegerField(default=0)
- num_uses_succeeded = models.PositiveIntegerField(default=0)
- objects: NetworkInterfaceManager = NetworkInterfaceManager()
- class Meta:
- app_label = 'machine'
- unique_together = (('machine', 'ip_public', 'ip_local', 'mac_address', 'dns_server'),)
- @classmethod
- def current(cls) -> 'NetworkInterface':
- global _CURRENT_INTERFACE
- if _CURRENT_INTERFACE:
- if timezone.now() < _CURRENT_INTERFACE.modified_at + timedelta(seconds=NETWORK_INTERFACE_RECHECK_INTERVAL):
- return _CURRENT_INTERFACE
- _CURRENT_INTERFACE = None
- machine = Machine.objects.current()
- net_info = get_host_network()
- _CURRENT_INTERFACE, _ = cls.objects.update_or_create(
- machine=machine, ip_public=net_info.pop('ip_public'), ip_local=net_info.pop('ip_local'),
- mac_address=net_info.pop('mac_address'), dns_server=net_info.pop('dns_server'), defaults=net_info,
- )
- return _CURRENT_INTERFACE
- class BinaryManager(models.Manager):
- def get_from_db_or_cache(self, name: str, abspath: str = '', version: str = '', sha256: str = '', binprovider: str = 'env') -> 'Binary':
- """Get or create an Binary record from the database or cache."""
- global _CURRENT_BINARIES
- cached = _CURRENT_BINARIES.get(name)
- if cached and timezone.now() < cached.modified_at + timedelta(seconds=BINARY_RECHECK_INTERVAL):
- return cached
- _CURRENT_BINARIES[name], _ = self.update_or_create(
- machine=Machine.objects.current(), name=name, binprovider=binprovider,
- version=version, abspath=abspath, sha256=sha256,
- )
- return _CURRENT_BINARIES[name]
- def get_valid_binary(self, name: str, machine: 'Machine | None' = None) -> 'Binary | None':
- """Get a valid Binary for the given name on the current machine, or None if not found."""
- machine = machine or Machine.current()
- return self.filter(
- machine=machine,
- name__iexact=name,
- ).exclude(abspath='').exclude(abspath__isnull=True).order_by('-modified_at').first()
- class Binary(ModelWithHealthStats):
- """
- Tracks an binary on a specific machine.
- Follows the unified state machine pattern:
- - queued: Binary needs to be installed
- - started: Installation in progress
- - succeeded: Binary installed successfully (abspath, version, sha256 populated)
- - failed: Installation failed
- State machine calls run() which executes on_Binary__install_* hooks
- to install the binary using the specified providers.
- """
- class StatusChoices(models.TextChoices):
- QUEUED = 'queued', 'Queued'
- STARTED = 'started', 'Started'
- SUCCEEDED = 'succeeded', 'Succeeded'
- FAILED = 'failed', 'Failed'
- id = models.UUIDField(primary_key=True, default=uuid7, editable=False, unique=True)
- created_at = models.DateTimeField(default=timezone.now, db_index=True)
- modified_at = models.DateTimeField(auto_now=True)
- machine = models.ForeignKey(Machine, on_delete=models.CASCADE, null=False)
- # Binary metadata
- name = models.CharField(max_length=63, default='', null=False, blank=True, db_index=True)
- binproviders = models.CharField(max_length=127, default='env', null=False, blank=True,
- help_text="Comma-separated list of allowed providers: apt,brew,pip,npm,env")
- overrides = models.JSONField(default=dict, blank=True,
- help_text="Provider-specific overrides: {'apt': {'packages': ['pkg']}, ...}")
- # Installation results (populated after installation)
- binprovider = models.CharField(max_length=31, default='', null=False, blank=True,
- help_text="Provider that successfully installed this binary")
- abspath = models.CharField(max_length=255, default='', null=False, blank=True)
- version = models.CharField(max_length=32, default='', null=False, blank=True)
- sha256 = models.CharField(max_length=64, default='', null=False, blank=True)
- # State machine fields
- status = models.CharField(max_length=16, choices=StatusChoices.choices, default=StatusChoices.QUEUED, db_index=True)
- retry_at = models.DateTimeField(default=timezone.now, null=True, blank=True, db_index=True,
- help_text="When to retry this binary installation")
- output_dir = models.CharField(max_length=255, default='', null=False, blank=True,
- help_text="Directory where installation hook logs are stored")
- # Health stats
- num_uses_failed = models.PositiveIntegerField(default=0)
- num_uses_succeeded = models.PositiveIntegerField(default=0)
- state_machine_name: str = 'archivebox.machine.models.BinaryMachine'
- objects: BinaryManager = BinaryManager()
- class Meta:
- app_label = 'machine'
- verbose_name = 'Binary'
- verbose_name_plural = 'Binaries'
- unique_together = (('machine', 'name', 'abspath', 'version', 'sha256'),)
- def __str__(self) -> str:
- return f'{self.name}@{self.binprovider}+{self.abspath}@{self.version}'
- @property
- def is_valid(self) -> bool:
- """A binary is valid if it has both abspath and version set."""
- return bool(self.abspath) and bool(self.version)
- @cached_property
- def binary_info(self) -> dict:
- """Return info about the binary."""
- return {
- 'name': self.name,
- 'abspath': self.abspath,
- 'version': self.version,
- 'binprovider': self.binprovider,
- 'is_valid': self.is_valid,
- }
- def to_jsonl(self) -> dict:
- """
- Convert Binary model instance to a JSONL record.
- """
- from archivebox.config import VERSION
- return {
- 'type': 'Binary',
- 'schema_version': VERSION,
- 'id': str(self.id),
- 'machine_id': str(self.machine_id),
- 'name': self.name,
- 'binprovider': self.binprovider,
- 'abspath': self.abspath,
- 'version': self.version,
- 'sha256': self.sha256,
- 'status': self.status,
- }
- @staticmethod
- def from_jsonl(record: dict, overrides: dict = None):
- """
- Create/update Binary from JSONL record.
- Handles two cases:
- 1. From binaries.jsonl: creates queued binary with name, binproviders, overrides
- 2. From hook output: updates binary with abspath, version, sha256, binprovider
- Args:
- record: JSONL record with 'name' and either:
- - 'binproviders', 'overrides' (from binaries.jsonl)
- - 'abspath', 'version', 'sha256', 'binprovider' (from hook output)
- overrides: Not used
- Returns:
- Binary instance or None
- """
- name = record.get('name')
- if not name:
- return None
- machine = Machine.current()
- overrides = overrides or {}
- # Case 1: From binaries.jsonl - create queued binary
- if 'binproviders' in record or ('overrides' in record and not record.get('abspath')):
- binary, created = Binary.objects.get_or_create(
- machine=machine,
- name=name,
- defaults={
- 'binproviders': record.get('binproviders', 'env'),
- 'overrides': record.get('overrides', {}),
- 'status': Binary.StatusChoices.QUEUED,
- 'retry_at': timezone.now(),
- }
- )
- return binary
- # Case 2: From hook output - update with installation results
- abspath = record.get('abspath')
- version = record.get('version')
- if not abspath or not version:
- return None
- binary, _ = Binary.objects.update_or_create(
- machine=machine,
- name=name,
- defaults={
- 'abspath': abspath,
- 'version': version,
- 'sha256': record.get('sha256', ''),
- 'binprovider': record.get('binprovider', 'env'),
- 'status': Binary.StatusChoices.SUCCEEDED,
- 'retry_at': None,
- }
- )
- return binary
- @property
- def OUTPUT_DIR(self):
- """Return the output directory for this binary installation."""
- from pathlib import Path
- from django.conf import settings
- DATA_DIR = getattr(settings, 'DATA_DIR', Path.cwd())
- return Path(DATA_DIR) / 'machines' / str(self.machine_id) / 'binaries' / self.name / str(self.id)
- def update_and_requeue(self, **kwargs):
- """
- Update binary fields and requeue for worker state machine.
- Sets modified_at to ensure workers pick up changes.
- Always saves the model after updating.
- """
- for key, value in kwargs.items():
- setattr(self, key, value)
- self.modified_at = timezone.now()
- self.save()
- def run(self):
- """
- Execute binary installation by running on_Binary__install_* hooks.
- Called by BinaryMachine when entering 'started' state.
- Runs ALL on_Binary__install_* hooks - each hook checks binproviders
- and decides if it can handle this binary. First hook to succeed wins.
- Updates status to SUCCEEDED or FAILED based on hook output.
- """
- import json
- from archivebox.hooks import discover_hooks, run_hook
- from archivebox.config.configset import get_config
- # Get merged config (Binary doesn't have crawl/snapshot context)
- config = get_config(scope='global')
- # Create output directory
- output_dir = self.OUTPUT_DIR
- output_dir.mkdir(parents=True, exist_ok=True)
- self.output_dir = str(output_dir)
- self.save()
- # Discover ALL on_Binary__install_* hooks
- hooks = discover_hooks('Binary', config=config)
- if not hooks:
- self.status = self.StatusChoices.FAILED
- self.save()
- return
- # Run each hook - they decide if they can handle this binary
- for hook in hooks:
- plugin_name = hook.parent.name
- plugin_output_dir = output_dir / plugin_name
- plugin_output_dir.mkdir(parents=True, exist_ok=True)
- # Build kwargs for hook
- hook_kwargs = {
- 'binary_id': str(self.id),
- 'machine_id': str(self.machine_id),
- 'name': self.name,
- 'binproviders': self.binproviders,
- }
- # Add overrides as JSON string if present
- if self.overrides:
- hook_kwargs['overrides'] = json.dumps(self.overrides)
- # Run the hook
- result = run_hook(
- hook,
- output_dir=plugin_output_dir,
- config=config,
- timeout=600, # 10 min timeout for binary installation
- **hook_kwargs
- )
- # Background hook (unlikely for binary installation, but handle it)
- if result is None:
- continue
- # Failed or skipped hook - try next one
- if result['returncode'] != 0:
- continue
- # Parse JSONL output to check for successful installation
- stdout_file = plugin_output_dir / 'stdout.log'
- if stdout_file.exists():
- stdout = stdout_file.read_text()
- for line in stdout.splitlines():
- if line.strip() and line.strip().startswith('{'):
- try:
- record = json.loads(line)
- if record.get('type') == 'Binary' and record.get('abspath'):
- # Update self from successful installation
- self.abspath = record['abspath']
- self.version = record.get('version', '')
- self.sha256 = record.get('sha256', '')
- self.binprovider = record.get('binprovider', 'env')
- self.status = self.StatusChoices.SUCCEEDED
- self.save()
- return
- except json.JSONDecodeError:
- continue
- # No hook succeeded
- self.status = self.StatusChoices.FAILED
- self.save()
- def cleanup(self):
- """
- Clean up background binary installation hooks.
- Called by state machine if needed (not typically used for binaries
- since installations are foreground, but included for consistency).
- """
- from pathlib import Path
- from archivebox.misc.process_utils import safe_kill_process
- output_dir = self.OUTPUT_DIR
- if not output_dir.exists():
- return
- # Kill any background hooks
- for plugin_dir in output_dir.iterdir():
- if not plugin_dir.is_dir():
- continue
- pid_file = plugin_dir / 'hook.pid'
- cmd_file = plugin_dir / 'cmd.sh'
- safe_kill_process(pid_file, cmd_file)
- # =============================================================================
- # Process Model
- # =============================================================================
- class ProcessManager(models.Manager):
- """Manager for Process model."""
- def current(self) -> 'Process':
- """Get the Process record for the current OS process."""
- return Process.current()
- def get_by_pid(self, pid: int, machine: 'Machine' = None) -> 'Process | None':
- """
- Find a Process by PID with proper validation against PID reuse.
- IMPORTANT: PIDs are reused by the OS! This method:
- 1. Filters by machine (required - PIDs are only unique per machine)
- 2. Filters by time window (processes older than 24h are stale)
- 3. Validates via psutil that start times match
- Args:
- pid: OS process ID
- machine: Machine instance (defaults to current machine)
- Returns:
- Process if found and validated, None otherwise
- """
- if not PSUTIL_AVAILABLE:
- return None
- machine = machine or Machine.current()
- # Get the actual process start time from OS
- try:
- os_proc = psutil.Process(pid)
- os_start_time = os_proc.create_time()
- except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess):
- # Process doesn't exist - any DB record with this PID is stale
- return None
- # Query candidates: same machine, same PID, recent, still RUNNING
- candidates = self.filter(
- machine=machine,
- pid=pid,
- status=Process.StatusChoices.RUNNING,
- started_at__gte=timezone.now() - PID_REUSE_WINDOW,
- ).order_by('-started_at')
- for candidate in candidates:
- # Validate start time matches (within tolerance)
- if candidate.started_at:
- db_start_time = candidate.started_at.timestamp()
- if abs(db_start_time - os_start_time) < START_TIME_TOLERANCE:
- return candidate
- return None
- def create_for_archiveresult(self, archiveresult, **kwargs):
- """
- Create a Process record for an ArchiveResult.
- Called during migration and when creating new ArchiveResults.
- """
- # Defaults from ArchiveResult if not provided
- defaults = {
- 'machine': Machine.current(),
- 'pwd': kwargs.get('pwd') or str(archiveresult.snapshot.output_dir / archiveresult.plugin),
- 'cmd': kwargs.get('cmd') or [],
- 'status': 'queued',
- 'timeout': kwargs.get('timeout', 120),
- 'env': kwargs.get('env', {}),
- }
- defaults.update(kwargs)
- process = self.create(**defaults)
- return process
- class Process(models.Model):
- """
- Tracks a single OS process execution.
- Process represents the actual subprocess spawned to execute a hook.
- One Process can optionally be associated with an ArchiveResult (via OneToOne),
- but Process can also exist standalone for internal operations.
- Follows the unified state machine pattern:
- - queued: Process ready to launch
- - running: Process actively executing
- - exited: Process completed (check exit_code for success/failure)
- State machine calls launch() to spawn the process and monitors its lifecycle.
- """
- class StatusChoices(models.TextChoices):
- QUEUED = 'queued', 'Queued'
- RUNNING = 'running', 'Running'
- EXITED = 'exited', 'Exited'
- class TypeChoices(models.TextChoices):
- CLI = 'cli', 'CLI Command'
- SUPERVISORD = 'supervisord', 'Supervisord Daemon'
- ORCHESTRATOR = 'orchestrator', 'Orchestrator'
- WORKER = 'worker', 'Worker Process'
- HOOK = 'hook', 'Hook Script'
- BINARY = 'binary', 'Binary Execution'
- # Primary fields
- id = models.UUIDField(primary_key=True, default=uuid7, editable=False, unique=True)
- created_at = models.DateTimeField(default=timezone.now, db_index=True)
- modified_at = models.DateTimeField(auto_now=True)
- # Parent process FK for hierarchy tracking
- parent = models.ForeignKey(
- 'self',
- on_delete=models.SET_NULL,
- null=True,
- blank=True,
- related_name='children',
- help_text='Parent process that spawned this one'
- )
- # Process type for distinguishing in hierarchy
- process_type = models.CharField(
- max_length=16,
- choices=TypeChoices.choices,
- default=TypeChoices.BINARY,
- db_index=True,
- help_text='Type of process in the execution hierarchy'
- )
- # Machine FK - required (every process runs on a machine)
- machine = models.ForeignKey(
- Machine,
- on_delete=models.CASCADE,
- null=False,
- related_name='process_set',
- help_text='Machine where this process executed'
- )
- # Execution metadata
- pwd = models.CharField(max_length=512, default='', null=False, blank=True,
- help_text='Working directory for process execution')
- cmd = models.JSONField(default=list, null=False, blank=True,
- help_text='Command as array of arguments')
- env = models.JSONField(default=dict, null=False, blank=True,
- help_text='Environment variables for process')
- timeout = models.IntegerField(default=120, null=False,
- help_text='Timeout in seconds')
- # Process results
- pid = models.IntegerField(default=None, null=True, blank=True,
- help_text='OS process ID')
- exit_code = models.IntegerField(default=None, null=True, blank=True,
- help_text='Process exit code (0 = success)')
- stdout = models.TextField(default='', null=False, blank=True,
- help_text='Standard output from process')
- stderr = models.TextField(default='', null=False, blank=True,
- help_text='Standard error from process')
- # Timing
- started_at = models.DateTimeField(default=None, null=True, blank=True,
- help_text='When process was launched')
- ended_at = models.DateTimeField(default=None, null=True, blank=True,
- help_text='When process completed/terminated')
- # Optional FKs
- binary = models.ForeignKey(
- Binary,
- on_delete=models.SET_NULL,
- null=True, blank=True,
- related_name='process_set',
- help_text='Binary used by this process'
- )
- iface = models.ForeignKey(
- NetworkInterface,
- on_delete=models.SET_NULL,
- null=True, blank=True,
- related_name='process_set',
- help_text='Network interface used by this process'
- )
- # Optional connection URL (for CDP, sonic, etc.)
- url = models.URLField(max_length=2048, default=None, null=True, blank=True,
- help_text='Connection URL (CDP endpoint, sonic server, etc.)')
- # Reverse relation to ArchiveResult (OneToOne from AR side)
- # archiveresult: OneToOneField defined on ArchiveResult model
- # State machine fields
- status = models.CharField(
- max_length=16,
- choices=StatusChoices.choices,
- default=StatusChoices.QUEUED,
- db_index=True
- )
- retry_at = models.DateTimeField(
- default=timezone.now,
- null=True, blank=True,
- db_index=True,
- help_text='When to retry this process'
- )
- state_machine_name: str = 'archivebox.machine.models.ProcessMachine'
- objects: ProcessManager = ProcessManager()
- class Meta:
- app_label = 'machine'
- verbose_name = 'Process'
- verbose_name_plural = 'Processes'
- indexes = [
- models.Index(fields=['machine', 'status', 'retry_at']),
- models.Index(fields=['binary', 'exit_code']),
- models.Index(fields=['parent', 'status']),
- models.Index(fields=['machine', 'pid', 'started_at']),
- ]
- def __str__(self) -> str:
- cmd_str = ' '.join(self.cmd[:3]) if self.cmd else '(no cmd)'
- return f'Process[{self.id}] {cmd_str} ({self.status})'
- # Properties that delegate to related objects
- @property
- def cmd_version(self) -> str:
- """Get version from associated binary."""
- return self.binary.version if self.binary else ''
- @property
- def bin_abspath(self) -> str:
- """Get absolute path from associated binary."""
- return self.binary.abspath if self.binary else ''
- @property
- def plugin(self) -> str:
- """Get plugin name from associated ArchiveResult (if any)."""
- if hasattr(self, 'archiveresult'):
- # Inline import to avoid circular dependency
- return self.archiveresult.plugin
- return ''
- @property
- def hook_name(self) -> str:
- """Get hook name from associated ArchiveResult (if any)."""
- if hasattr(self, 'archiveresult'):
- return self.archiveresult.hook_name
- return ''
- def to_jsonl(self) -> dict:
- """
- Convert Process model instance to a JSONL record.
- """
- from archivebox.config import VERSION
- record = {
- 'type': 'Process',
- 'schema_version': VERSION,
- 'id': str(self.id),
- 'machine_id': str(self.machine_id),
- 'cmd': self.cmd,
- 'pwd': self.pwd,
- 'status': self.status,
- 'exit_code': self.exit_code,
- 'started_at': self.started_at.isoformat() if self.started_at else None,
- 'ended_at': self.ended_at.isoformat() if self.ended_at else None,
- }
- # Include optional fields if set
- if self.binary_id:
- record['binary_id'] = str(self.binary_id)
- if self.pid:
- record['pid'] = self.pid
- if self.timeout:
- record['timeout'] = self.timeout
- return record
- def update_and_requeue(self, **kwargs):
- """
- Update process fields and requeue for worker state machine.
- Sets modified_at to ensure workers pick up changes.
- """
- for key, value in kwargs.items():
- setattr(self, key, value)
- self.modified_at = timezone.now()
- self.save()
- # =========================================================================
- # Process.current() and hierarchy methods
- # =========================================================================
- @classmethod
- def current(cls) -> 'Process':
- """
- Get or create the Process record for the current OS process.
- Similar to Machine.current(), this:
- 1. Checks cache for existing Process with matching PID
- 2. Validates the cached Process is still valid (PID not reused)
- 3. Creates new Process if needed
- IMPORTANT: Uses psutil to validate PID hasn't been reused.
- PIDs are recycled by OS, so we compare start times.
- """
- global _CURRENT_PROCESS
- current_pid = os.getpid()
- machine = Machine.current()
- # Check cache validity
- if _CURRENT_PROCESS:
- # Verify: same PID, same machine, cache not expired
- if (_CURRENT_PROCESS.pid == current_pid and
- _CURRENT_PROCESS.machine_id == machine.id and
- timezone.now() < _CURRENT_PROCESS.modified_at + timedelta(seconds=PROCESS_RECHECK_INTERVAL)):
- return _CURRENT_PROCESS
- _CURRENT_PROCESS = None
- # Get actual process start time from OS for validation
- os_start_time = None
- if PSUTIL_AVAILABLE:
- try:
- os_proc = psutil.Process(current_pid)
- os_start_time = os_proc.create_time()
- except (psutil.NoSuchProcess, psutil.AccessDenied):
- pass
- # Try to find existing Process for this PID on this machine
- # Filter by: machine + PID + RUNNING + recent + start time matches
- if os_start_time:
- existing = cls.objects.filter(
- machine=machine,
- pid=current_pid,
- status=cls.StatusChoices.RUNNING,
- started_at__gte=timezone.now() - PID_REUSE_WINDOW,
- ).order_by('-started_at').first()
- if existing and existing.started_at:
- db_start_time = existing.started_at.timestamp()
- if abs(db_start_time - os_start_time) < START_TIME_TOLERANCE:
- _CURRENT_PROCESS = existing
- return existing
- # No valid existing record - create new one
- parent = cls._find_parent_process(machine)
- process_type = cls._detect_process_type()
- # Use psutil cmdline if available (matches what proc() will validate against)
- # Otherwise fall back to sys.argv
- cmd = sys.argv
- if PSUTIL_AVAILABLE:
- try:
- os_proc = psutil.Process(current_pid)
- cmd = os_proc.cmdline()
- except (psutil.NoSuchProcess, psutil.AccessDenied):
- pass
- # Use psutil start time if available (more accurate than timezone.now())
- if os_start_time:
- started_at = datetime.fromtimestamp(os_start_time, tz=timezone.get_current_timezone())
- else:
- started_at = timezone.now()
- _CURRENT_PROCESS = cls.objects.create(
- machine=machine,
- parent=parent,
- process_type=process_type,
- cmd=cmd,
- pwd=os.getcwd(),
- pid=current_pid,
- started_at=started_at,
- status=cls.StatusChoices.RUNNING,
- )
- return _CURRENT_PROCESS
- @classmethod
- def _find_parent_process(cls, machine: 'Machine' = None) -> 'Process | None':
- """
- Find the parent Process record by looking up PPID.
- IMPORTANT: Validates against PID reuse by checking:
- 1. Same machine (PIDs are only unique per machine)
- 2. Start time matches OS process start time
- 3. Process is still RUNNING and recent
- Returns None if parent is not an ArchiveBox process.
- """
- if not PSUTIL_AVAILABLE:
- return None
- ppid = os.getppid()
- machine = machine or Machine.current()
- # Get parent process start time from OS
- try:
- os_parent = psutil.Process(ppid)
- os_parent_start = os_parent.create_time()
- except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess):
- return None # Parent process doesn't exist
- # Find matching Process record
- candidates = cls.objects.filter(
- machine=machine,
- pid=ppid,
- status=cls.StatusChoices.RUNNING,
- started_at__gte=timezone.now() - PID_REUSE_WINDOW,
- ).order_by('-started_at')
- for candidate in candidates:
- if candidate.started_at:
- db_start_time = candidate.started_at.timestamp()
- if abs(db_start_time - os_parent_start) < START_TIME_TOLERANCE:
- return candidate
- return None # No matching ArchiveBox parent process
- @classmethod
- def _detect_process_type(cls) -> str:
- """
- Detect the type of the current process from sys.argv.
- """
- argv_str = ' '.join(sys.argv).lower()
- if 'supervisord' in argv_str:
- return cls.TypeChoices.SUPERVISORD
- elif 'orchestrator' in argv_str:
- return cls.TypeChoices.ORCHESTRATOR
- elif any(w in argv_str for w in ['crawl_worker', 'snapshot_worker', 'archiveresult_worker']):
- return cls.TypeChoices.WORKER
- elif 'archivebox' in argv_str:
- return cls.TypeChoices.CLI
- else:
- return cls.TypeChoices.BINARY
- @classmethod
- def cleanup_stale_running(cls, machine: 'Machine' = None) -> int:
- """
- Mark stale RUNNING processes as EXITED.
- Processes are stale if:
- - Status is RUNNING but OS process no longer exists
- - Status is RUNNING but started_at is older than PID_REUSE_WINDOW
- Returns count of processes cleaned up.
- """
- machine = machine or Machine.current()
- cleaned = 0
- stale = cls.objects.filter(
- machine=machine,
- status=cls.StatusChoices.RUNNING,
- )
- for proc in stale:
- is_stale = False
- # Check if too old (PID definitely reused)
- if proc.started_at and proc.started_at < timezone.now() - PID_REUSE_WINDOW:
- is_stale = True
- elif PSUTIL_AVAILABLE and proc.pid is not None:
- # Check if OS process still exists with matching start time
- try:
- os_proc = psutil.Process(proc.pid)
- if proc.started_at:
- db_start = proc.started_at.timestamp()
- os_start = os_proc.create_time()
- if abs(db_start - os_start) > START_TIME_TOLERANCE:
- is_stale = True # PID reused by different process
- except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess):
- is_stale = True # Process no longer exists
- if is_stale:
- proc.status = cls.StatusChoices.EXITED
- proc.ended_at = proc.ended_at or timezone.now()
- proc.exit_code = proc.exit_code if proc.exit_code is not None else -1
- proc.save(update_fields=['status', 'ended_at', 'exit_code'])
- cleaned += 1
- return cleaned
- # =========================================================================
- # Tree traversal properties
- # =========================================================================
- @property
- def root(self) -> 'Process':
- """Get the root process (CLI command) of this hierarchy."""
- proc = self
- while proc.parent_id:
- proc = proc.parent
- return proc
- @property
- def ancestors(self) -> list['Process']:
- """Get all ancestor processes from parent to root."""
- ancestors = []
- proc = self.parent
- while proc:
- ancestors.append(proc)
- proc = proc.parent
- return ancestors
- @property
- def depth(self) -> int:
- """Get depth in the process tree (0 = root)."""
- return len(self.ancestors)
- def get_descendants(self, include_self: bool = False):
- """Get all descendant processes recursively."""
- if include_self:
- pks = [self.pk]
- else:
- pks = []
- children = list(self.children.values_list('pk', flat=True))
- while children:
- pks.extend(children)
- children = list(Process.objects.filter(parent_id__in=children).values_list('pk', flat=True))
- return Process.objects.filter(pk__in=pks)
- # =========================================================================
- # Validated psutil access via .proc property
- # =========================================================================
- @property
- def proc(self) -> 'psutil.Process | None':
- """
- Get validated psutil.Process for this record.
- Returns psutil.Process ONLY if:
- 1. Process with this PID exists in OS
- 2. OS process start time matches our started_at (within tolerance)
- 3. Process is on current machine
- Returns None if:
- - PID doesn't exist (process exited)
- - PID was reused by a different process (start times don't match)
- - We're on a different machine than where process ran
- - psutil is not available
- This prevents accidentally matching a stale/recycled PID.
- """
- if not PSUTIL_AVAILABLE:
- return None
- # Can't get psutil.Process if we don't have a PID
- if not self.pid:
- return None
- # Can't validate processes on other machines
- if self.machine_id != Machine.current().id:
- return None
- try:
- os_proc = psutil.Process(self.pid)
- except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess):
- return None # Process no longer exists
- # Validate start time matches to prevent PID reuse confusion
- if self.started_at:
- os_start_time = os_proc.create_time()
- db_start_time = self.started_at.timestamp()
- if abs(os_start_time - db_start_time) > START_TIME_TOLERANCE:
- # PID has been reused by a different process!
- return None
- # Optionally validate command matches (extra safety)
- if self.cmd:
- try:
- os_cmdline = os_proc.cmdline()
- # Check if first arg (binary) matches
- if os_cmdline and self.cmd:
- os_binary = os_cmdline[0] if os_cmdline else ''
- db_binary = self.cmd[0] if self.cmd else ''
- # Match by basename (handles /usr/bin/python3 vs python3)
- if os_binary and db_binary:
- if Path(os_binary).name != Path(db_binary).name:
- return None # Different binary, PID reused
- except (psutil.AccessDenied, psutil.ZombieProcess):
- pass # Can't check cmdline, trust start time match
- return os_proc
- @property
- def is_running(self) -> bool:
- """
- Check if process is currently running via psutil.
- More reliable than checking status field since it validates
- the actual OS process exists and matches our record.
- """
- proc = self.proc
- return proc is not None and proc.is_running()
- def is_alive(self) -> bool:
- """
- Alias for is_running, for compatibility with subprocess.Popen API.
- """
- return self.is_running
- def get_memory_info(self) -> dict | None:
- """Get memory usage if process is running."""
- proc = self.proc
- if proc:
- try:
- mem = proc.memory_info()
- return {'rss': mem.rss, 'vms': mem.vms}
- except (psutil.NoSuchProcess, psutil.AccessDenied):
- pass
- return None
- def get_cpu_percent(self) -> float | None:
- """Get CPU usage percentage if process is running."""
- proc = self.proc
- if proc:
- try:
- return proc.cpu_percent(interval=0.1)
- except (psutil.NoSuchProcess, psutil.AccessDenied):
- pass
- return None
- def get_children_pids(self) -> list[int]:
- """Get PIDs of child processes from OS (not DB)."""
- proc = self.proc
- if proc:
- try:
- return [child.pid for child in proc.children(recursive=True)]
- except (psutil.NoSuchProcess, psutil.AccessDenied):
- pass
- return []
- # =========================================================================
- # Lifecycle methods (launch, kill, poll, wait)
- # =========================================================================
- @property
- def pid_file(self) -> Path:
- """Path to PID file for this process."""
- return Path(self.pwd) / 'process.pid' if self.pwd else None
- @property
- def cmd_file(self) -> Path:
- """Path to cmd.sh script for this process."""
- return Path(self.pwd) / 'cmd.sh' if self.pwd else None
- @property
- def stdout_file(self) -> Path:
- """Path to stdout log."""
- return Path(self.pwd) / 'stdout.log' if self.pwd else None
- @property
- def stderr_file(self) -> Path:
- """Path to stderr log."""
- return Path(self.pwd) / 'stderr.log' if self.pwd else None
- def _write_pid_file(self) -> None:
- """Write PID file with mtime set to process start time."""
- from archivebox.misc.process_utils import write_pid_file_with_mtime
- if self.pid and self.started_at and self.pid_file:
- write_pid_file_with_mtime(
- self.pid_file,
- self.pid,
- self.started_at.timestamp()
- )
- def _write_cmd_file(self) -> None:
- """Write cmd.sh script for debugging/validation."""
- from archivebox.misc.process_utils import write_cmd_file
- if self.cmd and self.cmd_file:
- write_cmd_file(self.cmd_file, self.cmd)
- def _build_env(self) -> dict:
- """Build environment dict for subprocess, merging stored env with system."""
- env = os.environ.copy()
- env.update(self.env or {})
- return env
- def launch(self, background: bool = False) -> 'Process':
- """
- Spawn the subprocess and update this Process record.
- Args:
- background: If True, don't wait for completion (for daemons/bg hooks)
- Returns:
- self (updated with pid, started_at, etc.)
- """
- import subprocess
- import time
- # Validate pwd is set (required for output files)
- if not self.pwd:
- raise ValueError("Process.pwd must be set before calling launch()")
- # Ensure output directory exists
- Path(self.pwd).mkdir(parents=True, exist_ok=True)
- # Write cmd.sh for debugging
- self._write_cmd_file()
- stdout_path = self.stdout_file
- stderr_path = self.stderr_file
- with open(stdout_path, 'w') as out, open(stderr_path, 'w') as err:
- proc = subprocess.Popen(
- self.cmd,
- cwd=self.pwd,
- stdout=out,
- stderr=err,
- env=self._build_env(),
- )
- # Get accurate start time from psutil if available
- if PSUTIL_AVAILABLE:
- try:
- ps_proc = psutil.Process(proc.pid)
- self.started_at = datetime.fromtimestamp(
- ps_proc.create_time(),
- tz=timezone.get_current_timezone()
- )
- except (psutil.NoSuchProcess, psutil.AccessDenied):
- self.started_at = timezone.now()
- else:
- self.started_at = timezone.now()
- self.pid = proc.pid
- self.status = self.StatusChoices.RUNNING
- self.save()
- self._write_pid_file()
- if not background:
- try:
- proc.wait(timeout=self.timeout)
- self.exit_code = proc.returncode
- except subprocess.TimeoutExpired:
- proc.kill()
- proc.wait()
- self.exit_code = -1
- self.ended_at = timezone.now()
- if stdout_path.exists():
- self.stdout = stdout_path.read_text()
- if stderr_path.exists():
- self.stderr = stderr_path.read_text()
- self.status = self.StatusChoices.EXITED
- self.save()
- return self
- def kill(self, signal_num: int = 15) -> bool:
- """
- Kill this process and update status.
- Uses self.proc for safe killing - only kills if PID matches
- our recorded process (prevents killing recycled PIDs).
- Args:
- signal_num: Signal to send (default SIGTERM=15)
- Returns:
- True if killed successfully, False otherwise
- """
- # Use validated psutil.Process to ensure we're killing the right process
- proc = self.proc
- if proc is None:
- # Process doesn't exist or PID was recycled - just update status
- if self.status != self.StatusChoices.EXITED:
- self.status = self.StatusChoices.EXITED
- self.ended_at = self.ended_at or timezone.now()
- self.save()
- return False
- try:
- # Safe to kill - we validated it's our process via start time match
- proc.send_signal(signal_num)
- # Update our record
- # Use standard Unix convention: 128 + signal number
- self.exit_code = 128 + signal_num
- self.ended_at = timezone.now()
- self.status = self.StatusChoices.EXITED
- self.save()
- # Clean up PID file
- if self.pid_file and self.pid_file.exists():
- self.pid_file.unlink(missing_ok=True)
- return True
- except (psutil.NoSuchProcess, psutil.AccessDenied, ProcessLookupError):
- # Process already exited between proc check and kill
- self.status = self.StatusChoices.EXITED
- self.ended_at = self.ended_at or timezone.now()
- self.save()
- return False
- def poll(self) -> int | None:
- """
- Check if process has exited and update status if so.
- Returns:
- exit_code if exited, None if still running
- """
- if self.status == self.StatusChoices.EXITED:
- return self.exit_code
- if not self.is_running:
- # Process exited - read output and update status
- if self.stdout_file and self.stdout_file.exists():
- self.stdout = self.stdout_file.read_text()
- if self.stderr_file and self.stderr_file.exists():
- self.stderr = self.stderr_file.read_text()
- # Try to get exit code from proc or default to unknown
- self.exit_code = self.exit_code if self.exit_code is not None else -1
- self.ended_at = timezone.now()
- self.status = self.StatusChoices.EXITED
- self.save()
- return self.exit_code
- return None # Still running
- def wait(self, timeout: int | None = None) -> int:
- """
- Wait for process to exit, polling periodically.
- Args:
- timeout: Max seconds to wait (None = use self.timeout)
- Returns:
- exit_code
- Raises:
- TimeoutError if process doesn't exit in time
- """
- import time
- timeout = timeout or self.timeout
- start = time.time()
- while True:
- exit_code = self.poll()
- if exit_code is not None:
- return exit_code
- if time.time() - start > timeout:
- raise TimeoutError(f"Process {self.id} did not exit within {timeout}s")
- time.sleep(0.1)
- def terminate(self, graceful_timeout: float = 5.0) -> bool:
- """
- Gracefully terminate process: SIGTERM → wait → SIGKILL.
- This consolidates the scattered SIGTERM/SIGKILL logic from:
- - crawls/models.py Crawl.cleanup()
- - workers/pid_utils.py stop_worker()
- - supervisord_util.py stop_existing_supervisord_process()
- Args:
- graceful_timeout: Seconds to wait after SIGTERM before SIGKILL
- Returns:
- True if process was terminated, False if already dead
- """
- import time
- import signal
- proc = self.proc
- if proc is None:
- # Already dead - just update status
- if self.status != self.StatusChoices.EXITED:
- self.status = self.StatusChoices.EXITED
- self.ended_at = self.ended_at or timezone.now()
- self.save()
- return False
- try:
- # Step 1: Send SIGTERM for graceful shutdown
- proc.terminate()
- # Step 2: Wait for graceful exit
- try:
- exit_status = proc.wait(timeout=graceful_timeout)
- # Process exited gracefully
- # psutil.Process.wait() returns the exit status
- self.exit_code = exit_status if exit_status is not None else 0
- self.status = self.StatusChoices.EXITED
- self.ended_at = timezone.now()
- self.save()
- return True
- except psutil.TimeoutExpired:
- pass # Still running, need to force kill
- # Step 3: Force kill with SIGKILL
- proc.kill()
- proc.wait(timeout=2)
- # Use standard Unix convention: 128 + signal number
- self.exit_code = 128 + signal.SIGKILL
- self.status = self.StatusChoices.EXITED
- self.ended_at = timezone.now()
- self.save()
- return True
- except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess):
- # Process already dead
- self.status = self.StatusChoices.EXITED
- self.ended_at = self.ended_at or timezone.now()
- self.save()
- return False
- def kill_tree(self, graceful_timeout: float = 2.0) -> int:
- """
- Kill this process and all its children (OS children, not DB children).
- This consolidates the scattered child-killing logic from:
- - crawls/models.py Crawl.cleanup() os.killpg()
- - supervisord_util.py stop_existing_supervisord_process()
- Args:
- graceful_timeout: Seconds to wait after SIGTERM before SIGKILL
- Returns:
- Number of processes killed (including self)
- """
- import signal
- killed_count = 0
- proc = self.proc
- if proc is None:
- # Already dead
- if self.status != self.StatusChoices.EXITED:
- self.status = self.StatusChoices.EXITED
- self.ended_at = self.ended_at or timezone.now()
- self.save()
- return 0
- try:
- # Get all children before killing parent
- children = proc.children(recursive=True)
- # Kill children first (reverse order - deepest first)
- for child in reversed(children):
- try:
- child.terminate()
- except (psutil.NoSuchProcess, psutil.AccessDenied):
- # Child already dead or we don't have permission - continue
- pass
- # Wait briefly for children to exit
- gone, alive = psutil.wait_procs(children, timeout=graceful_timeout)
- killed_count += len(gone)
- # Force kill remaining children
- for child in alive:
- try:
- child.kill()
- killed_count += 1
- except (psutil.NoSuchProcess, psutil.AccessDenied):
- # Child exited or we don't have permission - continue
- pass
- # Now kill self
- if self.terminate(graceful_timeout=graceful_timeout):
- killed_count += 1
- return killed_count
- except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess):
- # Process tree already dead
- self.status = self.StatusChoices.EXITED
- self.ended_at = self.ended_at or timezone.now()
- self.save()
- return killed_count
- def kill_children_db(self) -> int:
- """
- Kill all DB-tracked child processes (via parent FK).
- Different from kill_tree() which uses OS children.
- This kills processes created via Process.create(parent=self).
- Returns:
- Number of child Process records killed
- """
- killed = 0
- for child in self.children.filter(status=self.StatusChoices.RUNNING):
- if child.terminate():
- killed += 1
- return killed
- # =========================================================================
- # Class methods for querying processes
- # =========================================================================
- @classmethod
- def get_running(cls, process_type: str = None, machine: 'Machine' = None) -> 'QuerySet[Process]':
- """
- Get all running processes, optionally filtered by type.
- Replaces:
- - workers/pid_utils.py get_all_worker_pids()
- - workers/orchestrator.py get_total_worker_count()
- Args:
- process_type: Filter by TypeChoices (e.g., 'worker', 'hook')
- machine: Filter by machine (defaults to current)
- Returns:
- QuerySet of running Process records
- """
- machine = machine or Machine.current()
- qs = cls.objects.filter(
- machine=machine,
- status=cls.StatusChoices.RUNNING,
- )
- if process_type:
- qs = qs.filter(process_type=process_type)
- return qs
- @classmethod
- def get_running_count(cls, process_type: str = None, machine: 'Machine' = None) -> int:
- """
- Get count of running processes.
- Replaces:
- - workers/pid_utils.py get_running_worker_count()
- """
- return cls.get_running(process_type=process_type, machine=machine).count()
- @classmethod
- def stop_all(cls, process_type: str = None, machine: 'Machine' = None, graceful: bool = True) -> int:
- """
- Stop all running processes of a given type.
- Args:
- process_type: Filter by TypeChoices
- machine: Filter by machine
- graceful: If True, use terminate() (SIGTERM→SIGKILL), else kill()
- Returns:
- Number of processes stopped
- """
- stopped = 0
- for proc in cls.get_running(process_type=process_type, machine=machine):
- if graceful:
- if proc.terminate():
- stopped += 1
- else:
- if proc.kill():
- stopped += 1
- return stopped
- @classmethod
- def get_next_worker_id(cls, process_type: str = 'worker', machine: 'Machine' = None) -> int:
- """
- Get the next available worker ID for spawning new workers.
- Replaces workers/pid_utils.py get_next_worker_id().
- Simply returns count of running workers of this type.
- Args:
- process_type: Worker type to count
- machine: Machine to scope query
- Returns:
- Next available worker ID (0-indexed)
- """
- return cls.get_running_count(process_type=process_type, machine=machine)
- # =============================================================================
- # Binary State Machine
- # =============================================================================
- class BinaryMachine(BaseStateMachine, strict_states=True):
- """
- State machine for managing Binary installation lifecycle.
- Hook Lifecycle:
- ┌─────────────────────────────────────────────────────────────┐
- │ QUEUED State │
- │ • Binary needs to be installed │
- └─────────────────────────────────────────────────────────────┘
- ↓ tick() when can_start()
- ┌─────────────────────────────────────────────────────────────┐
- │ STARTED State → enter_started() │
- │ 1. binary.run() │
- │ • discover_hooks('Binary') → all on_Binary__install_* │
- │ • Try each provider hook in sequence: │
- │ - run_hook(script, output_dir, ...) │
- │ - If returncode == 0: │
- │ * Read stdout.log │
- │ * Parse JSONL for 'Binary' record with abspath │
- │ * Update self: abspath, version, sha256, provider │
- │ * Set status=SUCCEEDED, RETURN │
- │ • If no hook succeeds: set status=FAILED │
- └─────────────────────────────────────────────────────────────┘
- ↓ tick() checks status
- ┌─────────────────────────────────────────────────────────────┐
- │ SUCCEEDED / FAILED │
- │ • Set by binary.run() based on hook results │
- │ • Health stats incremented (num_uses_succeeded/failed) │
- └─────────────────────────────────────────────────────────────┘
- """
- model_attr_name = 'binary'
- # States
- queued = State(value=Binary.StatusChoices.QUEUED, initial=True)
- started = State(value=Binary.StatusChoices.STARTED)
- succeeded = State(value=Binary.StatusChoices.SUCCEEDED, final=True)
- failed = State(value=Binary.StatusChoices.FAILED, final=True)
- # Tick Event - transitions based on conditions
- tick = (
- queued.to.itself(unless='can_start') |
- queued.to(started, cond='can_start') |
- started.to.itself(unless='is_finished') |
- started.to(succeeded, cond='is_succeeded') |
- started.to(failed, cond='is_failed')
- )
- def can_start(self) -> bool:
- """Check if binary installation can start."""
- return bool(self.binary.name and self.binary.binproviders)
- def is_succeeded(self) -> bool:
- """Check if installation succeeded (status was set by run())."""
- return self.binary.status == Binary.StatusChoices.SUCCEEDED
- def is_failed(self) -> bool:
- """Check if installation failed (status was set by run())."""
- return self.binary.status == Binary.StatusChoices.FAILED
- def is_finished(self) -> bool:
- """Check if installation has completed (success or failure)."""
- return self.binary.status in (
- Binary.StatusChoices.SUCCEEDED,
- Binary.StatusChoices.FAILED,
- )
- @queued.enter
- def enter_queued(self):
- """Binary is queued for installation."""
- self.binary.update_and_requeue(
- retry_at=timezone.now(),
- status=Binary.StatusChoices.QUEUED,
- )
- @started.enter
- def enter_started(self):
- """Start binary installation."""
- # Lock the binary while installation runs
- self.binary.update_and_requeue(
- retry_at=timezone.now() + timedelta(seconds=300), # 5 min timeout for installation
- status=Binary.StatusChoices.STARTED,
- )
- # Run installation hooks
- self.binary.run()
- # Save updated status (run() updates status to succeeded/failed)
- self.binary.save()
- @succeeded.enter
- def enter_succeeded(self):
- """Binary installed successfully."""
- self.binary.update_and_requeue(
- retry_at=None,
- status=Binary.StatusChoices.SUCCEEDED,
- )
- # Increment health stats
- self.binary.increment_health_stats(success=True)
- @failed.enter
- def enter_failed(self):
- """Binary installation failed."""
- self.binary.update_and_requeue(
- retry_at=None,
- status=Binary.StatusChoices.FAILED,
- )
- # Increment health stats
- self.binary.increment_health_stats(success=False)
- # =============================================================================
- # Process State Machine
- # =============================================================================
- class ProcessMachine(BaseStateMachine, strict_states=True):
- """
- State machine for managing Process (OS subprocess) lifecycle.
- Process Lifecycle:
- ┌─────────────────────────────────────────────────────────────┐
- │ QUEUED State │
- │ • Process ready to launch, waiting for resources │
- └─────────────────────────────────────────────────────────────┘
- ↓ tick() when can_start()
- ┌─────────────────────────────────────────────────────────────┐
- │ RUNNING State → enter_running() │
- │ 1. process.launch() │
- │ • Spawn subprocess with cmd, pwd, env, timeout │
- │ • Set pid, started_at │
- │ • Process runs in background or foreground │
- │ 2. Monitor process completion │
- │ • Check exit code when process completes │
- └─────────────────────────────────────────────────────────────┘
- ↓ tick() checks is_exited()
- ┌─────────────────────────────────────────────────────────────┐
- │ EXITED State │
- │ • Process completed (exit_code set) │
- │ • Health stats incremented │
- │ • stdout/stderr captured │
- └─────────────────────────────────────────────────────────────┘
- Note: This is a simpler state machine than ArchiveResult.
- Process is just about execution lifecycle. ArchiveResult handles
- the archival-specific logic (status, output parsing, etc.).
- """
- model_attr_name = 'process'
- # States
- queued = State(value=Process.StatusChoices.QUEUED, initial=True)
- running = State(value=Process.StatusChoices.RUNNING)
- exited = State(value=Process.StatusChoices.EXITED, final=True)
- # Tick Event - transitions based on conditions
- tick = (
- queued.to.itself(unless='can_start') |
- queued.to(running, cond='can_start') |
- running.to.itself(unless='is_exited') |
- running.to(exited, cond='is_exited')
- )
- # Additional events (for explicit control)
- launch = queued.to(running)
- kill = running.to(exited)
- def can_start(self) -> bool:
- """Check if process can start (has cmd and machine)."""
- return bool(self.process.cmd and self.process.machine)
- def is_exited(self) -> bool:
- """Check if process has exited (exit_code is set)."""
- return self.process.exit_code is not None
- @queued.enter
- def enter_queued(self):
- """Process is queued for execution."""
- self.process.update_and_requeue(
- retry_at=timezone.now(),
- status=Process.StatusChoices.QUEUED,
- )
- @running.enter
- def enter_running(self):
- """Start process execution."""
- # Lock the process while it runs
- self.process.update_and_requeue(
- retry_at=timezone.now() + timedelta(seconds=self.process.timeout),
- status=Process.StatusChoices.RUNNING,
- started_at=timezone.now(),
- )
- # Launch the subprocess
- # NOTE: This is a placeholder - actual launch logic would
- # be implemented based on how hooks currently spawn processes
- # For now, Process is a data model that tracks execution metadata
- # The actual subprocess spawning is still handled by run_hook()
- # Mark as immediately exited for now (until we refactor run_hook)
- # In the future, this would actually spawn the subprocess
- self.process.exit_code = 0 # Placeholder
- self.process.save()
- @exited.enter
- def enter_exited(self):
- """Process has exited."""
- success = self.process.exit_code == 0
- self.process.update_and_requeue(
- retry_at=None,
- status=Process.StatusChoices.EXITED,
- ended_at=timezone.now(),
- )
- # Increment health stats based on exit code
- self.process.increment_health_stats(success=success)
- # =============================================================================
- # State Machine Registration
- # =============================================================================
- # Manually register state machines with python-statemachine registry
- registry.register(BinaryMachine)
- registry.register(ProcessMachine)
|