2
2
3
3
import json
4
4
import logging
5
- from typing import Any , Dict , Iterable , List , Optional , Tuple
5
+ from typing import Any , Generator , Iterable , Optional
6
6
7
7
from ..types .content import ContentBlock , Message , Messages
8
8
from ..types .models import Model
@@ -80,7 +80,7 @@ def handle_message_start(event: MessageStartEvent, message: Message) -> Message:
80
80
return message
81
81
82
82
83
- def handle_content_block_start (event : ContentBlockStartEvent ) -> Dict [str , Any ]:
83
+ def handle_content_block_start (event : ContentBlockStartEvent ) -> dict [str , Any ]:
84
84
"""Handles the start of a content block by extracting tool usage information if any.
85
85
86
86
Args:
@@ -102,61 +102,59 @@ def handle_content_block_start(event: ContentBlockStartEvent) -> Dict[str, Any]:
102
102
103
103
104
104
def handle_content_block_delta (
105
- event : ContentBlockDeltaEvent , state : Dict [str , Any ], callback_handler : Any , ** kwargs : Any
106
- ) -> Dict [ str , Any ]:
105
+ event : ContentBlockDeltaEvent , state : dict [str , Any ]
106
+ ) -> tuple [ dict [ str , Any ], dict [ str , Any ] ]:
107
107
"""Handles content block delta updates by appending text, tool input, or reasoning content to the state.
108
108
109
109
Args:
110
110
event: Delta event.
111
111
state: The current state of message processing.
112
- callback_handler: Callback for processing events as they happen.
113
- **kwargs: Additional keyword arguments to pass to the callback handler.
114
112
115
113
Returns:
116
114
Updated state with appended text or tool input.
117
115
"""
118
116
delta_content = event ["delta" ]
119
117
118
+ callback_event = {}
119
+
120
120
if "toolUse" in delta_content :
121
121
if "input" not in state ["current_tool_use" ]:
122
122
state ["current_tool_use" ]["input" ] = ""
123
123
124
124
state ["current_tool_use" ]["input" ] += delta_content ["toolUse" ]["input" ]
125
- callback_handler ( delta = delta_content , current_tool_use = state ["current_tool_use" ], ** kwargs )
125
+ callback_event [ "callback" ] = { "delta" : delta_content , " current_tool_use" : state ["current_tool_use" ]}
126
126
127
127
elif "text" in delta_content :
128
128
state ["text" ] += delta_content ["text" ]
129
- callback_handler ( data = delta_content ["text" ], delta = delta_content , ** kwargs )
129
+ callback_event [ "callback" ] = { "data" : delta_content ["text" ], " delta" : delta_content }
130
130
131
131
elif "reasoningContent" in delta_content :
132
132
if "text" in delta_content ["reasoningContent" ]:
133
133
if "reasoningText" not in state :
134
134
state ["reasoningText" ] = ""
135
135
136
136
state ["reasoningText" ] += delta_content ["reasoningContent" ]["text" ]
137
- callback_handler (
138
- reasoningText = delta_content ["reasoningContent" ]["text" ],
139
- delta = delta_content ,
140
- reasoning = True ,
141
- ** kwargs ,
142
- )
137
+ callback_event ["callback" ] = {
138
+ "reasoningText" : delta_content ["reasoningContent" ]["text" ],
139
+ "delta" : delta_content ,
140
+ "reasoning" : True ,
141
+ }
143
142
144
143
elif "signature" in delta_content ["reasoningContent" ]:
145
144
if "signature" not in state :
146
145
state ["signature" ] = ""
147
146
148
147
state ["signature" ] += delta_content ["reasoningContent" ]["signature" ]
149
- callback_handler (
150
- reasoning_signature = delta_content ["reasoningContent" ]["signature" ],
151
- delta = delta_content ,
152
- reasoning = True ,
153
- ** kwargs ,
154
- )
148
+ callback_event ["callback" ] = {
149
+ "reasoning_signature" : delta_content ["reasoningContent" ]["signature" ],
150
+ "delta" : delta_content ,
151
+ "reasoning" : True ,
152
+ }
155
153
156
- return state
154
+ return state , callback_event
157
155
158
156
159
- def handle_content_block_stop (state : Dict [str , Any ]) -> Dict [str , Any ]:
157
+ def handle_content_block_stop (state : dict [str , Any ]) -> dict [str , Any ]:
160
158
"""Handles the end of a content block by finalizing tool usage, text content, or reasoning content.
161
159
162
160
Args:
@@ -165,7 +163,7 @@ def handle_content_block_stop(state: Dict[str, Any]) -> Dict[str, Any]:
165
163
Returns:
166
164
Updated state with finalized content block.
167
165
"""
168
- content : List [ContentBlock ] = state ["content" ]
166
+ content : list [ContentBlock ] = state ["content" ]
169
167
170
168
current_tool_use = state ["current_tool_use" ]
171
169
text = state ["text" ]
@@ -223,7 +221,7 @@ def handle_message_stop(event: MessageStopEvent) -> StopReason:
223
221
return event ["stopReason" ]
224
222
225
223
226
- def handle_redact_content (event : RedactContentEvent , messages : Messages , state : Dict [str , Any ]) -> None :
224
+ def handle_redact_content (event : RedactContentEvent , messages : Messages , state : dict [str , Any ]) -> None :
227
225
"""Handles redacting content from the input or output.
228
226
229
227
Args:
@@ -238,7 +236,7 @@ def handle_redact_content(event: RedactContentEvent, messages: Messages, state:
238
236
state ["message" ]["content" ] = [{"text" : event ["redactAssistantContentMessage" ]}]
239
237
240
238
241
- def extract_usage_metrics (event : MetadataEvent ) -> Tuple [Usage , Metrics ]:
239
+ def extract_usage_metrics (event : MetadataEvent ) -> tuple [Usage , Metrics ]:
242
240
"""Extracts usage metrics from the metadata chunk.
243
241
244
242
Args:
@@ -255,25 +253,20 @@ def extract_usage_metrics(event: MetadataEvent) -> Tuple[Usage, Metrics]:
255
253
256
254
def process_stream (
257
255
chunks : Iterable [StreamEvent ],
258
- callback_handler : Any ,
259
256
messages : Messages ,
260
- ** kwargs : Any ,
261
- ) -> Tuple [StopReason , Message , Usage , Metrics , Any ]:
257
+ ) -> Generator [dict [str , Any ], None , None ]:
262
258
"""Processes the response stream from the API, constructing the final message and extracting usage metrics.
263
259
264
260
Args:
265
261
chunks: The chunks of the response stream from the model.
266
- callback_handler: Callback for processing events as they happen.
267
262
messages: The agents messages.
268
- **kwargs: Additional keyword arguments that will be passed to the callback handler.
269
- And also returned in the request_state.
270
263
271
264
Returns:
272
- The reason for stopping, the constructed message, the usage metrics, and the updated request state .
265
+ The reason for stopping, the constructed message, and the usage metrics .
273
266
"""
274
267
stop_reason : StopReason = "end_turn"
275
268
276
- state : Dict [str , Any ] = {
269
+ state : dict [str , Any ] = {
277
270
"message" : {"role" : "assistant" , "content" : []},
278
271
"text" : "" ,
279
272
"current_tool_use" : {},
@@ -285,18 +278,16 @@ def process_stream(
285
278
usage : Usage = Usage (inputTokens = 0 , outputTokens = 0 , totalTokens = 0 )
286
279
metrics : Metrics = Metrics (latencyMs = 0 )
287
280
288
- kwargs .setdefault ("request_state" , {})
289
-
290
281
for chunk in chunks :
291
- # Callback handler call here allows each event to be visible to the caller
292
- callback_handler (event = chunk )
282
+ yield {"callback" : {"event" : chunk }}
293
283
294
284
if "messageStart" in chunk :
295
285
state ["message" ] = handle_message_start (chunk ["messageStart" ], state ["message" ])
296
286
elif "contentBlockStart" in chunk :
297
287
state ["current_tool_use" ] = handle_content_block_start (chunk ["contentBlockStart" ])
298
288
elif "contentBlockDelta" in chunk :
299
- state = handle_content_block_delta (chunk ["contentBlockDelta" ], state , callback_handler , ** kwargs )
289
+ state , callback_event = handle_content_block_delta (chunk ["contentBlockDelta" ], state )
290
+ yield callback_event
300
291
elif "contentBlockStop" in chunk :
301
292
state = handle_content_block_stop (state )
302
293
elif "messageStop" in chunk :
@@ -306,35 +297,30 @@ def process_stream(
306
297
elif "redactContent" in chunk :
307
298
handle_redact_content (chunk ["redactContent" ], messages , state )
308
299
309
- return stop_reason , state ["message" ], usage , metrics , kwargs [ "request_state" ]
300
+ yield { "stop" : ( stop_reason , state ["message" ], usage , metrics )}
310
301
311
302
312
303
def stream_messages (
313
304
model : Model ,
314
305
system_prompt : Optional [str ],
315
306
messages : Messages ,
316
307
tool_config : Optional [ToolConfig ],
317
- callback_handler : Any ,
318
- ** kwargs : Any ,
319
- ) -> Tuple [StopReason , Message , Usage , Metrics , Any ]:
308
+ ) -> Generator [dict [str , Any ], None , None ]:
320
309
"""Streams messages to the model and processes the response.
321
310
322
311
Args:
323
312
model: Model provider.
324
313
system_prompt: The system prompt to send.
325
314
messages: List of messages to send.
326
315
tool_config: Configuration for the tools to use.
327
- callback_handler: Callback for processing events as they happen.
328
- **kwargs: Additional keyword arguments that will be passed to the callback handler.
329
- And also returned in the request_state.
330
316
331
317
Returns:
332
- The reason for stopping, the final message, the usage metrics, and updated request state.
318
+ The reason for stopping, the final message, and the usage metrics
333
319
"""
334
320
logger .debug ("model=<%s> | streaming messages" , model )
335
321
336
322
messages = remove_blank_messages_content_text (messages )
337
323
tool_specs = [tool ["toolSpec" ] for tool in tool_config .get ("tools" , [])] or None if tool_config else None
338
324
339
325
chunks = model .converse (messages , tool_specs , system_prompt )
340
- return process_stream (chunks , callback_handler , messages , ** kwargs )
326
+ yield from process_stream (chunks , messages )
0 commit comments