8000 Redesign MockMcpTransport internals and adapt tests · modelcontextprotocol/java-sdk@de6790a · GitHub
[go: up one dir, main page]

Skip to content

Commit de6790a

Browse files
committed
Redesign MockMcpTransport internals and adapt tests
1 parent 4aa2e77 commit de6790a

File tree

4 files changed

+107
-109
lines changed

4 files changed

+107
-109
lines changed

mcp-test/src/main/java/io/modelcontextprotocol/MockMcpTransport.java

Lines changed: 27 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,9 @@
44

55
package io.modelcontextprotocol;
66

7-
import java.util.concurrent.atomic.AtomicInteger;
7+
import java.util.ArrayList;
8+
import java.util.List;
9+
import java.util.function.BiConsumer;
810
import java.util.function.Function;
911

1012
import com.fasterxml.jackson.core.type.TypeReference;
@@ -14,47 +16,54 @@
1416
import io.modelcontextprotocol.spec.ServerMcpTransport;
1517
import io.modelcontextprotocol.spec.McpSchema.JSONRPCNotification;
1618
import io.modelcontextprotocol.spec.McpSchema.JSONRPCRequest;
17-
import reactor.core.publisher.Flux;
1819
import reactor.core.publisher.Mono;
1920
import reactor.core.publisher.Sinks;
20-
import reactor.core.scheduler.Schedulers;
2121

