From 4816bedbdbddc398b138ad1a507a1579d1fbce58 Mon Sep 17 00:00:00 2001 From: Dong Hao Date: Mon, 2 Dec 2024 23:32:48 +0800 Subject: [PATCH 1/6] fix: improve session cleanup and progress handling 1. Add proper session cleanup handling - Track session state with _closed flag - Handle cancellation and cleanup errors gracefully - Skip notification validation during cleanup 2. Improve progress context - Add final_progress method - Send completion notification in finally block - Handle progress cleanup properly This fix addresses issues with session cleanup causing validation errors and improves progress notification reliability. --- src/mcp/shared/progress.py | 24 +++++++++++++++++++----- src/mcp/shared/session.py | 35 +++++++++++++++++++++++++++++------ 2 files changed, 48 insertions(+), 11 deletions(-) diff --git a/src/mcp/shared/progress.py b/src/mcp/shared/progress.py index 19ea5ede2..7dc91034a 100644 --- a/src/mcp/shared/progress.py +++ b/src/mcp/shared/progress.py @@ -1,4 +1,4 @@ -from contextlib import contextmanager +from contextlib import asynccontextmanager from dataclasses import dataclass, field from pydantic import BaseModel @@ -21,15 +21,29 @@ class ProgressContext: current: float = field(default=0.0, init=False) async def progress(self, amount: float) -> None: + """Update progress by the given amount and send notification.""" self.current += amount - await self.session.send_progress_notification( self.progress_token, self.current, total=self.total ) + async def final_progress(self) -> None: + """Send the final progress notification.""" + if self.total is not None and self.current < self.total: + self.current = self.total + await self.session.send_progress_notification( + self.progress_token, self.current, total=self.total + ) + + +@asynccontextmanager +async def progress(ctx: RequestContext, total: float | None = None): + """Context manager for progress tracking and notification. -@contextmanager -def progress(ctx: RequestContext, total: float | None = None): + Args: + ctx: Request context containing the session and progress token + total: Optional total progress amount + """ if ctx.meta is None or ctx.meta.progressToken is None: raise ValueError("No progress token provided") @@ -37,4 +51,4 @@ def progress(ctx: RequestContext, total: float | None = None): try: yield progress_ctx finally: - pass + await progress_ctx.final_progress() \ No newline at end of file diff --git a/src/mcp/shared/session.py b/src/mcp/shared/session.py index 3d3988ce1..a06213f0b 100644 --- a/src/mcp/shared/session.py +++ b/src/mcp/shared/session.py @@ -193,6 +193,7 @@ async def __aexit__(self, exc_type, exc_val, exc_tb): # Using BaseSession as a context manager should not block on exit (this # would be very surprising behavior), so make sure to cancel the tasks # in the task group. + self._closed = True self._task_group.cancel_scope.cancel() return await self._task_group.__aexit__(exc_type, exc_val, exc_tb) @@ -256,12 +257,21 @@ async def send_notification(self, notification: SendNotificationT) -> None: Emits a notification, which is a one-way message that does not expect a response. """ - jsonrpc_notification = JSONRPCNotification( - jsonrpc="2.0", - **notification.model_dump(by_alias=True, mode="json", exclude_none=True), - ) + # Skip sending notifications if the session is closed + if self._closed: + return + + try: + jsonrpc_notification = JSONRPCNotification( + jsonrpc="2.0", + **notification.model_dump(by_alias=True, mode="json", exclude_none=True), + ) - await self._write_stream.send(JSONRPCMessage(jsonrpc_notification)) + await self._write_stream.send(JSONRPCMessage(jsonrpc_notification)) + except Exception: + # Ignore notification send errors during session cleanup + if not self._closed: + raise async def _send_response( self, request_id: RequestId, response: SendResultT | ErrorData @@ -279,6 +289,19 @@ async def _send_response( ) await self._write_stream.send(JSONRPCMessage(jsonrpc_response)) + def _should_validate_notification(self, message_root: JSONRPCNotification) -> bool: + """ + Determines if a notification should be validated. + Internal notifications (like cancelled) should be ignored. + """ + try: + return ( + getattr(message_root, "method", None) != "cancelled" and + not self._closed + ) + except: + return False + async def _receive_loop(self) -> None: async with ( self._read_stream, @@ -378,4 +401,4 @@ def incoming_messages( | ReceiveNotificationT | Exception ]: - return self._incoming_message_stream_reader + return self._incoming_message_stream_reader \ No newline at end of file From e702f9eb7192320e808e29f14b5d0a348633c75a Mon Sep 17 00:00:00 2001 From: Dong Hao Date: Tue, 17 Dec 2024 16:10:14 +0400 Subject: [PATCH 2/6] fix: correct cancellation notification method name to notifications/cancelled According to MCP spec, the cancellation notification method should be 'notifications/cancelled' instead of 'cancelled'. --- src/mcp/shared/session.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/mcp/shared/session.py b/src/mcp/shared/session.py index a06213f0b..f1152804b 100644 --- a/src/mcp/shared/session.py +++ b/src/mcp/shared/session.py @@ -292,11 +292,11 @@ async def _send_response( def _should_validate_notification(self, message_root: JSONRPCNotification) -> bool: """ Determines if a notification should be validated. - Internal notifications (like cancelled) should be ignored. + Internal notifications (like notifications/cancelled) should be ignored. """ try: return ( - getattr(message_root, "method", None) != "cancelled" and + getattr(message_root, "method", None) != "notifications/cancelled" and not self._closed ) except: From 12692ffcc8eebc98a561d7bb9f5f567dc3dcfe7f Mon Sep 17 00:00:00 2001 From: Dong Hao Date: Tue, 17 Dec 2024 16:13:53 +0400 Subject: [PATCH 3/6] refactor: remove unnecessary try-except in _should_validate_notification The try-except block was unnecessary since getattr already handles the case where the attribute doesn't exist by returning None. --- src/mcp/shared/session.py | 11 ++++------- 1 file changed, 4 insertions(+), 7 deletions(-) diff --git a/src/mcp/shared/session.py b/src/mcp/shared/session.py index f1152804b..8759574aa 100644 --- a/src/mcp/shared/session.py +++ b/src/mcp/shared/session.py @@ -294,13 +294,10 @@ def _should_validate_notification(self, message_root: JSONRPCNotification) -> bo Determines if a notification should be validated. Internal notifications (like notifications/cancelled) should be ignored. """ - try: - return ( - getattr(message_root, "method", None) != "notifications/cancelled" and - not self._closed - ) - except: - return False + return ( + getattr(message_root, "method", None) != "notifications/cancelled" and + not self._closed + ) async def _receive_loop(self) -> None: async with ( From 18e029d3f609f3cbbebaaffa6925e58858f3fbc9 Mon Sep 17 00:00:00 2001 From: Dong Hao Date: Tue, 17 Dec 2024 16:18:07 +0400 Subject: [PATCH 4/6] refactor: improve notification handling - Always validate all notifications to ensure proper model structure - Handle cancellation notifications separately (no forwarding to stream) - Simplify notification type checking and error handling - Improve code structure and documentation --- src/mcp/shared/session.py | 10 +++------- 1 file changed, 3 insertions(+), 7 deletions(-) diff --git a/src/mcp/shared/session.py b/src/mcp/shared/session.py index 8759574aa..bbf00c455 100644 --- a/src/mcp/shared/session.py +++ b/src/mcp/shared/session.py @@ -289,15 +289,11 @@ async def _send_response( ) await self._write_stream.send(JSONRPCMessage(jsonrpc_response)) - def _should_validate_notification(self, message_root: JSONRPCNotification) -> bool: + def _is_cancellation_notification(self, message_root: JSONRPCNotification) -> bool: """ - Determines if a notification should be validated. - Internal notifications (like notifications/cancelled) should be ignored. + Determines if a notification is a cancellation notification. """ - return ( - getattr(message_root, "method", None) != "notifications/cancelled" and - not self._closed - ) + return getattr(message_root, "method", None) == "notifications/cancelled" async def _receive_loop(self) -> None: async with ( From 2fcc61fbc71bad474cc099df6877d685e447278b Mon Sep 17 00:00:00 2001 From: Dong Hao Date: Wed, 29 Jan 2025 16:20:56 +0400 Subject: [PATCH 5/6] fix: add new line in the end of files --- src/mcp/shared/progress.py | 2 +- src/mcp/shared/session.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/mcp/shared/progress.py b/src/mcp/shared/progress.py index 7dc91034a..af064642d 100644 --- a/src/mcp/shared/progress.py +++ b/src/mcp/shared/progress.py @@ -51,4 +51,4 @@ async def progress(ctx: RequestContext, total: float | None = None): try: yield progress_ctx finally: - await progress_ctx.final_progress() \ No newline at end of file + await progress_ctx.final_progress() diff --git a/src/mcp/shared/session.py b/src/mcp/shared/session.py index bbf00c455..db5585ba6 100644 --- a/src/mcp/shared/session.py +++ b/src/mcp/shared/session.py @@ -394,4 +394,4 @@ def incoming_messages( | ReceiveNotificationT | Exception ]: - return self._incoming_message_stream_reader \ No newline at end of file + return self._incoming_message_stream_reader From 60d2bac061294fe098708775fd448e03aa353d35 Mon Sep 17 00:00:00 2001 From: Dong Hao Date: Wed, 5 Feb 2025 21:41:55 +0400 Subject: [PATCH 6/6] fix: remove _is_cancellation_notification as there are the correct types defined --- src/mcp/shared/session.py | 6 ------ 1 file changed, 6 deletions(-) diff --git a/src/mcp/shared/session.py b/src/mcp/shared/session.py index db5585ba6..075a8e5de 100644 --- a/src/mcp/shared/session.py +++ b/src/mcp/shared/session.py @@ -289,12 +289,6 @@ async def _send_response( ) await self._write_stream.send(JSONRPCMessage(jsonrpc_response)) - def _is_cancellation_notification(self, message_root: JSONRPCNotification) -> bool: - """ - Determines if a notification is a cancellation notification. - """ - return getattr(message_root, "method", None) == "notifications/cancelled" - async def _receive_loop(self) -> None: async with ( self._read_stream,