-
-
Notifications
You must be signed in to change notification settings - Fork 220
/
task.py
1446 lines (1330 loc) · 57.9 KB
/
task.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
from __future__ import annotations
import asyncio
import copy
import logging
from collections import Counter, deque
from types import SimpleNamespace
from typing import (
Any,
Callable,
Coroutine,
Deque,
Dict,
List,
Optional,
Set,
Tuple,
Type,
cast,
)
import numpy as np
from pydantic import BaseModel
from rich import print
from rich.markup import escape
from langroid.agent.base import Agent
from langroid.agent.chat_agent import ChatAgent
from langroid.agent.chat_document import (
ChatDocLoggerFields,
ChatDocMetaData,
ChatDocument,
StatusCode,
)
from langroid.cachedb.redis_cachedb import RedisCache, RedisCacheConfig
from langroid.exceptions import InfiniteLoopException
from langroid.mytypes import Entity
from langroid.parsing.parse_json import extract_top_level_json
from langroid.parsing.routing import parse_addressed_message
from langroid.utils.configuration import settings
from langroid.utils.constants import (
DONE,
NO_ANSWER,
PASS,
PASS_TO,
SEND_TO,
USER_QUIT_STRINGS,
)
from langroid.utils.logging import RichFileLogger, setup_file_logger
from langroid.utils.system import hash
logger = logging.getLogger(__name__)
Responder = Entity | Type["Task"]
def noop_fn(*args: List[Any], **kwargs: Dict[str, Any]) -> None:
pass
class TaskConfig(BaseModel):
"""Configuration for a Task. This is a container for any params that
we didn't include in the task `__init__` method.
We may eventually move all the task __init__ params to this class, analogous to how
we have config classes for `Agent`, `ChatAgent`, `LanguageModel`, etc.
Attributes:
inf_loop_cycle_len: max exact-loop cycle length: 0 => no inf loop test
inf_loop_dominance_factor: dominance factor for exact-loop detection
inf_loop_wait_factor: wait this * cycle_len msgs before loop-check
"""
inf_loop_cycle_len: int = 10
inf_loop_dominance_factor: float = 1.5
inf_loop_wait_factor: float = 5.0
class Task:
"""
A `Task` wraps an `Agent` object, and sets up the `Agent`'s goals and instructions.
A `Task` maintains two key variables:
- `self.pending_message`, which is the message awaiting a response, and
- `self.pending_sender`, which is the entity that sent the pending message.
The possible responders to `self.pending_message` are the `Agent`'s own "native"
responders (`agent_response`, `llm_response`, and `user_response`), and
the `run()` methods of any sub-tasks. All responders have the same type-signature
(somewhat simplified):
```
str | ChatDocument -> ChatDocument
```
Responders may or may not specify an intended recipient of their generated response.
The main top-level method in the `Task` class is `run()`, which repeatedly calls
`step()` until `done()` returns true. The `step()` represents a "turn" in the
conversation: this method sequentially (in round-robin fashion) calls the responders
until it finds one that generates a *valid* response to the `pending_message`
(as determined by the `valid()` method). Once a valid response is found,
`step()` updates the `pending_message` and `pending_sender` variables,
and on the next iteration, `step()` re-starts its search for a valid response
*from the beginning* of the list of responders (the exception being that the
human user always gets a chance to respond after each non-human valid response).
This process repeats until `done()` returns true, at which point `run()` returns
the value of `result()`, which is the final result of the task.
"""
# class variable called `cache` that is a RedisCache object
cache: RedisCache = RedisCache(RedisCacheConfig(fake=False))
def __init__(
self,
agent: Optional[Agent] = None,
name: str = "",
llm_delegate: bool = False,
single_round: bool = False,
system_message: str = "",
user_message: str | None = "",
restart: bool = True,
default_human_response: Optional[str] = None,
interactive: bool = True,
only_user_quits_root: bool = False,
erase_substeps: bool = False,
allow_null_result: bool = True,
max_stalled_steps: int = 5,
done_if_no_response: List[Responder] = [],
done_if_response: List[Responder] = [],
config: TaskConfig = TaskConfig(),
):
"""
A task to be performed by an agent.
Args:
agent (Agent): agent associated with the task
name (str): name of the task
llm_delegate (bool):
[Deprecated, not used; use `done_if_response`, `done_if_no_response`
instead]
Whether to delegate control to LLM; conceptually,
the "controlling entity" is the one "seeking" responses to its queries,
and has a goal it is aiming to achieve. The "controlling entity" is
either the LLM or the USER. (Note within a Task there is just one
LLM, and all other entities are proxies of the "User" entity).
single_round (bool):
[Deprecated: Use `done_if_response`, `done_if_no_response` instead].
If true, task runs until one message by controller,
and subsequent response by non-controller. If false, runs for the
specified number of turns in `run`, or until `done()` is true.
One run of step() is considered a "turn".
system_message (str): if not empty, overrides agent's system_message
user_message (str): if not empty, overrides agent's user_message
restart (bool): if true, resets the agent's message history
default_human_response (str): default response from user; useful for
testing, to avoid interactive input from user.
[Instead of this, setting `interactive` usually suffices]
interactive (bool): if true, wait for human input after each non-human
response (prevents infinite loop of non-human responses).
Default is true. If false, then `default_human_response` is set to ""
only_user_quits_root (bool): if true, only user can quit the root task.
[This param is ignored & deprecated; Keeping for backward compatibility.
Instead of this, setting `interactive` suffices]
erase_substeps (bool): if true, when task completes, erase intermediate
conversation with subtasks from this agent's `message_history`, and also
erase all subtask agents' `message_history`.
Note: erasing can reduce prompt sizes, but results in repetitive
sub-task delegation.
allow_null_result (bool): [Deprecated, may be removed in future.]
If true, allow null (empty or NO_ANSWER)
as the result of a step or overall task result.
Optional, default is True.
max_stalled_steps (int): task considered done after this many consecutive
steps with no progress. Default is 3.
done_if_no_response (List[Responder]): consider task done if NULL
response from any of these responders. Default is empty list.
done_if_response (List[Responder]): consider task done if NON-NULL
response from any of these responders. Default is empty list.
"""
if agent is None:
agent = ChatAgent()
self.callbacks = SimpleNamespace(
show_subtask_response=noop_fn,
set_parent_agent=noop_fn,
)
self.config = config
# counts of distinct pending messages in history,
# to help detect (exact) infinite loops
self.message_counter: Counter[str] = Counter()
self.history_count: Deque[int] = deque(maxlen=self.config.inf_loop_cycle_len)
# copy the agent's config, so that we don't modify the original agent's config,
# which may be shared by other agents.
try:
config_copy = copy.deepcopy(agent.config)
agent.config = config_copy
except Exception:
logger.warning(
"""
Failed to deep-copy Agent config during task creation,
proceeding with original config. Be aware that changes to
the config may affect other agents using the same config.
"""
)
if isinstance(agent, ChatAgent) and len(agent.message_history) == 0 or restart:
agent = cast(ChatAgent, agent)
agent.clear_history(0)
agent.clear_dialog()
# possibly change the system and user messages
if system_message:
# we always have at least 1 task_message
agent.set_system_message(system_message)
if user_message:
agent.set_user_message(user_message)
self.max_cost: float = 0
self.max_tokens: int = 0
self.session_id: str = ""
self.logger: None | RichFileLogger = None
self.tsv_logger: None | logging.Logger = None
self.color_log: bool = False if settings.notebook else True
self.agent = agent
self.step_progress = False # progress in current step?
self.n_stalled_steps = 0 # how many consecutive steps with no progress?
self.max_stalled_steps = max_stalled_steps
self.done_if_response = [r.value for r in done_if_response]
self.done_if_no_response = [r.value for r in done_if_no_response]
self.is_done = False # is task done (based on response)?
self.is_pass_thru = False # is current response a pass-thru?
self.task_progress = False # progress in current task (since run or run_async)?
if name:
# task name overrides name in agent config
agent.config.name = name
self.name = name or agent.config.name
self.value: str = self.name
if default_human_response is not None and default_human_response == "":
interactive = False
self.interactive = interactive
self.agent.interactive = interactive
self.message_history_idx = -1
if interactive:
only_user_quits_root = True
else:
default_human_response = default_human_response or ""
only_user_quits_root = False
if default_human_response is not None:
self.agent.default_human_response = default_human_response
self.default_human_response = default_human_response
if self.interactive:
self.agent.default_human_response = None
self.only_user_quits_root = only_user_quits_root
# set to True if we want to collapse multi-turn conversation with sub-tasks into
# just the first outgoing message and last incoming message.
# Note this also completely erases sub-task agents' message_history.
self.erase_substeps = erase_substeps
self.allow_null_result = allow_null_result
agent_entity_responders = agent.entity_responders()
agent_entity_responders_async = agent.entity_responders_async()
self.responders: List[Responder] = [e for e, _ in agent_entity_responders]
self.responders_async: List[Responder] = [
e for e, _ in agent_entity_responders_async
]
self.non_human_responders: List[Responder] = [
r for r in self.responders if r != Entity.USER
]
self.non_human_responders_async: List[Responder] = [
r for r in self.responders_async if r != Entity.USER
]
self.human_tried = False # did human get a chance to respond in last step?
self._entity_responder_map: Dict[
Entity, Callable[..., Optional[ChatDocument]]
] = dict(agent_entity_responders)
self._entity_responder_async_map: Dict[
Entity, Callable[..., Coroutine[Any, Any, Optional[ChatDocument]]]
] = dict(agent_entity_responders_async)
self.name_sub_task_map: Dict[str, Task] = {}
# latest message in a conversation among entities and agents.
self.pending_message: Optional[ChatDocument] = None
self.pending_sender: Responder = Entity.USER
self.single_round = single_round
self.turns = -1 # no limit
self.llm_delegate = llm_delegate
if llm_delegate:
self.controller = Entity.LLM
if self.single_round:
# 0: User instructs (delegating to LLM);
# 1: LLM asks;
# 2: user replies.
self.turns = 2
else:
self.controller = Entity.USER
if self.single_round:
self.turns = 1 # 0: User asks, 1: LLM replies.
# other sub_tasks this task can delegate to
self.sub_tasks: List[Task] = []
self.parent_task: Set[Task] = set()
self.caller: Task | None = None # which task called this task's `run` method
def clone(self, i: int) -> "Task":
"""
Returns a copy of this task, with a new agent.
"""
assert isinstance(self.agent, ChatAgent), "Task clone only works for ChatAgent"
agent: ChatAgent = self.agent.clone(i)
return Task(
agent,
name=self.name + f"-{i}",
llm_delegate=self.llm_delegate,
single_round=self.single_round,
system_message=self.agent.system_message,
user_message=self.agent.user_message,
restart=False,
default_human_response=self.default_human_response,
interactive=self.interactive,
erase_substeps=self.erase_substeps,
allow_null_result=self.allow_null_result,
max_stalled_steps=self.max_stalled_steps,
done_if_no_response=[Entity(s) for s in self.done_if_no_response],
done_if_response=[Entity(s) for s in self.done_if_response],
config=self.config,
)
def __repr__(self) -> str:
return f"{self.name}"
def __str__(self) -> str:
return f"{self.name}"
def _cache_session_store(self, key: str, value: str) -> None:
"""
Cache a key-value pair for the current session.
E.g. key = "kill", value = "1"
"""
try:
self.cache.store(f"{self.session_id}:{key}", value)
except Exception as e:
logging.error(f"Error in Task._cache_session_store: {e}")
def _cache_session_lookup(self, key: str) -> Dict[str, Any] | str | None:
"""
Retrieve a value from the cache for the current session.
"""
session_id_key = f"{self.session_id}:{key}"
try:
cached_val = self.cache.retrieve(session_id_key)
except Exception as e:
logging.error(f"Error in Task._cache_session_lookup: {e}")
return None
return cached_val
def _is_kill(self) -> bool:
"""
Check if the current session is killed.
"""
return self._cache_session_lookup("kill") == "1"
def _set_alive(self) -> None:
"""
Initialize the kill status of the current session.
"""
self._cache_session_store("kill", "0")
@classmethod
def kill_session(cls, session_id: str = "") -> None:
"""
Kill the session with the given session_id.
"""
session_id_kill_key = f"{session_id}:kill"
cls.cache.store(session_id_kill_key, "1")
def kill(self) -> None:
"""
Kill the task run associated with the current session.
"""
self._cache_session_store("kill", "1")
@property
def _level(self) -> int:
if self.caller is None:
return 0
return self.caller._level + 1
@property
def _indent(self) -> str:
return "...|" * self._level
@property
def _enter(self) -> str:
return self._indent + ">>>"
@property
def _leave(self) -> str:
return self._indent + "<<<"
def add_sub_task(self, task: Task | List[Task]) -> None:
"""
Add a sub-task (or list of subtasks) that this task can delegate
(or fail-over) to. Note that the sequence of sub-tasks is important,
since these are tried in order, as the parent task searches for a valid
response.
Args:
task (Task|List[Task]): sub-task(s) to add
"""
if isinstance(task, list):
for t in task:
self.add_sub_task(t)
return
assert isinstance(task, Task), f"added task must be a Task, not {type(task)}"
task.parent_task.add(self) # add myself to set of parent tasks of `task`
self.sub_tasks.append(task)
self.name_sub_task_map[task.name] = task
self.responders.append(cast(Responder, task))
self.responders_async.append(cast(Responder, task))
self.non_human_responders.append(cast(Responder, task))
self.non_human_responders_async.append(cast(Responder, task))
def init(self, msg: None | str | ChatDocument = None) -> ChatDocument | None:
"""
Initialize the task, with an optional message to start the conversation.
Initializes `self.pending_message` and `self.pending_sender`.
Args:
msg (str|ChatDocument): optional message to start the conversation.
Returns:
(ChatDocument|None): the initialized `self.pending_message`.
Currently not used in the code, but provided for convenience.
"""
self.pending_sender = Entity.USER
if isinstance(msg, str):
self.pending_message = ChatDocument(
content=msg,
metadata=ChatDocMetaData(
sender=Entity.USER,
),
)
else:
self.pending_message = msg
if self.pending_message is not None and self.caller is not None:
# msg may have come from `caller`, so we pretend this is from
# the CURRENT task's USER entity
self.pending_message.metadata.sender = Entity.USER
self._show_pending_message_if_debug()
if self.caller is not None and self.caller.logger is not None:
self.logger = self.caller.logger
else:
self.logger = RichFileLogger(f"logs/{self.name}.log", color=self.color_log)
if self.caller is not None and self.caller.tsv_logger is not None:
self.tsv_logger = self.caller.tsv_logger
else:
self.tsv_logger = setup_file_logger("tsv_logger", f"logs/{self.name}.tsv")
header = ChatDocLoggerFields().tsv_header()
self.tsv_logger.info(f" \tTask\tResponder\t{header}")
self.log_message(Entity.USER, self.pending_message)
return self.pending_message
def run(
self,
msg: Optional[str | ChatDocument] = None,
turns: int = -1,
caller: None | Task = None,
max_cost: float = 0,
max_tokens: int = 0,
session_id: str = "",
) -> Optional[ChatDocument]:
"""Synchronous version of `run_async()`.
See `run_async()` for details."""
self.task_progress = False
self.n_stalled_steps = 0
self.max_cost = max_cost
self.max_tokens = max_tokens
self.session_id = session_id
self._set_alive()
self.message_counter.clear()
self.history_count.clear()
assert (
msg is None or isinstance(msg, str) or isinstance(msg, ChatDocument)
), f"msg arg in Task.run() must be None, str, or ChatDocument, not {type(msg)}"
if (
isinstance(msg, ChatDocument)
and msg.metadata.recipient != ""
and msg.metadata.recipient != self.name
):
# this task is not the intended recipient so return None
return None
self._pre_run_loop(
msg=msg,
caller=caller,
is_async=False,
)
# self.turns overrides if it is > 0 and turns not set (i.e. = -1)
turns = self.turns if turns < 0 else turns
i = 0
while True:
self.step()
done, status = self.done()
if done:
if self._level == 0 and not settings.quiet:
print("[magenta]Bye, hope this was useful!")
break
i += 1
max_turns = (
min(turns, settings.max_turns)
if turns > 0 and settings.max_turns > 0
else max(turns, settings.max_turns)
)
if max_turns > 0 and i >= max_turns:
status = StatusCode.MAX_TURNS
break
if (
self.config.inf_loop_cycle_len > 0
and i % self.config.inf_loop_cycle_len == 0
and self._maybe_infinite_loop()
):
raise InfiniteLoopException(
"""Possible infinite loop detected!
You can adjust infinite loop detection by changing the params
in the TaskConfig passed to the Task constructor:
e.g. set inf_loop_cycle_len=0 to disable loop detection."""
)
final_result = self.result()
if final_result is not None:
final_result.metadata.status = status
self._post_run_loop()
return final_result
async def run_async(
self,
msg: Optional[str | ChatDocument] = None,
turns: int = -1,
caller: None | Task = None,
max_cost: float = 0,
max_tokens: int = 0,
session_id: str = "",
) -> Optional[ChatDocument]:
"""
Loop over `step()` until task is considered done or `turns` is reached.
Runs asynchronously.
Args:
msg (str|ChatDocument): initial message to process; if None,
the LLM will respond to its initial `self.task_messages`
which set up and kick off the overall task.
The agent tries to achieve this goal by looping
over `self.step()` until the task is considered
done; this can involve a series of messages produced by Agent,
LLM or Human (User).
turns (int): number of turns to run the task for;
default is -1, which means run until task is done.
caller (Task|None): the calling task, if any
max_cost (float): max cost allowed for the task (default 0 -> no limit)
max_tokens (int): max tokens allowed for the task (default 0 -> no limit)
session_id (str): session id for the task
Returns:
Optional[ChatDocument]: valid result of the task.
"""
# Even if the initial "sender" is not literally the USER (since the task could
# have come from another LLM), as far as this agent is concerned, the initial
# message can be considered to be from the USER
# (from the POV of this agent's LLM).
self.task_progress = False
self.n_stalled_steps = 0
self.max_cost = max_cost
self.max_tokens = max_tokens
self.session_id = session_id
self._set_alive()
self.message_counter.clear()
self.history_count.clear()
if (
isinstance(msg, ChatDocument)
and msg.metadata.recipient != ""
and msg.metadata.recipient != self.name
):
# this task is not the intended recipient so return None
return None
self._pre_run_loop(
msg=msg,
caller=caller,
is_async=True,
)
# self.turns overrides if it is > 0 and turns not set (i.e. = -1)
turns = self.turns if turns < 0 else turns
i = 0
while True:
await self.step_async()
await asyncio.sleep(0.01) # temp yield to avoid blocking
done, status = self.done()
if done:
if self._level == 0 and not settings.quiet:
print("[magenta]Bye, hope this was useful!")
break
i += 1
max_turns = (
min(turns, settings.max_turns)
if turns > 0 and settings.max_turns > 0
else max(turns, settings.max_turns)
)
if max_turns > 0 and i >= max_turns:
status = StatusCode.MAX_TURNS
break
if (
self.config.inf_loop_cycle_len > 0
and i % self.config.inf_loop_cycle_len == 0
and self._maybe_infinite_loop()
):
raise InfiniteLoopException(
"""Possible infinite loop detected!
You can adjust infinite loop detection by changing the params
in the TaskConfig passed to the Task constructor:
e.g. set inf_loop_cycle_len=0 to disable loop detection."""
)
final_result = self.result()
if final_result is not None:
final_result.metadata.status = status
self._post_run_loop()
return final_result
def _pre_run_loop(
self,
msg: Optional[str | ChatDocument] = None,
caller: None | Task = None,
is_async: bool = False,
) -> None:
self.caller = caller
self.init(msg)
# sets indentation to be printed prior to any output from agent
self.agent.indent = self._indent
if self.default_human_response is not None:
self.agent.default_human_response = self.default_human_response
self.message_history_idx = -1
if isinstance(self.agent, ChatAgent):
# mark where we are in the message history, so we can reset to this when
# we are done with the task
self.message_history_idx = (
max(
len(self.agent.message_history),
len(self.agent.task_messages),
)
- 1
)
# TODO decide on whether or not to print, based on is_async
llm_model = (
"no-LLM"
if self.agent.config.llm is None
else self.agent.config.llm.chat_model
)
if not settings.quiet:
print(
f"[bold magenta]{self._enter} Starting Agent "
f"{self.name} ({self.message_history_idx+1}) "
f"{llm_model} [/bold magenta]"
)
def _post_run_loop(self) -> None:
# delete all messages from our agent's history, AFTER the first incoming
# message, and BEFORE final result message
n_messages = 0
if isinstance(self.agent, ChatAgent):
if self.erase_substeps:
# TODO I don't like directly accessing agent message_history. Revisit.
# (Pchalasani)
del self.agent.message_history[
self.message_history_idx + 2 : n_messages - 1
]
n_messages = len(self.agent.message_history)
if self.erase_substeps:
for t in self.sub_tasks:
# erase our conversation with agent of subtask t
# erase message_history of agent of subtask t
# TODO - here we assume that subtask-agents are
# ONLY talking to the current agent.
if isinstance(t.agent, ChatAgent):
t.agent.clear_history(0)
if not settings.quiet:
print(
f"[bold magenta]{self._leave} Finished Agent "
f"{self.name} ({n_messages}) [/bold magenta]"
)
def step(self, turns: int = -1) -> ChatDocument | None:
"""
Synchronous version of `step_async()`. See `step_async()` for details.
TODO: Except for the self.response() calls, this fn should be identical to
`step_async()`. Consider refactoring to avoid duplication.
"""
self.is_done = False
self.step_progress = False
parent = self.pending_message
recipient = (
""
if self.pending_message is None
else self.pending_message.metadata.recipient
)
if not self._valid_recipient(recipient):
logger.warning(f"Invalid recipient: {recipient}")
error_doc = ChatDocument(
content=f"Invalid recipient: {recipient}",
metadata=ChatDocMetaData(
sender=Entity.AGENT,
sender_name=Entity.AGENT,
),
)
self._process_valid_responder_result(Entity.AGENT, parent, error_doc)
return error_doc
responders: List[Responder] = self.non_human_responders.copy()
if (
Entity.USER in self.responders
and not self.human_tried
and not self.agent.has_tool_message_attempt(self.pending_message)
):
# Give human first chance if they haven't been tried in last step,
# and the msg is not a tool-call attempt;
# This ensures human gets a chance to respond,
# other than to a LLM tool-call.
# When there's a tool msg attempt we want the
# Agent to be the next responder; this only makes a difference in an
# interactive setting: LLM generates tool, then we don't want user to
# have to respond, and instead let the agent_response handle the tool.
responders.insert(0, Entity.USER)
found_response = False
for r in responders:
self.is_pass_thru = False
if not self._can_respond(r):
# create dummy msg for logging
log_doc = ChatDocument(
content="[CANNOT RESPOND]",
function_call=None,
metadata=ChatDocMetaData(
sender=r if isinstance(r, Entity) else Entity.USER,
sender_name=str(r),
recipient=recipient,
),
)
self.log_message(r, log_doc)
continue
self.human_tried = r == Entity.USER
result = self.response(r, turns)
self.is_done = self._is_done_response(result, r)
self.is_pass_thru = PASS in result.content if result else False
if self.valid(result, r):
found_response = True
assert result is not None
self._process_valid_responder_result(r, parent, result)
break
else:
self.log_message(r, result)
if self.is_done:
# skip trying other responders in this step
break
if not found_response:
self._process_invalid_step_result(parent)
self._show_pending_message_if_debug()
return self.pending_message
async def step_async(self, turns: int = -1) -> ChatDocument | None:
"""
A single "turn" in the task conversation: The "allowed" responders in this
turn (which can be either the 3 "entities", or one of the sub-tasks) are
tried in sequence, until a _valid_ response is obtained; a _valid_
response is one that contributes to the task, either by ending it,
or producing a response to be further acted on.
Update `self.pending_message` to the latest valid response (or NO_ANSWER
if no valid response was obtained from any responder).
Args:
turns (int): number of turns to process. Typically used in testing
where there is no human to "quit out" of current level, or in cases
where we want to limit the number of turns of a delegated agent.
Returns (ChatDocument|None):
Updated `self.pending_message`. Currently the return value is not used
by the `task.run()` method, but we return this as a convenience for
other use-cases, e.g. where we want to run a task step by step in a
different context.
"""
self.is_done = False
self.step_progress = False
parent = self.pending_message
recipient = (
""
if self.pending_message is None
else self.pending_message.metadata.recipient
)
if not self._valid_recipient(recipient):
logger.warning(f"Invalid recipient: {recipient}")
error_doc = ChatDocument(
content=f"Invalid recipient: {recipient}",
metadata=ChatDocMetaData(
sender=Entity.AGENT,
sender_name=Entity.AGENT,
),
)
self._process_valid_responder_result(Entity.AGENT, parent, error_doc)
return error_doc
responders: List[Responder] = self.non_human_responders_async.copy()
if (
Entity.USER in self.responders
and not self.human_tried
and not self.agent.has_tool_message_attempt(self.pending_message)
):
# Give human first chance if they haven't been tried in last step,
# and the msg is not a tool-call attempt;
# This ensures human gets a chance to respond,
# other than to a LLM tool-call.
# When there's a tool msg attempt we want the
# Agent to be the next responder; this only makes a difference in an
# interactive setting: LLM generates tool, then we don't want user to
# have to respond, and instead let the agent_response handle the tool.
responders.insert(0, Entity.USER)
found_response = False
for r in responders:
if not self._can_respond(r):
# create dummy msg for logging
log_doc = ChatDocument(
content="[CANNOT RESPOND]",
function_call=None,
metadata=ChatDocMetaData(
sender=r if isinstance(r, Entity) else Entity.USER,
sender_name=str(r),
recipient=recipient,
),
)
self.log_message(r, log_doc)
continue
self.human_tried = r == Entity.USER
result = await self.response_async(r, turns)
self.is_done = self._is_done_response(result, r)
self.is_pass_thru = PASS in result.content if result else False
if self.valid(result, r):
found_response = True
assert result is not None
self._process_valid_responder_result(r, parent, result)
break
else:
self.log_message(r, result)
if self.is_done:
# skip trying other responders in this step
break
if not found_response:
self._process_invalid_step_result(parent)
self._show_pending_message_if_debug()
return self.pending_message
def _process_valid_responder_result(
self,
r: Responder,
parent: ChatDocument | None,
result: ChatDocument,
) -> None:
"""Processes valid result from a responder, during a step"""
# pending_sender is of type Responder,
# i.e. it is either one of the agent's entities
# OR a sub-task, that has produced a valid response.
# Contrast this with self.pending_message.metadata.sender, which is an ENTITY
# of this agent, or a sub-task's agent.
if not self.is_pass_thru:
self.pending_sender = r
result.metadata.parent = parent
if not self.is_pass_thru:
self.pending_message = result
self.log_message(self.pending_sender, result, mark=True)
self.step_progress = True
self.task_progress = True
if self.is_pass_thru:
self.n_stalled_steps += 1
else:
# reset stuck counter since we made progress
self.n_stalled_steps = 0
# update counters for infinite loop detection
if self.pending_message is not None:
hashed_msg = hash(str(self.pending_message))
self.message_counter.update([hashed_msg])
self.history_count.append(self.message_counter[hashed_msg])
def _process_invalid_step_result(self, parent: ChatDocument | None) -> None:
"""
Since step had no valid result from any responder, decide whether to update the
self.pending_message to a NO_ANSWER message from the opposite entity,
or leave it as is.
Args:
parent (ChatDocument|None): parent message of the current message
"""
self.n_stalled_steps += 1
if (not self.task_progress or self.allow_null_result) and not self.is_pass_thru:
# There has been no progress at all in this task, so we
# update the pending_message to a dummy NO_ANSWER msg
# from the entity 'opposite' to the current pending_sender,
# so we show "progress" and avoid getting stuck in an infinite loop.
responder = (
Entity.LLM if self.pending_sender == Entity.USER else Entity.USER
)
self.pending_message = ChatDocument(
content=NO_ANSWER,
metadata=ChatDocMetaData(sender=responder, parent=parent),
)
self.pending_sender = responder
self.log_message(self.pending_sender, self.pending_message, mark=True)
def _show_pending_message_if_debug(self) -> None:
if self.pending_message is None:
return
if settings.debug:
sender_str = escape(str(self.pending_sender))
msg_str = escape(str(self.pending_message))
print(f"[grey37][{sender_str}]{msg_str}[/grey37]")
def response(
self,
e: Responder,
turns: int = -1,
) -> Optional[ChatDocument]:
"""
Sync version of `response_async()`. See `response_async()` for details.
"""
if isinstance(e, Task):
actual_turns = e.turns if e.turns > 0 else turns
e.agent.callbacks.set_parent_agent(self.agent)
# e.callbacks.set_parent_agent(self.agent)
result = e.run(
self.pending_message,
turns=actual_turns,
caller=self,
max_cost=self.max_cost,
max_tokens=self.max_tokens,
)
result_str = str(ChatDocument.to_LLMMessage(result))
maybe_tool = len(extract_top_level_json(result_str)) > 0
self.callbacks.show_subtask_response(
task=e,
content=result_str,
is_tool=maybe_tool,
)
else:
response_fn = self._entity_responder_map[cast(Entity, e)]
result = response_fn(self.pending_message)
return self._process_result_routing(result)
def _process_result_routing(
self, result: ChatDocument | None
) -> ChatDocument | None:
# process result in case there is a routing instruction
if result is None:
return None
# if result content starts with @name, set recipient to name
is_pass, recipient, content = parse_routing(result)
if is_pass is None: # no routing, i.e. neither PASS nor SEND
return result
if is_pass:
if recipient is None or self.pending_message is None:
# Just PASS, no recipient
# This means pass on self.pending_message to the next responder
# in the default sequence of responders.
# So leave result intact since we handle "PASS" in step()
return result
# set recipient in self.pending_message
self.pending_message.metadata.recipient = recipient
# clear out recipient, replace with just PASS
result.content = result.content.replace(
f"{PASS_TO}:{recipient}", PASS
).strip()
return result
elif recipient is not None:
# we are sending non-empty content to non-null recipient
# clean up result.content, set metadata.recipient and return
result.content = content or ""
result.metadata.recipient = recipient
return result
else:
return result
async def response_async(
self,
e: Responder,