From b237e91a55a9ab820959ed126709b53d78267c5a Mon Sep 17 00:00:00 2001 From: Jianke LIN Date: Sat, 30 May 2026 23:50:45 +0200 Subject: [PATCH 01/13] fix(streamable-http): fail request when resumption can't complete --- src/mcp/client/streamable_http.py | 29 ++++++-- .../transports/test_hosting_resume.py | 72 +++++++++++++++++++ 2 files changed, 96 insertions(+), 5 deletions(-) diff --git a/src/mcp/client/streamable_http.py b/src/mcp/client/streamable_http.py index 09e5048cc..7fcd06ffa 100644 --- a/src/mcp/client/streamable_http.py +++ b/src/mcp/client/streamable_http.py @@ -13,6 +13,7 @@ from anyio.abc import TaskGroup from httpx_sse import EventSource, ServerSentEvent, aconnect_sse from mcp_types import ( + CONNECTION_CLOSED, INTERNAL_ERROR, INVALID_REQUEST, METHOD_NOT_FOUND, @@ -437,10 +438,16 @@ async def _handle_sse_response( except Exception: logger.debug("SSE stream ended", exc_info=True) # pragma: lax no cover - # Stream ended without response - reconnect if we received an event with ID - if last_event_id is not None: # pragma: no branch - logger.info("SSE stream disconnected, reconnecting...") - await self._handle_reconnection(ctx, last_event_id, retry_interval_ms) + # Stream ended without a terminal response/error. If the server provided an event id, + # try resuming; otherwise fail the request instead of hanging forever. + if last_event_id is None: + error_data = ErrorData(code=CONNECTION_CLOSED, message="SSE stream disconnected before response completed") + error_msg = SessionMessage(JSONRPCError(jsonrpc="2.0", id=original_request_id, error=error_data)) + await ctx.read_stream_writer.send(error_msg) + return + + logger.info("SSE stream disconnected, reconnecting...") + await self._handle_reconnection(ctx, last_event_id, retry_interval_ms) async def _handle_reconnection( self, @@ -451,7 +458,19 @@ async def _handle_reconnection( ) -> None: """Reconnect with Last-Event-ID to resume stream after server disconnect.""" # Bail if max retries exceeded - if attempt >= MAX_RECONNECTION_ATTEMPTS: # pragma: no cover + if attempt >= MAX_RECONNECTION_ATTEMPTS: + original_request_id = None + if isinstance(ctx.session_message.message, JSONRPCRequest): # pragma: no branch + original_request_id = ctx.session_message.message.id + + if original_request_id is not None: + error_data = ErrorData( + code=CONNECTION_CLOSED, + message="SSE stream disconnected and could not be resumed", + data={"last_event_id": last_event_id}, + ) + error_msg = SessionMessage(JSONRPCError(jsonrpc="2.0", id=original_request_id, error=error_data)) + await ctx.read_stream_writer.send(error_msg) logger.debug(f"Max reconnection attempts ({MAX_RECONNECTION_ATTEMPTS}) exceeded") return diff --git a/tests/interaction/transports/test_hosting_resume.py b/tests/interaction/transports/test_hosting_resume.py index 78492ccb7..d13bb7583 100644 --- a/tests/interaction/transports/test_hosting_resume.py +++ b/tests/interaction/transports/test_hosting_resume.py @@ -373,6 +373,78 @@ async def call() -> None: assert received == snapshot(["before close", "after close"]) +async def test_a_call_whose_stream_closes_and_cannot_be_resumed_fails_instead_of_hanging() -> None: + """If a resumable response stream disconnects and the server session is gone, the client fails + the request instead of hanging forever. + + The server closes the call's SSE stream after emitting one related notification. The test then + deletes the active server-side session to force the client's reconnect GET to return 404. + Without a terminal response/error on the read stream, ClientSession.send_request waits forever + (read timeout defaults to None). The transport must surface a request-scoped error when it + gives up reconnecting. + """ + reconnect_attempted = anyio.Event() + allow_exit = anyio.Event() + done = anyio.Event() + raised: list[BaseException] = [] + manager_ref = None + deleted_session = False + + mcp = MCPServer("resumable") + + @mcp.tool() + async def interrupt(ctx: Context) -> str: + await ctx.info("before close") + await ctx.close_sse_stream() + await allow_exit.wait() + return "unreachable" + + async def record_request(request: httpx.Request) -> None: + nonlocal deleted_session + if request.method != "GET": + return + if request.headers.get("last-event-id") is None: + return + reconnect_attempted.set() + if deleted_session or manager_ref is None: + return + session_ids = list(manager_ref._server_instances.keys()) + if session_ids: # pragma: no branch + del manager_ref._server_instances[session_ids[0]] + deleted_session = True + + async with mounted_app(mcp, event_store=SequencedEventStore(), retry_interval=0, on_request=record_request) as ( + http, + manager, + ): + manager_ref = manager + with anyio.fail_after(5): # pragma: no branch + async with ( + streamable_http_client(f"{BASE_URL}/mcp", http_client=http, terminate_on_close=False) as (r, w), + ClientSession(r, w) as session, + anyio.create_task_group() as tg, + ): + await session.initialize() + + async def call() -> None: + try: + await session.call_tool("interrupt", {}) + except BaseException as exc: + raised.append(exc) + finally: + done.set() + + tg.start_soon(call) + await reconnect_attempted.wait() + await done.wait() + allow_exit.set() + tg.cancel_scope.cancel() + + assert len(raised) == 1 + assert isinstance(raised[0], Exception) + assert "disconnected" in str(raised[0]).lower() + + @requirement("client-transport:http:resume-stream-api") async def test_a_captured_resumption_token_replays_missed_messages_on_a_new_connection() -> None: """A resumption token captured via on_resumption_token_update on one connection lets a fresh From 4ea297cbb562ca84c187bafbaf3ef96d555696f7 Mon Sep 17 00:00:00 2001 From: Jianke LIN Date: Sat, 30 May 2026 23:56:49 +0200 Subject: [PATCH 02/13] test(streamable-http): ensure resumption failure does not hang --- tests/interaction/transports/test_hosting_resume.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/tests/interaction/transports/test_hosting_resume.py b/tests/interaction/transports/test_hosting_resume.py index d13bb7583..92e58a199 100644 --- a/tests/interaction/transports/test_hosting_resume.py +++ b/tests/interaction/transports/test_hosting_resume.py @@ -373,6 +373,10 @@ async def call() -> None: assert received == snapshot(["before close", "after close"]) +@requirement("hosting:resume:close-stream") +@requirement("transport:streamable-http:resumability") +@requirement("client-transport:http:reconnect-post-priming") +@requirement("client-transport:http:reconnect-retry-value") async def test_a_call_whose_stream_closes_and_cannot_be_resumed_fails_instead_of_hanging() -> None: """If a resumable response stream disconnects and the server session is gone, the client fails the request instead of hanging forever. From 6537096b52f5bf0823eeea39a71e3fabdca9640d Mon Sep 17 00:00:00 2001 From: Jianke LIN Date: Sun, 31 May 2026 00:06:41 +0200 Subject: [PATCH 03/13] test(streamable-http): cover disconnect without resumption anchor --- src/mcp/client/streamable_http.py | 21 +++++++++------------ tests/client/test_streamable_http.py | 28 +++++++++++++++++++++++++++- 2 files changed, 36 insertions(+), 13 deletions(-) diff --git a/src/mcp/client/streamable_http.py b/src/mcp/client/streamable_http.py index 7fcd06ffa..35dc99b19 100644 --- a/src/mcp/client/streamable_http.py +++ b/src/mcp/client/streamable_http.py @@ -459,18 +459,15 @@ async def _handle_reconnection( """Reconnect with Last-Event-ID to resume stream after server disconnect.""" # Bail if max retries exceeded if attempt >= MAX_RECONNECTION_ATTEMPTS: - original_request_id = None - if isinstance(ctx.session_message.message, JSONRPCRequest): # pragma: no branch - original_request_id = ctx.session_message.message.id - - if original_request_id is not None: - error_data = ErrorData( - code=CONNECTION_CLOSED, - message="SSE stream disconnected and could not be resumed", - data={"last_event_id": last_event_id}, - ) - error_msg = SessionMessage(JSONRPCError(jsonrpc="2.0", id=original_request_id, error=error_data)) - await ctx.read_stream_writer.send(error_msg) + assert isinstance(ctx.session_message.message, JSONRPCRequest) + original_request_id = ctx.session_message.message.id + error_data = ErrorData( + code=CONNECTION_CLOSED, + message="SSE stream disconnected and could not be resumed", + data={"last_event_id": last_event_id}, + ) + error_msg = SessionMessage(JSONRPCError(jsonrpc="2.0", id=original_request_id, error=error_data)) + await ctx.read_stream_writer.send(error_msg) logger.debug(f"Max reconnection attempts ({MAX_RECONNECTION_ATTEMPTS}) exceeded") return diff --git a/tests/client/test_streamable_http.py b/tests/client/test_streamable_http.py index defda41f8..7e5917433 100644 --- a/tests/client/test_streamable_http.py +++ b/tests/client/test_streamable_http.py @@ -18,6 +18,7 @@ from mcp_types import ( CLIENT_CAPABILITIES_META_KEY, CLIENT_INFO_META_KEY, + CONNECTION_CLOSED, METHOD_NOT_FOUND, PROTOCOL_VERSION_META_KEY, JSONRPCError, @@ -28,10 +29,11 @@ from mcp_types.version import LATEST_MODERN_VERSION from starlette.types import Receive, Scope, Send -from mcp.client.streamable_http import streamable_http_client +from mcp.client.streamable_http import RequestContext, StreamableHTTPTransport, streamable_http_client from mcp.server import Server from mcp.server._streamable_http_modern import handle_modern_request from mcp.server.subscriptions import InMemorySubscriptionBus, ListenHandler, ServerEvent +from mcp.shared._context_streams import create_context_streams from mcp.shared.dispatcher import CallOptions, DispatchContext from mcp.shared.inbound import MCP_METHOD_HEADER, MCP_PROTOCOL_VERSION_HEADER, encode_header_value from mcp.shared.jsonrpc_dispatcher import JSONRPCDispatcher @@ -73,6 +75,30 @@ def test_mcp_name_header_values_are_base64_wrapped_when_unsafe_for_an_http_field assert encoded == raw +@pytest.mark.anyio +async def test_sse_response_disconnect_before_any_event_id_fails_request() -> None: + transport = StreamableHTTPTransport("http://example.com/mcp") + async with httpx.AsyncClient() as client: + read_stream_writer, read_stream = create_context_streams[SessionMessage | Exception](1) + request = JSONRPCRequest(jsonrpc="2.0", id=1, method="tools/call", params={"name": "noop", "arguments": {}}) + ctx = RequestContext( + client=client, + session_id=None, + session_message=SessionMessage(request), + metadata=None, + read_stream_writer=read_stream_writer, + ) + response = httpx.Response(200, headers={"content-type": "text/event-stream"}, content=b"") + + async with read_stream_writer, read_stream: + await transport._handle_sse_response(response, ctx) + message = await read_stream.receive() + + assert isinstance(message.message, JSONRPCError) + assert message.message.id == 1 + assert message.message.error.code == CONNECTION_CLOSED + + @pytest.mark.anyio async def test_post_request_merges_per_message_metadata_headers() -> None: """`ClientMessageMetadata.headers` on a `SessionMessage` are merged into the outgoing POST headers From 4bee40258002a5adf2da49ca5c38a814a2f7cef1 Mon Sep 17 00:00:00 2001 From: Jianke LIN Date: Sun, 31 May 2026 00:18:08 +0200 Subject: [PATCH 04/13] fix: make streamable-http tests pass CI --- src/mcp/client/streamable_http.py | 2 +- tests/client/test_streamable_http.py | 1 + tests/interaction/transports/test_hosting_resume.py | 2 +- 3 files changed, 3 insertions(+), 2 deletions(-) diff --git a/src/mcp/client/streamable_http.py b/src/mcp/client/streamable_http.py index 35dc99b19..507b33a31 100644 --- a/src/mcp/client/streamable_http.py +++ b/src/mcp/client/streamable_http.py @@ -511,7 +511,7 @@ async def _handle_reconnection( # Stream ended again without response - reconnect again (reset attempt counter) logger.info("SSE stream disconnected, reconnecting...") await self._handle_reconnection(ctx, reconnect_last_event_id, reconnect_retry_ms, 0) - except Exception as e: # pragma: no cover + except Exception as e: logger.debug(f"Reconnection failed: {e}") # Try to reconnect again if we still have an event ID await self._handle_reconnection(ctx, last_event_id, retry_interval_ms, attempt + 1) diff --git a/tests/client/test_streamable_http.py b/tests/client/test_streamable_http.py index 7e5917433..d331c8d23 100644 --- a/tests/client/test_streamable_http.py +++ b/tests/client/test_streamable_http.py @@ -94,6 +94,7 @@ async def test_sse_response_disconnect_before_any_event_id_fails_request() -> No await transport._handle_sse_response(response, ctx) message = await read_stream.receive() + assert isinstance(message, SessionMessage) assert isinstance(message.message, JSONRPCError) assert message.message.id == 1 assert message.message.error.code == CONNECTION_CLOSED diff --git a/tests/interaction/transports/test_hosting_resume.py b/tests/interaction/transports/test_hosting_resume.py index 92e58a199..4a1b5703f 100644 --- a/tests/interaction/transports/test_hosting_resume.py +++ b/tests/interaction/transports/test_hosting_resume.py @@ -423,7 +423,7 @@ async def record_request(request: httpx.Request) -> None: ): manager_ref = manager with anyio.fail_after(5): # pragma: no branch - async with ( + async with ( # pragma: no branch streamable_http_client(f"{BASE_URL}/mcp", http_client=http, terminate_on_close=False) as (r, w), ClientSession(r, w) as session, anyio.create_task_group() as tg, From d1b9b4010baaee73d0498f9c9f9588ebc251de57 Mon Sep 17 00:00:00 2001 From: Jianke LIN Date: Sat, 20 Jun 2026 21:36:16 +0200 Subject: [PATCH 05/13] test(streamable-http): suppress deprecated logging warning --- tests/interaction/transports/test_hosting_resume.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/interaction/transports/test_hosting_resume.py b/tests/interaction/transports/test_hosting_resume.py index 4a1b5703f..1fe5d713b 100644 --- a/tests/interaction/transports/test_hosting_resume.py +++ b/tests/interaction/transports/test_hosting_resume.py @@ -398,7 +398,7 @@ async def test_a_call_whose_stream_closes_and_cannot_be_resumed_fails_instead_of @mcp.tool() async def interrupt(ctx: Context) -> str: - await ctx.info("before close") + await ctx.info("before close") # pyright: ignore[reportDeprecated] await ctx.close_sse_stream() await allow_exit.wait() return "unreachable" From 8bf41bc8cbc5913c117b3cf9f3371a881802fa35 Mon Sep 17 00:00:00 2001 From: Jianke LIN Date: Sun, 28 Jun 2026 23:52:02 +0200 Subject: [PATCH 06/13] fix(streamable-http): tolerate closed read streams during SSE cleanup --- src/mcp/client/streamable_http.py | 24 +++++++++++++++++------- 1 file changed, 17 insertions(+), 7 deletions(-) diff --git a/src/mcp/client/streamable_http.py b/src/mcp/client/streamable_http.py index 507b33a31..3acc53f2b 100644 --- a/src/mcp/client/streamable_http.py +++ b/src/mcp/client/streamable_http.py @@ -46,6 +46,16 @@ StreamWriter = ContextSendStream[SessionMessageOrError] StreamReader = ContextReceiveStream[SessionMessage] + +async def _send_or_ignore_closed(read_stream_writer: StreamWriter, message: SessionMessageOrError) -> bool: + try: + await read_stream_writer.send(message) + except (anyio.BrokenResourceError, anyio.ClosedResourceError): + logger.debug("Read stream closed before Streamable HTTP message could be delivered", exc_info=True) + return False + return True + + MCP_SESSION_ID = "mcp-session-id" LAST_EVENT_ID = "last-event-id" @@ -179,17 +189,17 @@ async def _handle_sse_event( # Otherwise, return False to continue listening return isinstance(message, JSONRPCResponse | JSONRPCError) - # Forwarding to a closed read stream lands here when the caller cancels mid-SSE - # (BrokenResourceError, not a parse failure); coverage is timing-dependent in the - # streaming story's modern HTTP cancellation leg. + except (anyio.BrokenResourceError, anyio.ClosedResourceError): + logger.debug("Read stream closed while forwarding SSE message", exc_info=True) + return True except Exception as exc: # pragma: lax no cover logger.exception("Error parsing SSE message") if original_request_id is not None: error_data = ErrorData(code=PARSE_ERROR, message=f"Failed to parse SSE message: {exc}") error_msg = SessionMessage(JSONRPCError(jsonrpc="2.0", id=original_request_id, error=error_data)) - await read_stream_writer.send(error_msg) + await _send_or_ignore_closed(read_stream_writer, error_msg) return True - await read_stream_writer.send(exc) + await _send_or_ignore_closed(read_stream_writer, exc) return False else: # pragma: no cover logger.warning(f"Unknown SSE event: {sse.event}") @@ -443,7 +453,7 @@ async def _handle_sse_response( if last_event_id is None: error_data = ErrorData(code=CONNECTION_CLOSED, message="SSE stream disconnected before response completed") error_msg = SessionMessage(JSONRPCError(jsonrpc="2.0", id=original_request_id, error=error_data)) - await ctx.read_stream_writer.send(error_msg) + await _send_or_ignore_closed(ctx.read_stream_writer, error_msg) return logger.info("SSE stream disconnected, reconnecting...") @@ -467,7 +477,7 @@ async def _handle_reconnection( data={"last_event_id": last_event_id}, ) error_msg = SessionMessage(JSONRPCError(jsonrpc="2.0", id=original_request_id, error=error_data)) - await ctx.read_stream_writer.send(error_msg) + await _send_or_ignore_closed(ctx.read_stream_writer, error_msg) logger.debug(f"Max reconnection attempts ({MAX_RECONNECTION_ATTEMPTS}) exceeded") return From 4b6464668b0d2cc764c056ec7fd6e2437f5a8891 Mon Sep 17 00:00:00 2001 From: Jianke LIN Date: Mon, 29 Jun 2026 00:00:40 +0200 Subject: [PATCH 07/13] test(streamable-http): cover closed read stream cleanup --- tests/client/test_streamable_http.py | 20 ++++++++++++++++++++ 1 file changed, 20 insertions(+) diff --git a/tests/client/test_streamable_http.py b/tests/client/test_streamable_http.py index d331c8d23..f6e86d272 100644 --- a/tests/client/test_streamable_http.py +++ b/tests/client/test_streamable_http.py @@ -100,6 +100,26 @@ async def test_sse_response_disconnect_before_any_event_id_fails_request() -> No assert message.message.error.code == CONNECTION_CLOSED +@pytest.mark.anyio +async def test_sse_response_disconnect_ignores_closed_read_stream() -> None: + transport = StreamableHTTPTransport("http://example.com/mcp") + async with httpx.AsyncClient() as client: + read_stream_writer, read_stream = create_context_streams[SessionMessage | Exception](1) + request = JSONRPCRequest(jsonrpc="2.0", id=1, method="tools/call", params={"name": "noop", "arguments": {}}) + ctx = RequestContext( + client=client, + session_id=None, + session_message=SessionMessage(request), + metadata=None, + read_stream_writer=read_stream_writer, + ) + response = httpx.Response(200, headers={"content-type": "text/event-stream"}, content=b"") + + async with read_stream_writer, read_stream: + await read_stream.aclose() + await transport._handle_sse_response(response, ctx) + + @pytest.mark.anyio async def test_post_request_merges_per_message_metadata_headers() -> None: """`ClientMessageMetadata.headers` on a `SessionMessage` are merged into the outgoing POST headers From d7e05e77e733837014619b48d77fc46faf59a581 Mon Sep 17 00:00:00 2001 From: Jianke LIN Date: Mon, 29 Jun 2026 00:52:32 +0200 Subject: [PATCH 08/13] fix(streamable-http): bound empty SSE reconnect loops --- src/mcp/client/streamable_http.py | 9 ++++- tests/client/test_streamable_http.py | 57 +++++++++++++++++++++++++++- 2 files changed, 63 insertions(+), 3 deletions(-) diff --git a/src/mcp/client/streamable_http.py b/src/mcp/client/streamable_http.py index 3acc53f2b..f21f4c886 100644 --- a/src/mcp/client/streamable_http.py +++ b/src/mcp/client/streamable_http.py @@ -501,12 +501,15 @@ async def _handle_reconnection( # Track for potential further reconnection reconnect_last_event_id: str = last_event_id reconnect_retry_ms = retry_interval_ms + made_progress = False async for sse in event_source.aiter_sse(): if sse.id: # pragma: no branch reconnect_last_event_id = sse.id if sse.retry is not None: reconnect_retry_ms = sse.retry + if sse.event == "message" and bool(sse.data): + made_progress = True is_complete = await self._handle_sse_event( sse, @@ -518,9 +521,11 @@ async def _handle_reconnection( await event_source.response.aclose() return - # Stream ended again without response - reconnect again (reset attempt counter) + # Stream ended again without response - reconnect again. Only reset + # the retry counter when the resumed stream delivered real data. logger.info("SSE stream disconnected, reconnecting...") - await self._handle_reconnection(ctx, reconnect_last_event_id, reconnect_retry_ms, 0) + next_attempt = 0 if made_progress else attempt + 1 + await self._handle_reconnection(ctx, reconnect_last_event_id, reconnect_retry_ms, next_attempt) except Exception as e: logger.debug(f"Reconnection failed: {e}") # Try to reconnect again if we still have an event ID diff --git a/tests/client/test_streamable_http.py b/tests/client/test_streamable_http.py index f6e86d272..f60e07739 100644 --- a/tests/client/test_streamable_http.py +++ b/tests/client/test_streamable_http.py @@ -92,7 +92,8 @@ async def test_sse_response_disconnect_before_any_event_id_fails_request() -> No async with read_stream_writer, read_stream: await transport._handle_sse_response(response, ctx) - message = await read_stream.receive() + with anyio.fail_after(5): + message = await read_stream.receive() assert isinstance(message, SessionMessage) assert isinstance(message.message, JSONRPCError) @@ -100,6 +101,60 @@ async def test_sse_response_disconnect_before_any_event_id_fails_request() -> No assert message.message.error.code == CONNECTION_CLOSED +@pytest.mark.anyio +async def test_reconnection_empty_streams_count_toward_max_attempts(monkeypatch: pytest.MonkeyPatch) -> None: + class PrimingOnlyEventSource: + def __init__(self) -> None: + self.response = httpx.Response(200) + + async def __aenter__(self) -> "PrimingOnlyEventSource": + nonlocal reconnect_attempts + reconnect_attempts += 1 + return self + + async def __aexit__(self, *args: object) -> None: + return None + + async def aiter_sse(self) -> object: + yield type( + "SSE", + (), + {"event": "message", "data": "", "id": f"event-{reconnect_attempts}", "retry": 0}, + )() + + def connect_sse(*args: object, **kwargs: object) -> PrimingOnlyEventSource: + return PrimingOnlyEventSource() + + reconnect_attempts = 0 + monkeypatch.setattr( + "mcp.client.streamable_http.aconnect_sse", + connect_sse, + ) + + transport = StreamableHTTPTransport("http://example.com/mcp") + async with httpx.AsyncClient() as client: + read_stream_writer, read_stream = create_context_streams[SessionMessage | Exception](1) + request = JSONRPCRequest(jsonrpc="2.0", id=1, method="tools/call", params={"name": "noop", "arguments": {}}) + ctx = RequestContext( + client=client, + session_id=None, + session_message=SessionMessage(request), + metadata=None, + read_stream_writer=read_stream_writer, + ) + + async with read_stream_writer, read_stream: + with anyio.fail_after(5): + await transport._handle_reconnection(ctx, "event-1", retry_interval_ms=0) + message = await read_stream.receive() + + assert reconnect_attempts == 2 + assert isinstance(message, SessionMessage) + assert isinstance(message.message, JSONRPCError) + assert message.message.id == 1 + assert message.message.error.code == CONNECTION_CLOSED + + @pytest.mark.anyio async def test_sse_response_disconnect_ignores_closed_read_stream() -> None: transport = StreamableHTTPTransport("http://example.com/mcp") From c41a5075bc79d4525ba45a3ded272e4a4502b35f Mon Sep 17 00:00:00 2001 From: Jianke LIN Date: Mon, 29 Jun 2026 00:57:35 +0200 Subject: [PATCH 09/13] test(streamable-http): keep reconnect fake covered --- tests/client/test_streamable_http.py | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/tests/client/test_streamable_http.py b/tests/client/test_streamable_http.py index f60e07739..db4d9158b 100644 --- a/tests/client/test_streamable_http.py +++ b/tests/client/test_streamable_http.py @@ -10,6 +10,7 @@ import json from collections.abc import AsyncIterator, Callable, Mapping from typing import Any +from types import SimpleNamespace import anyio import httpx @@ -116,11 +117,7 @@ async def __aexit__(self, *args: object) -> None: return None async def aiter_sse(self) -> object: - yield type( - "SSE", - (), - {"event": "message", "data": "", "id": f"event-{reconnect_attempts}", "retry": 0}, - )() + yield SimpleNamespace(event="message", data="", id=f"event-{reconnect_attempts}", retry=0) def connect_sse(*args: object, **kwargs: object) -> PrimingOnlyEventSource: return PrimingOnlyEventSource() From fc802f6234bfd14f2217c3bfde2f0fd23cf3cc22 Mon Sep 17 00:00:00 2001 From: Jianke LIN Date: Tue, 30 Jun 2026 19:46:25 +0200 Subject: [PATCH 10/13] test(streamable-http): exempt dropped async generator yield --- tests/client/test_streamable_http.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/tests/client/test_streamable_http.py b/tests/client/test_streamable_http.py index db4d9158b..1610e2122 100644 --- a/tests/client/test_streamable_http.py +++ b/tests/client/test_streamable_http.py @@ -117,7 +117,12 @@ async def __aexit__(self, *args: object) -> None: return None async def aiter_sse(self) -> object: - yield SimpleNamespace(event="message", data="", id=f"event-{reconnect_attempts}", retry=0) + yield SimpleNamespace( # pragma: lax no cover - coverage.py drops this nested async-generator yield + event="message", + data="", + id=f"event-{reconnect_attempts}", + retry=0, + ) def connect_sse(*args: object, **kwargs: object) -> PrimingOnlyEventSource: return PrimingOnlyEventSource() From e0ec578c7166f9c917dd8990d66a2b06fc34b2e1 Mon Sep 17 00:00:00 2001 From: Jianke LIN Date: Tue, 30 Jun 2026 19:52:50 +0200 Subject: [PATCH 11/13] test(streamable-http): mark traced timeout guards no-branch --- tests/client/test_streamable_http.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/client/test_streamable_http.py b/tests/client/test_streamable_http.py index 1610e2122..6583e68c7 100644 --- a/tests/client/test_streamable_http.py +++ b/tests/client/test_streamable_http.py @@ -93,7 +93,7 @@ async def test_sse_response_disconnect_before_any_event_id_fails_request() -> No async with read_stream_writer, read_stream: await transport._handle_sse_response(response, ctx) - with anyio.fail_after(5): + with anyio.fail_after(5): # pragma: no branch message = await read_stream.receive() assert isinstance(message, SessionMessage) @@ -146,7 +146,7 @@ def connect_sse(*args: object, **kwargs: object) -> PrimingOnlyEventSource: ) async with read_stream_writer, read_stream: - with anyio.fail_after(5): + with anyio.fail_after(5): # pragma: no branch await transport._handle_reconnection(ctx, "event-1", retry_interval_ms=0) message = await read_stream.receive() From 94f08d37bf32352ba3f5bb82948673df125df537 Mon Sep 17 00:00:00 2001 From: Jianke LIN Date: Thu, 2 Jul 2026 22:03:57 +0200 Subject: [PATCH 12/13] test(streamable-http): apply ruff import order --- tests/client/test_streamable_http.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/client/test_streamable_http.py b/tests/client/test_streamable_http.py index 6583e68c7..ad47dfd72 100644 --- a/tests/client/test_streamable_http.py +++ b/tests/client/test_streamable_http.py @@ -9,8 +9,8 @@ import base64 import json from collections.abc import AsyncIterator, Callable, Mapping -from typing import Any from types import SimpleNamespace +from typing import Any import anyio import httpx From d24762df927ce0fa7db9dd0f1e9ceb82875838fd Mon Sep 17 00:00:00 2001 From: Jianke LIN Date: Thu, 2 Jul 2026 22:12:50 +0200 Subject: [PATCH 13/13] test(streamable-http): cover closed SSE reader --- tests/client/test_streamable_http.py | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/tests/client/test_streamable_http.py b/tests/client/test_streamable_http.py index ad47dfd72..3ec99cd28 100644 --- a/tests/client/test_streamable_http.py +++ b/tests/client/test_streamable_http.py @@ -15,6 +15,7 @@ import anyio import httpx import pytest +from httpx_sse import ServerSentEvent from inline_snapshot import snapshot from mcp_types import ( CLIENT_CAPABILITIES_META_KEY, @@ -177,6 +178,20 @@ async def test_sse_response_disconnect_ignores_closed_read_stream() -> None: await transport._handle_sse_response(response, ctx) +@pytest.mark.anyio +async def test_sse_message_ignores_closed_read_stream() -> None: + transport = StreamableHTTPTransport("http://example.com/mcp") + read_stream_writer, read_stream = create_context_streams[SessionMessage | Exception](1) + response = JSONRPCResponse(jsonrpc="2.0", id=1, result={}) + sse = ServerSentEvent(event="message", data=response.model_dump_json(by_alias=True)) + + async with read_stream_writer, read_stream: + await read_stream.aclose() + complete = await transport._handle_sse_event(sse, read_stream_writer, original_request_id=1) + + assert complete is True + + @pytest.mark.anyio async def test_post_request_merges_per_message_metadata_headers() -> None: """`ClientMessageMetadata.headers` on a `SessionMessage` are merged into the outgoing POST headers