diff --git a/src/mcp/server/fastmcp/utilities/func_metadata.py b/src/mcp/server/fastmcp/utilities/func_metadata.py index fa443d2fcb..f35d622f96 100644 --- a/src/mcp/server/fastmcp/utilities/func_metadata.py +++ b/src/mcp/server/fastmcp/utilities/func_metadata.py @@ -14,6 +14,7 @@ WithJsonSchema, create_model, ) +from pydantic.errors import PydanticSchemaGenerationError from pydantic.fields import FieldInfo from pydantic.json_schema import GenerateJsonSchema, JsonSchemaWarningKind from typing_extensions import is_typeddict @@ -349,74 +350,92 @@ def _try_create_model_and_schema( model = None wrap_output = False - # First handle special case: None - if type_expr is None: - model = _create_wrapped_model(func_name, original_annotation) - wrap_output = True - - # Handle GenericAlias types (list[str], dict[str, int], Union[str, int], etc.) - elif isinstance(type_expr, GenericAlias): - origin = get_origin(type_expr) - - # Special case: dict with string keys can use RootModel - if origin is dict: - args = get_args(type_expr) - if len(args) == 2 and args[0] is str: - # TODO: should we use the original annotation? We are loosing any potential `Annotated` - # metadata for Pydantic here: - model = _create_dict_model(func_name, type_expr) + try: + # First handle special case: None + if type_expr is None: + model = _create_wrapped_model(func_name, original_annotation) + wrap_output = True + + # Handle GenericAlias types (list[str], dict[str, int], Union[str, int], etc.) + elif isinstance(type_expr, GenericAlias): + origin = get_origin(type_expr) + + # Special case: dict with string keys can use RootModel + if origin is dict: + args = get_args(type_expr) + if len(args) == 2 and args[0] is str: + # TODO: should we use the original annotation? We are loosing any potential `Annotated` + # metadata for Pydantic here: + model = _create_dict_model(func_name, original_annotation) + else: + # dict with non-str keys needs wrapping + model = _create_wrapped_model(func_name, original_annotation) + wrap_output = True else: - # dict with non-str keys needs wrapping + # All other generic types need wrapping (list, tuple, Union, Optional, etc.) model = _create_wrapped_model(func_name, original_annotation) wrap_output = True - else: - # All other generic types need wrapping (list, tuple, Union, Optional, etc.) - model = _create_wrapped_model(func_name, original_annotation) - wrap_output = True - # Handle regular type objects - elif isinstance(type_expr, type): - type_annotation = cast(type[Any], type_expr) + # Handle regular type objects + elif isinstance(type_expr, type): + type_annotation = cast(type[Any], type_expr) - # Case 1: BaseModel subclasses (can be used directly) - if issubclass(type_annotation, BaseModel): - model = type_annotation + # Case 1: BaseModel subclasses (can be used directly) + if issubclass(type_annotation, BaseModel): + model = type_annotation - # Case 2: TypedDicts: - elif is_typeddict(type_annotation): - model = _create_model_from_typeddict(type_annotation) + # Case 2: TypedDicts: + elif is_typeddict(type_annotation): + model = _create_model_from_typeddict(type_annotation) - # Case 3: Primitive types that need wrapping - elif type_annotation in (str, int, float, bool, bytes, type(None)): - model = _create_wrapped_model(func_name, original_annotation) - wrap_output = True + # Case 3: Primitive types that need wrapping + elif type_annotation in (str, int, float, bool, bytes, type(None)): + model = _create_wrapped_model(func_name, original_annotation) + wrap_output = True + + # Case 4: Other class types (dataclasses, regular classes with annotations) + else: + type_hints = get_type_hints(type_annotation) + if type_hints: + # Classes with type hints can be converted to Pydantic models + model = _create_model_from_class(type_annotation, type_hints) + # Classes without type hints are not serializable - model remains None - # Case 4: Other class types (dataclasses, regular classes with annotations) + # Handle any other types not covered above else: - type_hints = get_type_hints(type_annotation) - if type_hints: - # Classes with type hints can be converted to Pydantic models - model = _create_model_from_class(type_annotation, type_hints) - # Classes without type hints are not serializable - model remains None + # This includes typing constructs that aren't GenericAlias in Python 3.10 + # (e.g., Union, Optional in some Python versions) + model = _create_wrapped_model(func_name, original_annotation) + wrap_output = True - # Handle any other types not covered above - else: - # This includes typing constructs that aren't GenericAlias in Python 3.10 - # (e.g., Union, Optional in some Python versions) - model = _create_wrapped_model(func_name, original_annotation) - wrap_output = True + except ( + TypeError, + ValueError, + pydantic_core.SchemaError, + pydantic_core.ValidationError, + PydanticSchemaGenerationError, + ) as e: + logger.info(f"Cannot create model for type {type_expr} in {func_name}: {type(e).__name__}: {e}") + return None, None, False if model: # If we successfully created a model, try to get its schema # Use StrictJsonSchema to raise exceptions instead of warnings try: schema = model.model_json_schema(schema_generator=StrictJsonSchema) - except (TypeError, ValueError, pydantic_core.SchemaError, pydantic_core.ValidationError) as e: + except ( + TypeError, + ValueError, + pydantic_core.SchemaError, + pydantic_core.ValidationError, + PydanticSchemaGenerationError, + ) as e: # These are expected errors when a type can't be converted to a Pydantic schema # TypeError: When Pydantic can't handle the type # ValueError: When there are issues with the type definition (including our custom warnings) # SchemaError: When Pydantic can't build a schema # ValidationError: When validation fails + # PydanticSchemaGenerationError: When pydantic-core cannot generate a schema for a type logger.info(f"Cannot create schema for type {type_expr} in {func_name}: {type(e).__name__}: {e}") return None, None, False diff --git a/tests/client/test_stdio.py b/tests/client/test_stdio.py index ba58da7321..c0d7c16015 100644 --- a/tests/client/test_stdio.py +++ b/tests/client/test_stdio.py @@ -305,8 +305,12 @@ async def test_basic_child_process_cleanup(self): # Verify child is writing if os.path.exists(marker_file): # pragma: no branch initial_size = os.path.getsize(marker_file) - await anyio.sleep(0.3) - size_after_wait = os.path.getsize(marker_file) + size_after_wait = initial_size + for _ in range(10): # pragma: no branch + await anyio.sleep(0.2) + size_after_wait = os.path.getsize(marker_file) + if size_after_wait > initial_size: # pragma: no branch + break assert size_after_wait > initial_size, "Child process should be writing" print(f"Child is writing (file grew from {initial_size} to {size_after_wait} bytes)") diff --git a/tests/issues/test_1338_icons_and_metadata.py b/tests/issues/test_1338_icons_and_metadata.py index adc37f1c6e..32142f3ab7 100644 --- a/tests/issues/test_1338_icons_and_metadata.py +++ b/tests/issues/test_1338_icons_and_metadata.py @@ -1,11 +1,18 @@ """Test icon and metadata support (SEP-973).""" +import sys + import pytest from mcp.server.fastmcp import FastMCP from mcp.types import Icon -pytestmark = pytest.mark.anyio +pytestmark = [ + pytest.mark.anyio, + pytest.mark.filterwarnings( + "ignore::pytest.PytestUnraisableExceptionWarning" if sys.platform == "win32" else "default" + ), +] async def test_icons_and_website_url(): diff --git a/tests/server/fastmcp/test_func_metadata.py b/tests/server/fastmcp/test_func_metadata.py index 61e524290e..ddffefcfc1 100644 --- a/tests/server/fastmcp/test_func_metadata.py +++ b/tests/server/fastmcp/test_func_metadata.py @@ -243,6 +243,16 @@ def func_dict_int_key() -> dict[int, str]: # pragma: no cover assert meta.output_schema is not None assert "result" in meta.output_schema["properties"] + # Test Annotated dict[str, int] with Field metadata on the root type + def func_dict_annotated() -> Annotated[dict[str, int], Field(description="User scores")]: # pragma: no cover + return {"alice": 10, "bob": 20} + + meta = func_metadata(func_dict_annotated) + assert meta.output_schema is not None + assert meta.output_schema["type"] == "object" + assert meta.output_schema["title"] == "func_dict_annotatedDictOutput" + assert meta.output_schema.get("description") == "User scores" + @pytest.mark.anyio async def test_lambda_function(): diff --git a/tests/server/fastmcp/test_server.py b/tests/server/fastmcp/test_server.py index 3935f3bd13..6a1863b90c 100644 --- a/tests/server/fastmcp/test_server.py +++ b/tests/server/fastmcp/test_server.py @@ -450,7 +450,7 @@ async def test_tool_mixed_list_with_audio_and_image(self, tmp_path: Path): # TODO(Marcelo): It seems if we add the proper type hint, it generates an invalid JSON schema. # We need to fix this. - def mixed_list_fn() -> list: # type: ignore + def mixed_list_fn() -> list[str | Image | Audio | dict[str, str] | TextContent]: # type: ignore return [ # type: ignore "text message", Image(image_path),