| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256 |
- """
- Tests for BinaryWorker processing Binary queue.
- Tests cover:
- - BinaryWorker is spawned by Orchestrator when Binary queue has work
- - Binary hooks (on_Binary__*) actually run and install binaries
- - Binary status transitions from QUEUED -> INSTALLED
- - BinaryWorker exits after idle timeout
- """
- import json
- import sqlite3
- import time
- from archivebox.tests.conftest import (
- run_archivebox_cmd,
- parse_jsonl_output,
- )
- class TestBinaryWorkerSpawning:
- """Tests for BinaryWorker lifecycle."""
- def test_binary_worker_spawns_when_binary_queued(self, initialized_archive):
- """Orchestrator spawns BinaryWorker when Binary queue has work."""
- # Create a Binary record via CLI
- binary_record = {
- 'type': 'Binary',
- 'name': 'python3',
- 'binproviders': 'env', # Use env provider to detect system python
- }
- # Use `archivebox run` to create the Binary (this queues it)
- stdout, stderr, code = run_archivebox_cmd(
- ['run'],
- stdin=json.dumps(binary_record),
- data_dir=initialized_archive,
- timeout=60, # Increased timeout to allow for binary installation
- )
- assert code == 0, f"Failed to create Binary: {stderr}"
- # Verify Binary was created in DB
- conn = sqlite3.connect(initialized_archive / 'index.sqlite3')
- c = conn.cursor()
- binaries = c.execute(
- "SELECT name, status, abspath FROM machine_binary WHERE name='python3'"
- ).fetchall()
- conn.close()
- assert len(binaries) >= 1, "Binary was not created in database"
- name, status, abspath = binaries[0]
- assert name == 'python3'
- # Status should be INSTALLED after BinaryWorker processed it
- # (or QUEUED if worker timed out before installing)
- assert status in ['installed', 'queued']
- def test_binary_hooks_actually_run(self, initialized_archive):
- """Binary installation hooks (on_Binary__*) run and update abspath."""
- # Create a Binary for python3 (guaranteed to exist on system)
- binary_record = {
- 'type': 'Binary',
- 'name': 'python3',
- 'binproviders': 'env',
- }
- stdout, stderr, code = run_archivebox_cmd(
- ['run'],
- stdin=json.dumps(binary_record),
- data_dir=initialized_archive,
- timeout=30,
- )
- assert code == 0, f"Failed to process Binary: {stderr}"
- # Query database to check if hooks ran and populated abspath
- conn = sqlite3.connect(initialized_archive / 'index.sqlite3')
- c = conn.cursor()
- result = c.execute(
- "SELECT name, status, abspath, version FROM machine_binary WHERE name='python3'"
- ).fetchone()
- conn.close()
- assert result is not None, "Binary not found in database"
- name, status, abspath, version = result
- # If hooks ran successfully, abspath should be populated
- if status == 'installed':
- assert abspath, f"Binary installed but abspath is empty: {abspath}"
- assert '/python3' in abspath or '\\python3' in abspath, \
- f"abspath doesn't look like a python3 path: {abspath}"
- # Version should also be populated
- assert version, f"Binary installed but version is empty: {version}"
- def test_binary_status_transitions(self, initialized_archive):
- """Binary status correctly transitions QUEUED -> INSTALLED."""
- binary_record = {
- 'type': 'Binary',
- 'name': 'python3',
- 'binproviders': 'env',
- }
- # Create and process the Binary
- stdout, stderr, code = run_archivebox_cmd(
- ['run'],
- stdin=json.dumps(binary_record),
- data_dir=initialized_archive,
- timeout=30,
- )
- assert code == 0
- # Check final status
- conn = sqlite3.connect(initialized_archive / 'index.sqlite3')
- c = conn.cursor()
- status = c.execute(
- "SELECT status FROM machine_binary WHERE name='python3'"
- ).fetchone()
- conn.close()
- assert status is not None
- # Should be installed (or queued if worker timed out)
- assert status[0] in ['installed', 'queued']
- class TestBinaryWorkerHooks:
- """Tests for specific Binary hook providers."""
- def test_env_provider_hook_detects_system_binary(self, initialized_archive):
- """on_Binary__15_env_install.py hook detects system binaries."""
- binary_record = {
- 'type': 'Binary',
- 'name': 'python3',
- 'binproviders': 'env',
- }
- stdout, stderr, code = run_archivebox_cmd(
- ['run'],
- stdin=json.dumps(binary_record),
- data_dir=initialized_archive,
- timeout=30,
- )
- assert code == 0
- # Check that env provider hook populated the Binary
- conn = sqlite3.connect(initialized_archive / 'index.sqlite3')
- c = conn.cursor()
- result = c.execute(
- "SELECT binprovider, abspath FROM machine_binary WHERE name='python3' AND status='installed'"
- ).fetchone()
- conn.close()
- if result:
- binprovider, abspath = result
- assert binprovider == 'env', f"Expected env provider, got: {binprovider}"
- assert abspath, "abspath should be populated by env provider"
- def test_multiple_binaries_processed_in_batch(self, initialized_archive):
- """BinaryWorker processes multiple queued binaries."""
- # Create multiple Binary records
- binaries = [
- {'type': 'Binary', 'name': 'python3', 'binproviders': 'env'},
- {'type': 'Binary', 'name': 'curl', 'binproviders': 'env'},
- ]
- stdin = '\n'.join(json.dumps(b) for b in binaries)
- stdout, stderr, code = run_archivebox_cmd(
- ['run'],
- stdin=stdin,
- data_dir=initialized_archive,
- timeout=90, # Need more time for multiple binaries
- )
- assert code == 0
- # Both should be processed
- conn = sqlite3.connect(initialized_archive / 'index.sqlite3')
- c = conn.cursor()
- installed = c.execute(
- "SELECT name FROM machine_binary WHERE name IN ('python3', 'curl')"
- ).fetchall()
- conn.close()
- assert len(installed) >= 1, "At least one binary should be created"
- class TestBinaryWorkerEdgeCases:
- """Tests for edge cases and error handling."""
- def test_nonexistent_binary_stays_queued(self, initialized_archive):
- """Binary that doesn't exist stays queued (doesn't fail permanently)."""
- binary_record = {
- 'type': 'Binary',
- 'name': 'nonexistent-binary-xyz-12345',
- 'binproviders': 'env',
- }
- stdout, stderr, code = run_archivebox_cmd(
- ['run'],
- stdin=json.dumps(binary_record),
- data_dir=initialized_archive,
- timeout=30,
- )
- # Command should still succeed (orchestrator doesn't fail on binary install failures)
- assert code == 0
- # Binary should remain queued (not installed)
- conn = sqlite3.connect(initialized_archive / 'index.sqlite3')
- c = conn.cursor()
- result = c.execute(
- "SELECT status FROM machine_binary WHERE name='nonexistent-binary-xyz-12345'"
- ).fetchone()
- conn.close()
- if result:
- status = result[0]
- # Should stay queued since installation failed
- assert status == 'queued', f"Expected queued, got: {status}"
- def test_binary_worker_respects_machine_isolation(self, initialized_archive):
- """BinaryWorker only processes binaries for current machine."""
- # This is implicitly tested by other tests - Binary.objects.filter(machine=current)
- # ensures only current machine's binaries are processed
- binary_record = {
- 'type': 'Binary',
- 'name': 'python3',
- 'binproviders': 'env',
- }
- stdout, stderr, code = run_archivebox_cmd(
- ['run'],
- stdin=json.dumps(binary_record),
- data_dir=initialized_archive,
- timeout=30,
- )
- assert code == 0
- # Check that machine_id is set correctly
- conn = sqlite3.connect(initialized_archive / 'index.sqlite3')
- c = conn.cursor()
- result = c.execute(
- "SELECT machine_id FROM machine_binary WHERE name='python3'"
- ).fetchone()
- conn.close()
- assert result is not None
- machine_id = result[0]
- assert machine_id, "machine_id should be set on Binary"
|