8000 test: unit tests for code_executor_context.py · lbruderer/adk-python@98727b4 · GitHub
[go: up one dir, main page]

Skip to content

Commit 98727b4

Browse files
iamulyacopybara-github
authored andcommitted
test: unit tests for code_executor_context.py
Copybara import of the project: -- 9e51865 by Amulya Bhatia <amulya.bhatia@t-online.de>: test: unit tests for code_executor_context.py COPYBARA_INTEGRATE_REVIEW=google#780 from iamulya:test-code-executor-context 907b171 PiperOrigin-RevId: 761294975
1 parent 76700d7 commit 98727b4

File tree

2 files changed

+288
-0
lines changed

2 files changed

+288
-0
lines changed
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
# Copyright 2025 Google LLC
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
Lines changed: 275 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,275 @@
1+
# Copyright 2025 Google LLC
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
from google.adk.code_executors.code_execution_utils import File
16+
from google.adk.code_executors.code_executor_context import CodeExecutorContext
17+
from google.adk.sessions.state import State
18+
import pytest
19+
20+
21+
@pytest.fixture
22+
def empty_state() -> State:
23+
"""Fixture for an empty session state."""
24+
return State({}, {})
25+
26+
27+
@pytest.fixture
28+
def context_with_data() -> CodeExecutorContext:
29+
"""Fixture for a CodeExecutorContext with some pre-populated data."""
30+
state_data = {
31+
"_code_execution_context": {
32+
"execution_session_id": "session123",
33+
"processed_input_files": ["file1.csv", "file2.txt"],
34+
},
35+
"_code_executor_input_files": [
36+
{"name": "input1.txt", "content": "YQ==", "mime_type": "text/plain"}
37+
],
38+
"_code_executor_error_counts": {"invocationA": 2},
39+
}
40+
state = State(state_data, {})
41+
return CodeExecutorContext(state)
42+
43+
44+
def test_init_empty_state(empty_state: State):
45+
"""Test initialization with an empty state."""
46+
ctx = CodeExecutorContext(empty_state)
47+
assert ctx._context == {}
48+
assert ctx._session_state is empty_state
49+
50+
51+
def test_get_state_delta_empty(empty_state: State):
52+
"""Test get_state_delta when context is empty."""
53+
ctx = CodeExecutorContext(empty_state)
54+
delta = ctx.get_state_delta()
55+
assert delta == {"_code_execution_context": {}}
56+
57+
58+
def test_get_state_delta_with_data(context_with_data: CodeExecutorContext):
59+
"""Test get_state_delta with existing context data."""
60+
delta = context_with_data.get_state_delta()
61+
expected_context = {
62+
"execution_session_id": "session123",
63+
"processed_input_files": ["file1.csv", "file2.txt"],
64+
}
65+
assert delta == {"_code_execution_context": expected_context}
66+
67+
68+
def test_get_execution_id_exists(context_with_data: CodeExecutorContext):
69+
"""Test getting an existing execution ID."""
70+
assert context_with_data.get_execution_id() == "session123"
71+
72+
73+
def test_get_execution_id_not_exists(empty_state: State):
74+
"""Test getting execution ID when it doesn't exist."""
75+
ctx = CodeExecutorContext(empty_state)
76+
assert ctx.get_execution_id() is None
77+
78+
79+
def test_set_execution_id(empty_state: State):
80+
"""Test setting an execution ID."""
81+
ctx = CodeExecutorContext(empty_state)
82+
ctx.set_execution_id("new_session_id")
83+
assert ctx._context["execution_session_id"] == "new_session_id"
84+
assert ctx.get_execution_id() == "new_session_id"
85+
86+
87+
def test_get_processed_file_names_exists(
88+
context_with_data: CodeExecutorContext,
89+
):
90+
"""Test getting existing processed file names."""
91+
assert context_with_data.get_processed_file_names() == [
92+
"file1.csv",
93+
"file2.txt",
94+
]
95+
96+
97+
def test_get_processed_file_names_not_exists(empty_state: State):
98+
"""Test getting processed file names when none exist."""
99+
ctx = CodeExecutorContext(empty_state)
100+
assert ctx.get_processed_file_names() == []
101+
102+
103+
def test_add_processed_file_names_new(empty_state: State):
104+
"""Test adding processed file names to an empty context."""
105+
ctx = CodeExecutorContext(empty_state)
106+
ctx.add_processed_file_names(["new_file.py"])
107+
assert ctx._context["processed_input_files"] == ["new_file.py"]
108+
109+
110+
def test_add_processed_file_names_append(
111+
context_with_data: CodeExecutorContext,
112+
):
113+
"""Test appending to existing processed file names."""
114+
context_with_data.add_processed_file_names(["another_file.md"])
115+
assert context_with_data.get_processed_file_names() == [
116+
"file1.csv",
117+
"file2.txt",
118+
"another_file.md",
119+
]
120+
121+
122+
def test_get_input_files_exis 10000 ts(context_with_data: CodeExecutorContext):
123+
"""Test getting existing input files."""
124+
files = context_with_data.get_input_files()
125+
assert len(files) == 1
126+
assert files[0].name == "input1.txt"
127+
assert files[0].content == "YQ=="
128+
assert files[0].mime_type == "text/plain"
129+
130+
131+
def test_get_input_files_not_exists(empty_state: State):
132+
"""Test getting input files when none exist."""
133+
ctx = CodeExecutorContext(empty_state)
134+
assert ctx.get_input_files() == []
135+
136+
137+
def test_add_input_files_new(empty_state: State):
138+
"""Test adding input files to an empty session state."""
139+
ctx = CodeExecutorContext(empty_state)
140+
new_files = [File(name="new.dat", content="Yg==", mime_type="application/octet-stream")]
141+
ctx.add_input_files(new_files)
142+
assert empty_state["_code_executor_input_files"] == [
143+
{"name": "new.dat", "content": "Yg==", "mime_type": "application/octet-stream"}
144+
]
145+
146+
147+
def test_add_input_files_append(context_with_data: CodeExecutorContext):
148+
"""Test appending to existing input files."""
149+
new_file = File(name="input2.log", content="Yw==", mime_type="text/x-log")
150+
context_with_data.add_input_files([new_file])
151+
expected_files_data = [
152+
{"name": "input1.txt", "content": "YQ==", "mime_type": "text/plain"},
153+
{"name": "input2.log", "content": "Yw==", "mime_type": "text/x-log"},
154+
]
155+
assert (
156+
context_with_data._session_state["_code_executor_input_files"]
157+
== expected_files_data
158+
)
159+
160+
161+
def test_clear_input_files(context_with_data: CodeExecutorContext):
162+
"""Test clearing input files and processed file names."""
163+
context_with_data.clear_input_files()
164+
assert context_with_data._session_state["_code_executor_input_files"] == []
165+
assert context_with_data._context["processed_input_files"] == []
166+
167+
168+
def test_clear_input_files_when_not_exist(empty_state: State):
169+
"""Test clearing input files when they don't exist initially."""
170+
ctx = CodeExecutorContext(empty_state)
171+
ctx.clear_input_files() # Should not raise error
172+
assert "_code_executor_input_files" not in empty_state # Or assert it's empty
173+
assert "_code_execution_context" not in empty_state or not empty_state[
174+
"_code_execution_context"
175+
].get("processed_input_files")
176+
177+
178+
def test_get_error_count_exists(context_with_data: CodeExecutorContext):
179+
"""Test getting an existing error count."""
180+
assert context_with_data.get_error_count("invocationA") == 2
181+
182+
183+
def test_get_error_count_invocation_not_exists(
184+
context_with_data: CodeExecutorContext,
185+
):
186+
"""Test getting error count for an unknown invocation ID."""
187+
assert context_with_data.get_error_count("invocationB") == 0
188+
189+
190+
def test_get_error_count_no_error_key(empty_state: State):
191+
"""Test getting error count when the error key itself doesn't exist."""
192+
ctx = CodeExecutorContext(empty_state)
193+
assert ctx.get_error_count("any_invocation") == 0
194+
195+
196+
def test_increment_error_count_new_invocation(empty_state: State):
197+
"""Test incrementing error count for a new invocation ID."""
198+
ctx = CodeExecutorContext(empty_state)
199+
ctx.increment_error_count("invocationNew")
200+
assert empty_state["_code_executor_error_counts"]["invocationNew"] == 1
201+
202+
203+
def test_increment_error_count_existing_invocation(
204+
context_with_data: CodeExecutorContext,
205+
):
206+
"""Test incrementing error count for an existing invocation ID."""
207+
context_with_data.increment_error_count("invocationA")
208+
assert (
209+
context_with_data._session_state["_code_executor_error_counts"][
210+
"invocationA"
211+
]
212+
== 3
213+
)
214+
215+
216+
def test_reset_error_count_exists(context_with_data: CodeExecutorContext):
217+
"""Test resetting an existing error count."""
218+
context_with_data.reset_error_count("invocationA")
219+
assert "invocationA" not in (
220+
context_with_data._session_state["_code_executor_error_counts"]
221+
)
222+
223+
224+
def test_reset_error_count_not_exists(context_with_data: CodeExecutorContext):
225+
"""Test resetting an error count that doesn't exist."""
226+
context_with_data.reset_error_count("invocationB") # Should not raise
227+
assert "invocationB" not in (
228+
context_with_data._session_state["_code_executor_error_counts"]
229+
)
230+
231+
232+
def test_reset_error_count_no_error_key(empty_state: State):
233+
"""Test resetting when the error key itself doesn't exist."""
234+
ctx = CodeExecutorContext(empty_state)
235+
ctx.reset_error_count("any_invocation") # Should not raise
236+
assert "_code_executor_error_counts" not in empty_state
237+
238+
239+
def test_update_code_execution_result_new_invocation(empty_state: State):
240+
"""Test updating code execution result for a new invocation."""
241+
ctx = CodeExecutorContext(empty_state)
242+
ctx.update_code_execution_result(
243+
"inv1", "print('hi')", "hi", ""
244+
)
245+
results = empty_state["_code_execution_results"]["inv1"]
246+
assert len(results) == 1
247+
assert results[0]["code"] == "print('hi')"
248+
assert results[0]["result_stdout"] == "hi"
249+
assert results[0]["result_stderr"] == ""
250+
assert "timestamp" in results[0]
251+
252+
253+
def test_update_code_execution_result_append(
254+
context_with_data: CodeExecutorContext,
255+
):
256+
"""Test appending to existing code execution results for an invocation."""
257+
# First, let's add an initial result for a new invocation to the existing state
258+
context_with_data._session_state["_code_execution_results"] = {
259+
"invocationX": [{
260+
"code": "old_code",
261+
"result_stdout": "old_out",
262+
"result_stderr": "old_err",
263+
"timestamp": 123,
264+
}]
265+
}
266+
context_with_data.update_code_execution_result(
267+
"invocationX", "new_code", "new_out", "new_err"
268+
)
269+
results = context_with_data._session_state["_code_execution_results"][
270+
"invocationX"
271+
]
272+
assert len(results) == 2
273+
assert results[1]["code"] == "new_code"
274+
assert results[1]["result_stdout"] == "new_out"
275+
assert results[1]["result_stderr"] == "new_err"

0 commit comments

Comments
 (0)
0