server.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393
  1. __package__ = 'archivebox.mcp'
  2. """
  3. Model Context Protocol (MCP) server implementation for ArchiveBox.
  4. Dynamically exposes all ArchiveBox CLI commands as MCP tools by introspecting
  5. Click command metadata. Handles JSON-RPC 2.0 requests over stdio transport.
  6. """
  7. import sys
  8. import json
  9. import traceback
  10. from typing import Any, Dict, List, Optional
  11. from io import StringIO
  12. from contextlib import redirect_stdout, redirect_stderr
  13. import click
  14. from click.testing import CliRunner
  15. from archivebox.config.version import VERSION
  16. class MCPJSONEncoder(json.JSONEncoder):
  17. """Custom JSON encoder that handles Click sentinel values and other special types"""
  18. def default(self, obj):
  19. # Handle Click's sentinel values
  20. if hasattr(click, 'core') and hasattr(click.core, '_SentinelClass'):
  21. if isinstance(obj, click.core._SentinelClass):
  22. return None
  23. # Handle tuples (convert to lists)
  24. if isinstance(obj, tuple):
  25. return list(obj)
  26. # Handle any other non-serializable objects
  27. try:
  28. return super().default(obj)
  29. except TypeError:
  30. return str(obj)
  31. # Type mapping from Click types to JSON Schema types
  32. def click_type_to_json_schema_type(click_type) -> dict:
  33. """Convert a Click parameter type to JSON Schema type definition"""
  34. if isinstance(click_type, click.types.StringParamType):
  35. return {"type": "string"}
  36. elif isinstance(click_type, click.types.IntParamType):
  37. return {"type": "integer"}
  38. elif isinstance(click_type, click.types.FloatParamType):
  39. return {"type": "number"}
  40. elif isinstance(click_type, click.types.BoolParamType):
  41. return {"type": "boolean"}
  42. elif isinstance(click_type, click.types.Choice):
  43. return {"type": "string", "enum": click_type.choices}
  44. elif isinstance(click_type, click.types.Path):
  45. return {"type": "string", "description": "File or directory path"}
  46. elif isinstance(click_type, click.types.File):
  47. return {"type": "string", "description": "File path"}
  48. elif isinstance(click_type, click.types.Tuple):
  49. # Multiple arguments of same type
  50. return {"type": "array", "items": {"type": "string"}}
  51. else:
  52. # Default to string for unknown types
  53. return {"type": "string"}
  54. def click_command_to_mcp_tool(cmd_name: str, click_command: click.Command) -> dict:
  55. """
  56. Convert a Click command to an MCP tool definition with JSON Schema.
  57. Introspects the Click command's parameters to automatically generate
  58. the input schema without manual definition.
  59. """
  60. properties = {}
  61. required = []
  62. # Extract parameters from Click command
  63. for param in click_command.params:
  64. # Skip internal parameters
  65. if param.name in ('help', 'version'):
  66. continue
  67. param_schema = click_type_to_json_schema_type(param.type)
  68. # Add description from Click help text
  69. if param.help:
  70. param_schema["description"] = param.help
  71. # Handle default values
  72. if param.default is not None and param.default != ():
  73. param_schema["default"] = param.default
  74. # Handle multiple values (like multiple URLs)
  75. if param.multiple:
  76. properties[param.name] = {
  77. "type": "array",
  78. "items": param_schema,
  79. "description": param_schema.get("description", f"Multiple {param.name} values")
  80. }
  81. else:
  82. properties[param.name] = param_schema
  83. # Mark as required if Click requires it
  84. if param.required:
  85. required.append(param.name)
  86. return {
  87. "name": cmd_name,
  88. "description": click_command.help or click_command.short_help or f"Run archivebox {cmd_name} command",
  89. "inputSchema": {
  90. "type": "object",
  91. "properties": properties,
  92. "required": required
  93. }
  94. }
  95. def execute_click_command(cmd_name: str, click_command: click.Command, arguments: dict) -> dict:
  96. """
  97. Execute a Click command programmatically with given arguments.
  98. Returns MCP-formatted result with captured output and error status.
  99. """
  100. # Setup Django for archive commands (commands that need database access)
  101. from archivebox.cli import ArchiveBoxGroup
  102. if cmd_name in ArchiveBoxGroup.archive_commands:
  103. try:
  104. from archivebox.config.django import setup_django
  105. from archivebox.misc.checks import check_data_folder
  106. setup_django()
  107. check_data_folder()
  108. except Exception as e:
  109. # If Django setup fails, return error (unless it's manage/shell which handle this themselves)
  110. if cmd_name not in ('manage', 'shell'):
  111. return {
  112. "content": [{
  113. "type": "text",
  114. "text": f"Error setting up Django: {str(e)}\n\nMake sure you're running the MCP server from inside an ArchiveBox data directory."
  115. }],
  116. "isError": True
  117. }
  118. # Use Click's test runner to invoke command programmatically
  119. runner = CliRunner()
  120. # Build a map of parameter names to their Click types (Argument vs Option)
  121. param_map = {param.name: param for param in click_command.params}
  122. # Convert arguments dict to CLI args list
  123. args = []
  124. positional_args = []
  125. for key, value in arguments.items():
  126. param_name = key.replace('_', '-') # Click uses dashes
  127. param = param_map.get(key)
  128. # Check if this is a positional Argument (not an Option)
  129. is_argument = isinstance(param, click.Argument)
  130. if is_argument:
  131. # Positional arguments - add them without dashes
  132. if isinstance(value, list):
  133. positional_args.extend([str(v) for v in value])
  134. elif value is not None:
  135. positional_args.append(str(value))
  136. else:
  137. # Options - add with dashes
  138. if isinstance(value, bool):
  139. if value:
  140. args.append(f'--{param_name}')
  141. elif isinstance(value, list):
  142. # Multiple values for an option (rare)
  143. for item in value:
  144. args.append(f'--{param_name}')
  145. args.append(str(item))
  146. elif value is not None:
  147. args.append(f'--{param_name}')
  148. args.append(str(value))
  149. # Add positional arguments at the end
  150. args.extend(positional_args)
  151. # Execute the command
  152. try:
  153. result = runner.invoke(click_command, args, catch_exceptions=False)
  154. # Format output as MCP content
  155. content = []
  156. if result.output:
  157. content.append({
  158. "type": "text",
  159. "text": result.output
  160. })
  161. if result.stderr_bytes:
  162. stderr_text = result.stderr_bytes.decode('utf-8', errors='replace')
  163. if stderr_text.strip():
  164. content.append({
  165. "type": "text",
  166. "text": f"[stderr]\n{stderr_text}"
  167. })
  168. # Check exit code
  169. is_error = result.exit_code != 0
  170. if is_error and not content:
  171. content.append({
  172. "type": "text",
  173. "text": f"Command failed with exit code {result.exit_code}"
  174. })
  175. return {
  176. "content": content or [{"type": "text", "text": "(no output)"}],
  177. "isError": is_error
  178. }
  179. except Exception as e:
  180. # Capture any exceptions during execution
  181. error_trace = traceback.format_exc()
  182. return {
  183. "content": [{
  184. "type": "text",
  185. "text": f"Error executing {cmd_name}: {str(e)}\n\n{error_trace}"
  186. }],
  187. "isError": True
  188. }
  189. class MCPServer:
  190. """
  191. Model Context Protocol server for ArchiveBox.
  192. Provides JSON-RPC 2.0 interface over stdio, dynamically exposing
  193. all Click commands as MCP tools.
  194. """
  195. def __init__(self):
  196. # Import here to avoid circular imports
  197. from archivebox.cli import ArchiveBoxGroup
  198. self.cli_group = ArchiveBoxGroup()
  199. self.protocol_version = "2025-11-25"
  200. self._tool_cache = {} # Cache loaded Click commands
  201. def get_click_command(self, cmd_name: str) -> Optional[click.Command]:
  202. """Get a Click command by name, with caching"""
  203. if cmd_name not in self._tool_cache:
  204. if cmd_name not in self.cli_group.all_subcommands:
  205. return None
  206. self._tool_cache[cmd_name] = self.cli_group.get_command(None, cmd_name)
  207. return self._tool_cache[cmd_name]
  208. def handle_initialize(self, params: dict) -> dict:
  209. """Handle MCP initialize request"""
  210. return {
  211. "protocolVersion": self.protocol_version,
  212. "capabilities": {
  213. "tools": {}
  214. },
  215. "serverInfo": {
  216. "name": "archivebox-mcp",
  217. "version": VERSION
  218. }
  219. }
  220. def handle_tools_list(self, params: dict) -> dict:
  221. """Handle MCP tools/list request - returns all available CLI commands as tools"""
  222. tools = []
  223. for cmd_name in self.cli_group.all_subcommands.keys():
  224. click_cmd = self.get_click_command(cmd_name)
  225. if click_cmd:
  226. try:
  227. tool_def = click_command_to_mcp_tool(cmd_name, click_cmd)
  228. tools.append(tool_def)
  229. except Exception as e:
  230. # Log but don't fail - skip problematic commands
  231. print(f"Warning: Could not generate tool for {cmd_name}: {e}", file=sys.stderr)
  232. return {"tools": tools}
  233. def handle_tools_call(self, params: dict) -> dict:
  234. """Handle MCP tools/call request - executes a CLI command"""
  235. tool_name = params.get('name')
  236. arguments = params.get('arguments', {})
  237. if not tool_name:
  238. raise ValueError("Missing required parameter: name")
  239. click_cmd = self.get_click_command(tool_name)
  240. if not click_cmd:
  241. raise ValueError(f"Unknown tool: {tool_name}")
  242. # Execute the command and return MCP-formatted result
  243. return execute_click_command(tool_name, click_cmd, arguments)
  244. def handle_request(self, request: dict) -> dict:
  245. """
  246. Handle a JSON-RPC 2.0 request and return response.
  247. Supports MCP methods: initialize, tools/list, tools/call
  248. """
  249. method = request.get('method')
  250. params = request.get('params', {})
  251. request_id = request.get('id')
  252. try:
  253. # Route to appropriate handler
  254. if method == 'initialize':
  255. result = self.handle_initialize(params)
  256. elif method == 'tools/list':
  257. result = self.handle_tools_list(params)
  258. elif method == 'tools/call':
  259. result = self.handle_tools_call(params)
  260. else:
  261. # Method not found
  262. return {
  263. "jsonrpc": "2.0",
  264. "id": request_id,
  265. "error": {
  266. "code": -32601,
  267. "message": f"Method not found: {method}"
  268. }
  269. }
  270. # Success response
  271. return {
  272. "jsonrpc": "2.0",
  273. "id": request_id,
  274. "result": result
  275. }
  276. except Exception as e:
  277. # Error response
  278. error_trace = traceback.format_exc()
  279. return {
  280. "jsonrpc": "2.0",
  281. "id": request_id,
  282. "error": {
  283. "code": -32603,
  284. "message": str(e),
  285. "data": error_trace
  286. }
  287. }
  288. def run_stdio_server(self):
  289. """
  290. Run the MCP server in stdio mode.
  291. Reads JSON-RPC requests from stdin (one per line),
  292. writes JSON-RPC responses to stdout (one per line).
  293. """
  294. # Read requests from stdin line by line
  295. for line in sys.stdin:
  296. line = line.strip()
  297. if not line:
  298. continue
  299. try:
  300. # Parse JSON-RPC request
  301. request = json.loads(line)
  302. # Handle request
  303. response = self.handle_request(request)
  304. # Write response to stdout (use custom encoder for Click types)
  305. print(json.dumps(response, cls=MCPJSONEncoder), flush=True)
  306. except json.JSONDecodeError as e:
  307. # Invalid JSON
  308. error_response = {
  309. "jsonrpc": "2.0",
  310. "id": None,
  311. "error": {
  312. "code": -32700,
  313. "message": "Parse error",
  314. "data": str(e)
  315. }
  316. }
  317. print(json.dumps(error_response, cls=MCPJSONEncoder), flush=True)
  318. def run_mcp_server():
  319. """Main entry point for MCP server"""
  320. server = MCPServer()
  321. server.run_stdio_server()