diff --git a/mcp-core/src/main/java/io/modelcontextprotocol/client/transport/HttpClientSseClientTransport.java b/mcp-core/src/main/java/io/modelcontextprotocol/client/transport/HttpClientSseClientTransport.java index b9ed2711d..defaf7495 100644 --- a/mcp-core/src/main/java/io/modelcontextprotocol/client/transport/HttpClientSseClientTransport.java +++ b/mcp-core/src/main/java/io/modelcontextprotocol/client/transport/HttpClientSseClientTransport.java @@ -11,6 +11,7 @@ import java.net.http.HttpResponse; import java.time.Duration; import java.util.List; +import java.util.concurrent.CancellationException; import java.util.concurrent.CompletableFuture; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; @@ -399,9 +400,13 @@ else if (MESSAGE_EVENT_TYPE.equals(responseEvent.sseEvent().event())) { logger.warn("SSE stream observed an error", t); sink.error(t); } + this.messageEndpointSink.tryEmitError(t); return true; }) .doFinally(s -> { + this.messageEndpointSink + .tryEmitError(new CancellationException("SSE stream ended before receiving the endpoint event")); + Disposable ref = this.sseSubscription.getAndSet(null); if (ref != null && !ref.isDisposed()) { ref.dispose(); diff --git a/mcp-test/src/test/java/io/modelcontextprotocol/client/transport/HttpClientSseClientTransportTests.java b/mcp-test/src/test/java/io/modelcontextprotocol/client/transport/HttpClientSseClientTransportTests.java index a24805a30..ac9e6709f 100644 --- a/mcp-test/src/test/java/io/modelcontextprotocol/client/transport/HttpClientSseClientTransportTests.java +++ b/mcp-test/src/test/java/io/modelcontextprotocol/client/transport/HttpClientSseClientTransportTests.java @@ -244,6 +244,27 @@ void testRetryBehavior() { failingTransport.closeGracefully().block(); } + @Test + void testSendMessageFailsWhenSseConnectionFails() { + // Create transport that connects to a non-existent host + HttpClientSseClientTransport failingTransport = HttpClientSseClientTransport.builder("http://non-existent-host") + .connectTimeout(Duration.ofSeconds(2)) + .build(); + + // Attempt to connect (will fail) + failingTransport.connect(Function.identity()).subscribe(); + + // Create a test message + JSONRPCRequest testMessage = new JSONRPCRequest(McpSchema.JSONRPC_VERSION, "test-method", "test-id", + Map.of("key", "value")); + + // Verify sendMessage fails with an error rather than hanging forever + StepVerifier.create(failingTransport.sendMessage(testMessage)).expectError().verify(Duration.ofSeconds(10)); + + // Clean up + failingTransport.closeGracefully().block(); + } + @Test void testMultipleMessageProcessing() { // Simulate receiving multiple messages in sequence