@@ -109,30 +109,35 @@ def get_presence_topic(self) -> str:
109
109
return mqtt_topic .get_client_presence_topic (self .mcp_client_id )
110
110
111
111
async def start (self , timeout : timedelta | None = None ) -> bool | str :
112
- connect_result = self .connect ()
113
- def do_start ():
114
- self .client .loop_forever ()
115
112
try :
116
- asyncio .create_task (anyio_to_thread .run_sync (do_start ))
117
- if connect_result and connect_result != mqtt .MQTT_ERR_SUCCESS :
113
+ connect_result = self .connect ()
114
+ asyncio .create_task (anyio_to_thread .run_sync (self .client .loop_forever ))
115
+ if connect_result != mqtt .MQTT_ERR_SUCCESS :
118
116
logger .error (f"Failed to connect to MQTT broker, error code: { connect_result } " )
119
117
return mqtt .error_string (connect_result )
120
118
# test if the client is connected and wait until it is connected
121
119
if timeout :
122
120
while not self .is_connected ():
123
121
await asyncio .sleep (0.1 )
124
122
if timeout .total_seconds () <= 0 :
125
- logger .error (f"Timeout while waiting for MQTT client to connect, reason: { self .get_last_connect_fail_reason ()} " )
126
- return self .get_last_connect_fail_reason () or "timeout"
123
+ last_fail_reason = self .get_last_connect_fail_reason ()
124
+ if last_fail_reason :
125
+ return last_fail_reason .getName ()
126
+ return "timeout"
127
127
timeout -= timedelta (seconds = 0.1 )
128
128
return True
129
129
except asyncio .CancelledError :
130
130
logger .debug ("MQTT transport (MCP client) got cancelled" )
131
131
return "cancelled"
132
+ except ConnectionRefusedError as exc :
133
+ logger .error (f"MQTT transport (MCP client) failed to connect: { exc } " )
134
+ return "connection_refused"
135
+ except TimeoutError as exc :
136
+ logger .error (f"MQTT transport (MCP client) timed out: { exc } " )
137
+ return "timeout"
132
138
except Exception as exc :
133
139
logger .error (f"MQTT transport (MCP client) failed: { exc } " )
134
- traceback .print_exc ()
135
- return "error"
140
+ return f"connect mqtt error: { str (exc )} "
136
141
137
142
def get_session (self , server_name : ServerName ) -> MqttClientSession | None :
138
143
return self .client_sessions .get (server_name , None )
@@ -273,8 +278,8 @@ def _create_session(
273
278
read_stream_writer : SndStreamEX
274
279
write_stream : SndStream
275
280
write_stream_reader : RcvStream
276
- read_stream_writer , read_stream = anyio .create_memory_object_stream (0 )
277
- write_stream , write_stream_reader = anyio .create_memory_object_stream (0 )
281
+ read_stream_writer , read_stream = anyio .create_memory_object_stream (0 ) # type: ignore
282
+ write_stream , write_stream_reader = anyio .create_memory_object_stream (0 ) # type: ignore
278
283
self ._read_stream_writers [server_id ] = read_stream_writer
279
284
self ._task_group .start_soon (self ._receieved_from_session , server_id , server_name , write_stream_reader )
280
285
logger .debug (f"Created new session for server_id: { server_id } " )
0 commit comments