cherry-studio/src/main/mcpServers/mcp-run-python/prepareEnvCode.ts
LiuVaayne c468c3cfd5
Feat/mcp run python (#6151)
* feat: add MCP Run Python server and integrate Pyodide for executing Python code

* fix: comment out unused polyfill imports in MCP Run Python server
2025-05-20 20:47:30 +08:00

203 lines
6.0 KiB
TypeScript

// DO NOT EDIT THIS FILE DIRECTLY, INSTEAD RUN "deno run build"
export const preparePythonCode = `"""Logic for installing dependencies in Pyodide.
Mostly taken from https://github.com/pydantic/pydantic.run/blob/main/src/frontend/src/prepare_env.py
"""
from __future__ import annotations as _annotations
import importlib
import logging
import re
import sys
import traceback
from collections.abc import Iterable, Iterator
from contextlib import contextmanager
from dataclasses import dataclass
from pathlib import Path
from typing import Any, Literal, TypedDict
import micropip
import pyodide_js
import tomllib
from pyodide.code import find_imports
__all__ = 'prepare_env', 'dump_json'
class File(TypedDict):
name: str
content: str
active: bool
@dataclass
class Success:
dependencies: list[str] | None
kind: Literal['success'] = 'success'
@dataclass
class Error:
message: str
kind: Literal['error'] = 'error'
async def prepare_env(files: list[File]) -> Success | Error:
sys.setrecursionlimit(400)
cwd = Path.cwd()
for file in files:
(cwd / file['name']).write_text(file['content'])
active: File | None = next((f for f in files if f['active']), None)
dependencies: list[str] | None = None
if active:
python_code = active['content']
dependencies = _find_pep723_dependencies(python_code)
if dependencies is None:
dependencies = await _find_import_dependencies(python_code)
if dependencies:
dependencies = _add_extra_dependencies(dependencies)
with _micropip_logging() as logs_filename:
try:
await micropip.install(dependencies, keep_going=True)
importlib.invalidate_caches()
except Exception:
with open(logs_filename) as f:
logs = f.read()
return Error(message=f'{logs} {traceback.format_exc()}')
return Success(dependencies=dependencies)
def dump_json(value: Any) -> str | None:
from pydantic_core import to_json
if value is None:
return None
if isinstance(value, str):
return value
else:
return to_json(value, indent=2, fallback=_json_fallback).decode()
def _json_fallback(value: Any) -> Any:
tp: Any = type(value)
module = tp.__module__
if module == 'numpy':
if tp.__name__ in {'ndarray', 'matrix'}:
return value.tolist()
else:
return value.item()
elif module == 'pyodide.ffi':
return value.to_py()
else:
return repr(value)
def _add_extra_dependencies(dependencies: list[str]) -> list[str]:
"""Add extra dependencies we know some packages need.
Workaround for micropip not installing some required transitive dependencies.
See https://github.com/pyodide/micropip/issues/204
pygments seems to be required to get rich to work properly, ssl is required for FastAPI and HTTPX,
pydantic_ai requires newest typing_extensions.
"""
extras: list[str] = []
for d in dependencies:
if d.startswith(('logfire', 'rich')):
extras.append('pygments')
elif d.startswith(('fastapi', 'httpx', 'pydantic_ai')):
extras.append('ssl')
if d.startswith('pydantic_ai'):
extras.append('typing_extensions>=4.12')
if len(extras) == 3:
break
return dependencies + extras
@contextmanager
def _micropip_logging() -> Iterator[str]:
from micropip import logging as micropip_logging
micropip_logging.setup_logging()
logger = logging.getLogger('micropip')
logger.handlers.clear()
logger.setLevel(logging.INFO)
file_name = 'micropip.log'
handler = logging.FileHandler(file_name)
handler.setLevel(logging.INFO)
handler.setFormatter(logging.Formatter('%(message)s'))
logger.addHandler(handler)
try:
yield file_name
finally:
logger.removeHandler(handler)
def _find_pep723_dependencies(code: str) -> list[str] | None:
"""Extract dependencies from a script with PEP 723 metadata."""
metadata = _read_pep723_metadata(code)
dependencies: list[str] | None = metadata.get('dependencies')
if dependencies is None:
return None
else:
assert isinstance(dependencies, list), 'dependencies must be a list'
assert all(isinstance(dep, str) for dep in dependencies), 'dependencies must be a list of strings'
return dependencies
def _read_pep723_metadata(code: str) -> dict[str, Any]:
"""Read PEP 723 script metadata.
Copied from https://packaging.python.org/en/latest/specifications/inline-script-metadata/#reference-implementation
"""
name = 'script'
magic_comment_regex = r'(?m)^# /// (?P<type>[a-zA-Z0-9-]+)$\\s(?P<content>(^#(| .*)$\\s)+)^# ///$'
matches = list(filter(lambda m: m.group('type') == name, re.finditer(magic_comment_regex, code)))
if len(matches) > 1:
raise ValueError(f'Multiple {name} blocks found')
elif len(matches) == 1:
content = ''.join(
line[2:] if line.startswith('# ') else line[1:]
for line in matches[0].group('content').splitlines(keepends=True)
)
return tomllib.loads(content)
else:
return {}
async def _find_import_dependencies(code: str) -> list[str] | None:
"""Find dependencies in imports."""
try:
imports: list[str] = find_imports(code)
except SyntaxError:
return None
else:
return list(_find_imports_to_install(imports))
TO_PACKAGE_NAME: dict[str, str] = pyodide_js._api._import_name_to_package_name.to_py() # pyright: ignore[reportPrivateUsage]
def _find_imports_to_install(imports: list[str]) -> Iterable[str]:
"""Given a list of module names being imported, return packages that are not installed."""
for module in imports:
try:
importlib.import_module(module)
except ModuleNotFoundError:
if package_name := TO_PACKAGE_NAME.get(module):
yield package_name
elif '.' not in module:
yield module
`