2
0

models.py 36 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896
  1. __package__ = 'archivebox.machine'
  2. import socket
  3. from archivebox.uuid_compat import uuid7
  4. from datetime import timedelta
  5. from statemachine import State, registry
  6. from django.db import models
  7. from django.utils import timezone
  8. from django.utils.functional import cached_property
  9. from archivebox.base_models.models import ModelWithHealthStats
  10. from archivebox.workers.models import BaseStateMachine
  11. from .detect import get_host_guid, get_os_info, get_vm_info, get_host_network, get_host_stats
  12. _CURRENT_MACHINE = None
  13. _CURRENT_INTERFACE = None
  14. _CURRENT_BINARIES = {}
  15. MACHINE_RECHECK_INTERVAL = 7 * 24 * 60 * 60
  16. NETWORK_INTERFACE_RECHECK_INTERVAL = 1 * 60 * 60
  17. BINARY_RECHECK_INTERVAL = 1 * 30 * 60
  18. class MachineManager(models.Manager):
  19. def current(self) -> 'Machine':
  20. return Machine.current()
  21. class Machine(ModelWithHealthStats):
  22. id = models.UUIDField(primary_key=True, default=uuid7, editable=False, unique=True)
  23. created_at = models.DateTimeField(default=timezone.now, db_index=True)
  24. modified_at = models.DateTimeField(auto_now=True)
  25. guid = models.CharField(max_length=64, default=None, null=False, unique=True, editable=False)
  26. hostname = models.CharField(max_length=63, default=None, null=False)
  27. hw_in_docker = models.BooleanField(default=False, null=False)
  28. hw_in_vm = models.BooleanField(default=False, null=False)
  29. hw_manufacturer = models.CharField(max_length=63, default=None, null=False)
  30. hw_product = models.CharField(max_length=63, default=None, null=False)
  31. hw_uuid = models.CharField(max_length=255, default=None, null=False)
  32. os_arch = models.CharField(max_length=15, default=None, null=False)
  33. os_family = models.CharField(max_length=15, default=None, null=False)
  34. os_platform = models.CharField(max_length=63, default=None, null=False)
  35. os_release = models.CharField(max_length=63, default=None, null=False)
  36. os_kernel = models.CharField(max_length=255, default=None, null=False)
  37. stats = models.JSONField(default=dict, null=True, blank=True)
  38. config = models.JSONField(default=dict, null=True, blank=True,
  39. help_text="Machine-specific config overrides (e.g., resolved binary paths like WGET_BINARY)")
  40. num_uses_failed = models.PositiveIntegerField(default=0)
  41. num_uses_succeeded = models.PositiveIntegerField(default=0)
  42. objects: MachineManager = MachineManager()
  43. networkinterface_set: models.Manager['NetworkInterface']
  44. class Meta:
  45. app_label = 'machine'
  46. @classmethod
  47. def current(cls) -> 'Machine':
  48. global _CURRENT_MACHINE
  49. if _CURRENT_MACHINE:
  50. if timezone.now() < _CURRENT_MACHINE.modified_at + timedelta(seconds=MACHINE_RECHECK_INTERVAL):
  51. return _CURRENT_MACHINE
  52. _CURRENT_MACHINE = None
  53. _CURRENT_MACHINE, _ = cls.objects.update_or_create(
  54. guid=get_host_guid(),
  55. defaults={'hostname': socket.gethostname(), **get_os_info(), **get_vm_info(), 'stats': get_host_stats()},
  56. )
  57. return _CURRENT_MACHINE
  58. @staticmethod
  59. def from_jsonl(record: dict, overrides: dict = None):
  60. """
  61. Update Machine config from JSONL record.
  62. Args:
  63. record: JSONL record with '_method': 'update', 'key': '...', 'value': '...'
  64. overrides: Not used
  65. Returns:
  66. Machine instance or None
  67. """
  68. method = record.get('_method')
  69. if method == 'update':
  70. key = record.get('key')
  71. value = record.get('value')
  72. if key and value:
  73. machine = Machine.current()
  74. if not machine.config:
  75. machine.config = {}
  76. machine.config[key] = value
  77. machine.save(update_fields=['config'])
  78. return machine
  79. return None
  80. class NetworkInterfaceManager(models.Manager):
  81. def current(self) -> 'NetworkInterface':
  82. return NetworkInterface.current()
  83. class NetworkInterface(ModelWithHealthStats):
  84. id = models.UUIDField(primary_key=True, default=uuid7, editable=False, unique=True)
  85. created_at = models.DateTimeField(default=timezone.now, db_index=True)
  86. modified_at = models.DateTimeField(auto_now=True)
  87. machine = models.ForeignKey(Machine, on_delete=models.CASCADE, default=None, null=False)
  88. mac_address = models.CharField(max_length=17, default=None, null=False, editable=False)
  89. ip_public = models.GenericIPAddressField(default=None, null=False, editable=False)
  90. ip_local = models.GenericIPAddressField(default=None, null=False, editable=False)
  91. dns_server = models.GenericIPAddressField(default=None, null=False, editable=False)
  92. hostname = models.CharField(max_length=63, default=None, null=False)
  93. iface = models.CharField(max_length=15, default=None, null=False)
  94. isp = models.CharField(max_length=63, default=None, null=False)
  95. city = models.CharField(max_length=63, default=None, null=False)
  96. region = models.CharField(max_length=63, default=None, null=False)
  97. country = models.CharField(max_length=63, default=None, null=False)
  98. num_uses_failed = models.PositiveIntegerField(default=0)
  99. num_uses_succeeded = models.PositiveIntegerField(default=0)
  100. objects: NetworkInterfaceManager = NetworkInterfaceManager()
  101. class Meta:
  102. app_label = 'machine'
  103. unique_together = (('machine', 'ip_public', 'ip_local', 'mac_address', 'dns_server'),)
  104. @classmethod
  105. def current(cls) -> 'NetworkInterface':
  106. global _CURRENT_INTERFACE
  107. if _CURRENT_INTERFACE:
  108. if timezone.now() < _CURRENT_INTERFACE.modified_at + timedelta(seconds=NETWORK_INTERFACE_RECHECK_INTERVAL):
  109. return _CURRENT_INTERFACE
  110. _CURRENT_INTERFACE = None
  111. machine = Machine.objects.current()
  112. net_info = get_host_network()
  113. _CURRENT_INTERFACE, _ = cls.objects.update_or_create(
  114. machine=machine, ip_public=net_info.pop('ip_public'), ip_local=net_info.pop('ip_local'),
  115. mac_address=net_info.pop('mac_address'), dns_server=net_info.pop('dns_server'), defaults=net_info,
  116. )
  117. return _CURRENT_INTERFACE
  118. class BinaryManager(models.Manager):
  119. def get_from_db_or_cache(self, name: str, abspath: str = '', version: str = '', sha256: str = '', binprovider: str = 'env') -> 'Binary':
  120. """Get or create an Binary record from the database or cache."""
  121. global _CURRENT_BINARIES
  122. cached = _CURRENT_BINARIES.get(name)
  123. if cached and timezone.now() < cached.modified_at + timedelta(seconds=BINARY_RECHECK_INTERVAL):
  124. return cached
  125. _CURRENT_BINARIES[name], _ = self.update_or_create(
  126. machine=Machine.objects.current(), name=name, binprovider=binprovider,
  127. version=version, abspath=abspath, sha256=sha256,
  128. )
  129. return _CURRENT_BINARIES[name]
  130. def get_valid_binary(self, name: str, machine: 'Machine | None' = None) -> 'Binary | None':
  131. """Get a valid Binary for the given name on the current machine, or None if not found."""
  132. machine = machine or Machine.current()
  133. return self.filter(
  134. machine=machine,
  135. name__iexact=name,
  136. ).exclude(abspath='').exclude(abspath__isnull=True).order_by('-modified_at').first()
  137. class Binary(ModelWithHealthStats):
  138. """
  139. Tracks an binary on a specific machine.
  140. Follows the unified state machine pattern:
  141. - queued: Binary needs to be installed
  142. - started: Installation in progress
  143. - succeeded: Binary installed successfully (abspath, version, sha256 populated)
  144. - failed: Installation failed
  145. State machine calls run() which executes on_Binary__install_* hooks
  146. to install the binary using the specified providers.
  147. """
  148. class StatusChoices(models.TextChoices):
  149. QUEUED = 'queued', 'Queued'
  150. STARTED = 'started', 'Started'
  151. SUCCEEDED = 'succeeded', 'Succeeded'
  152. FAILED = 'failed', 'Failed'
  153. id = models.UUIDField(primary_key=True, default=uuid7, editable=False, unique=True)
  154. created_at = models.DateTimeField(default=timezone.now, db_index=True)
  155. modified_at = models.DateTimeField(auto_now=True)
  156. machine = models.ForeignKey(Machine, on_delete=models.CASCADE, null=False)
  157. # Binary metadata
  158. name = models.CharField(max_length=63, default='', null=False, blank=True, db_index=True)
  159. binproviders = models.CharField(max_length=127, default='env', null=False, blank=True,
  160. help_text="Comma-separated list of allowed providers: apt,brew,pip,npm,env")
  161. overrides = models.JSONField(default=dict, blank=True,
  162. help_text="Provider-specific overrides: {'apt': {'packages': ['pkg']}, ...}")
  163. # Installation results (populated after installation)
  164. binprovider = models.CharField(max_length=31, default='', null=False, blank=True,
  165. help_text="Provider that successfully installed this binary")
  166. abspath = models.CharField(max_length=255, default='', null=False, blank=True)
  167. version = models.CharField(max_length=32, default='', null=False, blank=True)
  168. sha256 = models.CharField(max_length=64, default='', null=False, blank=True)
  169. # State machine fields
  170. status = models.CharField(max_length=16, choices=StatusChoices.choices, default=StatusChoices.QUEUED, db_index=True)
  171. retry_at = models.DateTimeField(default=timezone.now, null=True, blank=True, db_index=True,
  172. help_text="When to retry this binary installation")
  173. output_dir = models.CharField(max_length=255, default='', null=False, blank=True,
  174. help_text="Directory where installation hook logs are stored")
  175. # Health stats
  176. num_uses_failed = models.PositiveIntegerField(default=0)
  177. num_uses_succeeded = models.PositiveIntegerField(default=0)
  178. state_machine_name: str = 'archivebox.machine.models.BinaryMachine'
  179. objects: BinaryManager = BinaryManager()
  180. class Meta:
  181. app_label = 'machine'
  182. verbose_name = 'Binary'
  183. verbose_name_plural = 'Binaries'
  184. unique_together = (('machine', 'name', 'abspath', 'version', 'sha256'),)
  185. def __str__(self) -> str:
  186. return f'{self.name}@{self.binprovider}+{self.abspath}@{self.version}'
  187. @property
  188. def is_valid(self) -> bool:
  189. """A binary is valid if it has both abspath and version set."""
  190. return bool(self.abspath) and bool(self.version)
  191. @cached_property
  192. def binary_info(self) -> dict:
  193. """Return info about the binary."""
  194. return {
  195. 'name': self.name,
  196. 'abspath': self.abspath,
  197. 'version': self.version,
  198. 'binprovider': self.binprovider,
  199. 'is_valid': self.is_valid,
  200. }
  201. def to_jsonl(self) -> dict:
  202. """
  203. Convert Binary model instance to a JSONL record.
  204. """
  205. from archivebox.config import VERSION
  206. return {
  207. 'type': 'Binary',
  208. 'schema_version': VERSION,
  209. 'id': str(self.id),
  210. 'machine_id': str(self.machine_id),
  211. 'name': self.name,
  212. 'binprovider': self.binprovider,
  213. 'abspath': self.abspath,
  214. 'version': self.version,
  215. 'sha256': self.sha256,
  216. 'status': self.status,
  217. }
  218. @staticmethod
  219. def from_jsonl(record: dict, overrides: dict = None):
  220. """
  221. Create/update Binary from JSONL record.
  222. Handles two cases:
  223. 1. From binaries.jsonl: creates queued binary with name, binproviders, overrides
  224. 2. From hook output: updates binary with abspath, version, sha256, binprovider
  225. Args:
  226. record: JSONL record with 'name' and either:
  227. - 'binproviders', 'overrides' (from binaries.jsonl)
  228. - 'abspath', 'version', 'sha256', 'binprovider' (from hook output)
  229. overrides: Not used
  230. Returns:
  231. Binary instance or None
  232. """
  233. name = record.get('name')
  234. if not name:
  235. return None
  236. machine = Machine.current()
  237. overrides = overrides or {}
  238. # Case 1: From binaries.jsonl - create queued binary
  239. if 'binproviders' in record or ('overrides' in record and not record.get('abspath')):
  240. binary, created = Binary.objects.get_or_create(
  241. machine=machine,
  242. name=name,
  243. defaults={
  244. 'binproviders': record.get('binproviders', 'env'),
  245. 'overrides': record.get('overrides', {}),
  246. 'status': Binary.StatusChoices.QUEUED,
  247. 'retry_at': timezone.now(),
  248. }
  249. )
  250. return binary
  251. # Case 2: From hook output - update with installation results
  252. abspath = record.get('abspath')
  253. version = record.get('version')
  254. if not abspath or not version:
  255. return None
  256. binary, _ = Binary.objects.update_or_create(
  257. machine=machine,
  258. name=name,
  259. defaults={
  260. 'abspath': abspath,
  261. 'version': version,
  262. 'sha256': record.get('sha256', ''),
  263. 'binprovider': record.get('binprovider', 'env'),
  264. 'status': Binary.StatusChoices.SUCCEEDED,
  265. 'retry_at': None,
  266. }
  267. )
  268. return binary
  269. @property
  270. def OUTPUT_DIR(self):
  271. """Return the output directory for this binary installation."""
  272. from pathlib import Path
  273. from django.conf import settings
  274. DATA_DIR = getattr(settings, 'DATA_DIR', Path.cwd())
  275. return Path(DATA_DIR) / 'machines' / str(self.machine_id) / 'binaries' / self.name / str(self.id)
  276. def update_and_requeue(self, **kwargs):
  277. """
  278. Update binary fields and requeue for worker state machine.
  279. Sets modified_at to ensure workers pick up changes.
  280. Always saves the model after updating.
  281. """
  282. for key, value in kwargs.items():
  283. setattr(self, key, value)
  284. self.modified_at = timezone.now()
  285. self.save()
  286. def run(self):
  287. """
  288. Execute binary installation by running on_Binary__install_* hooks.
  289. Called by BinaryMachine when entering 'started' state.
  290. Runs ALL on_Binary__install_* hooks - each hook checks binproviders
  291. and decides if it can handle this binary. First hook to succeed wins.
  292. Updates status to SUCCEEDED or FAILED based on hook output.
  293. """
  294. import json
  295. from archivebox.hooks import discover_hooks, run_hook
  296. from archivebox.config.configset import get_config
  297. # Get merged config (Binary doesn't have crawl/snapshot context)
  298. config = get_config(scope='global')
  299. # Create output directory
  300. output_dir = self.OUTPUT_DIR
  301. output_dir.mkdir(parents=True, exist_ok=True)
  302. self.output_dir = str(output_dir)
  303. self.save()
  304. # Discover ALL on_Binary__install_* hooks
  305. hooks = discover_hooks('Binary', config=config)
  306. if not hooks:
  307. self.status = self.StatusChoices.FAILED
  308. self.save()
  309. return
  310. # Run each hook - they decide if they can handle this binary
  311. for hook in hooks:
  312. plugin_name = hook.parent.name
  313. plugin_output_dir = output_dir / plugin_name
  314. plugin_output_dir.mkdir(parents=True, exist_ok=True)
  315. # Build kwargs for hook
  316. hook_kwargs = {
  317. 'binary_id': str(self.id),
  318. 'machine_id': str(self.machine_id),
  319. 'name': self.name,
  320. 'binproviders': self.binproviders,
  321. }
  322. # Add overrides as JSON string if present
  323. if self.overrides:
  324. hook_kwargs['overrides'] = json.dumps(self.overrides)
  325. # Run the hook
  326. result = run_hook(
  327. hook,
  328. output_dir=plugin_output_dir,
  329. config=config,
  330. timeout=600, # 10 min timeout for binary installation
  331. **hook_kwargs
  332. )
  333. # Background hook (unlikely for binary installation, but handle it)
  334. if result is None:
  335. continue
  336. # Failed or skipped hook - try next one
  337. if result['returncode'] != 0:
  338. continue
  339. # Parse JSONL output to check for successful installation
  340. stdout_file = plugin_output_dir / 'stdout.log'
  341. if stdout_file.exists():
  342. stdout = stdout_file.read_text()
  343. for line in stdout.splitlines():
  344. if line.strip() and line.strip().startswith('{'):
  345. try:
  346. record = json.loads(line)
  347. if record.get('type') == 'Binary' and record.get('abspath'):
  348. # Update self from successful installation
  349. self.abspath = record['abspath']
  350. self.version = record.get('version', '')
  351. self.sha256 = record.get('sha256', '')
  352. self.binprovider = record.get('binprovider', 'env')
  353. self.status = self.StatusChoices.SUCCEEDED
  354. self.save()
  355. return
  356. except json.JSONDecodeError:
  357. continue
  358. # No hook succeeded
  359. self.status = self.StatusChoices.FAILED
  360. self.save()
  361. def cleanup(self):
  362. """
  363. Clean up background binary installation hooks.
  364. Called by state machine if needed (not typically used for binaries
  365. since installations are foreground, but included for consistency).
  366. """
  367. from pathlib import Path
  368. from archivebox.hooks import kill_process
  369. output_dir = self.OUTPUT_DIR
  370. if not output_dir.exists():
  371. return
  372. # Kill any background hooks
  373. for plugin_dir in output_dir.iterdir():
  374. if not plugin_dir.is_dir():
  375. continue
  376. pid_file = plugin_dir / 'hook.pid'
  377. if pid_file.exists():
  378. kill_process(pid_file)
  379. # =============================================================================
  380. # Process Model
  381. # =============================================================================
  382. class ProcessManager(models.Manager):
  383. """Manager for Process model."""
  384. def create_for_archiveresult(self, archiveresult, **kwargs):
  385. """
  386. Create a Process record for an ArchiveResult.
  387. Called during migration and when creating new ArchiveResults.
  388. """
  389. # Defaults from ArchiveResult if not provided
  390. defaults = {
  391. 'machine': Machine.current(),
  392. 'pwd': kwargs.get('pwd') or str(archiveresult.snapshot.output_dir / archiveresult.plugin),
  393. 'cmd': kwargs.get('cmd') or [],
  394. 'status': 'queued',
  395. 'timeout': kwargs.get('timeout', 120),
  396. 'env': kwargs.get('env', {}),
  397. }
  398. defaults.update(kwargs)
  399. process = self.create(**defaults)
  400. return process
  401. class Process(ModelWithHealthStats):
  402. """
  403. Tracks a single OS process execution.
  404. Process represents the actual subprocess spawned to execute a hook.
  405. One Process can optionally be associated with an ArchiveResult (via OneToOne),
  406. but Process can also exist standalone for internal operations.
  407. Follows the unified state machine pattern:
  408. - queued: Process ready to launch
  409. - running: Process actively executing
  410. - exited: Process completed (check exit_code for success/failure)
  411. State machine calls launch() to spawn the process and monitors its lifecycle.
  412. """
  413. class StatusChoices(models.TextChoices):
  414. QUEUED = 'queued', 'Queued'
  415. RUNNING = 'running', 'Running'
  416. EXITED = 'exited', 'Exited'
  417. # Primary fields
  418. id = models.UUIDField(primary_key=True, default=uuid7, editable=False, unique=True)
  419. created_at = models.DateTimeField(default=timezone.now, db_index=True)
  420. modified_at = models.DateTimeField(auto_now=True)
  421. # Machine FK - required (every process runs on a machine)
  422. machine = models.ForeignKey(
  423. Machine,
  424. on_delete=models.CASCADE,
  425. null=False,
  426. related_name='process_set',
  427. help_text='Machine where this process executed'
  428. )
  429. # Execution metadata
  430. pwd = models.CharField(max_length=512, default='', null=False, blank=True,
  431. help_text='Working directory for process execution')
  432. cmd = models.JSONField(default=list, null=False, blank=True,
  433. help_text='Command as array of arguments')
  434. env = models.JSONField(default=dict, null=False, blank=True,
  435. help_text='Environment variables for process')
  436. timeout = models.IntegerField(default=120, null=False,
  437. help_text='Timeout in seconds')
  438. # Process results
  439. pid = models.IntegerField(default=None, null=True, blank=True,
  440. help_text='OS process ID')
  441. exit_code = models.IntegerField(default=None, null=True, blank=True,
  442. help_text='Process exit code (0 = success)')
  443. stdout = models.TextField(default='', null=False, blank=True,
  444. help_text='Standard output from process')
  445. stderr = models.TextField(default='', null=False, blank=True,
  446. help_text='Standard error from process')
  447. # Timing
  448. started_at = models.DateTimeField(default=None, null=True, blank=True,
  449. help_text='When process was launched')
  450. ended_at = models.DateTimeField(default=None, null=True, blank=True,
  451. help_text='When process completed/terminated')
  452. # Optional FKs
  453. binary = models.ForeignKey(
  454. Binary,
  455. on_delete=models.SET_NULL,
  456. null=True, blank=True,
  457. related_name='process_set',
  458. help_text='Binary used by this process'
  459. )
  460. iface = models.ForeignKey(
  461. NetworkInterface,
  462. on_delete=models.SET_NULL,
  463. null=True, blank=True,
  464. related_name='process_set',
  465. help_text='Network interface used by this process'
  466. )
  467. # Optional connection URL (for CDP, sonic, etc.)
  468. url = models.URLField(max_length=2048, default=None, null=True, blank=True,
  469. help_text='Connection URL (CDP endpoint, sonic server, etc.)')
  470. # Reverse relation to ArchiveResult (OneToOne from AR side)
  471. # archiveresult: OneToOneField defined on ArchiveResult model
  472. # State machine fields
  473. status = models.CharField(
  474. max_length=16,
  475. choices=StatusChoices.choices,
  476. default=StatusChoices.QUEUED,
  477. db_index=True
  478. )
  479. retry_at = models.DateTimeField(
  480. default=timezone.now,
  481. null=True, blank=True,
  482. db_index=True,
  483. help_text='When to retry this process'
  484. )
  485. # Health stats
  486. num_uses_failed = models.PositiveIntegerField(default=0)
  487. num_uses_succeeded = models.PositiveIntegerField(default=0)
  488. state_machine_name: str = 'archivebox.machine.models.ProcessMachine'
  489. objects: ProcessManager = ProcessManager()
  490. class Meta:
  491. app_label = 'machine'
  492. verbose_name = 'Process'
  493. verbose_name_plural = 'Processes'
  494. indexes = [
  495. models.Index(fields=['machine', 'status', 'retry_at']),
  496. models.Index(fields=['binary', 'exit_code']),
  497. ]
  498. def __str__(self) -> str:
  499. cmd_str = ' '.join(self.cmd[:3]) if self.cmd else '(no cmd)'
  500. return f'Process[{self.id}] {cmd_str} ({self.status})'
  501. # Properties that delegate to related objects
  502. @property
  503. def cmd_version(self) -> str:
  504. """Get version from associated binary."""
  505. return self.binary.version if self.binary else ''
  506. @property
  507. def bin_abspath(self) -> str:
  508. """Get absolute path from associated binary."""
  509. return self.binary.abspath if self.binary else ''
  510. @property
  511. def plugin(self) -> str:
  512. """Get plugin name from associated ArchiveResult (if any)."""
  513. if hasattr(self, 'archiveresult'):
  514. # Inline import to avoid circular dependency
  515. return self.archiveresult.plugin
  516. return ''
  517. @property
  518. def hook_name(self) -> str:
  519. """Get hook name from associated ArchiveResult (if any)."""
  520. if hasattr(self, 'archiveresult'):
  521. return self.archiveresult.hook_name
  522. return ''
  523. def to_jsonl(self) -> dict:
  524. """
  525. Convert Process model instance to a JSONL record.
  526. """
  527. from archivebox.config import VERSION
  528. record = {
  529. 'type': 'Process',
  530. 'schema_version': VERSION,
  531. 'id': str(self.id),
  532. 'machine_id': str(self.machine_id),
  533. 'cmd': self.cmd,
  534. 'pwd': self.pwd,
  535. 'status': self.status,
  536. 'exit_code': self.exit_code,
  537. 'started_at': self.started_at.isoformat() if self.started_at else None,
  538. 'ended_at': self.ended_at.isoformat() if self.ended_at else None,
  539. }
  540. # Include optional fields if set
  541. if self.binary_id:
  542. record['binary_id'] = str(self.binary_id)
  543. if self.pid:
  544. record['pid'] = self.pid
  545. if self.timeout:
  546. record['timeout'] = self.timeout
  547. return record
  548. def update_and_requeue(self, **kwargs):
  549. """
  550. Update process fields and requeue for worker state machine.
  551. Sets modified_at to ensure workers pick up changes.
  552. """
  553. for key, value in kwargs.items():
  554. setattr(self, key, value)
  555. self.modified_at = timezone.now()
  556. self.save()
  557. # =============================================================================
  558. # Binary State Machine
  559. # =============================================================================
  560. class BinaryMachine(BaseStateMachine, strict_states=True):
  561. """
  562. State machine for managing Binary installation lifecycle.
  563. Hook Lifecycle:
  564. ┌─────────────────────────────────────────────────────────────┐
  565. │ QUEUED State │
  566. │ • Binary needs to be installed │
  567. └─────────────────────────────────────────────────────────────┘
  568. ↓ tick() when can_start()
  569. ┌─────────────────────────────────────────────────────────────┐
  570. │ STARTED State → enter_started() │
  571. │ 1. binary.run() │
  572. │ • discover_hooks('Binary') → all on_Binary__install_* │
  573. │ • Try each provider hook in sequence: │
  574. │ - run_hook(script, output_dir, ...) │
  575. │ - If returncode == 0: │
  576. │ * Read stdout.log │
  577. │ * Parse JSONL for 'Binary' record with abspath │
  578. │ * Update self: abspath, version, sha256, provider │
  579. │ * Set status=SUCCEEDED, RETURN │
  580. │ • If no hook succeeds: set status=FAILED │
  581. └─────────────────────────────────────────────────────────────┘
  582. ↓ tick() checks status
  583. ┌─────────────────────────────────────────────────────────────┐
  584. │ SUCCEEDED / FAILED │
  585. │ • Set by binary.run() based on hook results │
  586. │ • Health stats incremented (num_uses_succeeded/failed) │
  587. └─────────────────────────────────────────────────────────────┘
  588. """
  589. model_attr_name = 'binary'
  590. # States
  591. queued = State(value=Binary.StatusChoices.QUEUED, initial=True)
  592. started = State(value=Binary.StatusChoices.STARTED)
  593. succeeded = State(value=Binary.StatusChoices.SUCCEEDED, final=True)
  594. failed = State(value=Binary.StatusChoices.FAILED, final=True)
  595. # Tick Event - transitions based on conditions
  596. tick = (
  597. queued.to.itself(unless='can_start') |
  598. queued.to(started, cond='can_start') |
  599. started.to.itself(unless='is_finished') |
  600. started.to(succeeded, cond='is_succeeded') |
  601. started.to(failed, cond='is_failed')
  602. )
  603. def can_start(self) -> bool:
  604. """Check if binary installation can start."""
  605. return bool(self.binary.name and self.binary.binproviders)
  606. def is_succeeded(self) -> bool:
  607. """Check if installation succeeded (status was set by run())."""
  608. return self.binary.status == Binary.StatusChoices.SUCCEEDED
  609. def is_failed(self) -> bool:
  610. """Check if installation failed (status was set by run())."""
  611. return self.binary.status == Binary.StatusChoices.FAILED
  612. def is_finished(self) -> bool:
  613. """Check if installation has completed (success or failure)."""
  614. return self.binary.status in (
  615. Binary.StatusChoices.SUCCEEDED,
  616. Binary.StatusChoices.FAILED,
  617. )
  618. @queued.enter
  619. def enter_queued(self):
  620. """Binary is queued for installation."""
  621. self.binary.update_and_requeue(
  622. retry_at=timezone.now(),
  623. status=Binary.StatusChoices.QUEUED,
  624. )
  625. @started.enter
  626. def enter_started(self):
  627. """Start binary installation."""
  628. # Lock the binary while installation runs
  629. self.binary.update_and_requeue(
  630. retry_at=timezone.now() + timedelta(seconds=300), # 5 min timeout for installation
  631. status=Binary.StatusChoices.STARTED,
  632. )
  633. # Run installation hooks
  634. self.binary.run()
  635. # Save updated status (run() updates status to succeeded/failed)
  636. self.binary.save()
  637. @succeeded.enter
  638. def enter_succeeded(self):
  639. """Binary installed successfully."""
  640. self.binary.update_and_requeue(
  641. retry_at=None,
  642. status=Binary.StatusChoices.SUCCEEDED,
  643. )
  644. # Increment health stats
  645. self.binary.increment_health_stats(success=True)
  646. @failed.enter
  647. def enter_failed(self):
  648. """Binary installation failed."""
  649. self.binary.update_and_requeue(
  650. retry_at=None,
  651. status=Binary.StatusChoices.FAILED,
  652. )
  653. # Increment health stats
  654. self.binary.increment_health_stats(success=False)
  655. # =============================================================================
  656. # Process State Machine
  657. # =============================================================================
  658. class ProcessMachine(BaseStateMachine, strict_states=True):
  659. """
  660. State machine for managing Process (OS subprocess) lifecycle.
  661. Process Lifecycle:
  662. ┌─────────────────────────────────────────────────────────────┐
  663. │ QUEUED State │
  664. │ • Process ready to launch, waiting for resources │
  665. └─────────────────────────────────────────────────────────────┘
  666. ↓ tick() when can_start()
  667. ┌─────────────────────────────────────────────────────────────┐
  668. │ RUNNING State → enter_running() │
  669. │ 1. process.launch() │
  670. │ • Spawn subprocess with cmd, pwd, env, timeout │
  671. │ • Set pid, started_at │
  672. │ • Process runs in background or foreground │
  673. │ 2. Monitor process completion │
  674. │ • Check exit code when process completes │
  675. └─────────────────────────────────────────────────────────────┘
  676. ↓ tick() checks is_exited()
  677. ┌─────────────────────────────────────────────────────────────┐
  678. │ EXITED State │
  679. │ • Process completed (exit_code set) │
  680. │ • Health stats incremented │
  681. │ • stdout/stderr captured │
  682. └─────────────────────────────────────────────────────────────┘
  683. Note: This is a simpler state machine than ArchiveResult.
  684. Process is just about execution lifecycle. ArchiveResult handles
  685. the archival-specific logic (status, output parsing, etc.).
  686. """
  687. model_attr_name = 'process'
  688. # States
  689. queued = State(value=Process.StatusChoices.QUEUED, initial=True)
  690. running = State(value=Process.StatusChoices.RUNNING)
  691. exited = State(value=Process.StatusChoices.EXITED, final=True)
  692. # Tick Event - transitions based on conditions
  693. tick = (
  694. queued.to.itself(unless='can_start') |
  695. queued.to(running, cond='can_start') |
  696. running.to.itself(unless='is_exited') |
  697. running.to(exited, cond='is_exited')
  698. )
  699. # Additional events (for explicit control)
  700. launch = queued.to(running)
  701. kill = running.to(exited)
  702. def can_start(self) -> bool:
  703. """Check if process can start (has cmd and machine)."""
  704. return bool(self.process.cmd and self.process.machine)
  705. def is_exited(self) -> bool:
  706. """Check if process has exited (exit_code is set)."""
  707. return self.process.exit_code is not None
  708. @queued.enter
  709. def enter_queued(self):
  710. """Process is queued for execution."""
  711. self.process.update_and_requeue(
  712. retry_at=timezone.now(),
  713. status=Process.StatusChoices.QUEUED,
  714. )
  715. @running.enter
  716. def enter_running(self):
  717. """Start process execution."""
  718. # Lock the process while it runs
  719. self.process.update_and_requeue(
  720. retry_at=timezone.now() + timedelta(seconds=self.process.timeout),
  721. status=Process.StatusChoices.RUNNING,
  722. started_at=timezone.now(),
  723. )
  724. # Launch the subprocess
  725. # NOTE: This is a placeholder - actual launch logic would
  726. # be implemented based on how hooks currently spawn processes
  727. # For now, Process is a data model that tracks execution metadata
  728. # The actual subprocess spawning is still handled by run_hook()
  729. # Mark as immediately exited for now (until we refactor run_hook)
  730. # In the future, this would actually spawn the subprocess
  731. self.process.exit_code = 0 # Placeholder
  732. self.process.save()
  733. @exited.enter
  734. def enter_exited(self):
  735. """Process has exited."""
  736. success = self.process.exit_code == 0
  737. self.process.update_and_requeue(
  738. retry_at=None,
  739. status=Process.StatusChoices.EXITED,
  740. ended_at=timezone.now(),
  741. )
  742. # Increment health stats based on exit code
  743. self.process.increment_health_stats(success=success)
  744. # =============================================================================
  745. # State Machine Registration
  746. # =============================================================================
  747. # Manually register state machines with python-statemachine registry
  748. registry.register(BinaryMachine)
  749. registry.register(ProcessMachine)