8000 Provide sample LongRunningFunctionTool runner script and documentation · CentML/adk-python@609b3a5 · GitHub
[go: up one dir, main page]

Skip to content

Commit 609b3a5

Browse files
selcukguncopybara-github
authored andcommitted
Provide sample LongRunningFunctionTool runner script and documentation
PiperOrigin-RevId: 764475345
1 parent 2a8ca06 commit 609b3a5

File tree

3 files changed

+245
-2
lines changed

3 files changed

+245
-2
lines changed
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
# Agent with Long-Running Tools
2+
3+
This example demonstrates an agent using a long-running tool (`ask_for_approval`).
4+
5+
## Key Flow for Long-Running Tools
6+
7+
1. **Initial Call**: The agent calls the long-running tool (e.g., `ask_for_approval`).
8+
2. **Initial Tool Response**: The tool immediately returns an initial response, typically indicating a "pending" status and a way to track the request (e.g., a `ticket-id`). This is sent back to the agent as a `types.FunctionResponse` (usually processed internally by the runner and then influencing the agent's next turn).
9+
3. **Agent Acknowledges**: The agent processes this initial response and usually informs the user about the pending status.
10+
4. **External Process/Update**: The long-running task progresses externally (e.g., a human approves the request).
11+
5. **❗️Crucial Step: Provide Updated Tool Response❗️**:
12+
* Once the external process completes or updates, your application **must** construct a new `types.FunctionResponse`.
13+
* This response should use the **same `id` and `name`** as the original `FunctionCall` to the long-running tool.
14+
* The `response` field within this `types.FunctionResponse` should contain the *updated data* (e.g., `{'status': 'approved', ...}`).
15+
* Send this `types.FunctionResponse` back to the agent as a part within a new message using `role="user"`.
16+
17+
```python
18+
# Example: After external approval
19+
updated_tool_output_data = {
20+
"status": "approved",
21+
"ticket-id": ticket_id, # from original call
22+
# ... other relevant updated data
23+
}
24+
25+
updated_function_response_part = types.Part(
26+
function_response=types.FunctionResponse(
27+
id=long_running_function_call.id, # Original call ID
28+
name=long_running_function_call.name, # Original call name
29+
response=updated_tool_output_data,
30+
)
31+
)
32+
33+
# Send this back to the agent
34+
await runner.run_async(
35+
# ... session_id, user_id ...
36+
new_message=types.Content(
37+
parts=[updated_function_response_part], role="user"
38+
),
39+
)
40+
```
41+
6. **Agent Acts on Update**: The agent receives this message containing the `types.FunctionResponse` and, based on its instructions, proceeds with the next steps (e.g., calling another tool like `reimburse`).
42+
43+
**Why is this important?** The agent relies on receiving this subsequent `types.FunctionResponse` (provided in a message with `role="user"` containing the specific `Part`) to understand that the long-running task has concluded or its state has changed. Without it, the agent will remain unaware of the outcome of the pending task.

contributing/samples/human_in_loop/agent.py

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,14 +22,20 @@
2222

2323
def reimburse(purpose: str, amount: float) -> str:
2424
"""Reimburse the amount of money to the employee."""
25-
return {'status': 'ok'}
25+
return {
26+
'status': 'ok',
27+
}
2628

2729

2830
def ask_for_approval(
2931
purpose: str, amount: float, tool_context: ToolContext
3032
) -> dict[str, Any]:
3133
"""Ask for approval for the reimbursement."""
32-
return {'status': 'pending'}
34+
return {
35+
'status': 'pending',
36+
'amount': amount,
37+
'ticketId': 'reimbursement-ticket-001',
38+
}
3339

3440

3541
root_agent = Agent(
+194Lines changed: 194 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,194 @@
1+
# Copyright 2025 Google LLC
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
import asyncio
16+
17+
import agent
18+
from dotenv import load_dotenv
19+
from typing import Any
20+
from typing import Union
21+
from google.adk.agents import Agent
22+
from google.adk.events import Event
23+
from google.adk.runners import Runner
24+
from google.adk.tools import LongRunningFunctionTool
25+
from google.adk.sessions import InMemorySessionService
26+
from google.genai import types
27+
28+
import os
29+
from opentelemetry import trace
30+
from opentelemetry.exporter.cloud_trace import CloudTraceSpanExporter
31+
from opentelemetry.sdk.trace import export
32+
from opentelemetry.sdk.trace import TracerProvider
33+
34+
35+
load_dotenv(override=True)
36+
37+
APP_NAME = "human_in_the_loop"
38+
USER_ID = "1234"
39+
SESSION_ID = "session1234"
40+
41+
session_service = InMemorySessionService()
42+
43+
44+
async def main():
45+
session = await session_service.create_session(
46+
app_name=APP_NAME, user_id=USER_ID, session_id=SESSION_ID
47+
)
48+
runner = Runner(
49+
agent=agent.root_agent,
50+
app_name=APP_NAME,
51+
session_service=session_service,
52+
)
53+
54+
async def call_agent(query: str):
55+
content = types.Content(role="user", parts=[types.Part(text=query)])
56+
57+
print(f'>>> User Query: "{query}"')
58+
print("--- Running agent's initial turn ---")
59+
60+
events_async = runner.run_async(
61+
session_id=session.id, user_id=USER_ID, new_message=content
62+
)
63+
64+
long_running_function_call: Union[types.FunctionCall, None] = None
65+
initial_tool_response: Union[types.FunctionResponse, None] = None
66+
ticket_id: Union[str, None] = None
67+
68+
async for event in events_async:
69+
if event.content and event.content.parts:
70+
for i, part in enumerate(event.content.parts):
71+
if part.text:
72+
print(f" Part {i} [Text]: {part.text.strip()}")
73+
if part.function_call:
74+
print(
75+
f" Part {i} [FunctionCall]:"
76+
f" {part.function_call.name}({part.function_call.args}) ID:"
77+
f" {part.function_call.id}"
78+
)
79+
if not long_running_function_call and part.function_call.id in (
80+
event.long_running_tool_ids or []
81+
):
82+
long_running_function_call = part.function_call
83+
print(
84+
" (Captured as long_running_function_call for"
85+
f" '{part.function_call.name}')"
86+
)
87+
if part.function_response:
88+
print(
89+
f" Part {i} [FunctionResponse]: For"
90+
f" '{part.function_response.name}', ID:"
91+
f" {part.function_response.id}, Response:"
92+
f" {part.function_response.response}"
93+
)
94+
if (
95+
long_running_function_call
96+
and part.function_response.id == long_running_function_call.id
97+
):
98+
initial_tool_response = part.function_response
99+
if initial_tool_response.response:
100+
ticket_id = initial_tool_response.response.get("ticketId")
101+
print(
102+
" (Captured as initial_tool_response for"
103+
f" '{part.function_response.name}', Ticket ID: {ticket_id})"
104+
)
105+
106+
print("--- End of agent's initial turn ---\n")
107+
108+
if (
109+
long_running_function_call
110+
and initial_tool_response
111+
and initial_tool_response.response.get("status") == "pending"
112+
):
113+
print(f"--- Simulating external approval for ticket: {ticket_id} ---\n")
114+
115+
updated_tool_output_data = {
116+
"status": "approved",
117+
"ticketId": ticket_id,
118+
"approver_feedback": "Approved by manager at " + str(
119+
asyncio.get_event_loop().time()
120+
),
121+
}
122+
123+
updated_function_response_part = types.Part(
124+
function_response=types.FunctionResponse(
125+
id=long_running_function_call.id,
126+
name=long_running_function_call.name,
127+
response=updated_tool_output_data,
128+
)
129+
)
130+
131+
print(
132+
"--- Sending updated tool result to agent for call ID"
133+
f" {long_running_function_call.id}: {updated_tool_output_data} ---"
134+
)
135+
print("--- Running agent's turn AFTER receiving updated tool result ---")
136+
137+
async for event in runner.run_async(
138+
session_id=session.id,
139+
user_id=USER_ID,
140+
new_message=types.Content(
141+
parts=[updated_function_response_part], role="user"
142+
),
143+
):
144+
if event.content and event.content.parts:
145+
for i, part in enumerate(event.content.parts):
146+
if part.text:
147+
print(f" Part {i} [Text]: {part.text.strip()}")
148+
if part.function_call:
149+
print(
150+
f" Part {i} [FunctionCall]:"
151+
f" {part.function_call.name}({part.function_call.args}) ID:"
152+
f" {part.function_call.id}"
153+
)
154+
if part.function_response:
155+
print(
156+
f" Part {i} [FunctionResponse]: For"
157+
f" '{part.function_response.name}', ID:"
158+
f" {part.function_response.id}, Response:"
159+
f" {part.function_response.response}"
160+
)
161+
print("--- End of agent's turn AFTER receiving updated tool result ---")
162+
163+
elif long_running_function_call and not initial_tool_response:
164+
print(
165+
f"--- Long running function '{long_running_function_call.name}' was"
166+
" called, but its initial response was not captured. ---"
167+
)
168+
elif not long_running_function_call:
169+
print(
170+
"--- No long running function call was detected in the initial"
171+
" turn. ---"
172+
)
173+
174+
await call_agent("Please reimburse $50 for meals")
175+
print("=" * 70)
176+
await call_agent("Please reimburse $200 for conference travel")
177+
178+
179+
if __name__ == "__main__":
180+
provider = TracerProvider()
181+
project_id = os.environ.get("GOOGLE_CLOUD_PROJECT")
182+
if not project_id:
183+
raise ValueError("GOOGLE_CLOUD_PROJECT environment variable is not set.")
184+
print("Tracing to project", project_id)
185+
processor = export.BatchSpanProcessor(
186+
CloudTraceSpanExporter(project_id=project_id)
187+
)
188+
provider.add_span_processor(processor)
189+
trace.set_tracer_provider(provider)
190+
191+
asyncio.run(main())
192+
193+
provider.force_flush()
194+
print("Done tracing to project", project_id)

0 commit comments

Comments
 (0)
0