8000 fix: handle connection refused exceptions · emqx/mcp-python-sdk@37e14bd · GitHub
[go: up one dir, main page]

Skip to content

Commit 37e14bd

Browse files
committed
fix: handle connection refused exceptions
1 parent 17cc30e commit 37e14bd

File tree

2 files changed

+17
-12
lines changed

2 files changed

+17
-12
lines changed

src/mcp/client/mqtt.py

Lines changed: 16 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -109,30 +109,35 @@ def get_presence_topic(self) -> str:
109109
return mqtt_topic.get_client_presence_topic(self.mcp_client_id)
110110

111111
async def start(self, timeout: timedelta | None = None) -> bool | str:
112-
connect_result = self.connect()
113-
def do_start():
114-
self.client.loop_forever()
115112
try:
116-
asyncio.create_task(anyio_to_thread.run_sync(do_start))
117-
if connect_result and connect_result != mqtt.MQTT_ERR_SUCCESS:
113+
connect_result = self.connect()
114+
asyncio.create_task(anyio_to_thread.run_sync(self.client.loop_forever))
115+
if connect_result != mqtt.MQTT_ERR_SUCCESS:
118116
logger.error(f"Failed to connect to MQTT broker, error code: {connect_result}")
119117
return mqtt.error_string(connect_result)
120118
# test if the client is connected and wait until it is connected
121119
if timeout:
122120
while not self.is_connected():
123121
await asyncio.sleep(0.1)
124122
if timeout.total_seconds() <= 0:
125-
logger.error(f"Timeout while waiting for MQTT client to connect, reason: {self.get_last_connect_fail_reason()}")
126-
return self.get_last_connect_fail_reason() or "timeout"
123+
last_fail_reason = self.get_last_connect_fail_reason()
124+
if last_fail_reason:
125+
return last_fail_reason.getName()
126+
return "timeout"
127127
timeout -= timedelta(seconds=0.1)
128128
return True
129129
except asyncio.CancelledError:
130130
logger.debug("MQTT transport (MCP client) got cancelled")
131131
return "cancelled"
132+
except ConnectionRefusedError as exc:
133+
logger.error(f"MQTT transport (MCP client) failed to connect: {exc}")
134+
return "connection_refused"
135+
except TimeoutError as exc:
136+
logger.error(f"MQTT transport (MCP client) timed out: {exc}")
137+
return "timeout"
132138
except Exception as exc:
133139
logger.error(f"MQTT transport (MCP client) failed: {exc}")
134-
traceback.print_exc()
135-
return "error"
140+
return f"connect mqtt error: {str(exc)}"
136141

137142
def get_session(self, server_name: ServerName) -> MqttClientSession | None:
138143
return self.client_sessions.get(server_name, None)
@@ -273,8 +278,8 @@ def _create_session(
273278
read_stream_writer: SndStreamEX
274279
write_stream: SndStream
275280
write_stream_reader: RcvStream
276-
read_stream_writer, read_stream = anyio.create_memory_object_stream(0)
277-
write_stream, write_stream_reader = anyio.create_memory_object_stream(0)
281+
read_stream_writer, read_stream = anyio.create_memory_object_stream(0) # type: ignore
282+
write_stream, write_stream_reader = anyio.create_memory_object_stream(0) # type: ignore
278283
self._read_stream_writers[server_id] = read_stream_writer
279284
self._task_group.start_soon(self._receieved_from_session, server_id, server_name, write_stream_reader)
280285
logger.debug(f"Created new session for server_id: {server_id}")

src/mcp/shared/mqtt.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -170,7 +170,7 @@ def connect(self):
170170
props.UserProperty = [
171171
(PROPERTY_K_MCP_COMPONENT, self.mcp_component_type)
172172
]
173-
self.client.connect(
173+
return self.client.connect(
174174
host = self.mqtt_options.host,
175175
port = self.mqtt_options.port,
176176
keepalive = self.mqtt_options.keepalive,

0 commit comments

Comments
 (0)
0