Implement a consistent pattern across all models (Crawl, Snapshot, ArchiveResult, Dependency) for:
Status: COMPLETE
CrawlMachineCrawl.run() - runs hooks, processes JSONL via process_hook_records(), creates snapshotsCrawl.cleanup() - kills background hooks, runs on_CrawlEnd hooksOUTPUT_DIR/plugin_name/ for PWDqueued -> started: calls crawl.run()started -> sealed: calls crawl.cleanup()Status: COMPLETE
SnapshotMachineSnapshot.run() - creates pending ArchiveResultsSnapshot.cleanup() - kills background ArchiveResult hooks, calls update_from_output()Snapshot.has_running_background_hooks() - checks PID files using process_is_alive()Snapshot.from_jsonl() - simplified, filtering moved to callerqueued -> started: calls snapshot.run()started -> sealed: calls snapshot.cleanup()is_finished(): uses has_running_background_hooks()Status: COMPLETE - Major refactor completed
ArchiveResultMachineArchiveResult.run() - runs hook, calls update_from_output() for foreground hooksArchiveResult.update_from_output() - unified method for foreground and background hookssnapshot.OUTPUT_DIR/plugin_nameprocess_hook_records() with URL/depth filteringcheck_background_completed() - replaced by process_is_alive() helperfinalize_background_hook() - replaced by update_from_output()_populate_output_fields() - merged into update_from_output()queued -> started: calls archiveresult.run()started -> succeeded/failed/skipped: status set by update_from_output()Status: COMPLETE - Replaced Dependency model entirely
BinaryMachineBinary.run() - runs on_Binary_install* hooks, processes JSONLBinary.cleanup() - kills background installation hooks (for consistency)Binary.from_jsonl() - handles both binaries.jsonl and hook outputdata/machines/{machine_id}/binaries/{name}/{id}/plugin_name/plugins/*/binaries.jsonl filesqueued -> started: calls binary.run()started -> succeeded/failed: status set by hooks via JSONLStatus: Deleted entirely (replaced by Binary state machine)
plugins/*/binaries.jsonlon_Dependency__install_* to on_Binary__install_*# 1. State Machine orchestrates transitions
class ModelMachine(StateMachine):
@started.enter
def enter_started(self):
self.model.run() # Do the work
# Update status
def is_finished(self):
# Check if background hooks still running
if self.model.has_running_background_hooks():
return False
# Check if children finished
if self.model.has_pending_children():
return False
return True
@sealed.enter
def enter_sealed(self):
self.model.cleanup() # Clean up background hooks
# Update status
# 2. Model methods do the actual work
class Model:
def run(self):
"""Run hooks, process JSONL, create children."""
hooks = discover_hooks('ModelName')
for hook in hooks:
output_dir = self.OUTPUT_DIR / hook.parent.name
result = run_hook(hook, output_dir=output_dir, ...)
if result is None: # Background hook
continue
# Process JSONL records
records = result.get('records', [])
overrides = {'model': self, 'created_by_id': self.created_by_id}
process_hook_records(records, overrides=overrides)
# Create children (e.g., ArchiveResults, Snapshots)
self.create_children()
def cleanup(self):
"""Kill background hooks, run cleanup hooks."""
# Kill any background hooks
if self.OUTPUT_DIR.exists():
for pid_file in self.OUTPUT_DIR.glob('*/hook.pid'):
kill_process(pid_file)
# Run cleanup hooks (e.g., on_ModelEnd)
cleanup_hooks = discover_hooks('ModelEnd')
for hook in cleanup_hooks:
run_hook(hook, ...)
def has_running_background_hooks(self) -> bool:
"""Check if any background hooks still running."""
if not self.OUTPUT_DIR.exists():
return False
for pid_file in self.OUTPUT_DIR.glob('*/hook.pid'):
if process_is_alive(pid_file):
return True
return False
model.OUTPUT_DIR/plugin_name/
users/{user}/crawls/{date}/{crawl_id}/plugin_name/users/{user}/snapshots/{date}/{domain}/{snapshot_id}/plugin_name/users/{user}/snapshots/{date}/{domain}/{snapshot_id}/plugin_name/ (same as Snapshot)dependencies/{dependency_id}/plugin_name/ (set output_dir field directly)File: archivebox/hooks.py
Status: COMPLETE - Added three helper functions:
process_hook_records(records, overrides) - lines 1258-1323process_is_alive(pid_file) - lines 1326-1344kill_process(pid_file, sig) - lines 1347-1362
def process_hook_records(records: List[Dict], overrides: Dict = None) -> Dict[str, int]:
"""
Process JSONL records from hook output.
Dispatches to Model.from_jsonl() for each record type.
Args:
records: List of JSONL record dicts from result['records']
overrides: Dict with 'snapshot', 'crawl', 'dependency', 'created_by_id', etc.
Returns:
Dict with counts by record type
"""
stats = {}
for record in records:
record_type = record.get('type')
# Dispatch to appropriate model
if record_type == 'Snapshot':
from archivebox.core.models import Snapshot
Snapshot.from_jsonl(record, overrides)
stats['Snapshot'] = stats.get('Snapshot', 0) + 1
elif record_type == 'Tag':
from archivebox.core.models import Tag
Tag.from_jsonl(record, overrides)
stats['Tag'] = stats.get('Tag', 0) + 1
elif record_type == 'Binary':
from archivebox.machine.models import Binary
Binary.from_jsonl(record, overrides)
stats['Binary'] = stats.get('Binary', 0) + 1
# ... etc
return stats
def process_is_alive(pid_file: Path) -> bool:
"""Check if process in PID file is still running."""
if not pid_file.exists():
return False
try:
pid = int(pid_file.read_text().strip())
os.kill(pid, 0) # Signal 0 = check if exists
return True
except (OSError, ValueError):
return False
def kill_process(pid_file: Path, signal=SIGTERM):
"""Kill process in PID file."""
if not pid_file.exists():
return
try:
pid = int(pid_file.read_text().strip())
os.kill(pid, signal)
except (OSError, ValueError):
pass
Files: archivebox/core/models.py, archivebox/machine/models.py, archivebox/crawls/models.py
Status: COMPLETE - Added from_jsonl() to:
Tag.from_jsonl() - core/models.py lines 93-116Snapshot.from_jsonl() - core/models.py lines 1144-1189Machine.from_jsonl() - machine/models.py lines 66-89Dependency.from_jsonl() - machine/models.py lines 203-227Binary.from_jsonl() - machine/models.py lines 401-434Example implementations added:
class Snapshot:
@staticmethod
def from_jsonl(record: Dict, overrides: Dict = None):
"""Create/update Snapshot from JSONL record."""
from archivebox.misc.jsonl import get_or_create_snapshot
overrides = overrides or {}
# Apply overrides (crawl, parent_snapshot, depth limits)
crawl = overrides.get('crawl')
snapshot = overrides.get('snapshot') # parent
if crawl:
depth = record.get('depth', (snapshot.depth + 1 if snapshot else 1))
if depth > crawl.max_depth:
return None
record.setdefault('crawl_id', str(crawl.id))
record.setdefault('depth', depth)
if snapshot:
record.setdefault('parent_snapshot_id', str(snapshot.id))
created_by_id = overrides.get('created_by_id')
new_snapshot = get_or_create_snapshot(record, created_by_id=created_by_id)
new_snapshot.status = Snapshot.StatusChoices.QUEUED
new_snapshot.retry_at = timezone.now()
new_snapshot.save()
return new_snapshot
class Tag:
@staticmethod
def from_jsonl(record: Dict, overrides: Dict = None):
"""Create/update Tag from JSONL record."""
from archivebox.misc.jsonl import get_or_create_tag
tag = get_or_create_tag(record)
# Auto-attach to snapshot if in overrides
if overrides and 'snapshot' in overrides:
overrides['snapshot'].tags.add(tag)
return tag
class Binary:
@staticmethod
def from_jsonl(record: Dict, overrides: Dict = None):
"""Create/update Binary from JSONL record."""
# Implementation similar to existing create_model_record()
...
# Etc for other models
File: archivebox/core/models.py
Status: COMPLETE
Changes made:
✅ Replaced inline JSONL processing (lines 1912-1950):
self._url_passes_filters(url) with parent snapshot's config for proper hierarchyprocess_hook_records()✅ Simplified Snapshot.from_jsonl() (lines 1144-1189):
✅ Preserved ArchiveResult self-update logic:
✅ Key insight: Filtering happens in ArchiveResult.run() where we have parent snapshot context, NOT in from_jsonl() where we'd lose config hierarchy
Note: Did NOT delete special background hook methods (check_background_completed, finalize_background_hook) - that's Phase 6
File: archivebox/core/models.py
Status: COMPLETE
Changes made:
✅ Added Snapshot.cleanup() (lines 1144-1175):
*/hook.pid filesfinalize_background_hook() (temporary until Phase 6)✅ Added Snapshot.has_running_background_hooks() (lines 1177-1195):
process_is_alive()is_finished() checkFile: archivebox/core/statemachines.py
Status: COMPLETE
Changes made:
✅ Simplified is_finished() (lines 58-72):
self.snapshot.has_running_background_hooks() (line 68)✅ Added cleanup() to sealed.enter (lines 102-111):
self.snapshot.cleanup() to kill background hooks (line 105)File: archivebox/core/models.py
Status: COMPLETE - The BIG refactor (removed ~200 lines of duplication)
Changes made:
✅ Added ArchiveResult.update_from_output() (lines 1908-2061):
process_hook_records()✅ Simplified ArchiveResult.run() (lines 1841-1906):
update_from_output()update_from_output() to do all the work✅ Updated Snapshot.cleanup() (line 1172):
ar.finalize_background_hook() to ar.update_from_output()✅ Deleted _populate_output_fields() (was ~45 lines):
update_from_output()✅ Deleted check_background_completed() (was ~20 lines):
process_is_alive(pid_file) from hooks.py✅ Deleted finalize_background_hook() (was ~85 lines):
update_from_output()Total lines removed: ~280 lines of duplicate code Total lines added: ~160 lines of unified code Net reduction: ~120 lines (-43%)
Status: Intentionally skipped - Dependency doesn't need a state machine
Why no state machine for Dependency?
Wrong Granularity: Dependency is a GLOBAL singleton (one record per binary name)
status/retry_at fieldsWrong Timing: Installation should be SYNCHRONOUS, not queued
State Lives Elsewhere: Binary records are the actual state
Correct Architecture:
Dependency (global, no state machine):
├─ Configuration: bin_name, bin_providers, overrides
├─ run() method: synchronous installation attempt
└─ NO status, NO retry_at, NO state_machine_name
Binary (per-machine, has machine FK):
├─ State: is this binary installed on this specific machine?
├─ Created via JSONL output from on_Dependency hooks
└─ unique_together = (machine, name, abspath, version, sha256)
What was implemented:
Dependency.run() (lines 249-324):
discover_hooks() and process_hook_records() for consistencyALL core functionality is now complete! The unified pattern is consistently implemented across Crawl, Snapshot, and ArchiveResult. Dependency intentionally kept simple (no state machine needed).
✅ DONE archivebox/hooks.py - Add unified helpers:
process_hook_records(records, overrides) - dispatcher (lines 1258-1323)process_is_alive(pid_file) - check if PID still running (lines 1326-1344)kill_process(pid_file) - kill process (lines 1347-1362)✅ DONE archivebox/crawls/models.py - Already updated:
Crawl.run() - runs hooks, processes JSONL, creates snapshotsCrawl.cleanup() - kills background hooks, runs on_CrawlEnd✅ DONE archivebox/core/models.py:
Tag.from_jsonl() - lines 93-116Snapshot.from_jsonl() - lines 1197-1234 (simplified, removed filtering)Snapshot.cleanup() - lines 1144-1172 (kill background hooks, calls ar.update_from_output())Snapshot.has_running_background_hooks() - lines 1174-1193 (check PIDs)ArchiveResult.run() - simplified, uses update_from_output() (lines 1841-1906)ArchiveResult.update_from_output() - unified filesystem reading (lines 1908-2061)ArchiveResult.check_background_completed() - replaced by process_is_alive()ArchiveResult.finalize_background_hook() - replaced by update_from_output()ArchiveResult._populate_output_fields() - merged into update_from_output()✅ DONE archivebox/core/statemachines.py:
SnapshotMachine.is_finished() - uses has_running_background_hooks() (line 68)SnapshotMachine.sealed.enter (line 105)✅ DONE archivebox/machine/models.py:
Machine.from_jsonl() - lines 66-89Dependency.from_jsonl() - lines 203-227Binary.from_jsonl() - lines 401-434Dependency.run() to use unified pattern (lines 249-324)process_hook_records() dispatcher for all JSONL processingAll models now follow this consistent architecture:
class ModelMachine(StateMachine):
queued = State(initial=True)
started = State()
sealed/succeeded/failed = State(final=True)
@started.enter
def enter_started(self):
self.model.run() # Execute the work
@sealed.enter # or @succeeded.enter
def enter_sealed(self):
self.model.cleanup() # Clean up background hooks
class Model:
# State machine fields
status = CharField(default='queued')
retry_at = DateTimeField(default=timezone.now)
output_dir = CharField(default='', blank=True)
state_machine_name = 'app.statemachines.ModelMachine'
def run(self):
"""Run hooks, process JSONL, create children."""
hooks = discover_hooks('EventName')
for hook in hooks:
output_dir = self.OUTPUT_DIR / hook.parent.name
result = run_hook(hook, output_dir=output_dir, ...)
if result is None: # Background hook
continue
# Process JSONL records
overrides = {'model': self, 'created_by_id': self.created_by_id}
process_hook_records(result['records'], overrides=overrides)
def cleanup(self):
"""Kill background hooks, run cleanup hooks."""
for pid_file in self.OUTPUT_DIR.glob('*/hook.pid'):
kill_process(pid_file)
# Update children from filesystem
child.update_from_output()
def update_and_requeue(self, **fields):
"""Update fields and bump modified_at."""
for field, value in fields.items():
setattr(self, field, value)
self.save(update_fields=[*fields.keys(), 'modified_at'])
@staticmethod
def from_jsonl(record: dict, overrides: dict = None):
"""Create/update model from JSONL record."""
# Implementation specific to model
# Called by process_hook_records()
1. Model.run() discovers hooks
2. Hooks execute and output JSONL to stdout
3. JSONL records dispatched via process_hook_records()
4. Each record type handled by Model.from_jsonl()
5. Background hooks tracked via hook.pid files
6. Model.cleanup() kills background hooks on seal
7. Children updated via update_from_output()
All core models (Crawl, Snapshot, ArchiveResult) now follow the unified pattern:
.run() methods execute hooks and process JSONL.cleanup() methods kill background hooks.update_and_requeue() methods update state for worker coordinationprocess_hook_records() for JSONL dispatchingKey Decision: Eliminated Dependency model entirely and made Binary the state machine.
Static Configuration: plugins/{plugin}/dependencies.jsonl files define binary requirements
{"type": "Binary", "name": "yt-dlp", "bin_providers": "pip,brew,apt,env"}
{"type": "Binary", "name": "node", "bin_providers": "apt,brew,env", "overrides": {"apt": {"packages": ["nodejs"]}}}
{"type": "Binary", "name": "ffmpeg", "bin_providers": "apt,brew,env"}
Dynamic State: Binary model tracks per-machine installation state
machine, name, bin_providers, overrides, abspath, version, sha256, binproviderqueued → started → succeeded/faileddata/machines/{machine_id}/binaries/{binary_name}/{binary_id}/class BinaryMachine(StateMachine):
queued → started → succeeded/failed
@started.enter
def enter_started(self):
self.binary.run() # Runs on_Binary__install_* hooks
class Binary(models.Model):
def run(self):
"""
Runs ALL on_Binary__install_* hooks.
Each hook checks bin_providers and decides if it can handle this binary.
First hook to succeed wins.
Outputs JSONL with abspath, version, sha256, binprovider.
"""
hooks = discover_hooks('Binary')
for hook in hooks:
result = run_hook(hook, output_dir=self.OUTPUT_DIR/plugin_name,
binary_id=self.id, machine_id=self.machine_id,
name=self.name, bin_providers=self.bin_providers,
overrides=json.dumps(self.overrides))
# Hook outputs: {"type": "Binary", "name": "wget", "abspath": "/usr/bin/wget", "version": "1.21", "binprovider": "apt"}
# Binary.from_jsonl() updates self with installation results
on_Dependency__install_using_pip_provider.pyon_Binary__install_using_pip_provider.pyEach hook checks --bin-providers CLI argument:
if 'pip' not in bin_providers.split(','):
sys.exit(0) # Skip this binary
All models now follow identical patterns:
Crawl(queued) → CrawlMachine → Crawl.run() → sealed
Snapshot(queued) → SnapshotMachine → Snapshot.run() → sealed
ArchiveResult(queued) → ArchiveResultMachine → ArchiveResult.run() → succeeded/failed
Binary(queued) → BinaryMachine → Binary.run() → succeeded/failed
plugins/*/dependencies.jsonl