1
+ # Copyright 2025 Google LLC
2
+ #
3
+ # Licensed under the Apache License, Version 2.0 (the "License");
4
+ # you may not use this file except in compliance with the License.
5
+ # You may obtain a copy of the License at
6
+ #
7
+ # http://www.apache.org/licenses/LICENSE-2.0
8
+ #
9
+ # Unless required by applicable law or agreed to in writing, software
10
+ # distributed under the License is distributed on an "AS IS" BASIS,
11
+ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
+ # See the License for the specific language governing permissions and
13
+ # limitations under the License.
14
+
15
+ from google .adk .code_executors .code_execution_utils import File
16
+ from google .adk .code_executors .code_executor_context import CodeExecutorContext
17
+ from google .adk .sessions .state import State
18
+ import pytest
19
+
20
+
21
+ @pytest .fixture
22
+ def empty_state () -> State :
23
+ """Fixture for an empty session state."""
24
+ return State ({}, {})
25
+
26
+
27
+ @pytest .fixture
28
+ def context_with_data () -> CodeExecutorContext :
29
+ """Fixture for a CodeExecutorContext with some pre-populated data."""
30
+ state_data = {
31
+ "_code_execution_context" : {
32
+ "execution_session_id" : "session123" ,
33
+ "processed_input_files" : ["file1.csv" , "file2.txt" ],
34
+ },
35
+ "_code_executor_input_files" : [
36
+ {"name" : "input1.txt" , "content" : "YQ==" , "mime_type" : "text/plain" }
37
+ ],
38
+ "_code_executor_error_counts" : {"invocationA" : 2 },
39
+ }
40
+ state = State (state_data , {})
41
+ return CodeExecutorContext (state )
42
+
43
+
44
+ def test_init_empty_state (empty_state : State ):
45
+ """Test initialization with an empty state."""
46
+ ctx = CodeExecutorContext (empty_state )
47
+ assert ctx ._context == {}
48
+ assert ctx ._session_state is empty_state
49
+
50
+
51
+ def test_get_state_delta_empty (empty_state : State ):
52
+ """Test get_state_delta when context is empty."""
53
+ ctx = CodeExecutorContext (empty_state )
54
+ delta = ctx .get_state_delta ()
55
+ assert delta == {"_code_execution_context" : {}}
56
+
57
+
58
+ def test_get_state_delta_with_data (context_with_data : CodeExecutorContext ):
59
+ """Test get_state_delta with existing context data."""
60
+ delta = context_with_data .get_state_delta ()
61
+ expected_context = {
62
+ "execution_session_id" : "session123" ,
63
+ "processed_input_files" : ["file1.csv" , "file2.txt" ],
64
+ }
65
+ assert delta == {"_code_execution_context" : expected_context }
66
+
67
+
68
+ def test_get_execution_id_exists (context_with_data : CodeExecutorContext ):
69
+ """Test getting an existing execution ID."""
70
+ assert context_with_data .get_execution_id () == "session123"
71
+
72
+
73
+ def test_get_execution_id_not_exists (empty_state : State ):
74
+ """Test getting execution ID when it doesn't exist."""
75
+ ctx = CodeExecutorContext (empty_state )
76
+ assert ctx .get_execution_id () is None
77
+
78
+
79
+ def test_set_execution_id (empty_state : State ):
80
+ """Test setting an execution ID."""
81
+ ctx = CodeExecutorContext (empty_state )
82
+ ctx .set_execution_id ("new_session_id" )
83
+ assert ctx ._context ["execution_session_id" ] == "new_session_id"
84
+ assert ctx .get_execution_id () == "new_session_id"
85
+
86
+
87
+ def test_get_processed_file_names_exists (
88
+ context_with_data : CodeExecutorContext ,
89
+ ):
90
+ """Test getting existing processed file names."""
91
+ assert context_with_data .get_processed_file_names () == [
92
+ "file1.csv" ,
93
+ "file2.txt" ,
94
+ ]
95
+
96
+
97
+ def test_get_processed_file_names_not_exists (empty_state : State ):
98
+ """Test getting processed file names when none exist."""
99
+ ctx = CodeExecutorContext (empty_state )
100
+ assert ctx .get_processed_file_names () == []
101
+
102
+
103
+ def test_add_processed_file_names_new (empty_state : State ):
104
+ """Test adding processed file names to an empty context."""
105
+ ctx = CodeExecutorContext (empty_state )
106
+ ctx .add_processed_file_names (["new_file.py" ])
107
+ assert ctx ._context ["processed_input_files" ] == ["new_file.py" ]
108
+
109
+
110
+ def test_add_processed_file_names_append (
111
+ context_with_data : CodeExecutorContext ,
112
+ ):
113
+ """Test appending to existing processed file names."""
114
+ context_with_data .add_processed_file_names (["another_file.md" ])
115
+ assert context_with_data .get_processed_file_names () == [
116
+ "file1.csv" ,
117
+ "file2.txt" ,
118
+ "another_file.md" ,
119
+ ]
120
+
121
+
122
+ def test_get_input_files_exis
10000
ts (context_with_data : CodeExecutorContext ):
123
+ """Test getting existing input files."""
124
+ files = context_with_data .get_input_files ()
125
+ assert len (files ) == 1
126
+ assert files [0 ].name == "input1.txt"
127
+ assert files [0 ].content == "YQ=="
128
+ assert files [0 ].mime_type == "text/plain"
129
+
130
+
131
+ def test_get_input_files_not_exists (empty_state : State ):
132
+ """Test getting input files when none exist."""
133
+ ctx = CodeExecutorContext (empty_state )
134
+ assert ctx .get_input_files () == []
135
+
136
+
137
+ def test_add_input_files_new (empty_state : State ):
138
+ """Test adding input files to an empty session state."""
139
+ ctx = CodeExecutorContext (empty_state )
140
+ new_files = [File (name = "new.dat" , content = "Yg==" , mime_type = "application/octet-stream" )]
141
+ ctx .add_input_files (new_files )
142
+ assert empty_state ["_code_executor_input_files" ] == [
143
+ {"name" : "new.dat" , "content" : "Yg==" , "mime_type" : "application/octet-stream" }
144
+ ]
145
+
146
+
147
+ def test_add_input_files_append (context_with_data : CodeExecutorContext ):
148
+ """Test appending to existing input files."""
149
+ new_file = File (name = "input2.log" , content = "Yw==" , mime_type = "text/x-log" )
150
+ context_with_data .add_input_files ([new_file ])
151
+ expected_files_data = [
152
+ {"name" : "input1.txt" , "content" : "YQ==" , "mime_type" : "text/plain" },
153
+ {"name" : "input2.log" , "content" : "Yw==" , "mime_type" : "text/x-log" },
154
+ ]
155
+ assert (
156
+ context_with_data ._session_state ["_code_executor_input_files" ]
157
+ == expected_files_data
158
+ )
159
+
160
+
161
+ def test_clear_input_files (context_with_data : CodeExecutorContext ):
162
+ """Test clearing input files and processed file names."""
163
+ context_with_data .clear_input_files ()
164
+ assert context_with_data ._session_state ["_code_executor_input_files" ] == []
165
+ assert context_with_data ._context ["processed_input_files" ] == []
166
+
167
+
168
+ def test_clear_input_files_when_not_exist (empty_state : State ):
169
+ """Test clearing input files when they don't exist initially."""
170
+ ctx = CodeExecutorContext (empty_state )
171
+ ctx .clear_input_files () # Should not raise error
172
+ assert "_code_executor_input_files" not in empty_state # Or assert it's empty
173
+ assert "_code_execution_context" not in empty_state or not empty_state [
174
+ "_code_execution_context"
175
+ ].get ("processed_input_files" )
176
+
177
+
178
+ def test_get_error_count_exists (context_with_data : CodeExecutorContext ):
179
+ """Test getting an existing error count."""
180
+ assert context_with_data .get_error_count ("invocationA" ) == 2
181
+
182
+
183
+ def test_get_error_count_invocation_not_exists (
184
+ context_with_data : CodeExecutorContext ,
185
+ ):
186
+ """Test getting error count for an unknown invocation ID."""
187
+ assert context_with_data .get_error_count ("invocationB" ) == 0
188
+
189
+
190
+ def test_get_error_count_no_error_key (empty_state : State ):
191
+ """Test getting error count when the error key itself doesn't exist."""
192
+ ctx = CodeExecutorContext (empty_state )
193
+ assert ctx .get_error_count ("any_invocation" ) == 0
194
+
195
+
196
+ def test_increment_error_count_new_invocation (empty_state : State ):
197
+ """Test incrementing error count for a new invocation ID."""
198
+ ctx = CodeExecutorContext (empty_state )
199
+ ctx .increment_error_count ("invocationNew" )
200
+ assert empty_state ["_code_executor_error_counts" ]["invocationNew" ] == 1
201
+
202
+
203
+ def test_increment_error_count_existing_invocation (
204
+ context_with_data : CodeExecutorContext ,
205
+ ):
206
+ """Test incrementing error count for an existing invocation ID."""
207
+ context_with_data .increment_error_count ("invocationA" )
208
+ assert (
209
+ context_with_data ._session_state ["_code_executor_error_counts" ][
210
+ "invocationA"
211
+ ]
212
+ == 3
213
+ )
214
+
215
+
216
+ def test_reset_error_count_exists (context_with_data : CodeExecutorContext ):
217
+ """Test resetting an existing error count."""
218
+ context_with_data .reset_error_count ("invocationA" )
219
+ assert "invocationA" not in (
220
+ context_with_data ._session_state ["_code_executor_error_counts" ]
221
+ )
222
+
223
+
224
+ def test_reset_error_count_not_exists (context_with_data : CodeExecutorContext ):
225
+ """Test resetting an error count that doesn't exist."""
226
+ context_with_data .reset_error_count ("invocationB" ) # Should not raise
227
+ assert "invocationB" not in (
228
+ context_with_data ._session_state ["_code_executor_error_counts" ]
229
+ )
230
+
231
+
232
+ def test_reset_error_count_no_error_key (empty_state : State ):
233
+ """Test resetting when the error key itself doesn't exist."""
234
+ ctx = CodeExecutorContext (empty_state )
235
+ ctx .reset_error_count ("any_invocation" ) # Should not raise
236
+ assert "_code_executor_error_counts" not in empty_state
237
+
238
+
239
+ def test_update_code_execution_result_new_invocation (empty_state : State ):
240
+ """Test updating code execution result for a new invocation."""
241
+ ctx = CodeExecutorContext (empty_state )
242
+ ctx .update_code_execution_result (
243
+ "inv1" , "print('hi')" , "hi" , ""
244
+ )
245
+ results = empty_state ["_code_execution_results" ]["inv1" ]
246
+ assert len (results ) == 1
247
+ assert results [0 ]["code" ] == "print('hi')"
248
+ assert results [0 ]["result_stdout" ] == "hi"
249
+ assert results [0 ]["result_stderr" ] == ""
250
+ assert "timestamp" in results [0 ]
251
+
252
+
253
+ def test_update_code_execution_result_append (
254
+ context_with_data : CodeExecutorContext ,
255
+ ):
256
+ """Test appending to existing code execution results for an invocation."""
257
+ # First, let's add an initial result for a new invocation to the existing state
258
+ context_with_data ._session_state ["_code_execution_results" ] = {
259
+ "invocationX" : [{
260
+ "code" : "old_code" ,
261
+ "result_stdout" : "old_out" ,
262
+ "result_stderr" : "old_err" ,
263
+ "timestamp" : 123 ,
264
+ }]
265
+ }
266
+ context_with_data .update_code_execution_result (
267
+ "invocationX" , "new_code" , "new_out" , "new_err"
268
+ )
269
+ results = context_with_data ._session_state ["_code_execution_results" ][
270
+ "invocationX"
271
+ ]
272
+ assert len (results ) == 2
273
+ assert results [1 ]["code" ] == "new_code"
274
+ assert results [1 ]["result_stdout" ] == "new_out"
275
+ assert results [1 ]["result_stderr" ] == "new_err"
0 commit comments