models.py 66 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770
  1. __package__ = 'archivebox.machine'
  2. import os
  3. import sys
  4. import socket
  5. from pathlib import Path
  6. from archivebox.uuid_compat import uuid7
  7. from datetime import timedelta, datetime
  8. from statemachine import State, registry
  9. from django.db import models
  10. from django.utils import timezone
  11. from django.utils.functional import cached_property
  12. from archivebox.base_models.models import ModelWithHealthStats
  13. from archivebox.workers.models import BaseStateMachine
  14. from .detect import get_host_guid, get_os_info, get_vm_info, get_host_network, get_host_stats
  15. try:
  16. import psutil
  17. PSUTIL_AVAILABLE = True
  18. except ImportError:
  19. PSUTIL_AVAILABLE = False
  20. _CURRENT_MACHINE = None
  21. _CURRENT_INTERFACE = None
  22. _CURRENT_BINARIES = {}
  23. _CURRENT_PROCESS = None
  24. MACHINE_RECHECK_INTERVAL = 7 * 24 * 60 * 60
  25. NETWORK_INTERFACE_RECHECK_INTERVAL = 1 * 60 * 60
  26. BINARY_RECHECK_INTERVAL = 1 * 30 * 60
  27. PROCESS_RECHECK_INTERVAL = 60 # Re-validate every 60 seconds
  28. PID_REUSE_WINDOW = timedelta(hours=24) # Max age for considering a PID match valid
  29. START_TIME_TOLERANCE = 5.0 # Seconds tolerance for start time matching
  30. class MachineManager(models.Manager):
  31. def current(self) -> 'Machine':
  32. return Machine.current()
  33. class Machine(ModelWithHealthStats):
  34. id = models.UUIDField(primary_key=True, default=uuid7, editable=False, unique=True)
  35. created_at = models.DateTimeField(default=timezone.now, db_index=True)
  36. modified_at = models.DateTimeField(auto_now=True)
  37. guid = models.CharField(max_length=64, default=None, null=False, unique=True, editable=False)
  38. hostname = models.CharField(max_length=63, default=None, null=False)
  39. hw_in_docker = models.BooleanField(default=False, null=False)
  40. hw_in_vm = models.BooleanField(default=False, null=False)
  41. hw_manufacturer = models.CharField(max_length=63, default=None, null=False)
  42. hw_product = models.CharField(max_length=63, default=None, null=False)
  43. hw_uuid = models.CharField(max_length=255, default=None, null=False)
  44. os_arch = models.CharField(max_length=15, default=None, null=False)
  45. os_family = models.CharField(max_length=15, default=None, null=False)
  46. os_platform = models.CharField(max_length=63, default=None, null=False)
  47. os_release = models.CharField(max_length=63, default=None, null=False)
  48. os_kernel = models.CharField(max_length=255, default=None, null=False)
  49. stats = models.JSONField(default=dict, null=True, blank=True)
  50. config = models.JSONField(default=dict, null=True, blank=True,
  51. help_text="Machine-specific config overrides (e.g., resolved binary paths like WGET_BINARY)")
  52. num_uses_failed = models.PositiveIntegerField(default=0)
  53. num_uses_succeeded = models.PositiveIntegerField(default=0)
  54. objects: MachineManager = MachineManager()
  55. networkinterface_set: models.Manager['NetworkInterface']
  56. class Meta:
  57. app_label = 'machine'
  58. @classmethod
  59. def current(cls) -> 'Machine':
  60. global _CURRENT_MACHINE
  61. if _CURRENT_MACHINE:
  62. if timezone.now() < _CURRENT_MACHINE.modified_at + timedelta(seconds=MACHINE_RECHECK_INTERVAL):
  63. return _CURRENT_MACHINE
  64. _CURRENT_MACHINE = None
  65. _CURRENT_MACHINE, _ = cls.objects.update_or_create(
  66. guid=get_host_guid(),
  67. defaults={'hostname': socket.gethostname(), **get_os_info(), **get_vm_info(), 'stats': get_host_stats()},
  68. )
  69. return _CURRENT_MACHINE
  70. @staticmethod
  71. def from_jsonl(record: dict, overrides: dict = None):
  72. """
  73. Update Machine config from JSONL record.
  74. Args:
  75. record: JSONL record with '_method': 'update', 'key': '...', 'value': '...'
  76. overrides: Not used
  77. Returns:
  78. Machine instance or None
  79. """
  80. method = record.get('_method')
  81. if method == 'update':
  82. key = record.get('key')
  83. value = record.get('value')
  84. if key and value:
  85. machine = Machine.current()
  86. if not machine.config:
  87. machine.config = {}
  88. machine.config[key] = value
  89. machine.save(update_fields=['config'])
  90. return machine
  91. return None
  92. class NetworkInterfaceManager(models.Manager):
  93. def current(self) -> 'NetworkInterface':
  94. return NetworkInterface.current()
  95. class NetworkInterface(ModelWithHealthStats):
  96. id = models.UUIDField(primary_key=True, default=uuid7, editable=False, unique=True)
  97. created_at = models.DateTimeField(default=timezone.now, db_index=True)
  98. modified_at = models.DateTimeField(auto_now=True)
  99. machine = models.ForeignKey(Machine, on_delete=models.CASCADE, default=None, null=False)
  100. mac_address = models.CharField(max_length=17, default=None, null=False, editable=False)
  101. ip_public = models.GenericIPAddressField(default=None, null=False, editable=False)
  102. ip_local = models.GenericIPAddressField(default=None, null=False, editable=False)
  103. dns_server = models.GenericIPAddressField(default=None, null=False, editable=False)
  104. hostname = models.CharField(max_length=63, default=None, null=False)
  105. iface = models.CharField(max_length=15, default=None, null=False)
  106. isp = models.CharField(max_length=63, default=None, null=False)
  107. city = models.CharField(max_length=63, default=None, null=False)
  108. region = models.CharField(max_length=63, default=None, null=False)
  109. country = models.CharField(max_length=63, default=None, null=False)
  110. num_uses_failed = models.PositiveIntegerField(default=0)
  111. num_uses_succeeded = models.PositiveIntegerField(default=0)
  112. objects: NetworkInterfaceManager = NetworkInterfaceManager()
  113. class Meta:
  114. app_label = 'machine'
  115. unique_together = (('machine', 'ip_public', 'ip_local', 'mac_address', 'dns_server'),)
  116. @classmethod
  117. def current(cls) -> 'NetworkInterface':
  118. global _CURRENT_INTERFACE
  119. if _CURRENT_INTERFACE:
  120. if timezone.now() < _CURRENT_INTERFACE.modified_at + timedelta(seconds=NETWORK_INTERFACE_RECHECK_INTERVAL):
  121. return _CURRENT_INTERFACE
  122. _CURRENT_INTERFACE = None
  123. machine = Machine.objects.current()
  124. net_info = get_host_network()
  125. _CURRENT_INTERFACE, _ = cls.objects.update_or_create(
  126. machine=machine, ip_public=net_info.pop('ip_public'), ip_local=net_info.pop('ip_local'),
  127. mac_address=net_info.pop('mac_address'), dns_server=net_info.pop('dns_server'), defaults=net_info,
  128. )
  129. return _CURRENT_INTERFACE
  130. class BinaryManager(models.Manager):
  131. def get_from_db_or_cache(self, name: str, abspath: str = '', version: str = '', sha256: str = '', binprovider: str = 'env') -> 'Binary':
  132. """Get or create an Binary record from the database or cache."""
  133. global _CURRENT_BINARIES
  134. cached = _CURRENT_BINARIES.get(name)
  135. if cached and timezone.now() < cached.modified_at + timedelta(seconds=BINARY_RECHECK_INTERVAL):
  136. return cached
  137. _CURRENT_BINARIES[name], _ = self.update_or_create(
  138. machine=Machine.objects.current(), name=name, binprovider=binprovider,
  139. version=version, abspath=abspath, sha256=sha256,
  140. )
  141. return _CURRENT_BINARIES[name]
  142. def get_valid_binary(self, name: str, machine: 'Machine | None' = None) -> 'Binary | None':
  143. """Get a valid Binary for the given name on the current machine, or None if not found."""
  144. machine = machine or Machine.current()
  145. return self.filter(
  146. machine=machine,
  147. name__iexact=name,
  148. ).exclude(abspath='').exclude(abspath__isnull=True).order_by('-modified_at').first()
  149. class Binary(ModelWithHealthStats):
  150. """
  151. Tracks an binary on a specific machine.
  152. Follows the unified state machine pattern:
  153. - queued: Binary needs to be installed
  154. - started: Installation in progress
  155. - succeeded: Binary installed successfully (abspath, version, sha256 populated)
  156. - failed: Installation failed
  157. State machine calls run() which executes on_Binary__install_* hooks
  158. to install the binary using the specified providers.
  159. """
  160. class StatusChoices(models.TextChoices):
  161. QUEUED = 'queued', 'Queued'
  162. STARTED = 'started', 'Started'
  163. SUCCEEDED = 'succeeded', 'Succeeded'
  164. FAILED = 'failed', 'Failed'
  165. id = models.UUIDField(primary_key=True, default=uuid7, editable=False, unique=True)
  166. created_at = models.DateTimeField(default=timezone.now, db_index=True)
  167. modified_at = models.DateTimeField(auto_now=True)
  168. machine = models.ForeignKey(Machine, on_delete=models.CASCADE, null=False)
  169. # Binary metadata
  170. name = models.CharField(max_length=63, default='', null=False, blank=True, db_index=True)
  171. binproviders = models.CharField(max_length=127, default='env', null=False, blank=True,
  172. help_text="Comma-separated list of allowed providers: apt,brew,pip,npm,env")
  173. overrides = models.JSONField(default=dict, blank=True,
  174. help_text="Provider-specific overrides: {'apt': {'packages': ['pkg']}, ...}")
  175. # Installation results (populated after installation)
  176. binprovider = models.CharField(max_length=31, default='', null=False, blank=True,
  177. help_text="Provider that successfully installed this binary")
  178. abspath = models.CharField(max_length=255, default='', null=False, blank=True)
  179. version = models.CharField(max_length=32, default='', null=False, blank=True)
  180. sha256 = models.CharField(max_length=64, default='', null=False, blank=True)
  181. # State machine fields
  182. status = models.CharField(max_length=16, choices=StatusChoices.choices, default=StatusChoices.QUEUED, db_index=True)
  183. retry_at = models.DateTimeField(default=timezone.now, null=True, blank=True, db_index=True,
  184. help_text="When to retry this binary installation")
  185. output_dir = models.CharField(max_length=255, default='', null=False, blank=True,
  186. help_text="Directory where installation hook logs are stored")
  187. # Health stats
  188. num_uses_failed = models.PositiveIntegerField(default=0)
  189. num_uses_succeeded = models.PositiveIntegerField(default=0)
  190. state_machine_name: str = 'archivebox.machine.models.BinaryMachine'
  191. objects: BinaryManager = BinaryManager()
  192. class Meta:
  193. app_label = 'machine'
  194. verbose_name = 'Binary'
  195. verbose_name_plural = 'Binaries'
  196. unique_together = (('machine', 'name', 'abspath', 'version', 'sha256'),)
  197. def __str__(self) -> str:
  198. return f'{self.name}@{self.binprovider}+{self.abspath}@{self.version}'
  199. @property
  200. def is_valid(self) -> bool:
  201. """A binary is valid if it has both abspath and version set."""
  202. return bool(self.abspath) and bool(self.version)
  203. @cached_property
  204. def binary_info(self) -> dict:
  205. """Return info about the binary."""
  206. return {
  207. 'name': self.name,
  208. 'abspath': self.abspath,
  209. 'version': self.version,
  210. 'binprovider': self.binprovider,
  211. 'is_valid': self.is_valid,
  212. }
  213. def to_jsonl(self) -> dict:
  214. """
  215. Convert Binary model instance to a JSONL record.
  216. """
  217. from archivebox.config import VERSION
  218. return {
  219. 'type': 'Binary',
  220. 'schema_version': VERSION,
  221. 'id': str(self.id),
  222. 'machine_id': str(self.machine_id),
  223. 'name': self.name,
  224. 'binprovider': self.binprovider,
  225. 'abspath': self.abspath,
  226. 'version': self.version,
  227. 'sha256': self.sha256,
  228. 'status': self.status,
  229. }
  230. @staticmethod
  231. def from_jsonl(record: dict, overrides: dict = None):
  232. """
  233. Create/update Binary from JSONL record.
  234. Handles two cases:
  235. 1. From binaries.jsonl: creates queued binary with name, binproviders, overrides
  236. 2. From hook output: updates binary with abspath, version, sha256, binprovider
  237. Args:
  238. record: JSONL record with 'name' and either:
  239. - 'binproviders', 'overrides' (from binaries.jsonl)
  240. - 'abspath', 'version', 'sha256', 'binprovider' (from hook output)
  241. overrides: Not used
  242. Returns:
  243. Binary instance or None
  244. """
  245. name = record.get('name')
  246. if not name:
  247. return None
  248. machine = Machine.current()
  249. overrides = overrides or {}
  250. # Case 1: From binaries.jsonl - create queued binary
  251. if 'binproviders' in record or ('overrides' in record and not record.get('abspath')):
  252. binary, created = Binary.objects.get_or_create(
  253. machine=machine,
  254. name=name,
  255. defaults={
  256. 'binproviders': record.get('binproviders', 'env'),
  257. 'overrides': record.get('overrides', {}),
  258. 'status': Binary.StatusChoices.QUEUED,
  259. 'retry_at': timezone.now(),
  260. }
  261. )
  262. return binary
  263. # Case 2: From hook output - update with installation results
  264. abspath = record.get('abspath')
  265. version = record.get('version')
  266. if not abspath or not version:
  267. return None
  268. binary, _ = Binary.objects.update_or_create(
  269. machine=machine,
  270. name=name,
  271. defaults={
  272. 'abspath': abspath,
  273. 'version': version,
  274. 'sha256': record.get('sha256', ''),
  275. 'binprovider': record.get('binprovider', 'env'),
  276. 'status': Binary.StatusChoices.SUCCEEDED,
  277. 'retry_at': None,
  278. }
  279. )
  280. return binary
  281. @property
  282. def OUTPUT_DIR(self):
  283. """Return the output directory for this binary installation."""
  284. from pathlib import Path
  285. from django.conf import settings
  286. DATA_DIR = getattr(settings, 'DATA_DIR', Path.cwd())
  287. return Path(DATA_DIR) / 'machines' / str(self.machine_id) / 'binaries' / self.name / str(self.id)
  288. def update_and_requeue(self, **kwargs):
  289. """
  290. Update binary fields and requeue for worker state machine.
  291. Sets modified_at to ensure workers pick up changes.
  292. Always saves the model after updating.
  293. """
  294. for key, value in kwargs.items():
  295. setattr(self, key, value)
  296. self.modified_at = timezone.now()
  297. self.save()
  298. def run(self):
  299. """
  300. Execute binary installation by running on_Binary__install_* hooks.
  301. Called by BinaryMachine when entering 'started' state.
  302. Runs ALL on_Binary__install_* hooks - each hook checks binproviders
  303. and decides if it can handle this binary. First hook to succeed wins.
  304. Updates status to SUCCEEDED or FAILED based on hook output.
  305. """
  306. import json
  307. from archivebox.hooks import discover_hooks, run_hook
  308. from archivebox.config.configset import get_config
  309. # Get merged config (Binary doesn't have crawl/snapshot context)
  310. config = get_config(scope='global')
  311. # Create output directory
  312. output_dir = self.OUTPUT_DIR
  313. output_dir.mkdir(parents=True, exist_ok=True)
  314. self.output_dir = str(output_dir)
  315. self.save()
  316. # Discover ALL on_Binary__install_* hooks
  317. hooks = discover_hooks('Binary', config=config)
  318. if not hooks:
  319. self.status = self.StatusChoices.FAILED
  320. self.save()
  321. return
  322. # Run each hook - they decide if they can handle this binary
  323. for hook in hooks:
  324. plugin_name = hook.parent.name
  325. plugin_output_dir = output_dir / plugin_name
  326. plugin_output_dir.mkdir(parents=True, exist_ok=True)
  327. # Build kwargs for hook
  328. hook_kwargs = {
  329. 'binary_id': str(self.id),
  330. 'machine_id': str(self.machine_id),
  331. 'name': self.name,
  332. 'binproviders': self.binproviders,
  333. }
  334. # Add overrides as JSON string if present
  335. if self.overrides:
  336. hook_kwargs['overrides'] = json.dumps(self.overrides)
  337. # Run the hook
  338. result = run_hook(
  339. hook,
  340. output_dir=plugin_output_dir,
  341. config=config,
  342. timeout=600, # 10 min timeout for binary installation
  343. **hook_kwargs
  344. )
  345. # Background hook (unlikely for binary installation, but handle it)
  346. if result is None:
  347. continue
  348. # Failed or skipped hook - try next one
  349. if result['returncode'] != 0:
  350. continue
  351. # Parse JSONL output to check for successful installation
  352. stdout_file = plugin_output_dir / 'stdout.log'
  353. if stdout_file.exists():
  354. stdout = stdout_file.read_text()
  355. for line in stdout.splitlines():
  356. if line.strip() and line.strip().startswith('{'):
  357. try:
  358. record = json.loads(line)
  359. if record.get('type') == 'Binary' and record.get('abspath'):
  360. # Update self from successful installation
  361. self.abspath = record['abspath']
  362. self.version = record.get('version', '')
  363. self.sha256 = record.get('sha256', '')
  364. self.binprovider = record.get('binprovider', 'env')
  365. self.status = self.StatusChoices.SUCCEEDED
  366. self.save()
  367. return
  368. except json.JSONDecodeError:
  369. continue
  370. # No hook succeeded
  371. self.status = self.StatusChoices.FAILED
  372. self.save()
  373. def cleanup(self):
  374. """
  375. Clean up background binary installation hooks.
  376. Called by state machine if needed (not typically used for binaries
  377. since installations are foreground, but included for consistency).
  378. """
  379. from pathlib import Path
  380. from archivebox.misc.process_utils import safe_kill_process
  381. output_dir = self.OUTPUT_DIR
  382. if not output_dir.exists():
  383. return
  384. # Kill any background hooks
  385. for plugin_dir in output_dir.iterdir():
  386. if not plugin_dir.is_dir():
  387. continue
  388. pid_file = plugin_dir / 'hook.pid'
  389. cmd_file = plugin_dir / 'cmd.sh'
  390. safe_kill_process(pid_file, cmd_file)
  391. # =============================================================================
  392. # Process Model
  393. # =============================================================================
  394. class ProcessManager(models.Manager):
  395. """Manager for Process model."""
  396. def current(self) -> 'Process':
  397. """Get the Process record for the current OS process."""
  398. return Process.current()
  399. def get_by_pid(self, pid: int, machine: 'Machine' = None) -> 'Process | None':
  400. """
  401. Find a Process by PID with proper validation against PID reuse.
  402. IMPORTANT: PIDs are reused by the OS! This method:
  403. 1. Filters by machine (required - PIDs are only unique per machine)
  404. 2. Filters by time window (processes older than 24h are stale)
  405. 3. Validates via psutil that start times match
  406. Args:
  407. pid: OS process ID
  408. machine: Machine instance (defaults to current machine)
  409. Returns:
  410. Process if found and validated, None otherwise
  411. """
  412. if not PSUTIL_AVAILABLE:
  413. return None
  414. machine = machine or Machine.current()
  415. # Get the actual process start time from OS
  416. try:
  417. os_proc = psutil.Process(pid)
  418. os_start_time = os_proc.create_time()
  419. except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess):
  420. # Process doesn't exist - any DB record with this PID is stale
  421. return None
  422. # Query candidates: same machine, same PID, recent, still RUNNING
  423. candidates = self.filter(
  424. machine=machine,
  425. pid=pid,
  426. status=Process.StatusChoices.RUNNING,
  427. started_at__gte=timezone.now() - PID_REUSE_WINDOW,
  428. ).order_by('-started_at')
  429. for candidate in candidates:
  430. # Validate start time matches (within tolerance)
  431. if candidate.started_at:
  432. db_start_time = candidate.started_at.timestamp()
  433. if abs(db_start_time - os_start_time) < START_TIME_TOLERANCE:
  434. return candidate
  435. return None
  436. def create_for_archiveresult(self, archiveresult, **kwargs):
  437. """
  438. Create a Process record for an ArchiveResult.
  439. Called during migration and when creating new ArchiveResults.
  440. """
  441. # Defaults from ArchiveResult if not provided
  442. defaults = {
  443. 'machine': Machine.current(),
  444. 'pwd': kwargs.get('pwd') or str(archiveresult.snapshot.output_dir / archiveresult.plugin),
  445. 'cmd': kwargs.get('cmd') or [],
  446. 'status': 'queued',
  447. 'timeout': kwargs.get('timeout', 120),
  448. 'env': kwargs.get('env', {}),
  449. }
  450. defaults.update(kwargs)
  451. process = self.create(**defaults)
  452. return process
  453. class Process(models.Model):
  454. """
  455. Tracks a single OS process execution.
  456. Process represents the actual subprocess spawned to execute a hook.
  457. One Process can optionally be associated with an ArchiveResult (via OneToOne),
  458. but Process can also exist standalone for internal operations.
  459. Follows the unified state machine pattern:
  460. - queued: Process ready to launch
  461. - running: Process actively executing
  462. - exited: Process completed (check exit_code for success/failure)
  463. State machine calls launch() to spawn the process and monitors its lifecycle.
  464. """
  465. class StatusChoices(models.TextChoices):
  466. QUEUED = 'queued', 'Queued'
  467. RUNNING = 'running', 'Running'
  468. EXITED = 'exited', 'Exited'
  469. class TypeChoices(models.TextChoices):
  470. CLI = 'cli', 'CLI Command'
  471. SUPERVISORD = 'supervisord', 'Supervisord Daemon'
  472. ORCHESTRATOR = 'orchestrator', 'Orchestrator'
  473. WORKER = 'worker', 'Worker Process'
  474. HOOK = 'hook', 'Hook Script'
  475. BINARY = 'binary', 'Binary Execution'
  476. # Primary fields
  477. id = models.UUIDField(primary_key=True, default=uuid7, editable=False, unique=True)
  478. created_at = models.DateTimeField(default=timezone.now, db_index=True)
  479. modified_at = models.DateTimeField(auto_now=True)
  480. # Parent process FK for hierarchy tracking
  481. parent = models.ForeignKey(
  482. 'self',
  483. on_delete=models.SET_NULL,
  484. null=True,
  485. blank=True,
  486. related_name='children',
  487. help_text='Parent process that spawned this one'
  488. )
  489. # Process type for distinguishing in hierarchy
  490. process_type = models.CharField(
  491. max_length=16,
  492. choices=TypeChoices.choices,
  493. default=TypeChoices.BINARY,
  494. db_index=True,
  495. help_text='Type of process in the execution hierarchy'
  496. )
  497. # Machine FK - required (every process runs on a machine)
  498. machine = models.ForeignKey(
  499. Machine,
  500. on_delete=models.CASCADE,
  501. null=False,
  502. related_name='process_set',
  503. help_text='Machine where this process executed'
  504. )
  505. # Execution metadata
  506. pwd = models.CharField(max_length=512, default='', null=False, blank=True,
  507. help_text='Working directory for process execution')
  508. cmd = models.JSONField(default=list, null=False, blank=True,
  509. help_text='Command as array of arguments')
  510. env = models.JSONField(default=dict, null=False, blank=True,
  511. help_text='Environment variables for process')
  512. timeout = models.IntegerField(default=120, null=False,
  513. help_text='Timeout in seconds')
  514. # Process results
  515. pid = models.IntegerField(default=None, null=True, blank=True,
  516. help_text='OS process ID')
  517. exit_code = models.IntegerField(default=None, null=True, blank=True,
  518. help_text='Process exit code (0 = success)')
  519. stdout = models.TextField(default='', null=False, blank=True,
  520. help_text='Standard output from process')
  521. stderr = models.TextField(default='', null=False, blank=True,
  522. help_text='Standard error from process')
  523. # Timing
  524. started_at = models.DateTimeField(default=None, null=True, blank=True,
  525. help_text='When process was launched')
  526. ended_at = models.DateTimeField(default=None, null=True, blank=True,
  527. help_text='When process completed/terminated')
  528. # Optional FKs
  529. binary = models.ForeignKey(
  530. Binary,
  531. on_delete=models.SET_NULL,
  532. null=True, blank=True,
  533. related_name='process_set',
  534. help_text='Binary used by this process'
  535. )
  536. iface = models.ForeignKey(
  537. NetworkInterface,
  538. on_delete=models.SET_NULL,
  539. null=True, blank=True,
  540. related_name='process_set',
  541. help_text='Network interface used by this process'
  542. )
  543. # Optional connection URL (for CDP, sonic, etc.)
  544. url = models.URLField(max_length=2048, default=None, null=True, blank=True,
  545. help_text='Connection URL (CDP endpoint, sonic server, etc.)')
  546. # Reverse relation to ArchiveResult (OneToOne from AR side)
  547. # archiveresult: OneToOneField defined on ArchiveResult model
  548. # State machine fields
  549. status = models.CharField(
  550. max_length=16,
  551. choices=StatusChoices.choices,
  552. default=StatusChoices.QUEUED,
  553. db_index=True
  554. )
  555. retry_at = models.DateTimeField(
  556. default=timezone.now,
  557. null=True, blank=True,
  558. db_index=True,
  559. help_text='When to retry this process'
  560. )
  561. state_machine_name: str = 'archivebox.machine.models.ProcessMachine'
  562. objects: ProcessManager = ProcessManager()
  563. class Meta:
  564. app_label = 'machine'
  565. verbose_name = 'Process'
  566. verbose_name_plural = 'Processes'
  567. indexes = [
  568. models.Index(fields=['machine', 'status', 'retry_at']),
  569. models.Index(fields=['binary', 'exit_code']),
  570. models.Index(fields=['parent', 'status']),
  571. models.Index(fields=['machine', 'pid', 'started_at']),
  572. ]
  573. def __str__(self) -> str:
  574. cmd_str = ' '.join(self.cmd[:3]) if self.cmd else '(no cmd)'
  575. return f'Process[{self.id}] {cmd_str} ({self.status})'
  576. # Properties that delegate to related objects
  577. @property
  578. def cmd_version(self) -> str:
  579. """Get version from associated binary."""
  580. return self.binary.version if self.binary else ''
  581. @property
  582. def bin_abspath(self) -> str:
  583. """Get absolute path from associated binary."""
  584. return self.binary.abspath if self.binary else ''
  585. @property
  586. def plugin(self) -> str:
  587. """Get plugin name from associated ArchiveResult (if any)."""
  588. if hasattr(self, 'archiveresult'):
  589. # Inline import to avoid circular dependency
  590. return self.archiveresult.plugin
  591. return ''
  592. @property
  593. def hook_name(self) -> str:
  594. """Get hook name from associated ArchiveResult (if any)."""
  595. if hasattr(self, 'archiveresult'):
  596. return self.archiveresult.hook_name
  597. return ''
  598. def to_jsonl(self) -> dict:
  599. """
  600. Convert Process model instance to a JSONL record.
  601. """
  602. from archivebox.config import VERSION
  603. record = {
  604. 'type': 'Process',
  605. 'schema_version': VERSION,
  606. 'id': str(self.id),
  607. 'machine_id': str(self.machine_id),
  608. 'cmd': self.cmd,
  609. 'pwd': self.pwd,
  610. 'status': self.status,
  611. 'exit_code': self.exit_code,
  612. 'started_at': self.started_at.isoformat() if self.started_at else None,
  613. 'ended_at': self.ended_at.isoformat() if self.ended_at else None,
  614. }
  615. # Include optional fields if set
  616. if self.binary_id:
  617. record['binary_id'] = str(self.binary_id)
  618. if self.pid:
  619. record['pid'] = self.pid
  620. if self.timeout:
  621. record['timeout'] = self.timeout
  622. return record
  623. def update_and_requeue(self, **kwargs):
  624. """
  625. Update process fields and requeue for worker state machine.
  626. Sets modified_at to ensure workers pick up changes.
  627. """
  628. for key, value in kwargs.items():
  629. setattr(self, key, value)
  630. self.modified_at = timezone.now()
  631. self.save()
  632. # =========================================================================
  633. # Process.current() and hierarchy methods
  634. # =========================================================================
  635. @classmethod
  636. def current(cls) -> 'Process':
  637. """
  638. Get or create the Process record for the current OS process.
  639. Similar to Machine.current(), this:
  640. 1. Checks cache for existing Process with matching PID
  641. 2. Validates the cached Process is still valid (PID not reused)
  642. 3. Creates new Process if needed
  643. IMPORTANT: Uses psutil to validate PID hasn't been reused.
  644. PIDs are recycled by OS, so we compare start times.
  645. """
  646. global _CURRENT_PROCESS
  647. current_pid = os.getpid()
  648. machine = Machine.current()
  649. # Check cache validity
  650. if _CURRENT_PROCESS:
  651. # Verify: same PID, same machine, cache not expired
  652. if (_CURRENT_PROCESS.pid == current_pid and
  653. _CURRENT_PROCESS.machine_id == machine.id and
  654. timezone.now() < _CURRENT_PROCESS.modified_at + timedelta(seconds=PROCESS_RECHECK_INTERVAL)):
  655. return _CURRENT_PROCESS
  656. _CURRENT_PROCESS = None
  657. # Get actual process start time from OS for validation
  658. os_start_time = None
  659. if PSUTIL_AVAILABLE:
  660. try:
  661. os_proc = psutil.Process(current_pid)
  662. os_start_time = os_proc.create_time()
  663. except (psutil.NoSuchProcess, psutil.AccessDenied):
  664. pass
  665. # Try to find existing Process for this PID on this machine
  666. # Filter by: machine + PID + RUNNING + recent + start time matches
  667. if os_start_time:
  668. existing = cls.objects.filter(
  669. machine=machine,
  670. pid=current_pid,
  671. status=cls.StatusChoices.RUNNING,
  672. started_at__gte=timezone.now() - PID_REUSE_WINDOW,
  673. ).order_by('-started_at').first()
  674. if existing and existing.started_at:
  675. db_start_time = existing.started_at.timestamp()
  676. if abs(db_start_time - os_start_time) < START_TIME_TOLERANCE:
  677. _CURRENT_PROCESS = existing
  678. return existing
  679. # No valid existing record - create new one
  680. parent = cls._find_parent_process(machine)
  681. process_type = cls._detect_process_type()
  682. # Use psutil cmdline if available (matches what proc() will validate against)
  683. # Otherwise fall back to sys.argv
  684. cmd = sys.argv
  685. if PSUTIL_AVAILABLE:
  686. try:
  687. os_proc = psutil.Process(current_pid)
  688. cmd = os_proc.cmdline()
  689. except (psutil.NoSuchProcess, psutil.AccessDenied):
  690. pass
  691. # Use psutil start time if available (more accurate than timezone.now())
  692. if os_start_time:
  693. started_at = datetime.fromtimestamp(os_start_time, tz=timezone.get_current_timezone())
  694. else:
  695. started_at = timezone.now()
  696. _CURRENT_PROCESS = cls.objects.create(
  697. machine=machine,
  698. parent=parent,
  699. process_type=process_type,
  700. cmd=cmd,
  701. pwd=os.getcwd(),
  702. pid=current_pid,
  703. started_at=started_at,
  704. status=cls.StatusChoices.RUNNING,
  705. )
  706. return _CURRENT_PROCESS
  707. @classmethod
  708. def _find_parent_process(cls, machine: 'Machine' = None) -> 'Process | None':
  709. """
  710. Find the parent Process record by looking up PPID.
  711. IMPORTANT: Validates against PID reuse by checking:
  712. 1. Same machine (PIDs are only unique per machine)
  713. 2. Start time matches OS process start time
  714. 3. Process is still RUNNING and recent
  715. Returns None if parent is not an ArchiveBox process.
  716. """
  717. if not PSUTIL_AVAILABLE:
  718. return None
  719. ppid = os.getppid()
  720. machine = machine or Machine.current()
  721. # Get parent process start time from OS
  722. try:
  723. os_parent = psutil.Process(ppid)
  724. os_parent_start = os_parent.create_time()
  725. except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess):
  726. return None # Parent process doesn't exist
  727. # Find matching Process record
  728. candidates = cls.objects.filter(
  729. machine=machine,
  730. pid=ppid,
  731. status=cls.StatusChoices.RUNNING,
  732. started_at__gte=timezone.now() - PID_REUSE_WINDOW,
  733. ).order_by('-started_at')
  734. for candidate in candidates:
  735. if candidate.started_at:
  736. db_start_time = candidate.started_at.timestamp()
  737. if abs(db_start_time - os_parent_start) < START_TIME_TOLERANCE:
  738. return candidate
  739. return None # No matching ArchiveBox parent process
  740. @classmethod
  741. def _detect_process_type(cls) -> str:
  742. """
  743. Detect the type of the current process from sys.argv.
  744. """
  745. argv_str = ' '.join(sys.argv).lower()
  746. if 'supervisord' in argv_str:
  747. return cls.TypeChoices.SUPERVISORD
  748. elif 'orchestrator' in argv_str:
  749. return cls.TypeChoices.ORCHESTRATOR
  750. elif any(w in argv_str for w in ['crawl_worker', 'snapshot_worker', 'archiveresult_worker']):
  751. return cls.TypeChoices.WORKER
  752. elif 'archivebox' in argv_str:
  753. return cls.TypeChoices.CLI
  754. else:
  755. return cls.TypeChoices.BINARY
  756. @classmethod
  757. def cleanup_stale_running(cls, machine: 'Machine' = None) -> int:
  758. """
  759. Mark stale RUNNING processes as EXITED.
  760. Processes are stale if:
  761. - Status is RUNNING but OS process no longer exists
  762. - Status is RUNNING but started_at is older than PID_REUSE_WINDOW
  763. Returns count of processes cleaned up.
  764. """
  765. machine = machine or Machine.current()
  766. cleaned = 0
  767. stale = cls.objects.filter(
  768. machine=machine,
  769. status=cls.StatusChoices.RUNNING,
  770. )
  771. for proc in stale:
  772. is_stale = False
  773. # Check if too old (PID definitely reused)
  774. if proc.started_at and proc.started_at < timezone.now() - PID_REUSE_WINDOW:
  775. is_stale = True
  776. elif PSUTIL_AVAILABLE and proc.pid is not None:
  777. # Check if OS process still exists with matching start time
  778. try:
  779. os_proc = psutil.Process(proc.pid)
  780. if proc.started_at:
  781. db_start = proc.started_at.timestamp()
  782. os_start = os_proc.create_time()
  783. if abs(db_start - os_start) > START_TIME_TOLERANCE:
  784. is_stale = True # PID reused by different process
  785. except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess):
  786. is_stale = True # Process no longer exists
  787. if is_stale:
  788. proc.status = cls.StatusChoices.EXITED
  789. proc.ended_at = proc.ended_at or timezone.now()
  790. proc.exit_code = proc.exit_code if proc.exit_code is not None else -1
  791. proc.save(update_fields=['status', 'ended_at', 'exit_code'])
  792. cleaned += 1
  793. return cleaned
  794. # =========================================================================
  795. # Tree traversal properties
  796. # =========================================================================
  797. @property
  798. def root(self) -> 'Process':
  799. """Get the root process (CLI command) of this hierarchy."""
  800. proc = self
  801. while proc.parent_id:
  802. proc = proc.parent
  803. return proc
  804. @property
  805. def ancestors(self) -> list['Process']:
  806. """Get all ancestor processes from parent to root."""
  807. ancestors = []
  808. proc = self.parent
  809. while proc:
  810. ancestors.append(proc)
  811. proc = proc.parent
  812. return ancestors
  813. @property
  814. def depth(self) -> int:
  815. """Get depth in the process tree (0 = root)."""
  816. return len(self.ancestors)
  817. def get_descendants(self, include_self: bool = False):
  818. """Get all descendant processes recursively."""
  819. if include_self:
  820. pks = [self.pk]
  821. else:
  822. pks = []
  823. children = list(self.children.values_list('pk', flat=True))
  824. while children:
  825. pks.extend(children)
  826. children = list(Process.objects.filter(parent_id__in=children).values_list('pk', flat=True))
  827. return Process.objects.filter(pk__in=pks)
  828. # =========================================================================
  829. # Validated psutil access via .proc property
  830. # =========================================================================
  831. @property
  832. def proc(self) -> 'psutil.Process | None':
  833. """
  834. Get validated psutil.Process for this record.
  835. Returns psutil.Process ONLY if:
  836. 1. Process with this PID exists in OS
  837. 2. OS process start time matches our started_at (within tolerance)
  838. 3. Process is on current machine
  839. Returns None if:
  840. - PID doesn't exist (process exited)
  841. - PID was reused by a different process (start times don't match)
  842. - We're on a different machine than where process ran
  843. - psutil is not available
  844. This prevents accidentally matching a stale/recycled PID.
  845. """
  846. if not PSUTIL_AVAILABLE:
  847. return None
  848. # Can't get psutil.Process if we don't have a PID
  849. if not self.pid:
  850. return None
  851. # Can't validate processes on other machines
  852. if self.machine_id != Machine.current().id:
  853. return None
  854. try:
  855. os_proc = psutil.Process(self.pid)
  856. except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess):
  857. return None # Process no longer exists
  858. # Validate start time matches to prevent PID reuse confusion
  859. if self.started_at:
  860. os_start_time = os_proc.create_time()
  861. db_start_time = self.started_at.timestamp()
  862. if abs(os_start_time - db_start_time) > START_TIME_TOLERANCE:
  863. # PID has been reused by a different process!
  864. return None
  865. # Optionally validate command matches (extra safety)
  866. if self.cmd:
  867. try:
  868. os_cmdline = os_proc.cmdline()
  869. # Check if first arg (binary) matches
  870. if os_cmdline and self.cmd:
  871. os_binary = os_cmdline[0] if os_cmdline else ''
  872. db_binary = self.cmd[0] if self.cmd else ''
  873. # Match by basename (handles /usr/bin/python3 vs python3)
  874. if os_binary and db_binary:
  875. if Path(os_binary).name != Path(db_binary).name:
  876. return None # Different binary, PID reused
  877. except (psutil.AccessDenied, psutil.ZombieProcess):
  878. pass # Can't check cmdline, trust start time match
  879. return os_proc
  880. @property
  881. def is_running(self) -> bool:
  882. """
  883. Check if process is currently running via psutil.
  884. More reliable than checking status field since it validates
  885. the actual OS process exists and matches our record.
  886. """
  887. proc = self.proc
  888. return proc is not None and proc.is_running()
  889. def is_alive(self) -> bool:
  890. """
  891. Alias for is_running, for compatibility with subprocess.Popen API.
  892. """
  893. return self.is_running
  894. def get_memory_info(self) -> dict | None:
  895. """Get memory usage if process is running."""
  896. proc = self.proc
  897. if proc:
  898. try:
  899. mem = proc.memory_info()
  900. return {'rss': mem.rss, 'vms': mem.vms}
  901. except (psutil.NoSuchProcess, psutil.AccessDenied):
  902. pass
  903. return None
  904. def get_cpu_percent(self) -> float | None:
  905. """Get CPU usage percentage if process is running."""
  906. proc = self.proc
  907. if proc:
  908. try:
  909. return proc.cpu_percent(interval=0.1)
  910. except (psutil.NoSuchProcess, psutil.AccessDenied):
  911. pass
  912. return None
  913. def get_children_pids(self) -> list[int]:
  914. """Get PIDs of child processes from OS (not DB)."""
  915. proc = self.proc
  916. if proc:
  917. try:
  918. return [child.pid for child in proc.children(recursive=True)]
  919. except (psutil.NoSuchProcess, psutil.AccessDenied):
  920. pass
  921. return []
  922. # =========================================================================
  923. # Lifecycle methods (launch, kill, poll, wait)
  924. # =========================================================================
  925. @property
  926. def pid_file(self) -> Path:
  927. """Path to PID file for this process."""
  928. return Path(self.pwd) / 'process.pid' if self.pwd else None
  929. @property
  930. def cmd_file(self) -> Path:
  931. """Path to cmd.sh script for this process."""
  932. return Path(self.pwd) / 'cmd.sh' if self.pwd else None
  933. @property
  934. def stdout_file(self) -> Path:
  935. """Path to stdout log."""
  936. return Path(self.pwd) / 'stdout.log' if self.pwd else None
  937. @property
  938. def stderr_file(self) -> Path:
  939. """Path to stderr log."""
  940. return Path(self.pwd) / 'stderr.log' if self.pwd else None
  941. def _write_pid_file(self) -> None:
  942. """Write PID file with mtime set to process start time."""
  943. from archivebox.misc.process_utils import write_pid_file_with_mtime
  944. if self.pid and self.started_at and self.pid_file:
  945. write_pid_file_with_mtime(
  946. self.pid_file,
  947. self.pid,
  948. self.started_at.timestamp()
  949. )
  950. def _write_cmd_file(self) -> None:
  951. """Write cmd.sh script for debugging/validation."""
  952. from archivebox.misc.process_utils import write_cmd_file
  953. if self.cmd and self.cmd_file:
  954. write_cmd_file(self.cmd_file, self.cmd)
  955. def _build_env(self) -> dict:
  956. """Build environment dict for subprocess, merging stored env with system."""
  957. env = os.environ.copy()
  958. env.update(self.env or {})
  959. return env
  960. def launch(self, background: bool = False) -> 'Process':
  961. """
  962. Spawn the subprocess and update this Process record.
  963. Args:
  964. background: If True, don't wait for completion (for daemons/bg hooks)
  965. Returns:
  966. self (updated with pid, started_at, etc.)
  967. """
  968. import subprocess
  969. import time
  970. # Validate pwd is set (required for output files)
  971. if not self.pwd:
  972. raise ValueError("Process.pwd must be set before calling launch()")
  973. # Ensure output directory exists
  974. Path(self.pwd).mkdir(parents=True, exist_ok=True)
  975. # Write cmd.sh for debugging
  976. self._write_cmd_file()
  977. stdout_path = self.stdout_file
  978. stderr_path = self.stderr_file
  979. with open(stdout_path, 'w') as out, open(stderr_path, 'w') as err:
  980. proc = subprocess.Popen(
  981. self.cmd,
  982. cwd=self.pwd,
  983. stdout=out,
  984. stderr=err,
  985. env=self._build_env(),
  986. )
  987. # Get accurate start time from psutil if available
  988. if PSUTIL_AVAILABLE:
  989. try:
  990. ps_proc = psutil.Process(proc.pid)
  991. self.started_at = datetime.fromtimestamp(
  992. ps_proc.create_time(),
  993. tz=timezone.get_current_timezone()
  994. )
  995. except (psutil.NoSuchProcess, psutil.AccessDenied):
  996. self.started_at = timezone.now()
  997. else:
  998. self.started_at = timezone.now()
  999. self.pid = proc.pid
  1000. self.status = self.StatusChoices.RUNNING
  1001. self.save()
  1002. self._write_pid_file()
  1003. if not background:
  1004. try:
  1005. proc.wait(timeout=self.timeout)
  1006. self.exit_code = proc.returncode
  1007. except subprocess.TimeoutExpired:
  1008. proc.kill()
  1009. proc.wait()
  1010. self.exit_code = -1
  1011. self.ended_at = timezone.now()
  1012. if stdout_path.exists():
  1013. self.stdout = stdout_path.read_text()
  1014. if stderr_path.exists():
  1015. self.stderr = stderr_path.read_text()
  1016. self.status = self.StatusChoices.EXITED
  1017. self.save()
  1018. return self
  1019. def kill(self, signal_num: int = 15) -> bool:
  1020. """
  1021. Kill this process and update status.
  1022. Uses self.proc for safe killing - only kills if PID matches
  1023. our recorded process (prevents killing recycled PIDs).
  1024. Args:
  1025. signal_num: Signal to send (default SIGTERM=15)
  1026. Returns:
  1027. True if killed successfully, False otherwise
  1028. """
  1029. # Use validated psutil.Process to ensure we're killing the right process
  1030. proc = self.proc
  1031. if proc is None:
  1032. # Process doesn't exist or PID was recycled - just update status
  1033. if self.status != self.StatusChoices.EXITED:
  1034. self.status = self.StatusChoices.EXITED
  1035. self.ended_at = self.ended_at or timezone.now()
  1036. self.save()
  1037. return False
  1038. try:
  1039. # Safe to kill - we validated it's our process via start time match
  1040. proc.send_signal(signal_num)
  1041. # Update our record
  1042. # Use standard Unix convention: 128 + signal number
  1043. self.exit_code = 128 + signal_num
  1044. self.ended_at = timezone.now()
  1045. self.status = self.StatusChoices.EXITED
  1046. self.save()
  1047. # Clean up PID file
  1048. if self.pid_file and self.pid_file.exists():
  1049. self.pid_file.unlink(missing_ok=True)
  1050. return True
  1051. except (psutil.NoSuchProcess, psutil.AccessDenied, ProcessLookupError):
  1052. # Process already exited between proc check and kill
  1053. self.status = self.StatusChoices.EXITED
  1054. self.ended_at = self.ended_at or timezone.now()
  1055. self.save()
  1056. return False
  1057. def poll(self) -> int | None:
  1058. """
  1059. Check if process has exited and update status if so.
  1060. Returns:
  1061. exit_code if exited, None if still running
  1062. """
  1063. if self.status == self.StatusChoices.EXITED:
  1064. return self.exit_code
  1065. if not self.is_running:
  1066. # Process exited - read output and update status
  1067. if self.stdout_file and self.stdout_file.exists():
  1068. self.stdout = self.stdout_file.read_text()
  1069. if self.stderr_file and self.stderr_file.exists():
  1070. self.stderr = self.stderr_file.read_text()
  1071. # Try to get exit code from proc or default to unknown
  1072. self.exit_code = self.exit_code if self.exit_code is not None else -1
  1073. self.ended_at = timezone.now()
  1074. self.status = self.StatusChoices.EXITED
  1075. self.save()
  1076. return self.exit_code
  1077. return None # Still running
  1078. def wait(self, timeout: int | None = None) -> int:
  1079. """
  1080. Wait for process to exit, polling periodically.
  1081. Args:
  1082. timeout: Max seconds to wait (None = use self.timeout)
  1083. Returns:
  1084. exit_code
  1085. Raises:
  1086. TimeoutError if process doesn't exit in time
  1087. """
  1088. import time
  1089. timeout = timeout or self.timeout
  1090. start = time.time()
  1091. while True:
  1092. exit_code = self.poll()
  1093. if exit_code is not None:
  1094. return exit_code
  1095. if time.time() - start > timeout:
  1096. raise TimeoutError(f"Process {self.id} did not exit within {timeout}s")
  1097. time.sleep(0.1)
  1098. def terminate(self, graceful_timeout: float = 5.0) -> bool:
  1099. """
  1100. Gracefully terminate process: SIGTERM → wait → SIGKILL.
  1101. This consolidates the scattered SIGTERM/SIGKILL logic from:
  1102. - crawls/models.py Crawl.cleanup()
  1103. - workers/pid_utils.py stop_worker()
  1104. - supervisord_util.py stop_existing_supervisord_process()
  1105. Args:
  1106. graceful_timeout: Seconds to wait after SIGTERM before SIGKILL
  1107. Returns:
  1108. True if process was terminated, False if already dead
  1109. """
  1110. import time
  1111. import signal
  1112. proc = self.proc
  1113. if proc is None:
  1114. # Already dead - just update status
  1115. if self.status != self.StatusChoices.EXITED:
  1116. self.status = self.StatusChoices.EXITED
  1117. self.ended_at = self.ended_at or timezone.now()
  1118. self.save()
  1119. return False
  1120. try:
  1121. # Step 1: Send SIGTERM for graceful shutdown
  1122. proc.terminate()
  1123. # Step 2: Wait for graceful exit
  1124. try:
  1125. exit_status = proc.wait(timeout=graceful_timeout)
  1126. # Process exited gracefully
  1127. # psutil.Process.wait() returns the exit status
  1128. self.exit_code = exit_status if exit_status is not None else 0
  1129. self.status = self.StatusChoices.EXITED
  1130. self.ended_at = timezone.now()
  1131. self.save()
  1132. return True
  1133. except psutil.TimeoutExpired:
  1134. pass # Still running, need to force kill
  1135. # Step 3: Force kill with SIGKILL
  1136. proc.kill()
  1137. proc.wait(timeout=2)
  1138. # Use standard Unix convention: 128 + signal number
  1139. self.exit_code = 128 + signal.SIGKILL
  1140. self.status = self.StatusChoices.EXITED
  1141. self.ended_at = timezone.now()
  1142. self.save()
  1143. return True
  1144. except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess):
  1145. # Process already dead
  1146. self.status = self.StatusChoices.EXITED
  1147. self.ended_at = self.ended_at or timezone.now()
  1148. self.save()
  1149. return False
  1150. def kill_tree(self, graceful_timeout: float = 2.0) -> int:
  1151. """
  1152. Kill this process and all its children (OS children, not DB children).
  1153. This consolidates the scattered child-killing logic from:
  1154. - crawls/models.py Crawl.cleanup() os.killpg()
  1155. - supervisord_util.py stop_existing_supervisord_process()
  1156. Args:
  1157. graceful_timeout: Seconds to wait after SIGTERM before SIGKILL
  1158. Returns:
  1159. Number of processes killed (including self)
  1160. """
  1161. import signal
  1162. killed_count = 0
  1163. proc = self.proc
  1164. if proc is None:
  1165. # Already dead
  1166. if self.status != self.StatusChoices.EXITED:
  1167. self.status = self.StatusChoices.EXITED
  1168. self.ended_at = self.ended_at or timezone.now()
  1169. self.save()
  1170. return 0
  1171. try:
  1172. # Get all children before killing parent
  1173. children = proc.children(recursive=True)
  1174. # Kill children first (reverse order - deepest first)
  1175. for child in reversed(children):
  1176. try:
  1177. child.terminate()
  1178. except (psutil.NoSuchProcess, psutil.AccessDenied):
  1179. # Child already dead or we don't have permission - continue
  1180. pass
  1181. # Wait briefly for children to exit
  1182. gone, alive = psutil.wait_procs(children, timeout=graceful_timeout)
  1183. killed_count += len(gone)
  1184. # Force kill remaining children
  1185. for child in alive:
  1186. try:
  1187. child.kill()
  1188. killed_count += 1
  1189. except (psutil.NoSuchProcess, psutil.AccessDenied):
  1190. # Child exited or we don't have permission - continue
  1191. pass
  1192. # Now kill self
  1193. if self.terminate(graceful_timeout=graceful_timeout):
  1194. killed_count += 1
  1195. return killed_count
  1196. except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess):
  1197. # Process tree already dead
  1198. self.status = self.StatusChoices.EXITED
  1199. self.ended_at = self.ended_at or timezone.now()
  1200. self.save()
  1201. return killed_count
  1202. def kill_children_db(self) -> int:
  1203. """
  1204. Kill all DB-tracked child processes (via parent FK).
  1205. Different from kill_tree() which uses OS children.
  1206. This kills processes created via Process.create(parent=self).
  1207. Returns:
  1208. Number of child Process records killed
  1209. """
  1210. killed = 0
  1211. for child in self.children.filter(status=self.StatusChoices.RUNNING):
  1212. if child.terminate():
  1213. killed += 1
  1214. return killed
  1215. # =========================================================================
  1216. # Class methods for querying processes
  1217. # =========================================================================
  1218. @classmethod
  1219. def get_running(cls, process_type: str = None, machine: 'Machine' = None) -> 'QuerySet[Process]':
  1220. """
  1221. Get all running processes, optionally filtered by type.
  1222. Replaces:
  1223. - workers/pid_utils.py get_all_worker_pids()
  1224. - workers/orchestrator.py get_total_worker_count()
  1225. Args:
  1226. process_type: Filter by TypeChoices (e.g., 'worker', 'hook')
  1227. machine: Filter by machine (defaults to current)
  1228. Returns:
  1229. QuerySet of running Process records
  1230. """
  1231. machine = machine or Machine.current()
  1232. qs = cls.objects.filter(
  1233. machine=machine,
  1234. status=cls.StatusChoices.RUNNING,
  1235. )
  1236. if process_type:
  1237. qs = qs.filter(process_type=process_type)
  1238. return qs
  1239. @classmethod
  1240. def get_running_count(cls, process_type: str = None, machine: 'Machine' = None) -> int:
  1241. """
  1242. Get count of running processes.
  1243. Replaces:
  1244. - workers/pid_utils.py get_running_worker_count()
  1245. """
  1246. return cls.get_running(process_type=process_type, machine=machine).count()
  1247. @classmethod
  1248. def stop_all(cls, process_type: str = None, machine: 'Machine' = None, graceful: bool = True) -> int:
  1249. """
  1250. Stop all running processes of a given type.
  1251. Args:
  1252. process_type: Filter by TypeChoices
  1253. machine: Filter by machine
  1254. graceful: If True, use terminate() (SIGTERM→SIGKILL), else kill()
  1255. Returns:
  1256. Number of processes stopped
  1257. """
  1258. stopped = 0
  1259. for proc in cls.get_running(process_type=process_type, machine=machine):
  1260. if graceful:
  1261. if proc.terminate():
  1262. stopped += 1
  1263. else:
  1264. if proc.kill():
  1265. stopped += 1
  1266. return stopped
  1267. @classmethod
  1268. def get_next_worker_id(cls, process_type: str = 'worker', machine: 'Machine' = None) -> int:
  1269. """
  1270. Get the next available worker ID for spawning new workers.
  1271. Replaces workers/pid_utils.py get_next_worker_id().
  1272. Simply returns count of running workers of this type.
  1273. Args:
  1274. process_type: Worker type to count
  1275. machine: Machine to scope query
  1276. Returns:
  1277. Next available worker ID (0-indexed)
  1278. """
  1279. return cls.get_running_count(process_type=process_type, machine=machine)
  1280. # =============================================================================
  1281. # Binary State Machine
  1282. # =============================================================================
  1283. class BinaryMachine(BaseStateMachine, strict_states=True):
  1284. """
  1285. State machine for managing Binary installation lifecycle.
  1286. Hook Lifecycle:
  1287. ┌─────────────────────────────────────────────────────────────┐
  1288. │ QUEUED State │
  1289. │ • Binary needs to be installed │
  1290. └─────────────────────────────────────────────────────────────┘
  1291. ↓ tick() when can_start()
  1292. ┌─────────────────────────────────────────────────────────────┐
  1293. │ STARTED State → enter_started() │
  1294. │ 1. binary.run() │
  1295. │ • discover_hooks('Binary') → all on_Binary__install_* │
  1296. │ • Try each provider hook in sequence: │
  1297. │ - run_hook(script, output_dir, ...) │
  1298. │ - If returncode == 0: │
  1299. │ * Read stdout.log │
  1300. │ * Parse JSONL for 'Binary' record with abspath │
  1301. │ * Update self: abspath, version, sha256, provider │
  1302. │ * Set status=SUCCEEDED, RETURN │
  1303. │ • If no hook succeeds: set status=FAILED │
  1304. └─────────────────────────────────────────────────────────────┘
  1305. ↓ tick() checks status
  1306. ┌─────────────────────────────────────────────────────────────┐
  1307. │ SUCCEEDED / FAILED │
  1308. │ • Set by binary.run() based on hook results │
  1309. │ • Health stats incremented (num_uses_succeeded/failed) │
  1310. └─────────────────────────────────────────────────────────────┘
  1311. """
  1312. model_attr_name = 'binary'
  1313. # States
  1314. queued = State(value=Binary.StatusChoices.QUEUED, initial=True)
  1315. started = State(value=Binary.StatusChoices.STARTED)
  1316. succeeded = State(value=Binary.StatusChoices.SUCCEEDED, final=True)
  1317. failed = State(value=Binary.StatusChoices.FAILED, final=True)
  1318. # Tick Event - transitions based on conditions
  1319. tick = (
  1320. queued.to.itself(unless='can_start') |
  1321. queued.to(started, cond='can_start') |
  1322. started.to.itself(unless='is_finished') |
  1323. started.to(succeeded, cond='is_succeeded') |
  1324. started.to(failed, cond='is_failed')
  1325. )
  1326. def can_start(self) -> bool:
  1327. """Check if binary installation can start."""
  1328. return bool(self.binary.name and self.binary.binproviders)
  1329. def is_succeeded(self) -> bool:
  1330. """Check if installation succeeded (status was set by run())."""
  1331. return self.binary.status == Binary.StatusChoices.SUCCEEDED
  1332. def is_failed(self) -> bool:
  1333. """Check if installation failed (status was set by run())."""
  1334. return self.binary.status == Binary.StatusChoices.FAILED
  1335. def is_finished(self) -> bool:
  1336. """Check if installation has completed (success or failure)."""
  1337. return self.binary.status in (
  1338. Binary.StatusChoices.SUCCEEDED,
  1339. Binary.StatusChoices.FAILED,
  1340. )
  1341. @queued.enter
  1342. def enter_queued(self):
  1343. """Binary is queued for installation."""
  1344. self.binary.update_and_requeue(
  1345. retry_at=timezone.now(),
  1346. status=Binary.StatusChoices.QUEUED,
  1347. )
  1348. @started.enter
  1349. def enter_started(self):
  1350. """Start binary installation."""
  1351. # Lock the binary while installation runs
  1352. self.binary.update_and_requeue(
  1353. retry_at=timezone.now() + timedelta(seconds=300), # 5 min timeout for installation
  1354. status=Binary.StatusChoices.STARTED,
  1355. )
  1356. # Run installation hooks
  1357. self.binary.run()
  1358. # Save updated status (run() updates status to succeeded/failed)
  1359. self.binary.save()
  1360. @succeeded.enter
  1361. def enter_succeeded(self):
  1362. """Binary installed successfully."""
  1363. self.binary.update_and_requeue(
  1364. retry_at=None,
  1365. status=Binary.StatusChoices.SUCCEEDED,
  1366. )
  1367. # Increment health stats
  1368. self.binary.increment_health_stats(success=True)
  1369. @failed.enter
  1370. def enter_failed(self):
  1371. """Binary installation failed."""
  1372. self.binary.update_and_requeue(
  1373. retry_at=None,
  1374. status=Binary.StatusChoices.FAILED,
  1375. )
  1376. # Increment health stats
  1377. self.binary.increment_health_stats(success=False)
  1378. # =============================================================================
  1379. # Process State Machine
  1380. # =============================================================================
  1381. class ProcessMachine(BaseStateMachine, strict_states=True):
  1382. """
  1383. State machine for managing Process (OS subprocess) lifecycle.
  1384. Process Lifecycle:
  1385. ┌─────────────────────────────────────────────────────────────┐
  1386. │ QUEUED State │
  1387. │ • Process ready to launch, waiting for resources │
  1388. └─────────────────────────────────────────────────────────────┘
  1389. ↓ tick() when can_start()
  1390. ┌─────────────────────────────────────────────────────────────┐
  1391. │ RUNNING State → enter_running() │
  1392. │ 1. process.launch() │
  1393. │ • Spawn subprocess with cmd, pwd, env, timeout │
  1394. │ • Set pid, started_at │
  1395. │ • Process runs in background or foreground │
  1396. │ 2. Monitor process completion │
  1397. │ • Check exit code when process completes │
  1398. └─────────────────────────────────────────────────────────────┘
  1399. ↓ tick() checks is_exited()
  1400. ┌─────────────────────────────────────────────────────────────┐
  1401. │ EXITED State │
  1402. │ • Process completed (exit_code set) │
  1403. │ • Health stats incremented │
  1404. │ • stdout/stderr captured │
  1405. └─────────────────────────────────────────────────────────────┘
  1406. Note: This is a simpler state machine than ArchiveResult.
  1407. Process is just about execution lifecycle. ArchiveResult handles
  1408. the archival-specific logic (status, output parsing, etc.).
  1409. """
  1410. model_attr_name = 'process'
  1411. # States
  1412. queued = State(value=Process.StatusChoices.QUEUED, initial=True)
  1413. running = State(value=Process.StatusChoices.RUNNING)
  1414. exited = State(value=Process.StatusChoices.EXITED, final=True)
  1415. # Tick Event - transitions based on conditions
  1416. tick = (
  1417. queued.to.itself(unless='can_start') |
  1418. queued.to(running, cond='can_start') |
  1419. running.to.itself(unless='is_exited') |
  1420. running.to(exited, cond='is_exited')
  1421. )
  1422. # Additional events (for explicit control)
  1423. launch = queued.to(running)
  1424. kill = running.to(exited)
  1425. def can_start(self) -> bool:
  1426. """Check if process can start (has cmd and machine)."""
  1427. return bool(self.process.cmd and self.process.machine)
  1428. def is_exited(self) -> bool:
  1429. """Check if process has exited (exit_code is set)."""
  1430. return self.process.exit_code is not None
  1431. @queued.enter
  1432. def enter_queued(self):
  1433. """Process is queued for execution."""
  1434. self.process.update_and_requeue(
  1435. retry_at=timezone.now(),
  1436. status=Process.StatusChoices.QUEUED,
  1437. )
  1438. @running.enter
  1439. def enter_running(self):
  1440. """Start process execution."""
  1441. # Lock the process while it runs
  1442. self.process.update_and_requeue(
  1443. retry_at=timezone.now() + timedelta(seconds=self.process.timeout),
  1444. status=Process.StatusChoices.RUNNING,
  1445. started_at=timezone.now(),
  1446. )
  1447. # Launch the subprocess
  1448. # NOTE: This is a placeholder - actual launch logic would
  1449. # be implemented based on how hooks currently spawn processes
  1450. # For now, Process is a data model that tracks execution metadata
  1451. # The actual subprocess spawning is still handled by run_hook()
  1452. # Mark as immediately exited for now (until we refactor run_hook)
  1453. # In the future, this would actually spawn the subprocess
  1454. self.process.exit_code = 0 # Placeholder
  1455. self.process.save()
  1456. @exited.enter
  1457. def enter_exited(self):
  1458. """Process has exited."""
  1459. success = self.process.exit_code == 0
  1460. self.process.update_and_requeue(
  1461. retry_at=None,
  1462. status=Process.StatusChoices.EXITED,
  1463. ended_at=timezone.now(),
  1464. )
  1465. # Increment health stats based on exit code
  1466. self.process.increment_health_stats(success=success)
  1467. # =============================================================================
  1468. # State Machine Registration
  1469. # =============================================================================
  1470. # Manually register state machines with python-statemachine registry
  1471. registry.register(BinaryMachine)
  1472. registry.register(ProcessMachine)