-
Notifications
You must be signed in to change notification settings - Fork 0
kafka #2
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
kafka #2
Conversation
WalkthroughAdds Kafka transport support across client, server, and handler layers. Introduces Kafka-specific client transport, server app, correlation management, and request handler adapter. Updates exports and types to include Kafka. Adds Docker Compose for local Kafka, optional dependency group, README install instructions, example usage, and extensive tests. Changes
Sequence Diagram(s)sequenceDiagram
autonumber
participant Client as Client App
participant KClient as KafkaClientTransport
participant KBroker as Kafka (topics)
participant KServer as KafkaServerApp
participant Handler as RequestHandler
rect rgb(240,245,255)
note over KClient,KServer: Setup
Client->>KClient: start()/first call
KClient->>KBroker: Subscribe reply_topic, create producer
KServer->>KBroker: Subscribe request_topic, create producer
end
rect rgb(245,255,245)
note over Client,KServer: Request/Response (single)
Client->>KClient: send_message(request)
KClient->>KBroker: Publish to request_topic<br/>headers: correlation_id, reply_topic
KBroker-->>KServer: Consume request
KServer->>Handler: on_message_send(params, ctx)
Handler-->>KServer: result
KServer->>KBroker: Publish response to reply_topic<br/>headers: correlation_id, response_type
KBroker-->>KClient: Consume response
KClient-->>Client: Result
end
rect rgb(255,250,240)
note over Client,KServer: Streaming flow
Client->>KClient: send_message_streaming(request)
KClient->>KBroker: Publish request (stream)
KBroker-->>KServer: Consume request
KServer->>Handler: on_message_send_stream(...)
loop events
Handler-->>KServer: event
KServer->>KBroker: Publish event (response_type)
KBroker-->>KClient: Deliver event
KClient-->>Client: Yield event
end
KServer->>KBroker: Publish stream_complete
KBroker-->>KClient: Deliver stream_complete
KClient-->>Client: Close stream
end
rect rgb(255,240,240)
note over KClient,KServer: Errors/Timeouts
KServer->>KBroker: Publish error response (headers include correlation_id)
KBroker-->>KClient: Deliver error
KClient-->>Client: Raise A2AClientError
end
sequenceDiagram
autonumber
participant Broker as Kafka
participant Server as KafkaServerApp
participant KH as KafkaHandler
participant RH as RequestHandler
note over Server: Consumer loop
Server->>Broker: poll()
Broker-->>Server: KafkaMessage (headers+payload)
Server->>KH: handle_request(message)
alt method == message_send (single)
KH->>RH: on_message_send(params, ctx)
RH-->>KH: result
KH->>Server: send_response(reply_topic, correlation_id, result, type)
else method == message_send (stream)
KH->>RH: on_message_send_stream(params, ctx)
loop for each event
RH-->>KH: event
KH->>Server: send_response(..., event, type)
end
KH->>Server: send_stream_complete(...)
else unsupported
KH->>Server: send_error_response(...)
end
Estimated code review effort🎯 4 (Complex) | ⏱️ ~75 minutes Poem
Tip 🔌 Remote MCP (Model Context Protocol) integration is now available!Pro plan users can now connect to remote MCP servers from the Integrations page. Connect with popular remote MCPs such as Notion and Linear to add more context to your reviews and chats. ✨ Finishing Touches
🧪 Generate unit tests
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
SupportNeed help? Create a ticket on our support page for assistance with any issues or questions. CodeRabbit Commands (Invoked using PR/Issue comments)Type Other keywords and placeholders
CodeRabbit Configuration File (
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 18
🧹 Nitpick comments (25)
src/a2a/client/transports/kafka_correlation.py (1)
5-5: Remove unused import
Setis imported fromtypingbut not used anywhere in the module.-from typing import Any, Dict, Optional, Set +from typing import Any, Dict, Optionalsrc/a2a/server/apps/__init__.py (1)
11-15: ExportingKafkaServerAppasNonemay confuse consumers
__all__always exposesKafkaServerApp, even when the import fails, so
from a2a.server.apps import KafkaServerAppreturnsNonewithout an explicit error.Two safer options:
-except ImportError: - KafkaServerApp = None # type: ignore +except ImportError: + KafkaServerApp = None # type: ignore # keep backward-compat + +if KafkaServerApp is None: + __all__.remove("KafkaServerApp")or raise a descriptive runtime error when used.
test_simple_kafka.py (1)
3-6: Path handling for imports.
sys.path.append('src')works fora2abut not for modules outsidesrc. Keep it, but consider adding repo root for consistency, or convert this script into a proper pytest test undertests/to avoidsys.pathhacks.test_handler.py (1)
58-60: Consider pytest + assertions over print statements.Converting this to a pytest test with assertions improves CI signal and integrates with the test suite. You can mark it with
@pytest.mark.asyncioand skip if example module isn’t present.src/a2a/client/client_factory.py (2)
28-32: Guard import is fine, but widen the exception scope.Only
ImportErroris caught; ifkafka.pyimports another optional dep that raises aRuntimeErrorat import time, the variable ends up set while Kafka still cannot work.
Consider catchingExceptionand logging the exact reason to avoid partial mis-configuration.
105-114: Duplicated boilerplate for optional transports – extract helper.The three nearly-identical blocks for gRPC, Kafka, etc. are repetitive. A small util such as
_register_optional(protocol, transport_cls, extras_tag)would reduce code size and keep future additions trivial.src/a2a/server/request_handlers/__init__.py (1)
45-49: Placeholder class leaks into type checking.The stub
KafkaHandlerdoes not inherit fromRequestHandler, so static-type users importing it unconditionally will lose type information.
Derive fromRequestHandler | objectto preserve an approximate shape:-class KafkaHandler: # type: ignore +class KafkaHandler(RequestHandler): # type: ignoreexamples/kafka_example.py (1)
91-96: Hard-coded bootstrap & topic names – surface them as CLI flags.Baking these strings into samples forces edits for every environment. Hook them to argparse so users can quickly point the sample at their cluster.
scripts/setup_kafka_dev.py (1)
84-88: Invoke pip via interpreter for venv safety.Running bare
pipcan hit a different Python than this script.
Prefer:-if run_command("pip install aiokafka", cwd=project_root) != 0: +if run_command(f\"{sys.executable} -m pip install aiokafka\", cwd=project_root) != 0:KAFKA_FIX_SUMMARY.md (2)
6-8: Specify a language for fenced code blocksMarkdown-lint flags these code blocks for missing language identifiers (MD040).
Addbash,python, ortextafter the opening back-ticks to silence the linter and enable syntax highlighting.Also applies to: 99-105
122-133: Surround tables with blank linesThe table is immediately adjacent to surrounding text (MD058).
Insert a blank line before and after the table for clearer rendering and to satisfy markdown-lint.docker-compose.kafka.yml (1)
48-61: Pin kafka-ui image versionUsing
provectuslabs/kafka-ui:latesthinders reproducibility and may break unexpectedly.
Pin to a specific tag (e.g.,provectuslabs/kafka-ui:0.7.1) and update periodically.KAFKA_IMPLEMENTATION_SUMMARY.md (1)
88-99: Add language identifiers to ASCII-diagramsThe fenced blocks that depict the client/server architecture trigger MD040.
Appendtextafter the back-ticks to keep the diagram formatting and satisfy the linter.examples/kafka_comprehensive_example.py (2)
149-164: Duplicate shutdown invocation
KafkaServerApp.run()already callsawait self.stop()in itsfinallyblock.
The extrafinally: await server.stop()here is redundant and harmless now, but may hide future refactor regressions.- finally: - await server.stop()
292-303: Gather without error handling can mask failures
asyncio.gather(*tasks)will raise on the first exception and drop remaining results.
Capture exceptions explicitly to get full error visibility during load testing.responses = await asyncio.gather(*tasks, return_exceptions=True) errors = [e for e in responses if isinstance(e, Exception)] if errors: logger.error("Load-test encountered %d errors:\n%s", len(errors), errors)src/a2a/server/apps/kafka/app.py (1)
69-72: Hard-codedauto_offset_reset='latest'may drop pending requestsStarting a brand-new consumer at
latestskips any messages produced before the server starts.
If at-least-once processing is required, default toearliestor make it configurable.docs/kafka_transport.md (2)
35-40: Pin dependency versions in the install snippetConsider showing an explicit compatible
aiokafkaversion (e.g.a2a-sdk[kafka]>=...) to avoid accidental breaks when future major versions introduce incompatibilities.
191-196: Mention exactly-once & DLQ mitigation optionsSince at-least-once delivery can cause duplicate processing, briefly add recommended patterns (idempotent producer, transaction API, DLQ) so readers immediately see possible mitigations.
src/kafka_example.py (1)
146-152: Hard-coded broker addressUsing a private IP (
100.95.155.4:9094) in an example will fail for most users. Default tolocalhost:9092with an env override to improve portability.src/kafka_currency_demo.py (2)
68-75: No retry / timeout handling for external APIA single transient network hiccup will surface as a user-visible error. Consider adding exponential back-off retries or caching latest rates.
140-150: Hard-coded status strings leak to protocolPrefixing messages with “INPUT_REQUIRED” is an ad-hoc convention. Formalise it in the protocol (e.g., dedicated message type) or at least document it to avoid brittle client parsing.
A2A on Kafka.md (2)
7-8: External-only image URLs will break offline / public docsThe
<img>tags referenceclouddocs.huawei.comwhich is not publicly accessible. When this repo is viewed outside the Huawei network (e.g. GitHub Pages, local Markdown preview, pkg-docs), the diagrams will show as broken links.Host the images in the repository (e.g.
docs/assets/*.png) or an open CDN and reference them with relative URLs.
31-35: Heading level jumps violate Markdown tooling & TOC generation
#### <br />(Line 33) skips from H2 to H4, triggering MD001 and breaking automatic TOCs.
Increment heading levels one at a time (H2▶H3) and avoid empty headings.src/kafka_chatopenai_demo.py (1)
283-290: Hard-coded bootstrap servers bind the demo to a private network
"100.95.155.4:9094"is an internal IP; the sample will fail for most users.
Accept the address via CLI / env var (e.g.--bootstrapflag orKAFKA_BOOTSTRAP_SERVERS) and default tolocalhost:9092.Also applies to: 320-324
src/a2a/client/transports/kafka.py (1)
259-263: Possible emptyreply_topicheader
self.reply_topicis encoded without a fallback (Line 261).
During unit tests wherestart()isn’t called, this yields an empty header string, violating server routing assumptions.
Guard with an explicit check or raise ifreply_topicis unset.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (28)
.gitignore(1 hunks)A2A on Kafka.md(1 hunks)KAFKA_FIX_SUMMARY.md(1 hunks)KAFKA_IMPLEMENTATION_SUMMARY.md(1 hunks)README.md(2 hunks)docker-compose.kafka.yml(1 hunks)docs/kafka_transport.md(1 hunks)examples/kafka_comprehensive_example.py(1 hunks)examples/kafka_example.py(1 hunks)examples/kafka_handler_example.py(1 hunks)pyproject.toml(1 hunks)scripts/setup_kafka_dev.py(1 hunks)src/a2a/client/client_factory.py(2 hunks)src/a2a/client/transports/__init__.py(1 hunks)src/a2a/client/transports/kafka.py(1 hunks)src/a2a/client/transports/kafka_correlation.py(1 hunks)src/a2a/server/apps/__init__.py(1 hunks)src/a2a/server/apps/kafka/__init__.py(1 hunks)src/a2a/server/apps/kafka/app.py(1 hunks)src/a2a/server/request_handlers/__init__.py(1 hunks)src/a2a/server/request_handlers/kafka_handler.py(1 hunks)src/a2a/types.py(2 hunks)src/kafka_chatopenai_demo.py(1 hunks)src/kafka_currency_demo.py(1 hunks)src/kafka_example.py(1 hunks)test_handler.py(1 hunks)test_simple_kafka.py(1 hunks)tests/client/transports/test_kafka.py(1 hunks)
🧰 Additional context used
🧬 Code Graph Analysis (8)
src/a2a/server/apps/kafka/__init__.py (1)
src/a2a/server/apps/kafka/app.py (1)
KafkaServerApp(19-205)
src/a2a/server/apps/__init__.py (1)
src/a2a/server/apps/kafka/app.py (1)
KafkaServerApp(19-205)
src/a2a/client/transports/__init__.py (1)
src/a2a/client/transports/kafka.py (1)
KafkaClientTransport(36-580)
src/a2a/server/request_handlers/__init__.py (1)
src/a2a/server/request_handlers/kafka_handler.py (1)
KafkaHandler(47-401)
examples/kafka_example.py (6)
src/a2a/client/transports/kafka.py (4)
KafkaClientTransport(36-580)stop(148-172)send_message(280-305)send_message_streaming(307-342)src/a2a/server/apps/kafka/app.py (3)
KafkaServerApp(19-205)run(106-132)stop(83-104)src/a2a/server/request_handlers/default_request_handler.py (1)
DefaultRequestHandler(60-510)src/a2a/types.py (4)
AgentCard(1724-1834)Message(1437-1478)MessageSendParams(1481-1498)Task(1856-1888)examples/kafka_handler_example.py (4)
on_message_send(55-61)run_server(110-136)run_client(139-195)main(198-209)examples/kafka_comprehensive_example.py (5)
on_message_send(39-65)on_message_send_streaming(67-117)run_server(142-164)run_client(167-257)main(306-323)
src/a2a/client/transports/kafka_correlation.py (1)
src/a2a/types.py (4)
Message(1437-1478)Task(1856-1888)TaskArtifactUpdateEvent(1604-1637)TaskStatusUpdateEvent(1661-1690)
examples/kafka_handler_example.py (7)
src/a2a/server/request_handlers/request_handler.py (1)
RequestHandler(21-200)src/a2a/server/apps/kafka/app.py (4)
KafkaServerApp(19-205)get_handler(175-182)send_push_notification(184-196)run(106-132)src/a2a/client/transports/kafka.py (3)
KafkaClientTransport(36-580)send_message(280-305)send_message_streaming(307-342)src/a2a/server/context.py (1)
ServerCallContext(14-25)src/a2a/types.py (15)
AgentCard(1724-1834)AgentCapabilities(1090-1110)AgentSkill(134-181)Message(1437-1478)MessageSendParams(1481-1498)Part(1321-1326)Role(713-719)Task(1856-1888)TaskIdParams(877-889)TaskQueryParams(947-963)TaskPushNotificationConfig(932-944)GetTaskPushNotificationConfigParams(428-444)ListTaskPushNotificationConfigParams(673-685)DeleteTaskPushNotificationConfigParams(291-307)TextPart(1005-1021)examples/kafka_example.py (1)
on_message_send(20-29)src/a2a/server/request_handlers/kafka_handler.py (1)
send_push_notification(352-392)
src/kafka_example.py (7)
src/a2a/server/context.py (1)
ServerCallContext(14-25)src/a2a/server/request_handlers/request_handler.py (1)
RequestHandler(21-200)src/a2a/server/apps/kafka/app.py (3)
KafkaServerApp(19-205)run(106-132)stop(83-104)src/a2a/client/transports/kafka.py (4)
KafkaClientTransport(36-580)stop(148-172)send_message(280-305)send_message_streaming(307-342)src/a2a/types.py (15)
AgentCard(1724-1834)Message(1437-1478)MessageSendParams(1481-1498)Part(1321-1326)Role(713-719)Task(1856-1888)TaskQueryParams(947-963)TextPart(1005-1021)TaskIdParams(877-889)TaskPushNotificationConfig(932-944)GetTaskPushNotificationConfigParams(428-444)ListTaskPushNotificationConfigParams(673-685)DeleteTaskPushNotificationConfigParams(291-307)AgentCapabilities(1090-1110)AgentSkill(134-181)examples/kafka_example.py (5)
ExampleRequestHandler(17-46)on_message_send(20-29)run_server(49-71)run_client(74-120)main(123-138)examples/kafka_handler_example.py (12)
on_message_send(55-61)on_message_send_stream(63-75)on_get_task(78-80)on_cancel_task(82-84)on_set_task_push_notification_config(86-89)on_get_task_push_notification_config(91-94)on_resubscribe_to_task(96-100)on_list_task_push_notification_config(102-104)on_delete_task_push_notification_config(106-107)run_server(110-136)run_client(139-195)main(198-209)
🪛 markdownlint-cli2 (0.17.2)
KAFKA_IMPLEMENTATION_SUMMARY.md
88-88: Fenced code blocks should have a language specified
(MD040, fenced-code-language)
97-97: Fenced code blocks should have a language specified
(MD040, fenced-code-language)
KAFKA_FIX_SUMMARY.md
6-6: Fenced code blocks should have a language specified
(MD040, fenced-code-language)
99-99: Fenced code blocks should have a language specified
(MD040, fenced-code-language)
122-122: Tables should be surrounded by blank lines
(MD058, blanks-around-tables)
A2A on Kafka.md
15-15: Unordered list indentation
Expected: 2; Actual: 4
(MD007, ul-indent)
17-17: Unordered list indentation
Expected: 2; Actual: 4
(MD007, ul-indent)
33-33: Heading levels should only increment by one level at a time
Expected: h3; Actual: h4
(MD001, heading-increment)
39-39: Unordered list indentation
Expected: 2; Actual: 4
(MD007, ul-indent)
43-43: Unordered list indentation
Expected: 2; Actual: 4
(MD007, ul-indent)
49-49: Unordered list indentation
Expected: 2; Actual: 4
(MD007, ul-indent)
51-51: Unordered list indentation
Expected: 2; Actual: 4
(MD007, ul-indent)
53-53: Unordered list indentation
Expected: 2; Actual: 4
(MD007, ul-indent)
55-55: Unordered list indentation
Expected: 2; Actual: 4
(MD007, ul-indent)
57-57: Unordered list indentation
Expected: 2; Actual: 4
(MD007, ul-indent)
59-59: Unordered list indentation
Expected: 2; Actual: 4
(MD007, ul-indent)
63-63: Unordered list indentation
Expected: 2; Actual: 4
(MD007, ul-indent)
65-65: Unordered list indentation
Expected: 2; Actual: 4
(MD007, ul-indent)
67-67: Unordered list indentation
Expected: 2; Actual: 4
(MD007, ul-indent)
73-73: Unordered list indentation
Expected: 2; Actual: 4
(MD007, ul-indent)
75-75: Unordered list indentation
Expected: 2; Actual: 4
(MD007, ul-indent)
77-77: Unordered list indentation
Expected: 2; Actual: 4
(MD007, ul-indent)
79-79: Unordered list indentation
Expected: 2; Actual: 4
(MD007, ul-indent)
83-83: Unordered list indentation
Expected: 2; Actual: 4
(MD007, ul-indent)
85-85: Unordered list indentation
Expected: 2; Actual: 4
(MD007, ul-indent)
87-87: Unordered list indentation
Expected: 2; Actual: 4
(MD007, ul-indent)
89-89: Unordered list indentation
Expected: 2; Actual: 4
(MD007, ul-indent)
130-130: Unordered list indentation
Expected: 2; Actual: 4
(MD007, ul-indent)
132-132: Unordered list indentation
Expected: 2; Actual: 4
(MD007, ul-indent)
134-134: Unordered list indentation
Expected: 2; Actual: 4
(MD007, ul-indent)
136-136: Unordered list indentation
Expected: 2; Actual: 4
(MD007, ul-indent)
176-176: Code block style
Expected: fenced; Actual: indented
(MD046, code-block-style)
352-352: Unordered list indentation
Expected: 2; Actual: 4
(MD007, ul-indent)
354-354: Unordered list indentation
Expected: 4; Actual: 8
(MD007, ul-indent)
356-356: Unordered list indentation
Expected: 6; Actual: 12
(MD007, ul-indent)
358-358: Unordered list indentation
Expected: 6; Actual: 12
(MD007, ul-indent)
360-360: Unordered list indentation
Expected: 6; Actual: 12
(MD007, ul-indent)
362-362: Unordered list indentation
Expected: 6; Actual: 12
(MD007, ul-indent)
364-364: Unordered list indentation
Expected: 6; Actual: 1
F440
2
(MD007, ul-indent)
366-366: Unordered list indentation
Expected: 2; Actual: 4
(MD007, ul-indent)
368-368: Unordered list indentation
Expected: 4; Actual: 8
(MD007, ul-indent)
370-370: Unordered list indentation
Expected: 6; Actual: 12
(MD007, ul-indent)
372-372: Unordered list indentation
Expected: 6; Actual: 12
(MD007, ul-indent)
374-374: Unordered list indentation
Expected: 6; Actual: 12
(MD007, ul-indent)
376-376: Unordered list indentation
Expected: 6; Actual: 12
(MD007, ul-indent)
378-378: Unordered list indentation
Expected: 6; Actual: 12
(MD007, ul-indent)
380-380: Unordered list indentation
Expected: 6; Actual: 12
(MD007, ul-indent)
382-382: Unordered list indentation
Expected: 6; Actual: 12
(MD007, ul-indent)
384-384: Unordered list indentation
Expected: 4; Actual: 8
(MD007, ul-indent)
386-386: Unordered list indentation
Expected: 6; Actual: 12
(MD007, ul-indent)
388-388: Unordered list indentation
Expected: 6; Actual: 12
(MD007, ul-indent)
390-390: Unordered list indentation
Expected: 6; Actual: 12
(MD007, ul-indent)
392-392: Unordered list indentation
Expected: 6; Actual: 12
(MD007, ul-indent)
394-394: Unordered list indentation
Expected: 6; Actual: 12
(MD007, ul-indent)
396-396: Unordered list indentation
Expected: 4; Actual: 8
(MD007, ul-indent)
398-398: Unordered list indentation
Expected: 6; Actual: 12
(MD007, ul-indent)
400-400: Unordered list indentation
Expected: 6; Actual: 12
(MD007, ul-indent)
402-402: Unordered list indentation
Expected: 6; Actual: 12
(MD007, ul-indent)
404-404: Unordered list indentation
Expected: 2; Actual: 4
(MD007, ul-indent)
406-406: Unordered list indentation
Expected: 4; Actual: 8
(MD007, ul-indent)
408-408: Unordered list indentation
Expected: 6; Actual: 12
(MD007, ul-indent)
410-410: Unordered list indentation
Expected: 6; Actual: 12
(MD007, ul-indent)
412-412: Unordered list indentation
Expected: 6; Actual: 12
(MD007, ul-indent)
414-414: Unordered list indentation
Expected: 6; Actual: 12
(MD007, ul-indent)
416-416: Unordered list indentation
Expected: 6; Actual: 12
(MD007, ul-indent)
418-418: Unordered list indentation
Expected: 6; Actual: 12
(MD007, ul-indent)
420-420: Unordered list indentation
Expected: 6; Actual: 12
(MD007, ul-indent)
422-422: Unordered list indentation
Expected: 4; Actual: 8
(MD007, ul-indent)
424-424: Unordered list indentation
Expected: 6; Actual: 12
(MD007, ul-indent)
426-426: Unordered list indentation
Expected: 6; Actual: 12
(MD007, ul-indent)
428-428: Unordered list indentation
Expected: 6; Actual: 12
(MD007, ul-indent)
430-430: Unordered list indentation
Expected: 6; Actual: 12
(MD007, ul-indent)
432-432: Unordered list indentation
Expected: 6; Actual: 12
(MD007, ul-indent)
434-434: Unordered list indentation
Expected: 2; Actual: 4
(MD007, ul-indent)
436-436: Unordered list indentation
Expected: 4; Actual: 8
(MD007, ul-indent)
438-438: Unordered list indentation
Expected: 6; Actual: 12
(MD007, ul-indent)
440-440: Unordered list indentation
Expected: 6; Actual: 12
(MD007, ul-indent)
442-442: Unordered list indentation
Expected: 6; Actual: 12
(MD007, ul-indent)
444-444: Unordered list indentation
Expected: 6; Actual: 12
(MD007, ul-indent)
446-446: Unordered list indentation
Expected: 6; Actual: 12
(MD007, ul-indent)
448-448: Unordered list indentation
Expected: 6; Actual: 12
(MD007, ul-indent)
450-450: Unordered list indentation
Expected: 6; Actual: 12
(MD007, ul-indent)
452-452: Unordered list indentation
Expected: 4; Actual: 8
(MD007, ul-indent)
454-454: Unordered list indentation
Expected: 6; Actual: 12
(MD007, ul-indent)
456-456: Unordered list indentation
Expected: 6; Actual: 12
(MD007, ul-indent)
458-458: Unordered list indentation
Expected: 6; Actual: 12
(MD007, ul-indent)
460-460: Unordered list indentation
Expected: 6; Actual: 12
(MD007, ul-indent)
462-462: Unordered list indentation
Expected: 6; Actual: 12
(MD007, ul-indent)
468-468: Unordered list indentation
Expected: 2; Actual: 4
(MD007, ul-indent)
470-470: Unordered list indentation
Expected: 4; Actual: 8
(MD007, ul-indent)
472-472: Unordered list indentation
Expected: 2; Actual: 4
(MD007, ul-indent)
474-474: Unordered list indentation
Expected: 4; Actual: 8
(MD007, ul-indent)
476-476: Unordered list indentation
Expected: 4; Actual: 8
(MD007, ul-indent)
478-478: Unordered list indentation
Expected: 2; Actual: 4
(MD007, ul-indent)
480-480: Unordered list indentation
Expected: 4; Actual: 8
(MD007, ul-indent)
482-482: Unordered list indentation
Expected: 4; Actual: 8
(MD007, ul-indent)
484-484: Unordered list indentation
Expected: 4; Actual: 8
(MD007, ul-indent)
486-486: Unordered list indentation
Expected: 2; Actual: 4
(MD007, ul-indent)
488-488: Unordered list indentation
Expected: 4; Actual: 8
(MD007, ul-indent)
490-490: Unordered list indentation
Expected: 4; Actual: 8
(MD007, ul-indent)
492-492: Unordered list indentation
Expected: 4; Actual: 8
(MD007, ul-indent)
494-494: Unordered list indentation
Expected: 2; Actual: 4
(MD007, ul-indent)
496-496: Unordered list indentation
Expected: 4; Actual: 8
(MD007, ul-indent)
502-502: Unordered list indentation
Expected: 2; Actual: 4
(MD007, ul-indent)
504-504: Unordered list indentation
Expected: 2; Actual: 4
(MD007, ul-indent)
506-506: Unordered list indentation
Expected: 2; Actual: 4
(MD007, ul-indent)
🪛 LanguageTool
A2A on Kafka.md
[uncategorized] ~17-~17: 动词的修饰一般为‘形容词(副词)+地+动词’。您的意思是否是:固定"地"请求
Context: ...d。 * 服务端 (Server):在固定的请求主题 (requestTopic) 上监听。处理完...
(wb4)
[uncategorized] ~408-~408: 能愿动词不能成为‘把’字句、‘被’字句的谓语动词。应该是:"能被……启动"。
Context: ... * 描述: 建立到 Kafka 的网络连接。此方法必须在会话被启动或恢复后才能调用。 * 检查 self.reply_topic 是...
(wa3)
🔇 Additional comments (14)
pyproject.toml (1)
39-40: Confirmaiokafkaversion compatibility & security
aiokafka>=0.11.0is more than four years old and the last release predates Python 3.12/3.13.
Please verify it still receives security fixes and runs on the full interpreter range advertised in this project (3.10-3.13). If not, pin to the latest maintained fork or relax the lower bound.src/a2a/client/transports/kafka_correlation.py (1)
68-104: Potential memory leak for long-lived streaming requests
_streaming_requestsentries are removed only viacomplete_streaming().
If the caller forgets to invoke it after the final chunk, finishedStreamingFutures remain in the dict indefinitely.Consider:
- Auto-removing the entry inside
StreamingFuture.set_done(), or- Adding a time-to-live / background cleanup task.
.gitignore (1)
11-12: LGTMAdding
.ideato.gitignoreis standard and harmless.README.md (1)
48-53: Documentation addition looks goodKafka extra is documented consistently for both
uvandpip.Also applies to: 96-101
src/a2a/server/apps/kafka/__init__.py (1)
3-7: Public API surface is clear and minimal.Import path and
__all__export look correct and consistent with theKafkaServerAppimplementation.src/a2a/client/transports/__init__.py (1)
13-17: Graceful optional dependency import — good.Importing
KafkaClientTransportbehind a try/except matches the pattern used for gRPC and avoids hard dependency on Kafka at import time.src/a2a/types.py (1)
1024-1033: Kafka protocol addition looks correct.Adding
kafka = 'KAFKA'toTransportProtocolaligns with the new transport support.test_handler.py (1)
9-12: Import path is correct—no changes neededThe test’s
sys.path.insert(0, …/src)correctly placessrc/kafka_example.pyat the front ofsys.path, sofrom kafka_example import ExampleRequestHandlerwill succeed under CI. We confirmed:
src/kafka_example.pyexists and will be imported beforeexamples/kafka_example.py.- No adjustments to import paths or additional
sys.pathentries are required.Please disregard the suggested change to
examples.kafka_example.Likely an incorrect or invalid review comment.
examples/kafka_example.py (1)
78-88: AgentCard creation may fail validation.
capabilitiesmust be anAgentCapabilitiesmodel, not a plain dict; some fields likeidcan also be mandatory depending on the schema.
Use the helper inclient_factory.minimal_agent_cardor construct a proper model to avoid runtimeValidationError.docker-compose.kafka.yml (1)
33-40: Health-check may hit wrong listenerInside the Kafka container the broker listens on 29092 (internal listener), but the health-check probes
localhost:9092.
IfKAFKA_ADVERTISED_LISTENERSkeeps port 9092 bound only to the host network, the command can fail in the container and mark the service unhealthy.Consider:
- test: ["CMD", "bash", "-c", "kafka-broker-api-versions --bootstrap-server localhost:9092"] + test: ["CMD", "bash", "-c", "kafka-broker-api-versions --bootstrap-server localhost:29092"]Also applies to: 42-46
examples/kafka_handler_example.py (1)
124-130: Placeholderreply_topicrisks silent drops
"a2a-reply-demo_client"is hard-coded; if the actual client reply topic differs, the push notification is lost with no error. Either derive it from the incoming request headers or make it a parameter.src/a2a/server/request_handlers/kafka_handler.py (1)
268-279:json.dumpsmay fail on non-primitive types
result.model_dump()can containEnum,datetime, etc., whichjson.dumpsin the producer serializer can’t encode. Provide a custom encoder or pre-serialize withpydantic.TypeAdapter(json=True).src/kafka_chatopenai_demo.py (1)
372-374: Streaming demo assumes every event isMessage
stream_resp.parts[0]will raise if the server streamsTaskStatusUpdateEvent, etc.
Either narrow the server’s contract or addisinstancechecks before accessing.parts.src/a2a/client/transports/kafka.py (1)
325-335:streaming_futureAPI is unclearLoop relies on
streaming_future.is_done()and.get().
IfCorrelationManager.register_streaming()returnsasyncio.Queue, those attributes don’t exist and anAttributeErrorwill surface on the first iteration.Confirm the object’s interface or wrap the queue in a lightweight adapter exposing
is_done()/get().
examples/kafka_example.py
Outdated
| async def on_message_send_streaming( | ||
| self, | ||
| params: MessageSendParams, | ||
| context=None | ||
| ) -> AsyncGenerator[Message | Task, None]: | ||
| """处理流式消息发送请求。""" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Method name mismatch – streaming handler will never be invoked.
KafkaHandler expects the server-side handler to expose on_message_send_stream, but this class defines on_message_send_streaming.
Rename to match the contract:
-async def on_message_send_streaming(
+async def on_message_send_stream(📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| async def on_message_send_streaming( | |
| self, | |
| params: MessageSendParams, | |
| context=None | |
| ) -> AsyncGenerator[Message | Task, None]: | |
| """处理流式消息发送请求。""" | |
| async def on_message_send_stream( | |
| self, | |
| params: MessageSendParams, | |
| context=None | |
| ) -> AsyncGenerator[Message | Task, None]: | |
| """处理流式消息发送请求。""" |
🤖 Prompt for AI Agents
In examples/kafka_example.py around lines 31 to 36, the method name
on_message_send_streaming does not match the expected handler name
on_message_send_stream required by KafkaHandler. Rename the method from
on_message_send_streaming to on_message_send_stream to ensure the streaming
handler is correctly invoked.
| __all__ = [ | ||
| 'ClientTransport', | ||
| 'GrpcTransport', | ||
| 'JsonRpcTransport', | ||
| 'KafkaClientTransport', | ||
| 'RestTransport', | ||
| ] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Avoid exporting KafkaClientTransport when dependency is missing.
Currently KafkaClientTransport is always listed in __all__, but may be None when Kafka deps are absent. Conditionally exporting improves DX and avoids surprising None symbols.
Apply this minimal change to avoid exporting when unavailable:
__all__ = [
'ClientTransport',
'GrpcTransport',
'JsonRpcTransport',
- 'KafkaClientTransport',
'RestTransport',
]Then append this (outside the above block) to export conditionally:
# Append conditionally to avoid exposing None when import fails
if KafkaClientTransport is not None:
__all__.append('KafkaClientTransport')🤖 Prompt for AI Agents
In src/a2a/client/transports/__init__.py around lines 19 to 25,
KafkaClientTransport is always included in the __all__ list even when its
dependencies are missing and it is None. To fix this, remove
'KafkaClientTransport' from the initial __all__ list and then add a conditional
statement after that block to append 'KafkaClientTransport' to __all__ only if
KafkaClientTransport is not None. This prevents exporting a None symbol and
improves developer experience.
| async def get(self) -> Any: | ||
| """Get the next item from the stream.""" | ||
| if self._exception: | ||
| raise self._exception | ||
| return await self.queue.get() | ||
|
|
||
| def set_exception(self, exception: Exception) -> None: | ||
| """Set an exception for the stream.""" | ||
| self._exception = exception | ||
| self._done = True | ||
|
|
||
| def set_done(self) -> None: | ||
| """Mark the stream as complete.""" | ||
| self._done = True | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
StreamingFuture.get() can dead-lock after set_done()
set_done() only flips _done; it doesn’t unblock awaiters.
If the queue is empty when _done is set, subsequent await get() hangs forever.
async def get(self) -> Any:
"""Get the next item from the stream."""
- if self._exception:
- raise self._exception
- return await self.queue.get()
+ if self._exception:
+ raise self._exception
+
+ if self._done and self.queue.empty():
+ # Signal end-of-stream
+ raise StopAsyncIteration
+
+ return await self.queue.get()
@@
def set_done(self) -> None:
"""Mark the stream as complete."""
- self._done = True
+ self._done = True
+ # Unblock any waiters when no more data will arrive.
+ if self.queue.empty():
+ self.queue.put_nowait(None) # sentinelCallers would stop consuming on StopAsyncIteration or the sentinel.
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| async def get(self) -> Any: | |
| """Get the next item from the stream.""" | |
| if self._exception: | |
| raise self._exception | |
| return await self.queue.get() | |
| def set_exception(self, exception: Exception) -> None: | |
| """Set an exception for the stream.""" | |
| self._exception = exception | |
| self._done = True | |
| def set_done(self) -> None: | |
| """Mark the stream as complete.""" | |
| self._done = True | |
| async def get(self) -> Any: | |
| """Get the next item from the stream.""" | |
| if self._exception: | |
| raise self._exception | |
| if self._done and self.queue.empty(): | |
| # Signal end-of-stream | |
| raise StopAsyncIteration | |
| return await self.queue.get() | |
| def set_exception(self, exception: Exception) -> None: | |
| """Set an exception for the stream.""" | |
| self._exception = exception | |
| self._done = True | |
| def set_done(self) -> None: | |
| """Mark the stream as complete.""" | |
| self._done = True | |
| # Unblock any waiters when no more data will arrive. | |
| if self.queue.empty(): | |
| self.queue.put_nowait(None) # sentinel |
🤖 Prompt for AI Agents
In src/a2a/client/transports/kafka_correlation.py around lines 23 to 37, the
set_done() method only sets the _done flag but does not unblock any coroutines
awaiting get(), causing a deadlock if the queue is empty. To fix this, modify
set_done() to also put a sentinel value or raise an exception in the queue to
unblock awaiters, allowing them to detect completion and stop consuming.
| correlation_id = self.correlation_manager.generate_correlation_id() | ||
|
|
||
| # Prepare request message | ||
| request_data = { | ||
| 'method': method, | ||
| 'params': params.model_dump() if hasattr(params, 'model_dump') else params, | ||
| 'streaming': streaming, | ||
| 'agent_card': self.agent_card.model_dump(), | ||
| } | ||
|
|
||
| # Prepare headers | ||
| headers = [ | ||
| ('correlation_id', correlation_id.encode('utf-8')), | ||
| ('reply_topic', (self.reply_topic or '').encode('utf-8')), | ||
| ('agent_id', self.agent_card.name.encode('utf-8')), | ||
| ] | ||
|
|
||
| if context: | ||
| # Add context headers if needed | ||
| if context.trace_id: | ||
| headers.append(('trace_id', context.trace_id.encode('utf-8'))) | ||
|
|
||
| try: | ||
| await self.producer.send_and_wait( | ||
| self.request_topic, | ||
| value=request_data, | ||
| headers=headers | ||
| ) | ||
| return correlation_id | ||
| except KafkaError as e: | ||
| raise A2AClientError(f"Failed to send Kafka message: {e}") from e | ||
|
|
||
| async def send_message( | ||
| self, | ||
| request: MessageSendParams, | ||
| *, | ||
| context: ClientCallContext | None = None, | ||
| ) -> Task | Message: | ||
| """Send a non-streaming message request to the agent.""" | ||
| correlation_id = await self._send_request('message_send', request, context, streaming=False) | ||
|
|
||
| # Register and wait for response | ||
| future = await self.correlation_manager.register(correlation_id) | ||
|
|
||
| try: | ||
| # Wait for response with timeout | ||
| timeout = 30.0 # Default timeout | ||
| if context and context.timeout: | ||
| timeout = context.timeout | ||
|
|
||
| result = await asyncio.wait_for(future, timeout=timeout) | ||
| return result | ||
| except asyncio.TimeoutError: | ||
| await self.correlation_manager.complete_with_exception( | ||
| correlation_id, | ||
| A2AClientError(f"Request timed out after {timeout} seconds") | ||
| ) | ||
| raise A2AClientError(f"Request timed out after {timeout} seconds") | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Race condition: reply may arrive before register()
_send_request() sends the message first; register() is called only afterwards (Line 290).
If the response lands quickly, CorrelationManager.complete() sees an unknown correlation ID and drops it.
Register the future/queue before publishing:
- correlation_id = await self._send_request(...)
- future = await self.correlation_manager.register(correlation_id)
+ correlation_id = self.correlation_manager.generate_correlation_id()
+ future = await self.correlation_manager.register(correlation_id)
+ await self._send_request(..., correlation_id=correlation_id)Committable suggestion skipped: line range outside the PR's diff.
🤖 Prompt for AI Agents
In src/a2a/client/transports/kafka.py between lines 248 and 306, the current
code sends the Kafka message before registering the correlation ID future in the
correlation manager, causing a race condition where a quick reply may be
dropped. To fix this, move the call to register the correlation ID future before
calling _send_request, so the future is ready to receive the response when it
arrives. Adjust the send_message method to first register the correlation ID,
then call _send_request without sending the message inside it, or refactor to
ensure registration happens before sending.
| params = MessageSendParams( | ||
| content="测试流式消息", | ||
| role="user" | ||
| ) | ||
| events = [] | ||
| async for event in handler.on_message_send_stream(params): | ||
| events.append(event) | ||
| print(f"✓ 收到流式事件: {event.content}") | ||
| print(f"✓ on_message_send_stream 正常工作,收到 {len(events)} 个事件") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
💡 Verification agent
🧩 Analysis chain
Align streaming test with event types.
Streaming may yield TaskStatusUpdateEvent, TaskArtifactUpdateEvent, Message, or Task. Avoid assuming a .content attribute (not present in the spec types).
- async for event in handler.on_message_send_stream(params):
- events.append(event)
- print(f"✓ 收到流式事件: {event.content}")
+ async for event in handler.on_message_send_stream(params):
+ events.append(event)
+ print(f"✓ 收到流式事件: {type(event).__name__}")🏁 Script executed:
#!/bin/bash
set -euo pipefail
# Search for the implementation of on_message_send_stream to inspect yielded event types
rg -n "async def on_message_send_stream" -A10Length of output: 8845
🏁 Script executed:
#!/bin/bash
set -euo pipefail
# Locate test_handler.py in the repository
fd test_handler.pyLength of output: 56
🏁 Script executed:
#!/bin/bash
set -euo pipefail
# Display the first 200 lines of test_handler.py for context
sed -n '1,200p' test_handler.pyLength of output: 1584
🏁 Script executed:
#!/bin/bash
set -euo pipefail
# Display the first 200 lines of the A2A types module to inspect Message and related classes
sed -n '1,200p' src/a2a/types.pyLength of output: 5278
🏁 Script executed:
#!/bin/bash
set -euo pipefail
# Inspect definitions of Event-related classes to check for a .content attribute
rg -n "class Message" -A20 src/a2a/types.py
rg -n "class Task" -A20 src/a2a/types.py
rg -n "class TaskStatusUpdateEvent" -A20 src/a2a/types.py
rg -n "class TaskArtifactUpdateEvent" -A20 src/a2a/types.py
rg -n "class Event" -A20 src/a2a/types.pyLength of output: 10691
Align streaming test with spec event types
Streaming handlers yield TaskStatusUpdateEvent, TaskArtifactUpdateEvent, Message, or Task—none of which define a .content attribute. Update your test to avoid assuming event.content and instead log the actual event type:
File: test_handler.py (lines 41–49)
- async for event in handler.on_message_send_stream(params):
- events.append(event)
- print(f"✓ 收到流式事件: {event.content}")
+ async for event in handler.on_message_send_stream(params):
+ events.append(event)
+ print(f"✓ 收到流式事件: {type(event).__name__}")📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| params = MessageSendParams( | |
| content="测试流式消息", | |
| role="user" | |
| ) | |
| events = [] | |
| async for event in handler.on_message_send_stream(params): | |
| events.append(event) | |
| print(f"✓ 收到流式事件: {event.content}") | |
| print(f"✓ on_message_send_stream 正常工作,收到 {len(events)} 个事件") | |
| params = MessageSendParams( | |
| content="测试流式消息", | |
| role="user" | |
| ) | |
| events = [] | |
| async for event in handler.on_message_send_stream(params): | |
| events.append(event) | |
| print(f"✓ 收到流式事件: {type(event).__name__}") | |
| print(f"✓ on_message_send_stream 正常工作,收到 {len(events)} 个事件") |
🤖 Prompt for AI Agents
In test_handler.py around lines 41 to 49, the test incorrectly assumes that the
streamed events have a .content attribute, but the events yielded are of types
TaskStatusUpdateEvent, TaskArtifactUpdateEvent, Message, or Task, which do not
have .content. Update the test to avoid accessing event.content and instead log
or print the type of each event received, for example by using type(event) or
event.__class__.__name__, to align with the expected event types in the
streaming handler.
| sys.path.append('src') | ||
|
|
||
| from a2a.client.transports.kafka import KafkaClientTransport |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Use package entrypoint to handle optional Kafka deps.
Directly importing a2a.client.transports.kafka will fail if Kafka deps aren’t installed. Import via the transports package and handle the None case to keep the test resilient.
-from a2a.client.transports.kafka import KafkaClientTransport
+from a2a.client.transports import KafkaClientTransport
+
+if KafkaClientTransport is None:
+ print("Kafka dependencies not installed; skipping Kafka client instantiation test.")
+ sys.exit(0)📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| sys.path.append('src') | |
| from a2a.client.transports.kafka import KafkaClientTransport | |
| sys.path.append('src') | |
| -from a2a.client.transports.kafka import KafkaClientTransport | |
| +from a2a.client.transports import KafkaClientTransport | |
| +if KafkaClientTransport is None: | |
| + print("Kafka dependencies not installed; skipping Kafka client instantiation test.") | |
| + sys.exit(0) |
🤖 Prompt for AI Agents
In test_simple_kafka.py around lines 5 to 7, avoid directly importing
a2a.client.transports.kafka because it fails if Kafka dependencies are missing.
Instead, import KafkaClientTransport through the a2a.client.transports package
entrypoint and add logic to handle the case when KafkaClientTransport is None,
ensuring the test remains resilient without Kafka dependencies installed.
| # 创建 Kafka 客户端传输 | ||
| transport = KafkaClientTransport( | ||
| agent_card=agent_card, | ||
| bootstrap_servers="localhost:9092", | ||
| request_topic="a2a-requests" | ||
| ) | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
💡 Verification agent
🧩 Analysis chain
Optionally validate the transport’s lifecycle.
If you want a stronger smoke test, call await transport.start() then await transport.stop() in a try/finally and guard it behind an env flag to avoid requiring a running Kafka broker during CI.
If you want to verify Kafka presence locally before starting, run:
🏁 Script executed:
#!/bin/bash
# basic broker reachability check; adjust host/port as needed
timeout 2 bash -c 'echo > /dev/tcp/localhost/9092' && echo "Kafka reachable" || echo "Kafka not reachable"Length of output: 199
Add an optional Kafka lifecycle smoke test with a portable reachability check
To strengthen the smoke test without mandating a live broker in CI, wrap the transport’s start/stop in a try/finally behind an env flag, and use a Python socket check (more portable than timeout) to skip when Kafka isn’t reachable:
• File: test_simple_kafka.py (around lines 35–41)
• Introduce an env guard (e.g. TEST_KAFKA)
• Preflight broker reachability via socket.create_connection and pytest.skip() if unreachable
• Invoke await transport.start() / await transport.stop() in try/finally
Proposed diff:
transport = KafkaClientTransport(
agent_card=agent_card,
bootstrap_servers="localhost:9092",
request_topic="a2a-requests"
)
+
+ if os.getenv("TEST_KAFKA"):
+ try:
+ import socket
+ conn = socket.create_connection(("localhost", 9092), timeout=2)
+ conn.close()
+ except Exception:
+ pytest.skip("Kafka broker not reachable")
+
+ try:
+ await transport.start()
+ finally:
+ await transport.stop()📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| # 创建 Kafka 客户端传输 | |
| transport = KafkaClientTransport( | |
| agent_card=agent_card, | |
| bootstrap_servers="localhost:9092", | |
| request_topic="a2a-requests" | |
| ) | |
| # 创建 Kafka 客户端传输 | |
| transport = KafkaClientTransport( | |
| agent_card=agent_card, | |
| bootstrap_servers="localhost:9092", | |
| request_topic="a2a-requests" | |
| ) | |
| if os.getenv("TEST_KAFKA"): | |
| try: | |
| import socket | |
| conn = socket.create_connection(("localhost", 9092), timeout=2) | |
| conn.close() | |
| except Exception: | |
| pytest.skip("Kafka broker not reachable") | |
| try: | |
| await transport.start() | |
| finally: | |
| await transport.stop() |
🤖 Prompt for AI Agents
In test_simple_kafka.py around lines 35 to 41, enhance the Kafka client smoke
test by adding an environment variable guard (e.g., TEST_KAFKA) to conditionally
run the test. Before starting the transport, perform a preflight check using
socket.create_connection to verify Kafka broker reachability; if unreachable,
call pytest.skip() to skip the test. Wrap the transport start and stop calls in
a try/finally block to ensure proper cleanup regardless of test outcome.
| message_params = MessageSendParams( | ||
| content="测试消息", | ||
| role="user" | ||
| ) | ||
| print(f"消息参数创建成功: {message_params.content}") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Fix MessageSendParams construction — must pass a Message, not raw content/role.
Per a2a.types, MessageSendParams requires a message: Message containing parts (e.g., a TextPart). Construct a valid message:
- message_params = MessageSendParams(
- content="测试消息",
- role="user"
- )
- print(f"消息参数创建成功: {message_params.content}")
+ import uuid
+ from a2a.types import Message, TextPart, Part, Role
+ message_params = MessageSendParams(
+ message=Message(
+ message_id=str(uuid.uuid4()),
+ role=Role.user,
+ parts=[Part(TextPart(text="测试消息"))],
+ )
+ )
+ print(f"消息参数创建成功: {message_params.message.parts[0].root.text}")📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| message_params = MessageSendParams( | |
| content="测试消息", | |
| role="user" | |
| ) | |
| print(f"消息参数创建成功: {message_params.content}") | |
| import uuid | |
| from a2a.types import Message, TextPart, Part, Role | |
| message_params = MessageSendParams( | |
| message=Message( | |
| message_id=str(uuid.uuid4()), | |
| role=Role.user, | |
| parts=[Part(TextPart(text="测试消息"))], | |
| ) | |
| ) | |
| print(f"消息参数创建成功: {message_params.message.parts[0].root.text}") |
🤖 Prompt for AI Agents
In test_simple_kafka.py around lines 47 to 51, the MessageSendParams is
incorrectly constructed by passing raw content and role directly. Instead,
create a Message object containing the appropriate parts such as a TextPart with
the content, then pass this Message instance to MessageSendParams. This ensures
the parameters conform to the expected structure defined in a2a.types.
| with patch.object(kafka_transport.correlation_manager, 'generate_correlation_id') as mock_gen_id, \ | ||
| patch.object(kafka_transport.correlation_manager, 'register') as mock_register: | ||
|
|
||
| mock_gen_id.return_value = "test-correlation-id" | ||
|
|
||
| # Create a future that resolves to a response | ||
| response = Message(content="test response", role="assistant") | ||
| future = asyncio.Future() | ||
| future.set_result(response) | ||
| mock_register.return_value = future | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Patched register is not awaitable – test will error.
send_message awaits correlation_manager.register, but here register is replaced with a normal MagicMock, so await raises TypeError.
Patch with an AsyncMock:
- patch.object(kafka_transport.correlation_manager, 'register') as mock_register:
+ patch.object(kafka_transport.correlation_manager, 'register', new_callable=AsyncMock) as mock_register:📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| with patch.object(kafka_transport.correlation_manager, 'generate_correlation_id') as mock_gen_id, \ | |
| patch.object(kafka_transport.correlation_manager, 'register') as mock_register: | |
| mock_gen_id.return_value = "test-correlation-id" | |
| # Create a future that resolves to a response | |
| response = Message(content="test response", role="assistant") | |
| future = asyncio.Future() | |
| future.set_result(response) | |
| mock_register.return_value = future | |
| with patch.object(kafka_transport.correlation_manager, 'generate_correlation_id') as mock_gen_id, \ | |
| patch.object(kafka_transport.correlation_manager, 'register', new_callable=AsyncMock) as mock_register: | |
| mock_gen_id.return_value = "test-correlation-id" | |
| # Create a future that resolves to a response | |
| response = Message(content="test response", role="assistant") | |
| future = asyncio.Future() | |
| future.set_result(response) | |
| mock_register.return_value = future |
🤖 Prompt for AI Agents
In tests/client/transports/test_kafka.py around lines 166 to 176, the patch for
correlation_manager.register uses a normal MagicMock, but since send_message
awaits this method, it must be an AsyncMock to avoid a TypeError. Replace the
patch for correlation_manager.register with an AsyncMock to ensure it is
awaitable and returns the expected future result.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 6
♻️ Duplicate comments (4)
src/a2a/client/transports/kafka.py (2)
339-340: Repeated context timeout access issueThe same
context.timeoutattribute access issue appears in multiple methods throughout the file.Also applies to: 374-375, 400-401, 426-427, 452-453, 478-479
304-307: Race condition warning from past review still validThe past review comment about the race condition where a reply may arrive before
register()is still applicable. The current implementation sends the request first, then registers the correlation ID, which could cause responses to be dropped if they arrive quickly.src/a2a/server/request_handlers/kafka_handler.py (2)
120-125: ServerCallContext doesn't support agent_id and trace_id fieldsThe
ServerCallContextmodel only hasstate,user,requested_extensions, andactivated_extensionsfields. Passingagent_idandtrace_iddirectly to the constructor will cause aValidationError.
166-196: Missing handler for task_push_notification_config_set methodThe client transport has a
set_task_callbackmethod that would send atask_push_notification_config_setrequest, but this method is not handled in the server.
🧹 Nitpick comments (1)
src/a2a/server/apps/kafka/kafka_app.py (1)
264-271: Async method incorrectly definedThe
get_handlermethod is defined asasyncbut doesn't perform any asynchronous operations and simply returns a value or raises an exception.Remove the
asynckeyword since this method doesn't need to be asynchronous:- async def get_handler(self) -> KafkaHandler: + def get_handler(self) -> KafkaHandler: """Get the Kafka handler instance. This can be used to send push notifications. """ if not self.handler: raise ServerError("Kafka handler not initialized") return self.handler
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
💡 Knowledge Base configuration:
- MCP integration is disabled by default for public repositories
- Jira integration is disabled by default for public repositories
- Linear integration is disabled by default for public repositories
You can enable these sources in your CodeRabbit configuration.
📒 Files selected for processing (6)
src/a2a/client/client_factory.py(2 hunks)src/a2a/client/transports/kafka.py(1 hunks)src/a2a/server/apps/kafka/__init__.py(1 hunks)src/a2a/server/apps/kafka/kafka_app.py(1 hunks)src/a2a/server/request_handlers/kafka_handler.py(1 hunks)tests/client/test_kafka_client.py(1 hunks)
🚧 Files skipped from review as they are similar to previous changes (2)
- src/a2a/server/apps/kafka/init.py
- src/a2a/client/client_factory.py
🧰 Additional context used
🧬 Code Graph Analysis (4)
tests/client/test_kafka_client.py (4)
src/a2a/client/transports/kafka.py (8)
KafkaClientTransport(36-602)start(106-151)stop(153-181)send_message(296-322)_parse_response(238-249)send_message_streaming(324-360)_sanitize_topic_name(79-104)create(563-602)src/a2a/client/transports/kafka_correlation.py (8)
CorrelationManager(47-136)generate_correlation_id(55-57)get_pending_count(134-136)complete_with_exception(105-120)cancel_all(122-132)StreamingFuture(10-44)put(18-21)set_done(34-36)src/a2a/client/errors.py (1)
A2AClientError(6-7)src/a2a/types.py (3)
MessageSendParams(1481-1498)TextPart(1005-1021)TransportProtocol(1024-1032)
src/a2a/server/request_handlers/kafka_handler.py (6)
src/a2a/server/context.py (1)
ServerCallContext(14-25)src/a2a/server/request_handlers/request_handler.py (1)
RequestHandler(21-200)src/a2a/types.py (12)
AgentCard(1724-1834)DeleteTaskPushNotificationConfigParams(291-307)GetTaskPushNotificationConfigParams(428-444)ListTaskPushNotificationConfigParams(673-685)Message(1437-1478)MessageSendParams(1481-1498)Task(1856-1888)TaskArtifactUpdateEvent(1604-1637)TaskIdParams(877-889)TaskPushNotificationConfig(932-944)TaskQueryParams(947-963)TaskStatusUpdateEvent(1661-1690)src/a2a/utils/errors.py (1)
ServerError(39-71)src/a2a/server/apps/kafka/kafka_app.py (3)
send_response(189-213)send_error_response(239-262)send_stream_complete(215-237)src/a2a/server/request_handlers/__init__.py (1)
KafkaHandler(45-49)
src/a2a/client/transports/kafka.py (5)
src/a2a/client/middleware.py (1)
ClientCallContext(14-21)src/a2a/client/transports/base.py (1)
ClientTransport(19-103)src/a2a/client/transports/kafka_correlation.py (10)
CorrelationManager(47-136)cancel_all(122-132)get(23-27)complete_streaming(96-103)complete_with_exception(105-120)complete(75-94)generate_correlation_id(55-57)register(59-66)register_streaming(68-73)is_done(38-40)src/a2a/client/errors.py (1)
A2AClientError(6-7)src/a2a/types.py (12)
AgentCard(1724-1834)DeleteTaskPushNotificationConfigParams(291-307)GetTaskPushNotificationConfigParams(428-444)ListTaskPushNotificationConfigParams(673-685)Message(1437-1478)MessageSendParams(1481-1498)Task(1856-1888)TaskArtifactUpdateEvent(1604-1637)TaskIdParams(877-889)TaskPushNotificationConfig(932-944)TaskQueryParams(947-963)TaskStatusUpdateEvent(1661-1690)
src/a2a/server/apps/kafka/kafka_app.py (4)
src/a2a/server/request_handlers/kafka_handler.py (6)
KafkaHandler(67-241)KafkaMessage(27-39)handle_request(88-151)send_response(45-51)send_stream_complete(60-64)send_error_response(53-58)src/a2a/server/request_handlers/request_handler.py (1)
RequestHandler(21-200)src/a2a/utils/errors.py (1)
ServerError(39-71)src/a2a/client/transports/kafka.py (2)
start(106-151)stop(153-181)
| if not self._running: | ||
| return |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
💡 Verification agent
🧩 Analysis chain
Method name inconsistency with naming convention
The methods _start() and _stop() are referenced in tests but don't exist - the actual methods are start() and stop(). The test file incorrectly references these as internal methods with underscore prefixes.
🏁 Script executed:
#!/bin/bash
# Verify the method names being called in the test file
# Check if _start or _stop methods are defined
ast-grep --pattern 'async def _start'
ast-grep --pattern 'async def _stop'
# Check test references
rg -n "kafka_transport\._start\(\)" tests/
rg -n "kafka_transport\._stop\(\)" tests/
rg -n "kafka_transport\.start\(\)" tests/
rg -n "kafka_transport\.stop\(\)" tests/Length of output: 559
Fix incorrect method references in Kafka transport tests
The tests are calling _start() and _stop(), but those methods don’t exist on the KafkaTransport class—the correct public methods are start() and stop(). This will cause test failures and needs to be corrected immediately.
• In tests/client/test_kafka_client.py at line 159, replace
- await kafka_transport._start()
+ await kafka_transport.start()• At line 171, replace
- await kafka_transport._stop()
+ await kafka_transport.stop()📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| if not self._running: | |
| return | |
| ++ b/tests/client/test_kafka_client.py | |
| @@ -156,7 +156,7 @@ async def test_kafka_transport_start_and_publish(...): | |
| await kafka_transport.start() | |
| # ... assertions that transport is running ... | |
| @@ -169,7 +169,7 @@ async def test_kafka_transport_stop_and_cleanup(...): | |
| # ... prepare for shutdown ... | |
| await kafka_transport.stop() | |
| # ... assertions that transport is no longer running ... |
🤖 Prompt for AI Agents
In src/a2a/client/transports/kafka.py around lines 159-160 (and also update the
call at line 171), tests are calling non-existent private methods `_start()` and
`_stop()` on KafkaTransport; change those test invocations to call the public
methods `start()` and `stop()` instead so the tests use the correct API and no
longer fail due to missing method references.
| if context.trace_id: | ||
| headers.append(('trace_id', context.trace_id.encode('utf-8'))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Missing context attribute access in ClientCallContext
The code accesses context.trace_id which doesn't exist in the ClientCallContext model. The ClientCallContext only has a state field for storing arbitrary data.
Apply this diff to fix the attribute access:
- if context:
- # Add context headers if needed
- if context.trace_id:
- headers.append(('trace_id', context.trace_id.encode('utf-8')))
+ if context:
+ # Add context headers if needed
+ if context.state.get('trace_id'):
+ headers.append(('trace_id', context.state['trace_id'].encode('utf-8')))📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| if context.trace_id: | |
| headers.append(('trace_id', context.trace_id.encode('utf-8'))) | |
| if context: | |
| # Add context headers if needed | |
| if context.state.get('trace_id'): | |
| headers.append(('trace_id', context.state['trace_id'].encode('utf-8'))) |
🤖 Prompt for AI Agents
In src/a2a/client/transports/kafka.py around lines 283-284, the code incorrectly
accesses context.trace_id (which doesn't exist on ClientCallContext); instead
read the trace id from context.state (e.g., trace_id =
context.state.get('trace_id')), check it's truthy, and if so append it to
headers as before (headers.append(('trace_id', trace_id.encode('utf-8')))).
Ensure you handle None safely and avoid raising if state is missing or not a
dict.
| # Wait for response with timeout | ||
| timeout = 30.0 # Default timeout | ||
| if context and context.timeout: | ||
| timeout = context.timeout | ||
|
|
||
| result = await asyncio.wait_for(future, timeout=timeout) | ||
| return result | ||
| except asyncio.TimeoutError: | ||
| await self.correlation_manager.complete_with_exception( | ||
| correlation_id, | ||
| A2AClientError(f"Request timed out after {timeout} seconds") | ||
| ) | ||
| raise A2AClientError(f"Request timed out after {timeout} seconds") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Potential race condition with timeout handling
The code calls complete_with_exception after a timeout even though the future has already timed out, which could interfere with other operations or lead to exceptions if the correlation_id was already removed.
Consider checking if the correlation is still pending before calling complete_with_exception:
try:
# Wait for response with timeout
timeout = 30.0 # Default timeout
if context and context.timeout:
timeout = context.timeout
result = await asyncio.wait_for(future, timeout=timeout)
return result
except asyncio.TimeoutError:
- await self.correlation_manager.complete_with_exception(
- correlation_id,
- A2AClientError(f"Request timed out after {timeout} seconds")
- )
raise A2AClientError(f"Request timed out after {timeout} seconds")The timeout already causes the future to fail, so attempting to complete it again is redundant.
Committable suggestion skipped: line range outside the PR's diff.
🤖 Prompt for AI Agents
In src/a2a/client/transports/kafka.py around lines 310 to 322, the timeout
exception handler unconditionally calls
correlation_manager.complete_with_exception after asyncio.wait_for times out,
which can race with the future's own completion and with removal of the
correlation entry; change the handler to first check whether the correlation is
still present/pending (e.g., use a correlation_manager.has(correlation_id) or
make complete_with_exception return a boolean) and only call
complete_with_exception if the correlation is still registered, otherwise skip
calling it; then raise the A2AClientError as before — this avoids
redundant/unsafe completion attempts and the race condition.
| if context and context.timeout: | ||
| timeout = context.timeout |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Context timeout attribute doesn't exist
Similar to the trace_id issue, context.timeout is not a defined attribute of ClientCallContext. The model only has a state field.
Apply this diff to properly access timeout from the context state:
- if context and context.timeout:
- timeout = context.timeout
+ if context and context.state.get('timeout'):
+ timeout = context.state['timeout']📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| if context and context.timeout: | |
| timeout = context.timeout | |
| if context and context.state.get('timeout'): | |
| timeout = context.state['timeout'] |
🤖 Prompt for AI Agents
In src/a2a/client/transports/kafka.py around lines 312-313, replace the
incorrect access of context.timeout (which doesn't exist) by retrieving timeout
from context.state; for example, read timeout = getattr(context.state,
"timeout", None) (or context.state.get("timeout") if state is a dict) and use
that value only if truthy.
| task_request = TaskQueryParams(task_id=request.task_id) | ||
| task = await self.get_task(task_request, context=context) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Incorrect attribute access in TaskQueryParams
The TaskQueryParams model expects an id field, not task_id.
Apply this diff to fix the field name:
- task_request = TaskQueryParams(task_id=request.task_id)
+ task_request = TaskQueryParams(id=request.id)Committable suggestion skipped: line range outside the PR's diff.
🤖 Prompt for AI Agents
In src/a2a/client/transports/kafka.py around lines 519 to 520, the code
constructs TaskQueryParams with a non-existent task_id attribute; change the
construction to use the expected id field (i.e., instantiate
TaskQueryParams(id=request.task_id)) so the correct model field is used when
querying the task.
| mock_consumer_class.return_value = mock_consumer | ||
|
|
||
| # Start transport using internal method | ||
| await kafka_transport._start() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Test references non-existent internal methods
Tests are calling kafka_transport._start() and kafka_transport._stop() but these methods don't exist. The actual public methods are start() and stop().
Apply this diff to fix the method names:
- await kafka_transport._start()
+ await kafka_transport.start()- await kafka_transport._stop()
+ await kafka_transport.stop()- with patch.object(kafka_transport, '_start') as mock_start, \
- patch.object(kafka_transport, '_stop') as mock_stop:
+ with patch.object(kafka_transport, 'start') as mock_start, \
+ patch.object(kafka_transport, 'stop') as mock_stop:Also applies to: 171-171, 282-283, 444-445, 447-448
🤖 Prompt for AI Agents
In tests/client/test_kafka_client.py around lines 159, 171, 282-283, 444-445,
and 447-448 the test calls non-existent internal methods
kafka_transport._start() and kafka_transport._stop(); update each call to use
the public API kafka_transport.start() and kafka_transport.stop() respectively
so tests call the correct public methods, keeping any await calls in place and
ensuring no other references to the underscore-prefixed methods remain.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
finish review
A2A on Kafka.md
Outdated
| @@ -0,0 +1,526 @@ | |||
| # A2A on Kafka | |||
|
|
|||
| ## 1. 概要设计 (High-Level Design) | |||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
设计文档不需要提交
docker-compose.kafka.yml
Outdated
| @@ -0,0 +1,85 @@ | |||
| version: '3.8' | |||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
这个不需要提交
examples/kafka_example.py
Outdated
| @@ -0,0 +1,142 @@ | |||
| """示例演示 A2A Kafka 传输使用方法。""" | |||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
example不需要提交到这里。有个专门的a2a-samples仓库
| context: ClientCallContext | None = None, | ||
| ) -> TaskPushNotificationConfig: | ||
| """Set task push notification configuration.""" | ||
| # For Kafka, we can store the callback configuration locally |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
没有实现
| context: ClientCallContext | None = None, | ||
| ) -> AgentCard: | ||
| """Retrieve the agent card.""" | ||
| # For Kafka transport, we return the local agent card |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
没有实现
| @@ -0,0 +1,233 @@ | |||
| """Kafka server application for A2A protocol.""" | |||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
可以新建个目录,message_queue,然后在下面建个 kafka_app
| await self.stop() | ||
| raise A2AClientError(f"Failed to start Kafka client transport: {e}") from e | ||
|
|
||
| async def stop(self) -> None: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
stop方法移到 close方法中。close是 base transport 已经定义的方法。
| logger = logging.getLogger(__name__) | ||
|
|
||
|
|
||
| class KafkaMessage: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
kafkahandler应该只负责协议方法的翻译和转发,对kafka逻辑无感知。
也就是说它会把kafka消息转换成DefaultRequestHandler能接收的类型,然后把DefaultRequestHandler的返回转换成kafka的消息。
| self.request_handler = request_handler | ||
| self.bootstrap_servers = bootstrap_servers | ||
| self.kafka_config = kafka_config | ||
| self.producer: Optional[AIOKafkaProducer] = None |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
它不应该管理producer,也不感知kafka地址。
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
这些都在kafka app里面做
| logger = logging.getLogger(__name__) | ||
|
|
||
|
4D07 span> |
||
| class KafkaServerApp: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
kafka app 管理kafka的生产者、消费者、消息的生产和消费。
Description
Thank you for opening a Pull Request!
Before submitting your PR, there are a few things you can do to make sure it goes smoothly:
CONTRIBUTINGGuide.fix:which represents bug fixes, and correlates to a SemVer patch.feat:represents a new feature, and correlates to a SemVer minor.feat!:, orfix!:,refactor!:, etc., which represent a breaking change (indicated by the!) and will result in a SemVer major.bash scripts/format.shfrom the repository root to format)Fixes #<issue_number_goes_here> 🦕
Summary by CodeRabbit
New Features
Documentation
Tests