| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544 |
- __package__ = 'archivebox.machine'
- import sys
- import os
- import signal
- import socket
- import subprocess
- import multiprocessing
- from datetime import timedelta
- from pathlib import Path
- from django.db import models
- from django.utils import timezone
- from django.utils.functional import cached_property
- import abx
- import archivebox
- from abx_pkg import Binary, BinProvider
- from archivebox.base_models.models import ABIDModel, ABIDField, AutoDateTimeField, ModelWithHealthStats
- from .detect import get_host_guid, get_os_info, get_vm_info, get_host_network, get_host_stats
- _CURRENT_MACHINE = None # global cache for the current machine
- _CURRENT_INTERFACE = None # global cache for the current network interface
- _CURRENT_BINARIES = {} # global cache for the currently installed binaries
- MACHINE_RECHECK_INTERVAL = 7 * 24 * 60 * 60 # 1 week (how often should we check for OS/hardware changes?)
- NETWORK_INTERFACE_RECHECK_INTERVAL = 1 * 60 * 60 # 1 hour (how often should we check for public IP/private IP/DNS changes?)
- INSTALLED_BINARY_RECHECK_INTERVAL = 1 * 30 * 60 # 30min (how often should we check for changes to locally installed binaries?)
- class MachineManager(models.Manager):
- def current(self) -> 'Machine':
- return Machine.current()
- class Machine(ABIDModel, ModelWithHealthStats):
- """Audit log entry for a physical machine that was used to do archiving."""
-
- abid_prefix = 'mcn_'
- abid_ts_src = 'self.created_at'
- abid_uri_src = 'self.guid'
- abid_subtype_src = '"01"'
- abid_rand_src = 'self.id'
- abid_drift_allowed = False
-
- read_only_fields = ('id', 'abid', 'created_at', 'guid', 'hw_in_docker', 'hw_in_vm', 'hw_manufacturer', 'hw_product', 'hw_uuid', 'os_arch', 'os_family')
- id = models.UUIDField(primary_key=True, default=None, null=False, editable=False, unique=True, verbose_name='ID')
- abid = ABIDField(prefix=abid_prefix)
- created_at = AutoDateTimeField(default=None, null=False, db_index=True)
- modified_at = models.DateTimeField(auto_now=True)
- # IMMUTABLE PROPERTIES
- guid = models.CharField(max_length=64, default=None, null=False, unique=True, editable=False) # 64char sha256 hash of machine's unique hardware ID
-
- # MUTABLE PROPERTIES
- hostname = models.CharField(max_length=63, default=None, null=False) # e.g. somehost.subdomain.example.com
- hw_in_docker = models.BooleanField(default=False, null=False) # e.g. False
- hw_in_vm = models.BooleanField(default=False, null=False) # e.g. False
- hw_manufacturer = models.CharField(max_length=63, default=None, null=False) # e.g. Apple
- hw_product = models.CharField(max_length=63, default=None, null=False) # e.g. Mac Studio Mac13,1
- hw_uuid = models.CharField(max_length=255, default=None, null=False) # e.g. 39A12B50-...-...-...-...
-
- os_arch = models.CharField(max_length=15, default=None, null=False) # e.g. arm64
- os_family = models.CharField(max_length=15, default=None, null=False) # e.g. darwin
- os_platform = models.CharField(max_length=63, default=None, null=False) # e.g. macOS-14.6.1-arm64-arm-64bit
- os_release = models.CharField(max_length=63, default=None, null=False) # e.g. macOS 14.6.1
- os_kernel = models.CharField(max_length=255, default=None, null=False) # e.g. Darwin Kernel Version 23.6.0: Mon Jul 29 21:14:30 PDT 2024; root:xnu-10063.141.2~1/RELEASE_ARM64_T6000
-
- # STATS COUNTERS
- stats = models.JSONField(default=dict, null=False) # e.g. {"cpu_load": [1.25, 2.4, 1.4], "mem_swap_used_pct": 56, ...}
-
- # num_uses_failed = models.PositiveIntegerField(default=0) # from ModelWithHealthStats
- # num_uses_succeeded = models.PositiveIntegerField(default=0)
-
- objects: MachineManager = MachineManager()
-
- networkinterface_set: models.Manager['NetworkInterface']
- @classmethod
- def current(cls) -> 'Machine':
- """Get the current machine that ArchiveBox is running on."""
-
- global _CURRENT_MACHINE
- if _CURRENT_MACHINE:
- expires_at = _CURRENT_MACHINE.modified_at + timedelta(seconds=MACHINE_RECHECK_INTERVAL)
- if timezone.now() < expires_at:
- # assume current machine cant change *while archivebox is actively running on it*
- # it's not strictly impossible to swap hardware while code is running,
- # but its rare and unusual so we check only once per week
- # (e.g. VMWare can live-migrate a VM to a new host while it's running)
- return _CURRENT_MACHINE
- else:
- _CURRENT_MACHINE = None
-
- _CURRENT_MACHINE, _created = cls.objects.update_or_create(
- guid=get_host_guid(),
- defaults={
- 'hostname': socket.gethostname(),
- **get_os_info(),
- **get_vm_info(),
- 'stats': get_host_stats(),
- },
- )
- _CURRENT_MACHINE.save() # populate ABID
-
- return _CURRENT_MACHINE
- class NetworkInterfaceManager(models.Manager):
- def current(self) -> 'NetworkInterface':
- return NetworkInterface.current()
- class NetworkInterface(ABIDModel, ModelWithHealthStats):
- """Audit log entry for a physical network interface / internet connection that was used to do archiving."""
-
- abid_prefix = 'net_'
- abid_ts_src = 'self.machine.created_at'
- abid_uri_src = 'self.machine.guid'
- abid_subtype_src = 'self.iface'
- abid_rand_src = 'self.id'
- abid_drift_allowed = False
-
- read_only_fields = ('id', 'abid', 'created_at', 'machine', 'mac_address', 'ip_public', 'ip_local', 'dns_server')
-
- id = models.UUIDField(primary_key=True, default=None, null=False, editable=False, unique=True, verbose_name='ID')
- abid = ABIDField(prefix=abid_prefix)
- created_at = AutoDateTimeField(default=None, null=False, db_index=True)
- modified_at = models.DateTimeField(auto_now=True)
-
- machine = models.ForeignKey(Machine, on_delete=models.CASCADE, default=None, null=False) # e.g. Machine(id=...)
- # IMMUTABLE PROPERTIES
- mac_address = models.CharField(max_length=17, default=None, null=False, editable=False) # e.g. ab:cd:ef:12:34:56
- ip_public = models.GenericIPAddressField(default=None, null=False, editable=False) # e.g. 123.123.123.123 or 2001:0db8:85a3:0000:0000:8a2e:0370:7334
- ip_local = models.GenericIPAddressField(default=None, null=False, editable=False) # e.g. 192.168.2.18 or 2001:0db8:85a3:0000:0000:8a2e:0370:7334
- dns_server = models.GenericIPAddressField(default=None, null=False, editable=False) # e.g. 8.8.8.8 or 2001:0db8:85a3:0000:0000:8a2e:0370:7334
-
- # MUTABLE PROPERTIES
- hostname = models.CharField(max_length=63, default=None, null=False) # e.g. somehost.sub.example.com
- iface = models.CharField(max_length=15, default=None, null=False) # e.g. en0
- isp = models.CharField(max_length=63, default=None, null=False) # e.g. AS-SONICTELECOM
- city = models.CharField(max_length=63, default=None, null=False) # e.g. Berkeley
- region = models.CharField(max_length=63, default=None, null=False) # e.g. California
- country = models.CharField(max_length=63, default=None, null=False) # e.g. United States
- # STATS COUNTERS (inherited from ModelWithHealthStats)
- # num_uses_failed = models.PositiveIntegerField(default=0)
- # num_uses_succeeded = models.PositiveIntegerField(default=0)
- objects: NetworkInterfaceManager = NetworkInterfaceManager()
-
- class Meta:
- unique_together = (
- # if *any* of these change, it's considered a different interface
- # because we might get different downloaded content as a result,
- # this forces us to store an audit trail whenever these things change
- ('machine', 'ip_public', 'ip_local', 'mac_address', 'dns_server'),
- )
-
- @classmethod
- def current(cls) -> 'NetworkInterface':
- """Get the current network interface for the current machine."""
-
- global _CURRENT_INTERFACE
- if _CURRENT_INTERFACE:
- # assume the current network interface (public IP, DNS servers, etc.) wont change more than once per hour
- expires_at = _CURRENT_INTERFACE.modified_at + timedelta(seconds=NETWORK_INTERFACE_RECHECK_INTERVAL)
- if timezone.now() < expires_at:
- return _CURRENT_INTERFACE
- else:
- _CURRENT_INTERFACE = None
-
- machine = Machine.objects.current()
- net_info = get_host_network()
- _CURRENT_INTERFACE, _created = cls.objects.update_or_create(
- machine=machine,
- ip_public=net_info.pop('ip_public'),
- ip_local=net_info.pop('ip_local'),
- mac_address=net_info.pop('mac_address'),
- dns_server=net_info.pop('dns_server'),
- defaults=net_info,
- )
- _CURRENT_INTERFACE.save() # populate ABID
- return _CURRENT_INTERFACE
- class InstalledBinaryManager(models.Manager):
- def get_from_db_or_cache(self, binary: Binary) -> 'InstalledBinary':
- """Get or create an InstalledBinary record for a Binary on the local machine"""
-
- global _CURRENT_BINARIES
- cached_binary = _CURRENT_BINARIES.get(binary.name)
- if cached_binary:
- expires_at = cached_binary.modified_at + timedelta(seconds=INSTALLED_BINARY_RECHECK_INTERVAL)
- if timezone.now() < expires_at:
- is_loaded = binary.abspath and binary.version and binary.sha256
- if is_loaded:
- # if the caller took did the (expensive) job of loading the binary from the filesystem already
- # then their in-memory version is certainly more up-to-date than any potential cached version
- # use this opportunity to invalidate the cache in case if anything has changed
- is_different_from_cache = (
- binary.abspath != cached_binary.abspath
- or binary.version != cached_binary.version
- or binary.sha256 != cached_binary.sha256
- )
- if is_different_from_cache:
- _CURRENT_BINARIES.pop(binary.name)
- else:
- return cached_binary
- else:
- # if they have not yet loaded the binary
- # but our cache is recent enough and not expired, assume cached version is good enough
- # it will automatically reload when the cache expires
- # cached_binary will be stale/bad for up to 30min if binary was updated/removed on host system
- return cached_binary
- else:
- # cached binary is too old, reload it from scratch
- _CURRENT_BINARIES.pop(binary.name)
-
- if not binary.abspath or not binary.version or not binary.sha256:
- # if binary was not yet loaded from filesystem, do it now
- # this is expensive, we have to find it's abspath, version, and sha256, but it's necessary
- # to make sure we have a good, up-to-date record of it in the DB & in-memroy cache
- binary = archivebox.pm.hook.binary_load(binary=binary, fresh=True)
- assert binary.loaded_binprovider and binary.loaded_abspath and binary.loaded_version and binary.loaded_sha256, f'Failed to load binary {binary.name} abspath, version, and sha256'
-
- _CURRENT_BINARIES[binary.name], _created = self.update_or_create(
- machine=Machine.objects.current(),
- name=binary.name,
- binprovider=binary.loaded_binprovider.name,
- version=str(binary.loaded_version),
- abspath=str(binary.loaded_abspath),
- sha256=str(binary.loaded_sha256),
- )
- cached_binary = _CURRENT_BINARIES[binary.name]
- cached_binary.save() # populate ABID
-
- # if we get this far make sure DB record matches in-memroy cache
- assert str(cached_binary.binprovider) == str(binary.loaded_binprovider.name)
- assert str(cached_binary.abspath) == str(binary.loaded_abspath)
- assert str(cached_binary.version) == str(binary.loaded_version)
- assert str(cached_binary.sha256) == str(binary.loaded_sha256)
-
- return cached_binary
-
- class InstalledBinary(ABIDModel, ModelWithHealthStats):
- abid_prefix = 'bin_'
- abid_ts_src = 'self.machine.created_at'
- abid_uri_src = 'self.machine.guid'
- abid_subtype_src = 'self.binprovider'
- abid_rand_src = 'self.id'
- abid_drift_allowed = False
-
- read_only_fields = ('id', 'abid', 'created_at', 'machine', 'name', 'binprovider', 'abspath', 'version', 'sha256')
-
- id = models.UUIDField(primary_key=True, default=None, null=False, editable=False, unique=True, verbose_name='ID')
- abid = ABIDField(prefix=abid_prefix)
- created_at = AutoDateTimeField(default=None, null=False, db_index=True)
- modified_at = models.DateTimeField(auto_now=True)
-
- # IMMUTABLE PROPERTIES
- machine = models.ForeignKey(Machine, on_delete=models.CASCADE, default=None, null=False, blank=True)
- name = models.CharField(max_length=63, default=None, null=False, blank=True)
- binprovider = models.CharField(max_length=31, default=None, null=False, blank=True)
- abspath = models.CharField(max_length=255, default=None, null=False, blank=True)
- version = models.CharField(max_length=32, default=None, null=False, blank=True)
- sha256 = models.CharField(max_length=64, default=None, null=False, blank=True)
-
- # MUTABLE PROPERTIES (TODO)
- # is_pinned = models.BooleanField(default=False) # i.e. should this binary superceede other binaries with the same name on the host?
- # is_valid = models.BooleanField(default=True) # i.e. is this binary still available on the host?
-
- # STATS COUNTERS (inherited from ModelWithHealthStats)
- # num_uses_failed = models.PositiveIntegerField(default=0)
- # num_uses_succeeded = models.PositiveIntegerField(default=0)
-
- objects: InstalledBinaryManager = InstalledBinaryManager()
-
- class Meta:
- verbose_name = 'Installed Binary'
- verbose_name_plural = 'Installed Binaries'
- unique_together = (
- ('machine', 'name', 'abspath', 'version', 'sha256'),
- )
- def __str__(self) -> str:
- return f'{self.name}@{self.binprovider}+{self.abspath}@{self.version}'
-
- def clean(self, *args, **kwargs) -> None:
- assert self.name or self.abspath
- self.name = str(self.name or self.abspath)
- assert self.name
- if not hasattr(self, 'machine'):
- self.machine = Machine.objects.current()
- if not self.binprovider:
- all_known_binproviders = list(abx.as_dict(archivebox.pm.hook.get_BINPROVIDERS()).values())
- binary = archivebox.pm.hook.binary_load(binary=Binary(name=self.name, binproviders=all_known_binproviders), fresh=True)
- self.binprovider = binary.loaded_binprovider.name if binary.loaded_binprovider else None
- if not self.abspath:
- self.abspath = self.BINPROVIDER.get_abspath(self.name)
- if not self.version:
- self.version = self.BINPROVIDER.get_version(self.name, abspath=self.abspath)
- if not self.sha256:
- self.sha256 = self.BINPROVIDER.get_sha256(self.name, abspath=self.abspath)
-
- super().clean(*args, **kwargs)
- @cached_property
- def BINARY(self) -> Binary:
- for binary in abx.as_dict(archivebox.pm.hook.get_BINARIES()).values():
- if binary.name == self.name:
- return binary
- raise Exception(f'Orphaned InstalledBinary {self.name} {self.binprovider} was found in DB, could not find any plugin that defines it')
- # TODO: we could technically reconstruct it from scratch, but why would we ever want to do that?
- @cached_property
- def BINPROVIDER(self) -> BinProvider:
- for binprovider in abx.as_dict(archivebox.pm.hook.get_BINPROVIDERS()).values():
- if binprovider.name == self.binprovider:
- return binprovider
- raise Exception(f'Orphaned InstalledBinary(name={self.name}) was found in DB, could not find any plugin that defines BinProvider(name={self.binprovider})')
- # maybe not a good idea to provide this? Binary in DB is a record of the binary's config
- # whereas a loaded binary is a not-yet saved instance that may not have the same config
- # why would we want to load a binary record from the db when it could be freshly loaded?
- def load_from_db(self) -> Binary:
- # TODO: implement defaults arg in abx_pkg
- # return self.BINARY.load(defaults={
- # 'binprovider': self.BINPROVIDER,
- # 'abspath': Path(self.abspath),
- # 'version': self.version,
- # 'sha256': self.sha256,
- # })
-
- return Binary.model_validate({
- **self.BINARY.model_dump(),
- 'abspath': self.abspath and Path(self.abspath),
- 'version': self.version,
- 'sha256': self.sha256,
- 'loaded_binprovider': self.BINPROVIDER,
- 'binproviders_supported': self.BINARY.binproviders_supported,
- 'overrides': self.BINARY.overrides,
- })
- def load_fresh(self) -> Binary:
- return archivebox.pm.hook.binary_load(binary=self.BINARY, fresh=True)
- def spawn_process(proc_id: str):
- proc = Process.objects.get(id=proc_id)
- proc.spawn()
-
- class ProcessManager(models.Manager):
- pass
- class ProcessQuerySet(models.QuerySet):
- """
- Enhanced QuerySet for Process model, usage:
- Process.objects.queued() -> QuerySet[Process] [Process(pid=None, returncode=None), Process(pid=None, returncode=None)]
- Process.objects.running() -> QuerySet[Process] [Process(pid=123, returncode=None), Process(pid=456, returncode=None)]
- Process.objects.exited() -> QuerySet[Process] [Process(pid=789, returncode=0), Process(pid=101, returncode=1)]
- Process.objects.running().pids() -> [456]
- Process.objects.kill() -> 1
- """
-
- def queued(self):
- return self.filter(pid__isnull=True, returncode__isnull=True)
-
- def running(self):
- return self.filter(pid__isnull=False, returncode__isnull=True)
-
- def exited(self):
- return self.filter(returncode__isnull=False)
-
- def kill(self):
- total_killed = 0
- for proc in self.running():
- proc.kill()
- total_killed += 1
- return total_killed
-
- def pids(self):
- return self.values_list('pid', flat=True)
- class Process(ABIDModel):
- abid_prefix = 'pid_'
- abid_ts_src = 'self.created_at'
- abid_uri_src = 'self.cmd'
- abid_subtype_src = 'self.actor_type or "00"'
- abid_rand_src = 'self.id'
- abid_drift_allowed = False
-
- read_only_fields = ('id', 'abid', 'created_at', 'cmd', 'cwd', 'actor_type', 'timeout')
-
- id = models.UUIDField(primary_key=True, default=None, null=False, editable=False, unique=True, verbose_name='ID')
- abid = ABIDField(prefix=abid_prefix)
-
- # immutable state
- cmd = models.JSONField(default=list) # shell argv
- cwd = models.CharField(max_length=255) # working directory
- actor_type = models.CharField(max_length=255, null=True) # python ActorType that this process is running
- timeout = models.PositiveIntegerField(null=True, default=None) # seconds to wait before killing the process if it's still running
-
- created_at = models.DateTimeField(null=False, default=timezone.now, editable=False)
- modified_at = models.DateTimeField(null=False, default=timezone.now, editable=False)
- # mutable fields
- machine = models.ForeignKey(Machine, on_delete=models.CASCADE)
- pid = models.IntegerField(null=True)
- launched_at = models.DateTimeField(null=True)
- finished_at = models.DateTimeField(null=True)
- returncode = models.IntegerField(null=True)
- stdout = models.TextField(default='', null=False)
- stderr = models.TextField(default='', null=False)
- machine_id: str
- # optional mutable state that can be used to trace what the process is doing
- # active_event = models.ForeignKey('Event', null=True, on_delete=models.SET_NULL)
-
- emitted_events: models.RelatedManager['Event']
- claimed_events: models.RelatedManager['Event']
-
- objects: ProcessManager = ProcessManager.from_queryset(ProcessQuerySet)()
- @classmethod
- def current(cls) -> 'Process':
- proc_id = os.environ.get('PROCESS_ID', '').strip()
- if not proc_id:
- proc = cls.objects.create(
- cmd=sys.argv,
- cwd=os.getcwd(),
- actor_type=None,
- timeout=None,
- machine=Machine.objects.current(),
- pid=os.getpid(),
- launched_at=timezone.now(),
- finished_at=None,
- returncode=None,
- stdout='',
- stderr='',
- )
- os.environ['PROCESS_ID'] = str(proc.id)
- return proc
-
- proc = cls.objects.get(id=proc_id)
- if proc.pid:
- assert os.getpid() == proc.pid, f'Process ID mismatch: {proc.pid} != {os.getpid()}'
- else:
- proc.pid = os.getpid()
- proc.machine = Machine.current()
- proc.cwd = os.getcwd()
- proc.cmd = sys.argv
- proc.launched_at = proc.launched_at or timezone.now()
- proc.save()
-
- return proc
- @classmethod
- def create_and_fork(cls, **kwargs):
- proc = cls.objects.create(**kwargs)
- proc.fork()
- return proc
- def fork(self):
- if self.pid:
- raise Exception(f'Process is already running, cannot fork again: {self}')
-
- # fork the process in the background
- multiprocessing.Process(target=spawn_process, args=(self.id,)).start()
- def spawn(self):
- if self.pid:
- raise Exception(f'Process already running, cannot spawn again: {self}')
-
- # spawn the process in the foreground and block until it exits
- proc = subprocess.Popen(self.cmd, cwd=self.cwd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
- self.pid = proc.pid
- self.launched_at = timezone.now()
- self.save()
- # Event.dispatch('PROC_UPDATED', {'process_id': self.id})
-
- # block until the process exits
- proc.wait()
- self.finished_at = timezone.now()
- self.returncode = proc.returncode
- self.stdout = proc.stdout.read()
- self.stderr = proc.stderr.read()
- self.pid = None
- self.save()
- # Event.dispatch('PROC_UPDATED', {'process_id': self.id})
-
- def kill(self):
- if not self.is_running: return
- assert self.machine == Machine.current(), f'Cannot kill actor on another machine: {self.machine_id} != {Machine.current().id}'
-
- os.kill(self.pid, signal.SIGKILL)
- self.pid = None
- self.save()
- # Event.dispatch('PROC_UPDATED', {'process_id': self.id})
- @property
- def is_pending(self):
- return (self.pid is None) and (self.returncode is None)
- @property
- def is_running(self):
- return (self.pid is not None) and (self.returncode is None)
-
- @property
- def is_failed(self):
- return self.returncode not in (None, 0)
-
- @property
- def is_succeeded(self):
- return self.returncode == 0
-
- # @property
- # def is_idle(self):
- # if not self.actor_type:
- # raise Exception(f'Process {self.id} has no actor_type set, can only introspect active events if Process.actor_type is set to the Actor its running')
- # return self.active_event is None
|