8000 [Agent Engine][ADK] support passing RunConfig to async_stream_query and stream_query on AdkApp · Issue #5351 · googleapis/python-aiplatform · GitHub
[go: up one dir, main page]

Skip to content
[Agent Engine][ADK] support passing RunConfig to async_stream_query and stream_query on AdkApp #5351
@soundTricker

Description

@soundTricker

Is your feature request related to a problem? Please describe.

Current AdkApp.stream_query and AdkApp.async_stream_query can't passing RunConfig for runner.run_async method.
This config includes to many important settings something like streaming_mode, max_llm_calls.
https://google.github.io/adk-docs/runtime/runconfig/#streaming_mode

Describe the solution you'd like
I wanna passing the config from client side like adk_app.stream_query(user_id=user_id, session_id=session_id, run_config=RunConfig(streaming_mode=StreamingMode.SSE)) (or passing dumped value like RunConfig(streaming_mode=StreamingMode.SSE).model_dump()).

Additional context

Current workaround solution is make a custom adk app like below.

class CustomAdkApp(reasoning_engines.AdkApp):
    def clone(self):
        """Returns a clone of the ADK application."""
        import copy

        return CustomAdkApp(
            agent=copy.deepcopy(self._tmpl_attrs.get("agent")),
            enable_tracing=self._tmpl_attrs.get("enable_tracing"),
            session_service_builder=self._tmpl_attrs.get("session_service_builder"),
            artifact_service_builder=self._tmpl_attrs.get("artifact_service_builder"),
        )
    def stream_query_sse(
        self,
        *,
        message: str| dict[str, Any],
        user_id: str,
        session_id: str | None = None,
        **kwargs,
    ):
        return self.stream_query(message=message, user_id=user_id, session_id=session_id, run_config=RunConfig(streaming_mode=StreamingMode.SSE), **kwargs)

    async def async_stream_query_sse(
        self,
        *,
        message: str| dict[str, Any],
        user_id: str,
        session_id: str | None = None,
        **kwargs,
    ) -> AsyncIterable[dict[str, Any]]:
        return self.async_stream_query(message=message, user_id=user_id, session_id=session_id, run_config=RunConfig(streaming_mode=StreamingMode.SSE), **kwargs)

    def register_operations(self) -> dict[str, list[str]]:
        """Registers the operations of the ADK application."""
        return {
            "": [
                "get_session",
                "list_sessions",
                "create_session",
                "delete_session" 
           ],
            "stream": ["stream_query_sse", "stream_query", "streaming_agent_run_with_events"],
            "async_stream": ["async_stream_query_sse", "async_stream_query"],
        }

Metadata

Metadata

Assignees

Labels

api: vertex-aiIssues related to the googleapis/python-aiplatform API.

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions

    0