2222
@SuppressWarnings("unused")
23+
/**
24+
* A mock implementation of the {@link ClientMcpTransport} and {@link ServerMcpTransport}
25+
* interfaces.
26+
*/
2327
public class MockMcpTransport implements ClientMcpTransport, ServerMcpTransport {
2428

25-
private final AtomicInteger inboundMessageCount = new AtomicInteger(0);
29+
private final Sinks.Many<McpSchema.JSONRPCMessage> inbound = Sinks.many().unicast().onBackpressureBuffer();
30+
31+
private final List<McpSchema.JSONRPCMessage> sent = new ArrayList<>();
2632

27-
private final Sinks.Many<McpSchema.JSONRPCMessage> outgoing = Sinks.many().multicast().onBackpressureBuffer();
33+
private final BiConsumer<MockMcpTransport, McpSchema.JSONRPCMessage> interceptor;
2834

29-
private final Sinks.Many<McpSchema.JSONRPCMessage> inbound = Sinks.many().unicast().onBackpressureBuffer();
35+
public MockMcpTransport() {
36+
this((t, msg) -> {
37+
});
38+
}
3039

31-
private final Flux<McpSchema.JSONRPCMessage> outboundView = outgoing.asFlux().cache(1);
40+
public MockMcpTransport(BiConsumer<MockMcpTransport, McpSchema.JSONRPCMessage> interceptor) {
41+
this.interceptor = interceptor;
42+
}
3243

3344
public void simulateIncomingMessage(McpSchema.JSONRPCMessage message) {
3445
if (inbound.tryEmitNext(message).isFailure()) {
35-
throw new RuntimeException("Failed to emit message " + message);
46+
throw new RuntimeException("Failed to process incoming message " + message);
3647
}
37-
inboundMessageCount.incrementAndGet();
3848
}
3949

4050
@Override
4151
public Mono<Void> sendMessage(McpSchema.JSONRPCMessage message) {
42-
if (outgoing.tryEmitNext(message).isFailure()) {
43-
return Mono.error(new RuntimeException("Can't emit outgoing message " + message));
44-
}
52+
sent.add(message);
53+
interceptor.accept(this, message);
4554
return Mono.empty();
4655
}
4756

4857
public McpSchema.JSONRPCRequest getLastSentMessageAsRequest() {
49-
return (JSONRPCRequest) outboundView.blockFirst();
58+
return (JSONRPCRequest) getLastSentMessage();
5059
}
5160

52-
public McpSchema.JSONRPCNotification getLastSentMessageAsNotifiation() {
53-
return (JSONRPCNotification) outboundView.blockFirst();
61+
public McpSchema.JSONRPCNotification getLastSentMessageAsNotification() {
62+
return (JSONRPCNotification) getLastSentMessage();
5463
}
5564

5665
public McpSchema.JSONRPCMessage getLastSentMessage() {
57-
return outboundView.blockFirst();
66+
return !sent.isEmpty() ? sent.get(sent.size() - 1) : null;
5867
}
5968

6069
private volatile boolean connected = false;
@@ -66,7 +75,6 @@ public Mono<Void> connect(Function<Mono<McpSchema.JSONRPCMessage>, Mono<McpSchem
6675
}
6776
connected = true;
6877
return inbound.asFlux()
69-
.publishOn(Schedulers.boundedElastic())
7078
.flatMap(message -> Mono.just(message).transform(handler))
7179
.doFinally(signal -> connected = false)
7280
.then();
@@ -76,8 +84,8 @@ public Mono<Void> connect(Function<Mono<McpSchema.JSONRPCMessage>, Mono<McpSchem
7684
public Mono<Void> closeGracefully() {
7785
return Mono.defer(() -> {
7886
connected = false;
79-
outgoing.tryEmitComplete();
8087
inbound.tryEmitComplete();
88+
// Wait for all subscribers to complete
8189
return Mono.empty();
8290
});
8391
}
@@ -87,4 +95,4 @@ public <T> T unmarshalFrom(Object data, TypeReference<T> typeRef) {
8795
return new ObjectMapper().convertValue(data, typeRef);
8896
}
8997

90-
}
98+
}

mcp/src/test/java/io/modelcontextprotocol/MockMcpTransport.java

Lines changed: 19 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -4,76 +4,65 @@
44

55
package io.modelcontextprotocol;
66

7-
import java.util.concurrent.atomic.AtomicInteger;
7+
import java.util.ArrayList;
8+
import java.util.List;
9+
import java.util.function.BiConsumer;
810
import java.util.function.Function;
911

1012
import com.fasterxml.jackson.core.type.TypeReference;
1113
import com.fasterxml.jackson.databind.ObjectMapper;
1214
import io.modelcontextprotocol.spec.ClientMcpTransport;
1315
import io.modelcontextprotocol.spec.McpSchema;
16+
import io.modelcontextprotocol.spec.ServerMcpTransport;
1417
import io.modelcontextprotocol.spec.McpSchema.JSONRPCNotification;
1518
import io.modelcontextprotocol.spec.McpSchema.JSONRPCRequest;
16-
import io.modelcontextprotocol.spec.ServerMcpTransport;
17-
import reactor.core.publisher.Flux;
1819
import reactor.core.publisher.Mono;
1920
import reactor.core.publisher.Sinks;
20-
import reactor.core.scheduler.Schedulers;
2121

2222
/**
2323
* A mock implementation of the {@link ClientMcpTransport} and {@link ServerMcpTransport}
2424
* interfaces.
2525
*/
2626
public class MockMcpTransport implements ClientMcpTransport, ServerMcpTransport {
2727

28-
private final AtomicInteger inboundMessageCount = new AtomicInteger(0);
29-
30-
private final Sinks.Many<McpSchema.JSONRPCMessage> outgoing = Sinks.many().multicast().onBackpressureBuffer();
31-
3228
private final Sinks.Many<McpSchema.JSONRPCMessage> inbound = Sinks.many().unicast().onBackpressureBuffer();
3329

34-
private final Flux<McpSchema.JSONRPCMessage> outboundView = outgoing.asFlux().cache(1);
30+
private final List<McpSchema.JSONRPCMessage> sent = new ArrayList<>();
3531

36-
// Latch to wait for the next message(s) to be sent in response of simulated incoming
37-
// message
38-
java.util.concurrent.CountDownLatch latch = new java.util.concurrent.CountDownLatch(1);
32+
private final BiConsumer<MockMcpTransport, McpSchema.JSONRPCMessage> interceptor;
3933

40-
public void simulateIncomingMessage(McpSchema.JSONRPCMessage message) {
41-
simulateIncomingMessage(message, 1);
34+
public MockMcpTransport() {
35+
this((t, msg) -> {
36+
});
4237
}
4338

44-
public void simulateIncomingMessage(McpSchema.JSONRPCMessage message, int expectedResponseMessagesCount) {
39+
public MockMcpTransport(BiConsumer<MockMcpTransport, McpSchema.JSONRPCMessage> interceptor) {
40+
this.interceptor = interceptor;
41+
}
42+
43+
public void simulateIncomingMessage(McpSchema.JSONRPCMessage message) {
4544
if (inbound.tryEmitNext(message).isFailure()) {
46-
throw new RuntimeException("Failed to emit message " + message);
45+
throw new RuntimeException("Failed to process incoming message " + message);
4746
}
48-
inboundMessageCount.incrementAndGet();
49-
latch = new java.util.concurrent.CountDownLatch(expectedResponseMessagesCount);
5047
}
5148

5249
@Override
5350
public Mono<Void> sendMessage(McpSchema.JSONRPCMessage message) {
54-
if (outgoing.tryEmitNext(message).isFailure()) {
55-
return Mono.error(new RuntimeException("Can't emit outgoing message " + message));
56-
}
57-
latch.countDown();
51+
sent.add(message);
52+
interceptor.accept(this, message);
5853
return Mono.empty();
5954
}
6055

6156
public McpSchema.JSONRPCRequest getLastSentMessageAsRequest() {
6257
return (JSONRPCRequest) getLastSentMessage();
6358
}
6459

65-
public McpSchema.JSONRPCNotification getLastSentMessageAsNotifiation() {
60+
public McpSchema.JSONRPCNotification getLastSentMessageAsNotification() {
6661
return (JSONRPCNotification) getLastSentMessage();
6762
}
6863

6964
public McpSchema.JSONRPCMessage getLastSentMessage() {
70-
try {
71-
latch.await();
72-
}
73-
catch (InterruptedException e) {
74-
e.printStackTrace();
75-
}
76-
return outboundView.blockFirst();
65+
return !sent.isEmpty() ? sent.get(sent.size() - 1) : null;
7766
}
7867

7968
private volatile boolean connected = false;
@@ -85,7 +74,6 @@ public Mono<Void> connect(Function<Mono<McpSchema.JSONRPCMessage>, Mono<McpSchem
8574
}
8675
connected = true;
8776
return inbound.asFlux()
88-
.publishOn(Schedulers.boundedElastic())
8977
.flatMap(message -> Mono.just(message).transform(handler))
9078
.doFinally(signal -> connected = false)
9179
.then();
@@ -95,7 +83,6 @@ public Mono<Void> connect(Function<Mono<McpSchema.JSONRPCMessage>, Mono<McpSchem
9583
public Mono<Void> closeGracefully() {
9684
return Mono.defer(() -> {
9785
connected = false;
98-
outgoing.tryEmitComplete();
9986
inbound.tryEmitComplete();
10087
// Wait for all subscribers to complete
10188
return Mono.empty();

0 commit comments

Comments
 (0)
0