4
4
5
5
package io .modelcontextprotocol ;
6
6
7
- import java .util .concurrent .atomic .AtomicInteger ;
7
+ import java .util .ArrayList ;
8
+ import java .util .List ;
9
+ import java .util .function .BiConsumer ;
8
10
import java .util .function .Function ;
9
11
10
12
import com .fasterxml .jackson .core .type .TypeReference ;
11
13
import com .fasterxml .jackson .databind .ObjectMapper ;
12
14
import io .modelcontextprotocol .spec .ClientMcpTransport ;
13
15
import io .modelcontextprotocol .spec .McpSchema ;
16
+ import io .modelcontextprotocol .spec .ServerMcpTransport ;
14
17
import io .modelcontextprotocol .spec .McpSchema .JSONRPCNotification ;
15
18
import io .modelcontextprotocol .spec .McpSchema .JSONRPCRequest ;
16
- import io .modelcontextprotocol .spec .ServerMcpTransport ;
17
- import reactor .core .publisher .Flux ;
18
19
import reactor .core .publisher .Mono ;
19
20
import reactor .core .publisher .Sinks ;
20
- import reactor .core .scheduler .Schedulers ;
21
21
22
22
/**
23
23
* A mock implementation of the {@link ClientMcpTransport} and {@link ServerMcpTransport}
24
24
* interfaces.
25
25
*/
26
26
public class MockMcpTransport implements ClientMcpTransport , ServerMcpTransport {
27
27
28
- private final AtomicInteger inboundMessageCount = new AtomicInteger (0 );
29
-
30
- private final Sinks .Many <McpSchema .JSONRPCMessage > outgoing = Sinks .many ().multicast ().onBackpressureBuffer ();
31
-
32
28
private final Sinks .Many <McpSchema .JSONRPCMessage > inbound = Sinks .many ().unicast ().onBackpressureBuffer ();
33
29
34
- private final Flux <McpSchema .JSONRPCMessage > outboundView = outgoing . asFlux (). cache ( 1 );
30
+ private final List <McpSchema .JSONRPCMessage > sent = new ArrayList <>( );
35
31
36
- // Latch to wait for the next message(s) to be sent in response of simulated incoming
37
- // message
38
- java .util .concurrent .CountDownLatch latch = new java .util .concurrent .CountDownLatch (1 );
32
+ private final BiConsumer <MockMcpTransport , McpSchema .JSONRPCMessage > interceptor ;
39
33
40
- public void simulateIncomingMessage (McpSchema .JSONRPCMessage message ) {
41
- simulateIncomingMessage (message , 1 );
34
+ public MockMcpTransport () {
35
+ this ((t , msg ) -> {
36
+ });
42
37
}
43
38
44
- public void simulateIncomingMessage (McpSchema .JSONRPCMessage message , int expectedResponseMessagesCount ) {
39
+ public MockMcpTransport (BiConsumer <MockMcpTransport , McpSchema .JSONRPCMessage > interceptor ) {
40
+ this .interceptor = interceptor ;
41
+ }
42
+
43
+ public void simulateIncomingMessage (McpSchema .JSONRPCMessage message ) {
45
44
if (inbound .tryEmitNext (message ).isFailure ()) {
46
- throw new RuntimeException ("Failed to emit message " + message );
45
+ throw new RuntimeException ("Failed to process incoming message " + message );
47
46
}
48
- inboundMessageCount .incrementAndGet ();
49
- latch = new java .util .concurrent .CountDownLatch (expectedResponseMessagesCount );
50
47
}
51
48
52
49
@ Override
53
50
public Mono <Void > sendMessage (McpSchema .JSONRPCMessage message ) {
54
- if (outgoing .tryEmitNext (message ).isFailure ()) {
55
- return Mono .error (new RuntimeException ("Can't emit outgoing message " + message ));
56
- }
57
- latch .countDown ();
51
+ sent .add (message );
52
+ interceptor .accept (this , message );
58
53
return Mono .empty ();
59
54
}
60
55
61
56
public McpSchema .JSONRPCRequest getLastSentMessageAsRequest () {
62
57
return (JSONRPCRequest ) getLastSentMessage ();
63
58
}
64
59
65
- public McpSchema .JSONRPCNotification getLastSentMessageAsNotifiation () {
60
+ public McpSchema .JSONRPCNotification getLastSentMessageAsNotification () {
66
61
return (JSONRPCNotification ) getLastSentMessage ();
67
62
}
68
63
69
64
public McpSchema .JSONRPCMessage getLastSentMessage () {
70
- try {
71
- latch .await ();
72
- }
73
- catch (InterruptedException e ) {
74
- e .printStackTrace ();
75
- }
76
- return outboundView .blockFirst ();
65
+ return !sent .isEmpty () ? sent .get (sent .size () - 1 ) : null ;
77
66
}
78
67
79
68
private volatile boolean connected = false ;
@@ -85,7 +74,6 @@ public Mono<Void> connect(Function<Mono<McpSchema.JSONRPCMessage>, Mono<McpSchem
85
74
}
86
75
connected = true ;
87
76
return inbound .asFlux ()
88
- .publishOn (Schedulers .boundedElastic ())
89
77
.flatMap (message -> Mono .just (message ).transform (handler ))
90
78
.doFinally (signal -> connected = false )
91
79
.then ();
@@ -95,7 +83,6 @@ public Mono<Void> connect(Function<Mono<McpSchema.JSONRPCMessage>, Mono<McpSchem
95
83
public Mono <Void > closeGracefully () {
96
84
return Mono .defer (() -> {
97
85
connected = false ;
98
- outgoing .tryEmitComplete ();
99
86
inbound .tryEmitComplete ();
100
87
// Wait for all subscribers to complete
101
88
return Mono .empty ();
0 commit comments