forked from strands-agents/sdk-python
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathsession_manager.py
More file actions
168 lines (135 loc) · 7.41 KB
/
session_manager.py
File metadata and controls
168 lines (135 loc) · 7.41 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
"""Session manager interface for agent session management."""
import logging
from abc import ABC, abstractmethod
from typing import TYPE_CHECKING, Any
from ..experimental.hooks.events import (
BidiAfterInvocationEvent,
BidiAgentInitializedEvent,
BidiMessageAddedEvent,
)
from ..experimental.hooks.multiagent.events import (
AfterMultiAgentInvocationEvent,
AfterNodeCallEvent,
MultiAgentInitializedEvent,
)
from ..hooks.events import AfterInvocationEvent, AgentInitializedEvent, MessageAddedEvent
from ..hooks.registry import HookProvider, HookRegistry
from ..types.content import Message
if TYPE_CHECKING:
from ..agent.agent import Agent
from ..experimental.bidi.agent.agent import BidiAgent
from ..multiagent.base import MultiAgentBase
logger = logging.getLogger(__name__)
class SessionManager(HookProvider, ABC):
"""Abstract interface for managing sessions.
A session manager is in charge of persisting the conversation and state of an agent across its interaction.
Changes made to the agents conversation, state, or other attributes should be persisted immediately after
they are changed. The different methods introduced in this class are called at important lifecycle events
for an agent, and should be persisted in the session.
"""
def register_hooks(self, registry: HookRegistry, **kwargs: Any) -> None:
"""Register hooks for persisting the agent to the session."""
# After the normal Agent initialization behavior, call the session initialize function to restore the agent
registry.add_callback(AgentInitializedEvent, lambda event: self.initialize(event.agent))
# For each message appended to the Agents messages, store that message in the session
registry.add_callback(MessageAddedEvent, lambda event: self.append_message(event.message, event.agent))
# Sync the agent into the session for each message in case the agent state was updated
registry.add_callback(MessageAddedEvent, lambda event: self.sync_agent(event.agent))
# After an agent was invoked, sync it with the session to capture any conversation manager state updates
registry.add_callback(AfterInvocationEvent, lambda event: self.sync_agent(event.agent))
registry.add_callback(MultiAgentInitializedEvent, lambda event: self.initialize_multi_agent(event.source))
registry.add_callback(AfterNodeCallEvent, lambda event: self.sync_multi_agent(event.source))
registry.add_callback(AfterMultiAgentInvocationEvent, lambda event: self.sync_multi_agent(event.source))
# Register BidiAgent hooks
registry.add_callback(BidiAgentInitializedEvent, lambda event: self.initialize_bidi_agent(event.agent))
registry.add_callback(BidiMessageAddedEvent, lambda event: self.append_bidi_message(event.message, event.agent))
registry.add_callback(BidiMessageAddedEvent, lambda event: self.sync_bidi_agent(event.agent))
registry.add_callback(BidiAfterInvocationEvent, lambda event: self.sync_bidi_agent(event.agent))
@abstractmethod
def redact_latest_message(self, redact_message: Message, agent: "Agent", **kwargs: Any) -> None:
"""Redact the message most recently appended to the agent in the session.
Args:
redact_message: New message to use that contains the redact content
agent: Agent to apply the message redaction to
**kwargs: Additional keyword arguments for future extensibility.
"""
@abstractmethod
def append_message(self, message: Message, agent: "Agent", **kwargs: Any) -> None:
"""Append a message to the agent's session.
Args:
message: Message to add to the agent in the session
agent: Agent to append the message to
**kwargs: Additional keyword arguments for future extensibility.
"""
@abstractmethod
def sync_agent(self, agent: "Agent", **kwargs: Any) -> None:
"""Serialize and sync the agent with the session storage.
Args:
agent: Agent who should be synchronized with the session storage
**kwargs: Additional keyword arguments for future extensibility.
"""
@abstractmethod
def initialize(self, agent: "Agent", **kwargs: Any) -> None:
"""Initialize an agent with a session.
Args:
agent: Agent to initialize
**kwargs: Additional keyword arguments for future extensibility.
"""
def sync_multi_agent(self, source: "MultiAgentBase", **kwargs: Any) -> None:
"""Serialize and sync multi-agent with the session storage.
Args:
source: Multi-agent source object to persist
**kwargs: Additional keyword arguments for future extensibility.
"""
raise NotImplementedError(
f"{self.__class__.__name__} does not support multi-agent persistence "
"(sync_multi_agent). Provide an implementation or use a "
"SessionManager with session_type=SessionType.MULTI_AGENT."
)
def initialize_multi_agent(self, source: "MultiAgentBase", **kwargs: Any) -> None:
"""Read multi-agent state from persistent storage.
Args:
**kwargs: Additional keyword arguments for future extensibility.
source: Multi-agent state to initialize.
Returns:
Multi-agent state dictionary or empty dict if not found.
"""
raise NotImplementedError(
f"{self.__class__.__name__} does not support multi-agent persistence "
"(initialize_multi_agent). Provide an implementation or use a "
"SessionManager with session_type=SessionType.MULTI_AGENT."
)
def initialize_bidi_agent(self, agent: "BidiAgent", **kwargs: Any) -> None:
"""Initialize a bidirectional agent with a session.
Args:
agent: BidiAgent to initialize
**kwargs: Additional keyword arguments for future extensibility.
"""
raise NotImplementedError(
f"{self.__class__.__name__} does not support bidirectional agent persistence "
"(initialize_bidi_agent). Provide an implementation or use a "
"SessionManager with bidirectional agent support."
)
def append_bidi_message(self, message: Message, agent: "BidiAgent", **kwargs: Any) -> None:
"""Append a message to the bidirectional agent's session.
Args:
message: Message to add to the agent in the session
agent: BidiAgent to append the message to
**kwargs: Additional keyword arguments for future extensibility.
"""
raise NotImplementedError(
f"{self.__class__.__name__} does not support bidirectional agent persistence "
"(append_bidi_message). Provide an implementation or use a "
"SessionManager with bidirectional agent support."
)
def sync_bidi_agent(self, agent: "BidiAgent", **kwargs: Any) -> None:
"""Serialize and sync the bidirectional agent with the session storage.
Args:
agent: BidiAgent who should be synchronized with the session storage
**kwargs: Additional keyword arguments for future extensibility.
"""
raise NotImplementedError(
f"{self.__class__.__name__} does not support bidirectional agent persistence "
"(sync_bidi_agent). Provide an implementation or use a "
"SessionManager with bidirectional agent support."
)