supervisord_util.py 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475
  1. __package__ = 'archivebox.workers'
  2. import sys
  3. import time
  4. import signal
  5. import psutil
  6. import shutil
  7. import subprocess
  8. from typing import Dict, cast, Iterator
  9. from pathlib import Path
  10. from functools import cache
  11. from rich import print
  12. from supervisor.xmlrpc import SupervisorTransport
  13. from xmlrpc.client import ServerProxy
  14. from archivebox.config import CONSTANTS
  15. from archivebox.config.paths import get_or_create_working_tmp_dir
  16. from archivebox.config.permissions import ARCHIVEBOX_USER
  17. from archivebox.misc.logging import STDERR
  18. from archivebox.misc.logging_util import pretty_path
  19. LOG_FILE_NAME = "supervisord.log"
  20. CONFIG_FILE_NAME = "supervisord.conf"
  21. PID_FILE_NAME = "supervisord.pid"
  22. WORKERS_DIR_NAME = "workers"
  23. SCHEDULER_WORKER = {
  24. "name": "worker_scheduler",
  25. "command": "archivebox manage djangohuey --queue system_tasks -w 4 -k thread --disable-health-check --flush-locks",
  26. "autostart": "true",
  27. "autorestart": "true",
  28. "stdout_logfile": "logs/worker_scheduler.log",
  29. "redirect_stderr": "true",
  30. }
  31. COMMAND_WORKER = {
  32. "name": "worker_commands",
  33. "command": "archivebox manage djangohuey --queue commands -w 4 -k thread --no-periodic --disable-health-check",
  34. "autostart": "true",
  35. "autorestart": "true",
  36. "stdout_logfile": "logs/worker_commands.log",
  37. "redirect_stderr": "true",
  38. }
  39. ORCHESTRATOR_WORKER = {
  40. "name": "worker_orchestrator",
  41. "command": "archivebox manage orchestrator",
  42. "autostart": "true",
  43. "autorestart": "true",
  44. "stdout_logfile": "logs/worker_orchestrator.log",
  45. "redirect_stderr": "true",
  46. }
  47. SERVER_WORKER = lambda host, port: {
  48. "name": "worker_daphne",
  49. "command": f"daphne --bind={host} --port={port} --application-close-timeout=600 archivebox.core.asgi:application",
  50. "autostart": "false",
  51. "autorestart": "true",
  52. "stdout_logfile": "logs/worker_daphne.log",
  53. "redirect_stderr": "true",
  54. }
  55. @cache
  56. def get_sock_file():
  57. """Get the path to the supervisord socket file, symlinking to a shorter path if needed due to unix path length limits"""
  58. TMP_DIR = get_or_create_working_tmp_dir(autofix=True, quiet=False)
  59. assert TMP_DIR, "Failed to find or create a writable TMP_DIR!"
  60. socket_file = TMP_DIR / "supervisord.sock"
  61. return socket_file
  62. def follow(file, sleep_sec=0.1) -> Iterator[str]:
  63. """ Yield each line from a file as they are written.
  64. `sleep_sec` is the time to sleep after empty reads. """
  65. line = ''
  66. while True:
  67. tmp = file.readline()
  68. if tmp is not None and tmp != "":
  69. line += tmp
  70. if line.endswith("\n"):
  71. yield line
  72. line = ''
  73. elif sleep_sec:
  74. time.sleep(sleep_sec)
  75. def create_supervisord_config():
  76. SOCK_FILE = get_sock_file()
  77. WORKERS_DIR = SOCK_FILE.parent / WORKERS_DIR_NAME
  78. CONFIG_FILE = SOCK_FILE.parent / CONFIG_FILE_NAME
  79. PID_FILE = SOCK_FILE.parent / PID_FILE_NAME
  80. LOG_FILE = CONSTANTS.LOGS_DIR / LOG_FILE_NAME
  81. config_content = f"""
  82. [supervisord]
  83. nodaemon = true
  84. environment = IS_SUPERVISORD_PARENT="true"
  85. pidfile = {PID_FILE}
  86. logfile = {LOG_FILE}
  87. childlogdir = {CONSTANTS.LOGS_DIR}
  88. directory = {CONSTANTS.DATA_DIR}
  89. strip_ansi = true
  90. nocleanup = true
  91. user = {ARCHIVEBOX_USER}
  92. [unix_http_server]
  93. file = {SOCK_FILE}
  94. chmod = 0700
  95. [supervisorctl]
  96. serverurl = unix://{SOCK_FILE}
  97. [rpcinterface:supervisor]
  98. supervisor.rpcinterface_factory = supervisor.rpcinterface:make_main_rpcinterface
  99. [include]
  100. files = {WORKERS_DIR}/*.conf
  101. """
  102. CONFIG_FILE.write_text(config_content)
  103. Path.mkdir(WORKERS_DIR, exist_ok=True, parents=True)
  104. (WORKERS_DIR / 'initial_startup.conf').write_text('') # hides error about "no files found to include" when supervisord starts
  105. def create_worker_config(daemon):
  106. """Create a supervisord worker config file for a given daemon"""
  107. SOCK_FILE = get_sock_file()
  108. WORKERS_DIR = SOCK_FILE.parent / WORKERS_DIR_NAME
  109. Path.mkdir(WORKERS_DIR, exist_ok=True, parents=True)
  110. name = daemon['name']
  111. worker_conf = WORKERS_DIR / f"{name}.conf"
  112. worker_str = f"[program:{name}]\n"
  113. for key, value in daemon.items():
  114. if key == 'name':
  115. continue
  116. worker_str += f"{key}={value}\n"
  117. worker_str += "\n"
  118. worker_conf.write_text(worker_str)
  119. def get_existing_supervisord_process():
  120. SOCK_FILE = get_sock_file()
  121. try:
  122. transport = SupervisorTransport(None, None, f"unix://{SOCK_FILE}")
  123. server = ServerProxy("http://localhost", transport=transport) # user:pass@localhost doesn't work for some reason with unix://.sock, cant seem to silence CRIT no-auth warning
  124. current_state = cast(Dict[str, int | str], server.supervisor.getState())
  125. if current_state["statename"] == "RUNNING":
  126. pid = server.supervisor.getPID()
  127. print(f"[🦸‍♂️] Supervisord connected (pid={pid}) via unix://{pretty_path(SOCK_FILE)}.")
  128. return server.supervisor
  129. except FileNotFoundError:
  130. return None
  131. except Exception as e:
  132. print(f"Error connecting to existing supervisord: {str(e)}")
  133. return None
  134. def stop_existing_supervisord_process():
  135. SOCK_FILE = get_sock_file()
  136. PID_FILE = SOCK_FILE.parent / PID_FILE_NAME
  137. try:
  138. # if pid file exists, load PID int
  139. try:
  140. pid = int(PID_FILE.read_text())
  141. except (FileNotFoundError, ValueError):
  142. return
  143. try:
  144. print(f"[🦸‍♂️] Stopping supervisord process (pid={pid})...")
  145. proc = psutil.Process(pid)
  146. proc.terminate()
  147. proc.wait(timeout=5)
  148. except (BaseException, BrokenPipeError, IOError, KeyboardInterrupt):
  149. pass
  150. finally:
  151. try:
  152. # clear PID file and socket file
  153. PID_FILE.unlink(missing_ok=True)
  154. get_sock_file().unlink(missing_ok=True)
  155. except BaseException:
  156. pass
  157. def start_new_supervisord_process(daemonize=False):
  158. SOCK_FILE = get_sock_file()
  159. WORKERS_DIR = SOCK_FILE.parent / WORKERS_DIR_NAME
  160. LOG_FILE = CONSTANTS.LOGS_DIR / LOG_FILE_NAME
  161. CONFIG_FILE = SOCK_FILE.parent / CONFIG_FILE_NAME
  162. PID_FILE = SOCK_FILE.parent / PID_FILE_NAME
  163. print(f"[🦸‍♂️] Supervisord starting{' in background' if daemonize else ''}...")
  164. pretty_log_path = pretty_path(LOG_FILE)
  165. print(f" > Writing supervisord logs to: {pretty_log_path}")
  166. print(f" > Writing task worker logs to: {pretty_log_path.replace('supervisord.log', 'worker_*.log')}")
  167. print(f' > Using supervisord config file: {pretty_path(CONFIG_FILE)}')
  168. print(f" > Using supervisord UNIX socket: {pretty_path(SOCK_FILE)}")
  169. print()
  170. # clear out existing stale state files
  171. shutil.rmtree(WORKERS_DIR, ignore_errors=True)
  172. PID_FILE.unlink(missing_ok=True)
  173. get_sock_file().unlink(missing_ok=True)
  174. CONFIG_FILE.unlink(missing_ok=True)
  175. # create the supervisord config file
  176. create_supervisord_config()
  177. # Start supervisord
  178. # panel = Panel(f"Starting supervisord with config: {SUPERVISORD_CONFIG_FILE}")
  179. # with Live(panel, refresh_per_second=1) as live:
  180. subprocess.Popen(
  181. f"supervisord --configuration={CONFIG_FILE}",
  182. stdin=None,
  183. shell=True,
  184. start_new_session=daemonize,
  185. )
  186. def exit_signal_handler(signum, frame):
  187. if signum == 2:
  188. STDERR.print("\n[🛑] Got Ctrl+C. Terminating child processes...")
  189. elif signum != 13:
  190. STDERR.print(f"\n[🦸‍♂️] Supervisord got stop signal ({signal.strsignal(signum)}). Terminating child processes...")
  191. stop_existing_supervisord_process()
  192. raise SystemExit(0)
  193. # Monitor for termination signals and cleanup child processes
  194. if not daemonize:
  195. try:
  196. signal.signal(signal.SIGINT, exit_signal_handler)
  197. signal.signal(signal.SIGHUP, exit_signal_handler)
  198. signal.signal(signal.SIGPIPE, exit_signal_handler)
  199. signal.signal(signal.SIGTERM, exit_signal_handler)
  200. except Exception:
  201. # signal handlers only work in main thread
  202. pass
  203. # otherwise supervisord will containue in background even if parent proc is ends (aka daemon mode)
  204. time.sleep(2)
  205. return get_existing_supervisord_process()
  206. def get_or_create_supervisord_process(daemonize=False):
  207. SOCK_FILE = get_sock_file()
  208. WORKERS_DIR = SOCK_FILE.parent / WORKERS_DIR_NAME
  209. supervisor = get_existing_supervisord_process()
  210. if supervisor is None:
  211. stop_existing_supervisord_process()
  212. supervisor = start_new_supervisord_process(daemonize=daemonize)
  213. time.sleep(0.5)
  214. # wait up to 5s in case supervisord is slow to start
  215. if not supervisor:
  216. for _ in range(10):
  217. if supervisor is not None:
  218. print()
  219. break
  220. sys.stdout.write('.')
  221. sys.stdout.flush()
  222. time.sleep(0.5)
  223. supervisor = get_existing_supervisord_process()
  224. else:
  225. print()
  226. assert supervisor, "Failed to start supervisord or connect to it!"
  227. supervisor.getPID() # make sure it doesn't throw an exception
  228. (WORKERS_DIR / 'initial_startup.conf').unlink(missing_ok=True)
  229. return supervisor
  230. def start_worker(supervisor, daemon, lazy=False):
  231. assert supervisor.getPID()
  232. print(f"[🦸‍♂️] Supervisord starting new subprocess worker: {daemon['name']}...")
  233. create_worker_config(daemon)
  234. result = supervisor.reloadConfig()
  235. added, changed, removed = result[0]
  236. # print(f"Added: {added}, Changed: {changed}, Removed: {removed}")
  237. for removed in removed:
  238. supervisor.stopProcessGroup(removed)
  239. supervisor.removeProcessGroup(removed)
  240. for changed in changed:
  241. supervisor.stopProcessGroup(changed)
  242. supervisor.removeProcessGroup(changed)
  243. supervisor.addProcessGroup(changed)
  244. for added in added:
  245. supervisor.addProcessGroup(added)
  246. time.sleep(1)
  247. for _ in range(10):
  248. procs = supervisor.getAllProcessInfo()
  249. for proc in procs:
  250. if proc['name'] == daemon["name"]:
  251. # See process state diagram here: http://supervisord.org/subprocess.html
  252. if proc['statename'] == 'RUNNING':
  253. print(f" - Worker {daemon['name']}: already {proc['statename']} ({proc['description']})")
  254. return proc
  255. else:
  256. if not lazy:
  257. supervisor.startProcessGroup(daemon["name"], True)
  258. proc = supervisor.getProcessInfo(daemon["name"])
  259. print(f" - Worker {daemon['name']}: started {proc['statename']} ({proc['description']})")
  260. return proc
  261. # retry in a second in case it's slow to launch
  262. time.sleep(0.5)
  263. raise Exception(f"Failed to start worker {daemon['name']}! Only found: {procs}")
  264. def get_worker(supervisor, daemon_name):
  265. try:
  266. return supervisor.getProcessInfo(daemon_name)
  267. except Exception:
  268. pass
  269. return None
  270. def stop_worker(supervisor, daemon_name):
  271. proc = get_worker(supervisor, daemon_name)
  272. for _ in range(10):
  273. if not proc:
  274. # worker does not exist (was never running or configured in the first place)
  275. return True
  276. # See process state diagram here: http://supervisord.org/subprocess.html
  277. if proc['statename'] == 'STOPPED':
  278. # worker was configured but has already stopped for some reason
  279. supervisor.removeProcessGroup(daemon_name)
  280. return True
  281. else:
  282. # worker was configured and is running, stop it now
  283. supervisor.stopProcessGroup(daemon_name)
  284. # wait 500ms and then re-check to make sure it's really stopped
  285. time.sleep(0.5)
  286. proc = get_worker(supervisor, daemon_name)
  287. raise Exception(f"Failed to stop worker {daemon_name}!")
  288. def tail_worker_logs(log_path: str):
  289. get_or_create_supervisord_process(daemonize=False)
  290. from rich.live import Live
  291. from rich.table import Table
  292. table = Table()
  293. table.add_column("TS")
  294. table.add_column("URL")
  295. try:
  296. with Live(table, refresh_per_second=1) as live: # update 4 times a second to feel fluid
  297. with open(log_path, 'r') as f:
  298. for line in follow(f):
  299. if '://' in line:
  300. live.console.print(f"Working on: {line.strip()}")
  301. # table.add_row("123124234", line.strip())
  302. except (KeyboardInterrupt, BrokenPipeError, IOError):
  303. STDERR.print("\n[🛑] Got Ctrl+C, stopping gracefully...")
  304. except SystemExit:
  305. pass
  306. def watch_worker(supervisor, daemon_name, interval=5):
  307. """loop continuously and monitor worker's health"""
  308. while True:
  309. proc = get_worker(supervisor, daemon_name)
  310. if not proc:
  311. raise Exception("Worker dissapeared while running! " + daemon_name)
  312. if proc['statename'] == 'STOPPED':
  313. return proc
  314. if proc['statename'] == 'RUNNING':
  315. time.sleep(1)
  316. continue
  317. if proc['statename'] in ('STARTING', 'BACKOFF', 'FATAL', 'EXITED', 'STOPPING'):
  318. print(f'[🦸‍♂️] WARNING: Worker {daemon_name} {proc["statename"]} {proc["description"]}')
  319. time.sleep(interval)
  320. continue
  321. def start_server_workers(host='0.0.0.0', port='8000', daemonize=False):
  322. supervisor = get_or_create_supervisord_process(daemonize=daemonize)
  323. bg_workers = [
  324. SCHEDULER_WORKER,
  325. COMMAND_WORKER,
  326. ORCHESTRATOR_WORKER,
  327. ]
  328. print()
  329. start_worker(supervisor, SERVER_WORKER(host=host, port=port))
  330. print()
  331. for worker in bg_workers:
  332. start_worker(supervisor, worker)
  333. print()
  334. if not daemonize:
  335. try:
  336. watch_worker(supervisor, "worker_daphne")
  337. except (KeyboardInterrupt, BrokenPipeError, IOError):
  338. STDERR.print("\n[🛑] Got Ctrl+C, stopping gracefully...")
  339. except SystemExit:
  340. pass
  341. except BaseException as e:
  342. STDERR.print(f"\n[🛑] Got {e.__class__.__name__} exception, stopping web server gracefully...")
  343. raise
  344. finally:
  345. stop_worker(supervisor, "worker_daphne")
  346. time.sleep(0.5)
  347. def start_cli_workers(watch=False):
  348. supervisor = get_or_create_supervisord_process(daemonize=False)
  349. start_worker(supervisor, COMMAND_WORKER)
  350. start_worker(supervisor, ORCHESTRATOR_WORKER)
  351. if watch:
  352. try:
  353. watch_worker(supervisor, ORCHESTRATOR_WORKER['name'])
  354. except (KeyboardInterrupt, BrokenPipeError, IOError):
  355. STDERR.print("\n[🛑] Got Ctrl+C, stopping gracefully...")
  356. except SystemExit:
  357. pass
  358. except BaseException as e:
  359. STDERR.print(f"\n[🛑] Got {e.__class__.__name__} exception, stopping web server gracefully...")
  360. raise
  361. finally:
  362. stop_worker(supervisor, COMMAND_WORKER['name'])
  363. stop_worker(supervisor, ORCHESTRATOR_WORKER['name'])
  364. time.sleep(0.5)
  365. return [COMMAND_WORKER, ORCHESTRATOR_WORKER]
  366. # def main(daemons):
  367. # supervisor = get_or_create_supervisord_process(daemonize=False)
  368. # worker = start_worker(supervisor, daemons["webworker"])
  369. # pprint(worker)
  370. # print("All processes started in background.")
  371. # Optionally you can block the main thread until an exit signal is received:
  372. # try:
  373. # signal.pause()
  374. # except KeyboardInterrupt:
  375. # pass
  376. # finally:
  377. # stop_existing_supervisord_process()
  378. # if __name__ == "__main__":
  379. # DAEMONS = {
  380. # "webworker": {
  381. # "name": "webworker",
  382. # "command": "python3 -m http.server 9000",
  383. # "directory": str(cwd),
  384. # "autostart": "true",
  385. # "autorestart": "true",
  386. # "stdout_logfile": cwd / "webworker.log",
  387. # "stderr_logfile": cwd / "webworker_error.log",
  388. # },
  389. # }
  390. # main(DAEMONS, cwd)