""" Copilot Session - represents a single conversation session with the Copilot CLI. This module provides the CopilotSession class for managing individual conversation sessions with the Copilot CLI, along with all session-related configuration and handler types. """ from __future__ import annotations import asyncio import functools import inspect import logging import os import pathlib import threading import time from collections.abc import Awaitable, Callable from dataclasses import dataclass from datetime import UTC, datetime from types import TracebackType from typing import TYPE_CHECKING, Any, Literal, NotRequired, Required, TypedDict, cast from ._diagnostics import log_timing from ._jsonrpc import JsonRpcError, ProcessExitedError from ._telemetry import get_trace_context, trace_context from .canvas import CanvasError, CanvasHandler, OpenCanvasInstance from .generated.rpc import ( CanvasHandler as RpcCanvasHandler, ) from .generated.rpc import ( CanvasProviderCloseRequest, CanvasProviderInvokeActionRequest, CanvasProviderOpenRequest, CanvasProviderOpenResult, ClientSessionApiHandlers, CommandsHandlePendingCommandRequest, ExternalToolTextResultForLlm, HandlePendingToolCallRequest, LogRequest, ModelSwitchToRequest, PermissionDecision, PermissionDecisionApproveOnce, PermissionDecisionRequest, PermissionDecisionUserNotAvailable, SessionLogLevel, SessionRpc, UIElicitationRequest, UIElicitationResponse, UIElicitationResponseAction, UIElicitationSchema, UIElicitationSchemaProperty, UIElicitationSchemaPropertyType, UIElicitationSchemaType, UIHandlePendingElicitationRequest, ) from .generated.rpc import ( ContextTier as _RpcContextTier, ) from .generated.rpc import ModelCapabilitiesOverride as _RpcModelCapabilitiesOverride from .generated.session_events import ( AssistantMessageData, CapabilitiesChangedData, CommandExecuteData, ElicitationRequestedData, ExternalToolRequestedData, PermissionRequest, PermissionRequestedData, SessionCanvasOpenedData, SessionErrorData, SessionEvent, SessionIdleData, session_event_from_dict, ) from .generated.session_events import ( ReasoningSummary as _RpcReasoningSummary, ) from .tools import Tool, ToolHandler, ToolInvocation, ToolResult logger = logging.getLogger(__name__) if TYPE_CHECKING: from .client import ModelCapabilitiesOverride from .session_fs_provider import SessionFsProvider # Re-export SessionEvent under an alias used internally SessionEventTypeAlias = SessionEvent # ============================================================================ # Reasoning Effort # ============================================================================ ReasoningEffort = Literal["low", "medium", "high", "xhigh"] ReasoningSummary = Literal["none", "concise", "detailed"] ContextTier = Literal["default", "long_context"] SessionFsConventions = Literal["posix", "windows"] class SessionFsCapabilities(TypedDict, total=False): sqlite: bool class SessionFsConfig(TypedDict): initial_working_directory: str session_state_path: str conventions: SessionFsConventions capabilities: NotRequired[SessionFsCapabilities] # ============================================================================ # Attachment Types # ============================================================================ class SelectionRange(TypedDict): line: int character: int class Selection(TypedDict): start: SelectionRange end: SelectionRange class FileAttachment(TypedDict): """File attachment.""" type: Literal["file"] path: str displayName: NotRequired[str] class DirectoryAttachment(TypedDict): """Directory attachment.""" type: Literal["directory"] path: str displayName: NotRequired[str] class SelectionAttachment(TypedDict): """Selection attachment with text from a file.""" type: Literal["selection"] filePath: str displayName: str selection: NotRequired[Selection] text: NotRequired[str] class BlobAttachment(TypedDict): """Inline base64-encoded content attachment (e.g. images).""" type: Literal["blob"] data: str """Base64-encoded content""" mimeType: str """MIME type of the inline data""" displayName: NotRequired[str] Attachment = FileAttachment | DirectoryAttachment | SelectionAttachment | BlobAttachment # ============================================================================ # System Message Configuration # ============================================================================ class SystemMessageAppendConfig(TypedDict, total=False): """ Append mode: Use CLI foundation with optional appended content. """ mode: NotRequired[Literal["append"]] content: NotRequired[str] class SystemMessageReplaceConfig(TypedDict): """ Replace mode: Use caller-provided system message entirely. Removes all SDK guardrails including security restrictions. """ mode: Literal["replace"] content: str # Known system message section identifiers for the "customize" mode. SectionTransformFn = Callable[[str], str | Awaitable[str]] """Transform callback: receives current section content, returns new content.""" SectionOverrideAction = Literal["replace", "remove", "append", "prepend"] | SectionTransformFn """Override action: a string literal for static overrides, or a callback for transforms.""" SystemMessageSection = Literal[ "identity", "tone", "tool_efficiency", "environment_context", "code_change_rules", "guidelines", "safety", "tool_instructions", "custom_instructions", "runtime_instructions", "last_instructions", ] SYSTEM_MESSAGE_SECTIONS: dict[SystemMessageSection, str] = { "identity": "Agent identity preamble and mode statement", "tone": "Response style, conciseness rules, output formatting preferences", "tool_efficiency": "Tool usage patterns, parallel calling, batching guidelines", "environment_context": "CWD, OS, git root, directory listing, available tools", "code_change_rules": "Coding rules, linting/testing, ecosystem tools, style", "guidelines": "Tips, behavioral best practices, behavioral guidelines", "safety": "Environment limitations, prohibited actions, security policies", "tool_instructions": "Per-tool usage instructions", "custom_instructions": "Repository and organization custom instructions", "runtime_instructions": ( "Runtime-provided context and instructions" " (e.g. system notifications, memories, workspace context," " mode-specific instructions, content-exclusion policy)" ), "last_instructions": ( "End-of-prompt instructions: parallel tool calling, persistence, task completion" ), } class SectionOverride(TypedDict, total=False): """Override operation for a single system message section.""" action: Required[SectionOverrideAction] content: NotRequired[str] class SystemMessageCustomizeConfig(TypedDict, total=False): """ Customize mode: Override individual sections of the system prompt. Keeps the SDK-managed prompt structure while allowing targeted modifications. """ mode: Required[Literal["customize"]] sections: NotRequired[dict[SystemMessageSection, SectionOverride]] content: NotRequired[str] SystemMessageConfig = ( SystemMessageAppendConfig | SystemMessageReplaceConfig | SystemMessageCustomizeConfig ) # ============================================================================ # Permission Types # ============================================================================ @dataclass class PermissionNoResult: """Sentinel returned by a permission handler to leave the request unanswered. Only meaningful against protocol-v1 servers. v2 servers reject ``no-result`` responses; the SDK raises :class:`ValueError` if a v2 server receives one. Mirrors the ``{kind: "no-result"}`` extension TS adds to its ``PermissionDecision`` union (see ``nodejs/src/types.ts:883``). """ kind: Literal["no-result"] = "no-result" # The decision returned by a permission handler. Identical shape to the wire # ``PermissionDecision`` discriminated union, plus a :class:`PermissionNoResult` # sentinel for v1 servers. Construct via the generated variant classes: # ``PermissionDecisionApproveOnce()``, ``PermissionDecisionReject(feedback=...)``, # etc. The ``kind`` discriminator is baked in as a ``ClassVar`` default by # codegen, so callers must not pass it. PermissionRequestResult = PermissionDecision | PermissionNoResult _PermissionHandlerFn = Callable[ [PermissionRequest, dict[str, str]], PermissionRequestResult | Awaitable[PermissionRequestResult], ] class PermissionHandler: @staticmethod def approve_all( request: PermissionRequest, invocation: dict[str, str] ) -> PermissionRequestResult: return PermissionDecisionApproveOnce() # ============================================================================ # User Input Request Types # ============================================================================ class UserInputRequest(TypedDict, total=False): """Request for user input from the agent (enables ask_user tool)""" question: str choices: list[str] allowFreeform: bool class UserInputResponse(TypedDict): """Response to a user input request""" answer: str wasFreeform: bool UserInputHandler = Callable[ [UserInputRequest, dict[str, str]], UserInputResponse | Awaitable[UserInputResponse], ] class ExitPlanModeRequest(TypedDict, total=False): """Request to exit plan mode and continue with a selected action.""" summary: Required[str] planContent: NotRequired[str] actions: Required[list[str]] recommendedAction: Required[str] class ExitPlanModeResult(TypedDict, total=False): """Response to an exit-plan-mode request.""" approved: Required[bool] selectedAction: NotRequired[str] feedback: NotRequired[str] ExitPlanModeHandler = Callable[ [ExitPlanModeRequest, dict[str, str]], ExitPlanModeResult | Awaitable[ExitPlanModeResult], ] class AutoModeSwitchRequest(TypedDict, total=False): """Request to switch to auto mode after an eligible rate limit.""" errorCode: NotRequired[str] retryAfterSeconds: NotRequired[float] AutoModeSwitchResponse = Literal["yes", "yes_always", "no"] AutoModeSwitchHandler = Callable[ [AutoModeSwitchRequest, dict[str, str]], AutoModeSwitchResponse | Awaitable[AutoModeSwitchResponse], ] # ============================================================================ # Command Types # ============================================================================ @dataclass class CommandContext: """Context passed to a command handler when a command is executed.""" session_id: str """Session ID where the command was invoked.""" command: str """The full command text (e.g. ``"/deploy production"``).""" command_name: str """Command name without leading ``/``.""" args: str """Raw argument string after the command name.""" CommandHandler = Callable[[CommandContext], Awaitable[None] | None] """Handler invoked when a registered command is executed by a user.""" @dataclass class CommandDefinition: """Definition of a slash command registered with the session. When the CLI is running with a TUI, registered commands appear as ``/commandName`` for the user to invoke. """ name: str """Command name (without leading ``/``).""" handler: CommandHandler """Handler invoked when the command is executed.""" description: str | None = None """Human-readable description shown in command completion UI.""" # ============================================================================ # Session Capabilities # ============================================================================ class SessionUiCapabilities(TypedDict, total=False): """UI capabilities reported by the CLI host.""" elicitation: bool """Whether the host supports interactive elicitation dialogs.""" mcpApps: bool """**Experimental.** This capability is part of an experimental wire-protocol surface (SEP-1865) and may change or be removed in a future release. Whether the runtime has accepted the session's MCP Apps (SEP-1865) opt-in. ``True`` when the consumer set ``enable_mcp_apps=True`` on create/resume and the runtime's ``MCP_APPS`` feature flag (or ``COPILOT_MCP_APPS=true`` env override) is on. Otherwise absent or ``False``, indicating the runtime silently dropped the opt-in.""" class SessionCapabilities(TypedDict, total=False): """Capabilities reported by the CLI host for this session.""" ui: SessionUiCapabilities # ============================================================================ # Elicitation Types (client → server) # ============================================================================ ElicitationFieldValue = str | float | bool | list[str] """Possible value types in elicitation form content.""" class ElicitationResult(TypedDict, total=False): """Result returned from an elicitation request.""" action: Required[Literal["accept", "decline", "cancel"]] """User action: ``"accept"`` (submitted), ``"decline"`` (rejected), or ``"cancel"`` (dismissed).""" content: dict[str, ElicitationFieldValue] """Form values submitted by the user (present when action is ``"accept"``).""" class ElicitationParams(TypedDict): """Parameters for a raw elicitation request.""" message: str """Message describing what information is needed from the user.""" requestedSchema: dict[str, Any] """JSON Schema describing the form fields to present.""" class InputOptions(TypedDict, total=False): """Options for the ``input()`` convenience method.""" title: str """Title label for the input field.""" description: str """Descriptive text shown below the field.""" minLength: int """Minimum text length.""" maxLength: int """Maximum text length.""" format: str """Input format hint (e.g. ``"email"``, ``"uri"``, ``"date"``).""" default: str """Default value for the input field.""" # ============================================================================ # Elicitation Types (server → client callback) # ============================================================================ class ElicitationContext(TypedDict, total=False): """Context for an elicitation handler invocation, combining the request data with session context. Mirrors the single-argument pattern of CommandContext.""" session_id: Required[str] """Identifier of the session that triggered the elicitation request.""" message: Required[str] """Message describing what information is needed from the user.""" requestedSchema: dict[str, Any] """JSON Schema describing the form fields to present.""" mode: Literal["form", "url"] """Elicitation mode: ``"form"`` for structured input, ``"url"`` for browser redirect.""" elicitationSource: str """The source that initiated the request (e.g. MCP server name).""" url: str """URL to open in the browser (when mode is ``"url"``).""" ElicitationHandler = Callable[ [ElicitationContext], ElicitationResult | Awaitable[ElicitationResult], ] """Handler invoked when the server dispatches an elicitation request to this client.""" CreateSessionFsHandler = Callable[["CopilotSession"], "SessionFsProvider"] # ============================================================================ # Session UI API # ============================================================================ class SessionUiApi: """Interactive UI methods for showing dialogs to the user. Only available when the CLI host supports elicitation (``session.capabilities["ui"]["elicitation"] is True``). Obtained via :attr:`CopilotSession.ui`. """ def __init__(self, session: CopilotSession) -> None: self._session = session async def elicitation(self, params: ElicitationParams) -> ElicitationResult: """Shows a generic elicitation dialog with a custom schema. Args: params: Elicitation parameters including message and requestedSchema. Returns: The user's response (action + optional content). Raises: RuntimeError: If the host does not support elicitation. """ self._session._assert_elicitation() rpc_result = await self._session.rpc.ui.elicitation( UIElicitationRequest( message=params["message"], requested_schema=UIElicitationSchema.from_dict(params["requestedSchema"]), ) ) result: ElicitationResult = {"action": rpc_result.action.value} if rpc_result.content is not None: result["content"] = rpc_result.content return result async def confirm(self, message: str) -> bool: """Shows a confirmation dialog and returns the user's boolean answer. Args: message: The question to ask the user. Returns: ``True`` if the user accepted, ``False`` otherwise. Raises: RuntimeError: If the host does not support elicitation. """ self._session._assert_elicitation() rpc_result = await self._session.rpc.ui.elicitation( UIElicitationRequest( message=message, requested_schema=UIElicitationSchema( type=UIElicitationSchemaType.OBJECT, properties={ "confirmed": UIElicitationSchemaProperty( type=UIElicitationSchemaPropertyType.BOOLEAN, default=True, ), }, required=["confirmed"], ), ) ) return ( rpc_result.action == UIElicitationResponseAction.ACCEPT and rpc_result.content is not None and rpc_result.content.get("confirmed") is True ) async def select(self, message: str, options: list[str]) -> str | None: """Shows a selection dialog with a list of options. Args: message: Instruction to show the user. options: List of choices the user can pick from. Returns: The selected string, or ``None`` if the user declined/cancelled. Raises: RuntimeError: If the host does not support elicitation. """ self._session._assert_elicitation() rpc_result = await self._session.rpc.ui.elicitation( UIElicitationRequest( message=message, requested_schema=UIElicitationSchema( type=UIElicitationSchemaType.OBJECT, properties={ "selection": UIElicitationSchemaProperty( type=UIElicitationSchemaPropertyType.STRING, enum=options, ), }, required=["selection"], ), ) ) if ( rpc_result.action == UIElicitationResponseAction.ACCEPT and rpc_result.content is not None and rpc_result.content.get("selection") is not None ): return str(rpc_result.content["selection"]) return None async def input(self, message: str, options: InputOptions | None = None) -> str | None: """Shows a text input dialog. Args: message: Instruction to show the user. options: Optional constraints for the input field. Returns: The entered text, or ``None`` if the user declined/cancelled. Raises: RuntimeError: If the host does not support elicitation. """ self._session._assert_elicitation() field: dict[str, Any] = {"type": "string"} if options: for key in ("title", "description", "minLength", "maxLength", "format", "default"): if key in options: field[key] = options[key] rpc_result = await self._session.rpc.ui.elicitation( UIElicitationRequest( message=message, requested_schema=UIElicitationSchema.from_dict( { "type": "object", "properties": {"value": field}, "required": ["value"], } ), ) ) if ( rpc_result.action == UIElicitationResponseAction.ACCEPT and rpc_result.content is not None and rpc_result.content.get("value") is not None ): return str(rpc_result.content["value"]) return None # ============================================================================ # Hook Types # ============================================================================ class PreToolUseHookInput(TypedDict): """Input for pre-tool-use hook""" sessionId: str timestamp: datetime workingDirectory: str toolName: str toolArgs: Any class PreToolUseHookOutput(TypedDict, total=False): """Output for pre-tool-use hook""" permissionDecision: Literal["allow", "deny", "ask"] permissionDecisionReason: str modifiedArgs: Any additionalContext: str suppressOutput: bool PreToolUseHandler = Callable[ [PreToolUseHookInput, dict[str, str]], PreToolUseHookOutput | None | Awaitable[PreToolUseHookOutput | None], ] class PreMcpToolCallHookInput(TypedDict): """Input for pre-MCP-tool-call hook""" sessionId: str timestamp: datetime workingDirectory: str serverName: str toolName: str arguments: Any toolCallId: NotRequired[str] _meta: NotRequired[dict[str, Any]] class PreMcpToolCallHookOutput(TypedDict, total=False): """Output for pre-MCP-tool-call hook. metaToUse semantics: - Key absent: preserve the current request _meta - Key present with None value: omit _meta from the request - Key present with dict value: use this dict as request _meta """ metaToUse: dict[str, Any] | None PreMcpToolCallHandler = Callable[ [PreMcpToolCallHookInput, dict[str, str]], PreMcpToolCallHookOutput | None | Awaitable[PreMcpToolCallHookOutput | None], ] class PostToolUseHookInput(TypedDict): """Input for post-tool-use hook""" sessionId: str timestamp: datetime workingDirectory: str toolName: str toolArgs: Any toolResult: Any class PostToolUseHookOutput(TypedDict, total=False): """Output for post-tool-use hook""" modifiedResult: Any additionalContext: str suppressOutput: bool PostToolUseHandler = Callable[ [PostToolUseHookInput, dict[str, str]], PostToolUseHookOutput | None | Awaitable[PostToolUseHookOutput | None], ] class PostToolUseFailureHookInput(TypedDict): """Input for post-tool-use-failure hook. Fires after a tool execution whose result was ``"failure"``. The CLI extracts the failure message from the tool result and passes it as the ``error`` field (rather than passing the full result object). """ sessionId: str timestamp: datetime workingDirectory: str toolName: str toolArgs: Any error: str class PostToolUseFailureHookOutput(TypedDict, total=False): """Output for post-tool-use-failure hook. Only ``additionalContext`` is consumed by the host CLI — it is appended as hidden guidance to the model alongside the failed tool result. """ additionalContext: str PostToolUseFailureHandler = Callable[ [PostToolUseFailureHookInput, dict[str, str]], PostToolUseFailureHookOutput | None | Awaitable[PostToolUseFailureHookOutput | None], ] class UserPromptSubmittedHookInput(TypedDict): """Input for user-prompt-submitted hook""" sessionId: str timestamp: datetime workingDirectory: str prompt: str class UserPromptSubmittedHookOutput(TypedDict, total=False): """Output for user-prompt-submitted hook""" modifiedPrompt: str additionalContext: str suppressOutput: bool UserPromptSubmittedHandler = Callable[ [UserPromptSubmittedHookInput, dict[str, str]], UserPromptSubmittedHookOutput | None | Awaitable[UserPromptSubmittedHookOutput | None], ] class SessionStartHookInput(TypedDict): """Input for session-start hook""" sessionId: str timestamp: datetime workingDirectory: str source: Literal["startup", "resume", "new"] initialPrompt: NotRequired[str] class SessionStartHookOutput(TypedDict, total=False): """Output for session-start hook""" additionalContext: str modifiedConfig: dict[str, Any] SessionStartHandler = Callable[ [SessionStartHookInput, dict[str, str]], SessionStartHookOutput | None | Awaitable[SessionStartHookOutput | None], ] class SessionEndHookInput(TypedDict): """Input for session-end hook""" sessionId: str timestamp: datetime workingDirectory: str reason: Literal["complete", "error", "abort", "timeout", "user_exit"] finalMessage: NotRequired[str] error: NotRequired[str] class SessionEndHookOutput(TypedDict, total=False): """Output for session-end hook""" suppressOutput: bool cleanupActions: list[str] sessionSummary: str SessionEndHandler = Callable[ [SessionEndHookInput, dict[str, str]], SessionEndHookOutput | None | Awaitable[SessionEndHookOutput | None], ] class ErrorOccurredHookInput(TypedDict): """Input for error-occurred hook""" sessionId: str timestamp: datetime workingDirectory: str error: str errorContext: Literal["model_call", "tool_execution", "system", "user_input"] recoverable: bool class ErrorOccurredHookOutput(TypedDict, total=False): """Output for error-occurred hook""" suppressOutput: bool errorHandling: Literal["retry", "skip", "abort"] retryCount: int userNotification: str ErrorOccurredHandler = Callable[ [ErrorOccurredHookInput, dict[str, str]], ErrorOccurredHookOutput | None | Awaitable[ErrorOccurredHookOutput | None], ] class SessionHooks(TypedDict, total=False): """Configuration for session hooks""" on_pre_tool_use: PreToolUseHandler on_pre_mcp_tool_call: PreMcpToolCallHandler on_post_tool_use: PostToolUseHandler on_post_tool_use_failure: PostToolUseFailureHandler on_user_prompt_submitted: UserPromptSubmittedHandler on_session_start: SessionStartHandler on_session_end: SessionEndHandler on_error_occurred: ErrorOccurredHandler # ============================================================================ # MCP Server Configuration Types # ============================================================================ class MCPStdioServerConfig(TypedDict, total=False): """Configuration for a local/stdio MCP server.""" tools: list[str] # List of tools to include. [] means none. "*" means all. type: NotRequired[Literal["local", "stdio"]] # Server type timeout: NotRequired[int] # Timeout in milliseconds command: str # Command to run args: NotRequired[list[str]] # Command arguments env: NotRequired[dict[str, str]] # Environment variables working_directory: NotRequired[str] # Working directory class MCPHTTPServerConfig(TypedDict, total=False): """Configuration for a remote MCP server (HTTP or SSE).""" tools: list[str] # List of tools to include. [] means none. "*" means all. type: Literal["http", "sse"] # Server type timeout: NotRequired[int] # Timeout in milliseconds url: str # URL of the remote server headers: NotRequired[dict[str, str]] # HTTP headers MCPServerConfig = MCPStdioServerConfig | MCPHTTPServerConfig # ============================================================================ # Custom Agent Configuration Types # ============================================================================ class CustomAgentConfig(TypedDict, total=False): """Configuration for a custom agent.""" name: str # Unique name of the custom agent display_name: NotRequired[str] # Display name for UI purposes description: NotRequired[str] # Description of what the agent does # List of tool names the agent can use tools: NotRequired[list[str] | None] prompt: str # The prompt content for the agent # MCP servers specific to agent mcp_servers: NotRequired[dict[str, MCPServerConfig]] infer: NotRequired[bool] # Whether agent is available for model inference # Skill names to preload into this agent's context at startup (opt-in; omit for none) skills: NotRequired[list[str]] # Model identifier (e.g. "claude-haiku-4.5"); runtime falls back to parent model if unavailable model: NotRequired[str] class DefaultAgentConfig(TypedDict, total=False): """Configuration for the default agent. The default agent is the built-in agent that handles turns when no custom agent is selected. """ # List of tool names to exclude from the default agent. # These tools remain available to custom sub-agents that reference them. excluded_tools: list[str] class InfiniteSessionConfig(TypedDict, total=False): """ Configuration for infinite sessions with automatic context compaction and workspace persistence. When enabled, sessions automatically manage context window limits through background compaction and persist state to a workspace directory. """ # Whether infinite sessions are enabled (default: True) enabled: bool # Context utilization threshold (0.0-1.0) at which background compaction starts. # Compaction runs asynchronously, allowing the session to continue processing. # Default: 0.80 background_compaction_threshold: float # Context utilization threshold (0.0-1.0) at which the session blocks until # compaction completes. This prevents context overflow when compaction hasn't # finished in time. Default: 0.95 buffer_exhaustion_threshold: float class LargeToolOutputConfig(TypedDict, total=False): """ Configuration for handling large tool outputs. When a tool produces output exceeding the configured size, the output is written to a temp file and a reference is returned to the model instead of the full payload. """ # Whether large output handling is enabled. Default True. enabled: bool # Maximum size in bytes before output is written to a temp file. Default 50KB. max_size_bytes: int # Directory to write temp files to. Defaults to the OS temp directory. output_directory: str # ============================================================================ # Session Configuration # ============================================================================ class AzureProviderOptions(TypedDict, total=False): """Azure-specific provider configuration""" api_version: str # Azure API version. Defaults to "2024-10-21". class ProviderConfig(TypedDict, total=False): """Configuration for a custom API provider""" type: Literal["openai", "azure", "anthropic"] wire_api: Literal["completions", "responses"] base_url: str api_key: str # Bearer token for authentication. Sets the Authorization header directly. # Use this for services requiring bearer token auth instead of API key. # Takes precedence over api_key when both are set. bearer_token: str azure: AzureProviderOptions # Azure-specific options headers: dict[str, str] # Well-known model name used by the runtime to look up agent configuration # (tools, prompts, reasoning behavior) and default token limits. Also used # as the wire model when wire_model is not set. # Falls back to SessionConfig.model. model_id: str # Model name sent to the provider API for inference. Use this when the # provider's model name (e.g. an Azure deployment name or a custom # fine-tune name) differs from model_id. # Falls back to model_id, then SessionConfig.model. wire_model: str # Overrides the resolved model's default max prompt tokens. The runtime # triggers conversation compaction before sending a request when the prompt # (system message, history, tool definitions, user message) would exceed # this limit. max_prompt_tokens: int # Overrides the resolved model's default max output tokens. When hit, the # model stops generating and returns a truncated response. max_output_tokens: int SessionEventHandler = Callable[[SessionEvent], None] class _CanvasHandlerAdapter: def __init__(self, handler: CanvasHandler) -> None: self._handler = handler async def open(self, params: CanvasProviderOpenRequest) -> CanvasProviderOpenResult: try: return await self._handler.on_open(params) except CanvasError as err: raise JsonRpcError(-32603, err.message, data=err.to_envelope()) from err except Exception as err: raise _canvas_handler_error(err) from err async def close(self, params: CanvasProviderCloseRequest) -> None: try: await self._handler.on_close(params) except CanvasError as err: raise JsonRpcError(-32603, err.message, data=err.to_envelope()) from err except Exception as err: raise _canvas_handler_error(err) from err async def invoke(self, params: CanvasProviderInvokeActionRequest) -> Any: try: return await self._handler.on_action(params) except CanvasError as err: raise JsonRpcError(-32603, err.message, data=err.to_envelope()) from err except Exception as err: raise _canvas_handler_error(err) from err def _canvas_handler_error(err: Exception) -> JsonRpcError: return JsonRpcError( -32603, str(err), data={"code": "canvas_handler_error", "message": str(err)}, ) class CopilotSession: """ Represents a single conversation session with the Copilot CLI. A session maintains conversation state, handles events, and manages tool execution. Sessions are created via :meth:`CopilotClient.create_session` or resumed via :meth:`CopilotClient.resume_session`. The session provides methods to send messages, subscribe to events, retrieve conversation history, and manage the session lifecycle. Attributes: session_id: The unique identifier for this session. Example: >>> async with await client.create_session( ... on_permission_request=PermissionHandler.approve_all, ... ) as session: ... # Subscribe to events ... unsubscribe = session.on(lambda event: print(event.type)) ... ... # Send a message ... await session.send("Hello, world!") ... ... # Clean up ... unsubscribe() """ def __init__( self, session_id: str, client: Any, workspace_path: os.PathLike[str] | str | None = None ): """ Initialize a new CopilotSession. Note: This constructor is internal. Use :meth:`CopilotClient.create_session` to create sessions. Args: session_id: The unique identifier for this session. client: The internal client connection to the Copilot CLI. workspace_path: Path to the session workspace directory (when infinite sessions enabled). """ self.session_id = session_id self._client = client self._workspace_path = os.fsdecode(workspace_path) if workspace_path is not None else None self._event_handlers: set[Callable[[SessionEvent], None]] = set() self._event_handlers_lock = threading.Lock() self._tool_handlers: dict[str, ToolHandler] = {} self._tool_handlers_lock = threading.Lock() self._permission_handler: _PermissionHandlerFn | None = None self._permission_handler_lock = threading.Lock() self._user_input_handler: UserInputHandler | None = None self._user_input_handler_lock = threading.Lock() self._exit_plan_mode_handler: ExitPlanModeHandler | None = None self._exit_plan_mode_handler_lock = threading.Lock() self._auto_mode_switch_handler: AutoModeSwitchHandler | None = None self._auto_mode_switch_handler_lock = threading.Lock() self._hooks: SessionHooks | None = None self._hooks_lock = threading.Lock() self._transform_callbacks: dict[str, SectionTransformFn] | None = None self._transform_callbacks_lock = threading.Lock() self._command_handlers: dict[str, CommandHandler] = {} self._command_handlers_lock = threading.Lock() self._elicitation_handler: ElicitationHandler | None = None self._elicitation_handler_lock = threading.Lock() self._capabilities: SessionCapabilities = {} self._client_session_apis = ClientSessionApiHandlers() self._canvas_handler: CanvasHandler | None = None self._canvas_handler_lock = threading.Lock() self._open_canvases: list[OpenCanvasInstance] = [] self._open_canvases_lock = threading.Lock() self._rpc: SessionRpc | None = None self._destroyed = False @property def rpc(self) -> SessionRpc: """Typed session-scoped RPC methods.""" if self._rpc is None: self._rpc = SessionRpc(self._client, self.session_id) return self._rpc @property def capabilities(self) -> SessionCapabilities: """Host capabilities reported when the session was created or resumed. Use this to check feature support before calling capability-gated APIs. """ return self._capabilities @property def ui(self) -> SessionUiApi: """Interactive UI methods for showing dialogs to the user. Only available when the CLI host supports elicitation (``session.capabilities.get("ui", {}).get("elicitation") is True``). Example: >>> ui_caps = session.capabilities.get("ui", {}) >>> if ui_caps.get("elicitation"): ... ok = await session.ui.confirm("Deploy to production?") """ return SessionUiApi(self) @functools.cached_property def workspace_path(self) -> pathlib.Path | None: """ Path to the session workspace directory when infinite sessions are enabled. Contains checkpoints/, plan.md, and files/ subdirectories. None if infinite sessions are disabled. """ # Done as a property as self._workspace_path is directly set from a server # response post-init. So it was either make sure all places directly setting # the attribute handle the None case appropriately, use a setter for the # attribute to do the conversion, or just do the conversion lazily via a getter. return pathlib.Path(self._workspace_path) if self._workspace_path else None async def send( self, prompt: str, *, attachments: list[Attachment] | None = None, mode: Literal["enqueue", "immediate"] | None = None, agent_mode: Literal["interactive", "plan", "autopilot", "shell"] | None = None, request_headers: dict[str, str] | None = None, display_prompt: str | None = None, ) -> str: """ Send a message to this session. The message is processed asynchronously. Subscribe to events via :meth:`on` to receive streaming responses and other session events. Use :meth:`send_and_wait` to block until the assistant finishes processing. Args: prompt: The message text to send. attachments: Optional file, directory, or selection attachments. mode: Message delivery mode (``"enqueue"`` or ``"immediate"``). agent_mode: The UI mode the agent was in when this message was sent (for example ``"plan"`` or ``"autopilot"``). Defaults to the session's current mode when unset. request_headers: Optional per-turn HTTP headers for outbound model requests. display_prompt: If provided, this is shown in the timeline instead of ``prompt``. Returns: The message ID assigned by the server, which can be used to correlate events. Raises: Exception: If the session has been disconnected or the connection fails. Example: >>> message_id = await session.send( ... "Explain this code", ... attachments=[{"type": "file", "path": "./src/main.py"}], ... ) """ params: dict[str, Any] = { "sessionId": self.session_id, "prompt": prompt, } if attachments is not None: params["attachments"] = attachments if mode is not None: params["mode"] = mode if agent_mode is not None: params["agentMode"] = agent_mode if request_headers is not None: params["requestHeaders"] = request_headers if display_prompt is not None: params["displayPrompt"] = display_prompt params.update(get_trace_context()) rpc_start = time.perf_counter() response = await self._client.request("session.send", params) message_id = response["messageId"] log_timing( logger, logging.DEBUG, "CopilotSession.send completed successfully", rpc_start, session_id=self.session_id, message_id=message_id, ) return message_id async def send_and_wait( self, prompt: str, *, attachments: list[Attachment] | None = None, mode: Literal["enqueue", "immediate"] | None = None, agent_mode: Literal["interactive", "plan", "autopilot", "shell"] | None = None, request_headers: dict[str, str] | None = None, display_prompt: str | None = None, timeout: float = 60.0, ) -> SessionEvent | None: """ Send a message to this session and wait until the session becomes idle. This is a convenience method that combines :meth:`send` with waiting for the session.idle event. Use this when you want to block until the assistant has finished processing the message. Events are still delivered to handlers registered via :meth:`on` while waiting. Args: prompt: The message text to send. attachments: Optional file, directory, or selection attachments. mode: Message delivery mode (``"enqueue"`` or ``"immediate"``). agent_mode: The UI mode the agent was in when this message was sent (for example ``"plan"`` or ``"autopilot"``). Defaults to the session's current mode when unset. request_headers: Optional per-turn HTTP headers for outbound model requests. display_prompt: If provided, this is shown in the timeline instead of ``prompt``. timeout: Timeout in seconds (default: 60). Controls how long to wait; does not abort in-flight agent work. Returns: The final assistant message event, or None if none was received. Raises: TimeoutError: If the timeout is reached before session becomes idle. Exception: If the session has been disconnected or the connection fails. Example: >>> from copilot.session_events import AssistantMessageData >>> response = await session.send_and_wait("What is 2+2?") >>> if response: ... match response.data: ... case AssistantMessageData() as data: ... print(data.content) """ total_start = time.perf_counter() idle_event = asyncio.Event() error_event: Exception | None = None last_assistant_message: SessionEvent | None = None first_assistant_message_logged = False def handler(event: SessionEventTypeAlias) -> None: nonlocal first_assistant_message_logged, last_assistant_message, error_event match event.data: case AssistantMessageData(): last_assistant_message = event if not first_assistant_message_logged: first_assistant_message_logged = True log_timing( logger, logging.DEBUG, "CopilotSession.send_and_wait first assistant message", total_start, session_id=self.session_id, ) case SessionIdleData(): log_timing( logger, logging.DEBUG, "CopilotSession.send_and_wait idle received", total_start, session_id=self.session_id, ) idle_event.set() case SessionErrorData() as data: error_event = Exception(f"Session error: {data.message or str(data)}") idle_event.set() unsubscribe = self.on(handler) try: await self.send( prompt, attachments=attachments, mode=mode, agent_mode=agent_mode, request_headers=request_headers, display_prompt=display_prompt, ) await asyncio.wait_for(idle_event.wait(), timeout=timeout) if error_event: log_timing( logger, logging.WARNING, "CopilotSession.send_and_wait failed", total_start, session_id=self.session_id, completed_by="error", ) raise error_event log_timing( logger, logging.DEBUG, "CopilotSession.send_and_wait complete", total_start, session_id=self.session_id, completed_by="idle", assistant_message_received=last_assistant_message is not None, ) return last_assistant_message except TimeoutError: log_timing( logger, logging.WARNING, "CopilotSession.send_and_wait failed", total_start, session_id=self.session_id, completed_by="timeout", ) raise TimeoutError(f"Timeout after {timeout}s waiting for session.idle") finally: unsubscribe() def on(self, handler: Callable[[SessionEvent], None]) -> Callable[[], None]: """ Subscribe to events from this session. Events include assistant messages, tool executions, errors, and session state changes. Multiple handlers can be registered and will all receive events. Args: handler: A callback function that receives session events. The function takes a single :class:`SessionEvent` argument and returns None. Returns: A function that, when called, unsubscribes the handler. Example: >>> from copilot.session_events import AssistantMessageData, SessionErrorData >>> def handle_event(event): ... match event.data: ... case AssistantMessageData() as data: ... print(f"Assistant: {data.content}") ... case SessionErrorData() as data: ... print(f"Error: {data.message}") >>> unsubscribe = session.on(handle_event) >>> # Later, to stop receiving events: >>> unsubscribe() """ with self._event_handlers_lock: self._event_handlers.add(handler) def unsubscribe(): with self._event_handlers_lock: self._event_handlers.discard(handler) return unsubscribe def _dispatch_event(self, event: SessionEvent) -> None: """ Dispatch an event to all registered handlers. Broadcast request events (external_tool.requested, permission.requested) are handled internally before being forwarded to user handlers. Note: This method is internal and should not be called directly. Args: event: The session event to dispatch to all handlers. """ dispatch_start = time.perf_counter() # Handle broadcast request events (protocol v3) before dispatching to user handlers. # Fire-and-forget: the response is sent asynchronously via RPC. self._handle_broadcast_event(event) with self._event_handlers_lock: handlers = list(self._event_handlers) for handler in handlers: try: handler(event) except Exception: logger.error("Unhandled exception in session event handler", exc_info=True) log_timing( logger, logging.DEBUG, "CopilotSession._dispatch_event dispatch", dispatch_start, session_id=self.session_id, event_type=event.type, ) def _handle_broadcast_event(self, event: SessionEvent) -> None: """Handle broadcast request events by executing local handlers and responding via RPC. Implements the protocol v3 broadcast model where tool calls and permission requests are broadcast as session events to all clients. """ match event.data: case ExternalToolRequestedData() as data: request_id = data.request_id tool_name = data.tool_name if not request_id or not tool_name: return handler = self._get_tool_handler(tool_name) if not handler: return # This client doesn't handle this tool; another client will. tool_call_id = data.tool_call_id or "" arguments = data.arguments tp = getattr(data, "traceparent", None) ts = getattr(data, "tracestate", None) asyncio.ensure_future( self._execute_tool_and_respond( request_id, tool_name, tool_call_id, arguments, handler, tp, ts ) ) case PermissionRequestedData() as data: if logger.isEnabledFor(logging.DEBUG): logger.debug( "CopilotSession._dispatch_event permission request received", extra={ "session_id": self.session_id, "event_type": event.type.value, }, ) request_id = data.request_id permission_request = data.permission_request if not request_id or not permission_request: return resolved_by_hook = getattr(data, "resolved_by_hook", None) if resolved_by_hook: return # Already resolved by a permissionRequest hook; no client action needed. with self._permission_handler_lock: perm_handler = self._permission_handler if not perm_handler: return # This client doesn't handle permissions; another client will. asyncio.ensure_future( self._execute_permission_and_respond( request_id, permission_request, perm_handler ) ) case CommandExecuteData() as data: request_id = data.request_id command_name = data.command_name command = data.command args = data.args if not request_id or not command_name: return asyncio.ensure_future( self._execute_command_and_respond( request_id, command_name, command or "", args or "" ) ) case ElicitationRequestedData() as data: with self._elicitation_handler_lock: handler = self._elicitation_handler if not handler: return request_id = data.request_id if not request_id: return context: ElicitationContext = { "session_id": self.session_id, "message": data.message or "", } if data.requested_schema is not None: context["requestedSchema"] = data.requested_schema.to_dict() if data.mode is not None: context["mode"] = data.mode.value if data.elicitation_source is not None: context["elicitationSource"] = data.elicitation_source if data.url is not None: context["url"] = data.url asyncio.ensure_future(self._handle_elicitation_request(context, request_id)) case CapabilitiesChangedData() as data: cap: SessionCapabilities = {} if data.ui is not None: ui_cap: SessionUiCapabilities = {} if data.ui.elicitation is not None: ui_cap["elicitation"] = data.ui.elicitation cap["ui"] = ui_cap self._capabilities = {**self._capabilities, **cap} case SessionCanvasOpenedData() as data: try: if ( not data.instance_id or not data.canvas_id or not data.extension_id or data.availability is None ): raise ValueError("missing required open canvas fields") self._upsert_open_canvas(OpenCanvasInstance.from_dict(data.to_dict())) except Exception as exc: logger.warning("failed to deserialize session.canvas.opened payload: %s", exc) async def _execute_tool_and_respond( self, request_id: str, tool_name: str, tool_call_id: str, arguments: Any, handler: ToolHandler, traceparent: str | None = None, tracestate: str | None = None, ) -> None: """Execute a tool handler and send the result back via HandlePendingToolCall RPC.""" try: invocation = ToolInvocation( session_id=self.session_id, tool_call_id=tool_call_id, tool_name=tool_name, arguments=arguments, ) with trace_context(traceparent, tracestate): handler_start = time.perf_counter() result = handler(invocation) if inspect.isawaitable(result): result = await result log_timing( logger, logging.DEBUG, "CopilotSession._execute_tool_and_respond tool dispatch", handler_start, session_id=self.session_id, request_id=request_id, tool_call_id=tool_call_id, tool_name=tool_name, ) tool_result: ToolResult if result is None: tool_result = ToolResult( text_result_for_llm="Tool returned no result.", result_type="failure", error="tool returned no result", tool_telemetry={}, ) else: tool_result = result # type: ignore[assignment] # Exception-originated failures (from define_tool's exception handler) are # sent via the top-level error param so the CLI formats them with its # standard "Failed to execute..." message. Deliberate user-returned # failures send the full structured result to preserve metadata. if tool_result._from_exception: rpc_start = time.perf_counter() await self.rpc.tools.handle_pending_tool_call( HandlePendingToolCallRequest( request_id=request_id, error=tool_result.error, ) ) log_timing( logger, logging.DEBUG, "CopilotSession._execute_tool_and_respond response sent successfully", rpc_start, session_id=self.session_id, request_id=request_id, tool_call_id=tool_call_id, tool_name=tool_name, ) else: rpc_start = time.perf_counter() await self.rpc.tools.handle_pending_tool_call( HandlePendingToolCallRequest( request_id=request_id, result=ExternalToolTextResultForLlm( text_result_for_llm=tool_result.text_result_for_llm, error=tool_result.error, result_type=tool_result.result_type, tool_telemetry=tool_result.tool_telemetry, ), ) ) log_timing( logger, logging.DEBUG, "CopilotSession._execute_tool_and_respond response sent successfully", rpc_start, session_id=self.session_id, request_id=request_id, tool_call_id=tool_call_id, tool_name=tool_name, ) except Exception as exc: try: await self.rpc.tools.handle_pending_tool_call( HandlePendingToolCallRequest( request_id=request_id, error=str(exc), ) ) except (JsonRpcError, ProcessExitedError, OSError): pass # Connection lost or RPC error — nothing we can do async def _execute_permission_and_respond( self, request_id: str, permission_request: Any, handler: _PermissionHandlerFn, ) -> None: """Execute a permission handler and respond via RPC.""" try: handler_start = time.perf_counter() result = handler(permission_request, {"session_id": self.session_id}) if inspect.isawaitable(result): result = await result log_timing( logger, logging.DEBUG, "CopilotSession._execute_permission_and_respond dispatch", handler_start, session_id=self.session_id, request_id=request_id, ) result = cast(PermissionRequestResult, result) if isinstance(result, PermissionNoResult): return rpc_start = time.perf_counter() await self.rpc.permissions.handle_pending_permission_request( PermissionDecisionRequest( request_id=request_id, result=result, ) ) log_timing( logger, logging.DEBUG, "CopilotSession._execute_permission_and_respond response sent successfully", rpc_start, session_id=self.session_id, request_id=request_id, ) except Exception: try: await self.rpc.permissions.handle_pending_permission_request( PermissionDecisionRequest( request_id=request_id, result=PermissionDecisionUserNotAvailable(), ) ) except (JsonRpcError, ProcessExitedError, OSError): pass # Connection lost or RPC error — nothing we can do async def _execute_command_and_respond( self, request_id: str, command_name: str, command: str, args: str, ) -> None: """Execute a command handler and send the result back via RPC.""" with self._command_handlers_lock: handler = self._command_handlers.get(command_name) if not handler: try: await self.rpc.commands.handle_pending_command( CommandsHandlePendingCommandRequest( request_id=request_id, error=f"Unknown command: {command_name}", ) ) except (JsonRpcError, ProcessExitedError, OSError): pass # Connection lost — nothing we can do return try: ctx = CommandContext( session_id=self.session_id, command=command, command_name=command_name, args=args, ) handler_start = time.perf_counter() result = handler(ctx) if inspect.isawaitable(result): await result log_timing( logger, logging.DEBUG, "CopilotSession._execute_command_and_respond dispatch", handler_start, session_id=self.session_id, request_id=request_id, command_name=command_name, ) rpc_start = time.perf_counter() await self.rpc.commands.handle_pending_command( CommandsHandlePendingCommandRequest(request_id=request_id) ) log_timing( logger, logging.DEBUG, "CopilotSession._execute_command_and_respond response sent successfully", rpc_start, session_id=self.session_id, request_id=request_id, command_name=command_name, ) except Exception as exc: message = str(exc) try: await self.rpc.commands.handle_pending_command( CommandsHandlePendingCommandRequest( request_id=request_id, error=message, ) ) except (JsonRpcError, ProcessExitedError, OSError): pass # Connection lost — nothing we can do async def _handle_elicitation_request( self, context: ElicitationContext, request_id: str, ) -> None: """Handle an elicitation.requested broadcast event. Invokes the registered handler and responds via handlePendingElicitation RPC. Auto-cancels on error so the server doesn't hang. """ with self._elicitation_handler_lock: handler = self._elicitation_handler if not handler: return try: handler_start = time.perf_counter() result = handler(context) if inspect.isawaitable(result): result = await result log_timing( logger, logging.DEBUG, "CopilotSession._handle_elicitation_request dispatch", handler_start, session_id=self.session_id, request_id=request_id, ) result = cast(ElicitationResult, result) action_val = result.get("action", "cancel") rpc_result = UIElicitationResponse( action=UIElicitationResponseAction(action_val), content=result.get("content"), ) rpc_start = time.perf_counter() await self.rpc.ui.handle_pending_elicitation( UIHandlePendingElicitationRequest( request_id=request_id, result=rpc_result, ) ) log_timing( logger, logging.DEBUG, "CopilotSession._handle_elicitation_request response sent successfully", rpc_start, session_id=self.session_id, request_id=request_id, ) except Exception: # Handler failed — attempt to cancel so the request doesn't hang try: await self.rpc.ui.handle_pending_elicitation( UIHandlePendingElicitationRequest( request_id=request_id, result=UIElicitationResponse( action=UIElicitationResponseAction.CANCEL, ), ) ) except (JsonRpcError, ProcessExitedError, OSError): pass # Connection lost or RPC error — nothing we can do def _assert_elicitation(self) -> None: """Raises if the host does not support elicitation.""" ui_caps = self._capabilities.get("ui", {}) if not ui_caps.get("elicitation"): raise RuntimeError( "Elicitation is not supported by the host. " "Check session.capabilities before calling UI methods." ) def _register_commands(self, commands: list[CommandDefinition] | None) -> None: """Register command handlers for this session. Args: commands: A list of CommandDefinition objects, or None to clear all commands. """ with self._command_handlers_lock: self._command_handlers.clear() if not commands: return for cmd in commands: self._command_handlers[cmd.name] = cmd.handler def _register_elicitation_handler(self, handler: ElicitationHandler | None) -> None: """Register the elicitation handler for this session. Args: handler: The handler to invoke when the server dispatches an elicitation request, or None to remove the handler. """ with self._elicitation_handler_lock: self._elicitation_handler = handler def _register_exit_plan_mode_handler(self, handler: ExitPlanModeHandler | None) -> None: """Register the exit-plan-mode handler for this session.""" with self._exit_plan_mode_handler_lock: self._exit_plan_mode_handler = handler def _register_auto_mode_switch_handler(self, handler: AutoModeSwitchHandler | None) -> None: """Register the auto-mode-switch handler for this session.""" with self._auto_mode_switch_handler_lock: self._auto_mode_switch_handler = handler def _register_canvas_handler(self, handler: CanvasHandler | None) -> None: """Register the canvas handler for this session.""" with self._canvas_handler_lock: self._canvas_handler = handler self._client_session_apis.canvas = ( cast(RpcCanvasHandler, _CanvasHandlerAdapter(handler)) if handler is not None else None ) def _get_canvas_handler(self) -> CanvasHandler | None: with self._canvas_handler_lock: return self._canvas_handler def _set_open_canvases(self, instances: list[OpenCanvasInstance]) -> None: with self._open_canvases_lock: self._open_canvases = list(instances) def _upsert_open_canvas(self, instance: OpenCanvasInstance) -> None: with self._open_canvases_lock: for index, existing in enumerate(self._open_canvases): if existing.instance_id == instance.instance_id: self._open_canvases[index] = instance return self._open_canvases.append(instance) @property def open_canvases(self) -> list[OpenCanvasInstance]: """Open canvas instances currently known to be open for this session. Populated from ``session.resume`` and live ``session.canvas.opened`` events. """ with self._open_canvases_lock: return list(self._open_canvases) def _set_capabilities(self, capabilities: SessionCapabilities | None) -> None: """Set the host capabilities for this session. Args: capabilities: The capabilities object from the create/resume response. """ self._capabilities: SessionCapabilities = capabilities if capabilities is not None else {} def _register_tools(self, tools: list[Tool] | None) -> None: """ Register custom tool handlers for this session. Tools with handlers allow the assistant to execute custom functions automatically. Declaration-only tools are surfaced as events and left pending for the consumer. Note: This method is internal. Tools are typically registered when creating a session via :meth:`CopilotClient.create_session`. Args: tools: A list of Tool objects with their handlers, or None to clear all registered tools. """ with self._tool_handlers_lock: self._tool_handlers.clear() if not tools: return for tool in tools: if not tool.name or not tool.handler: continue self._tool_handlers[tool.name] = tool.handler def _get_tool_handler(self, name: str) -> ToolHandler | None: """ Retrieve a registered tool handler by name. Note: This method is internal and should not be called directly. Args: name: The name of the tool to retrieve. Returns: The tool handler if found, or None if no handler is registered for the given name. """ with self._tool_handlers_lock: return self._tool_handlers.get(name) def _register_permission_handler(self, handler: _PermissionHandlerFn | None) -> None: """ Register a handler for permission requests. When the assistant needs permission to perform certain actions (e.g., file operations), this handler is called to approve or deny the request. Note: This method is internal. Permission handlers are typically registered when creating a session via :meth:`CopilotClient.create_session`. Args: handler: The permission handler function, or None to remove the handler. """ with self._permission_handler_lock: self._permission_handler = handler async def _handle_permission_request( self, request: PermissionRequest ) -> PermissionRequestResult: """ Handle a permission request from the Copilot CLI. Note: This method is internal and should not be called directly. Args: request: The permission request data from the CLI. Returns: A dictionary containing the permission decision with a "kind" key. """ with self._permission_handler_lock: handler = self._permission_handler if not handler: # No handler registered, deny permission. return PermissionDecisionUserNotAvailable() try: handler_start = time.perf_counter() result = handler(request, {"session_id": self.session_id}) if inspect.isawaitable(result): result = await result log_timing( logger, logging.DEBUG, "CopilotSession._handle_permission_request dispatch", handler_start, session_id=self.session_id, ) return cast(PermissionRequestResult, result) except Exception: # pylint: disable=broad-except # Handler failed, deny permission. logger.debug( "Error handling permission request", extra={"session_id": self.session_id}, exc_info=True, ) return PermissionDecisionUserNotAvailable() def _register_user_input_handler(self, handler: UserInputHandler | None) -> None: """ Register a handler for user input requests. When the agent needs input from the user (via ask_user tool), this handler is called to provide the response. Note: This method is internal. User input handlers are typically registered when creating a session via :meth:`CopilotClient.create_session`. Args: handler: The user input handler function, or None to remove the handler. """ with self._user_input_handler_lock: self._user_input_handler = handler async def _handle_user_input_request(self, request: dict) -> UserInputResponse: """ Handle a user input request from the Copilot CLI. Note: This method is internal and should not be called directly. Args: request: The user input request data from the CLI. Returns: A dictionary containing the user's response. """ with self._user_input_handler_lock: handler = self._user_input_handler if not handler: raise RuntimeError("User input requested but no handler registered") try: handler_start = time.perf_counter() result = handler( UserInputRequest( question=request.get("question", ""), choices=request.get("choices") or [], allowFreeform=request.get("allowFreeform", True), ), {"session_id": self.session_id}, ) if inspect.isawaitable(result): result = await result log_timing( logger, logging.DEBUG, "CopilotSession._handle_user_input_request dispatch", handler_start, session_id=self.session_id, ) return cast(UserInputResponse, result) except Exception: raise async def _handle_exit_plan_mode_request(self, request: dict) -> ExitPlanModeResult: """Handle an exitPlanMode.request callback from the runtime.""" with self._exit_plan_mode_handler_lock: handler = self._exit_plan_mode_handler if not handler: return {"approved": True} handler_start = time.perf_counter() typed_request = ExitPlanModeRequest( summary=request.get("summary", ""), actions=request.get("actions") or [], recommendedAction=request.get("recommendedAction", "autopilot"), ) if request.get("planContent") is not None: typed_request["planContent"] = request["planContent"] result = handler(typed_request, {"session_id": self.session_id}) if inspect.isawaitable(result): result = await result log_timing( logger, logging.DEBUG, "CopilotSession._handle_exit_plan_mode_request dispatch", handler_start, session_id=self.session_id, ) return cast(ExitPlanModeResult, result) async def _handle_auto_mode_switch_request(self, request: dict) -> AutoModeSwitchResponse: """Handle an autoModeSwitch.request callback from the runtime.""" with self._auto_mode_switch_handler_lock: handler = self._auto_mode_switch_handler if not handler: return "no" handler_start = time.perf_counter() typed_request = AutoModeSwitchRequest() if request.get("errorCode") is not None: typed_request["errorCode"] = request["errorCode"] if request.get("retryAfterSeconds") is not None: typed_request["retryAfterSeconds"] = request["retryAfterSeconds"] result = handler(typed_request, {"session_id": self.session_id}) if inspect.isawaitable(result): result = await result log_timing( logger, logging.DEBUG, "CopilotSession._handle_auto_mode_switch_request dispatch", handler_start, session_id=self.session_id, ) return result def _register_transform_callbacks( self, callbacks: dict[str, SectionTransformFn] | None ) -> None: """Register transform callbacks for system message sections.""" with self._transform_callbacks_lock: self._transform_callbacks = callbacks def _register_hooks(self, hooks: SessionHooks | None) -> None: """ Register hook handlers for session lifecycle events. Hooks allow custom logic to be executed at various points during the session lifecycle (before/after tool use, session start/end, etc.). Note: This method is internal. Hooks are typically registered when creating a session via :meth:`CopilotClient.create_session`. Args: hooks: The hooks configuration object, or None to remove all hooks. """ with self._hooks_lock: self._hooks = hooks async def _handle_system_message_transform( self, sections: dict[str, dict[str, str]] ) -> dict[str, dict[str, dict[str, str]]]: """Handle a systemMessage.transform request from the runtime.""" transform_start = time.perf_counter() with self._transform_callbacks_lock: callbacks = self._transform_callbacks result: dict[str, dict[str, str]] = {} for section_id, section_data in sections.items(): content = section_data.get("content", "") callback = callbacks.get(section_id) if callbacks else None if callback: try: transformed = callback(content) if inspect.isawaitable(transformed): transformed = await transformed result[section_id] = {"content": str(transformed)} except Exception: result[section_id] = {"content": content} else: result[section_id] = {"content": content} log_timing( logger, logging.DEBUG, "CopilotSession._handle_system_message_transform dispatch", transform_start, session_id=self.session_id, ) return {"sections": result} async def _handle_hooks_invoke(self, hook_type: str, input_data: Any) -> Any: """ Handle a hooks invocation from the Copilot CLI. Note: This method is internal and should not be called directly. Args: hook_type: The type of hook being invoked. input_data: The input data for the hook. Returns: The hook output, or None if no handler is registered. """ with self._hooks_lock: hooks = self._hooks if not hooks: return None handler_map = { "preToolUse": hooks.get("on_pre_tool_use"), "preMcpToolCall": hooks.get("on_pre_mcp_tool_call"), "postToolUse": hooks.get("on_post_tool_use"), "postToolUseFailure": hooks.get("on_post_tool_use_failure"), "userPromptSubmitted": hooks.get("on_user_prompt_submitted"), "sessionStart": hooks.get("on_session_start"), "sessionEnd": hooks.get("on_session_end"), "errorOccurred": hooks.get("on_error_occurred"), } handler = handler_map.get(hook_type) if not handler: return None try: handler_start = time.perf_counter() # Normalize input from the wire format: # - Remap wire key "cwd" to public API key "workingDirectory". # - Convert "timestamp" from epoch milliseconds to ``datetime`` so # hook handlers see a timezone-aware ``datetime`` rather than a # raw integer (matches TS PR #1357 Phase E). transformed: dict[str, Any] = dict(input_data) if "cwd" in transformed: transformed["workingDirectory"] = transformed.pop("cwd") timestamp = transformed.get("timestamp") if isinstance(timestamp, (int, float)): transformed["timestamp"] = datetime.fromtimestamp(timestamp / 1000, tz=UTC) # Each per-hook-type TypedDict is structurally compatible with the # normalized dict; cast to ``Any`` so ty doesn't try to narrow the # specific TypedDict variant from the runtime ``dict``. input_data = cast(Any, transformed) result = handler(input_data, {"session_id": self.session_id}) if inspect.isawaitable(result): result = await result log_timing( logger, logging.DEBUG, "CopilotSession._handle_hooks_invoke dispatch", handler_start, session_id=self.session_id, hook_type=hook_type, ) return result except Exception: # pylint: disable=broad-except # Hook failed, return None logger.warning( "Hook handler failed", extra={"session_id": self.session_id, "hook_type": hook_type}, exc_info=True, ) return None async def get_events(self) -> list[SessionEvent]: """ Retrieve all events and messages from this session's history. This returns the complete conversation history including user messages, assistant responses, tool executions, and other session events. Returns: A list of all session events in chronological order. Raises: Exception: If the session has been disconnected or the connection fails. Example: >>> from copilot.session_events import AssistantMessageData >>> events = await session.get_events() >>> for event in events: ... match event.data: ... case AssistantMessageData() as data: ... print(f"Assistant: {data.content}") """ response = await self._client.request("session.getMessages", {"sessionId": self.session_id}) # Convert dict events to SessionEvent objects events_dicts = response["events"] return [session_event_from_dict(event_dict) for event_dict in events_dicts] async def disconnect(self) -> None: """ Disconnect this session and release all in-memory resources (event handlers, tool handlers, permission handlers). Session state on disk (conversation history, planning state, artifacts) is preserved, so the conversation can be resumed later by calling :meth:`CopilotClient.resume_session` with the session ID. To permanently remove all session data including files on disk, use :meth:`CopilotClient.delete_session` instead. After calling this method, the session object can no longer be used. This method is idempotent—calling it multiple times is safe and will not raise an error if the session is already disconnected. Raises: Exception: If the connection fails (on first disconnect call). Example: >>> # Clean up when done — session can still be resumed later >>> await session.disconnect() """ # Ensure that the check and update of _destroyed are atomic so that # only the first caller proceeds to send the destroy RPC. with self._event_handlers_lock: if self._destroyed: return self._destroyed = True try: await self._client.request("session.destroy", {"sessionId": self.session_id}) finally: # Clear handlers even if the request fails. with self._event_handlers_lock: self._event_handlers.clear() with self._tool_handlers_lock: self._tool_handlers.clear() with self._permission_handler_lock: self._permission_handler = None with self._command_handlers_lock: self._command_handlers.clear() with self._elicitation_handler_lock: self._elicitation_handler = None with self._exit_plan_mode_handler_lock: self._exit_plan_mode_handler = None with self._auto_mode_switch_handler_lock: self._auto_mode_switch_handler = None async def __aenter__(self) -> CopilotSession: """Enable use as an async context manager.""" return self async def __aexit__( self, exc_type: type[BaseException] | None = None, exc_val: BaseException | None = None, exc_tb: TracebackType | None = None, ) -> None: """ Exit the async context manager. Automatically disconnects the session and releases all associated resources. """ await self.disconnect() async def abort(self) -> None: """ Abort the currently processing message in this session. Use this to cancel a long-running request. The session remains valid and can continue to be used for new messages. Raises: Exception: If the session has been disconnected or the connection fails. Example: >>> import asyncio >>> >>> # Start a long-running request >>> task = asyncio.create_task(session.send("Write a very long story...")) >>> >>> # Abort after 5 seconds >>> await asyncio.sleep(5) >>> await session.abort() """ await self._client.request("session.abort", {"sessionId": self.session_id}) async def set_model( self, model: str, *, reasoning_effort: str | None = None, reasoning_summary: ReasoningSummary | None = None, context_tier: ContextTier | None = None, model_capabilities: ModelCapabilitiesOverride | None = None, ) -> None: """ Change the model for this session. The new model takes effect for the next message. Conversation history is preserved. Args: model: Model ID to switch to (e.g., "gpt-4.1", "claude-sonnet-4"). reasoning_effort: Optional reasoning effort level for the new model (e.g., "low", "medium", "high", "xhigh"). reasoning_summary: Optional reasoning summary mode for supported models. Use "none" to suppress summary output regardless of whether reasoning is enabled. context_tier: Optional context window tier for supported models. Omit to use normal model behavior with no explicit tier. model_capabilities: Override individual model capabilities resolved by the runtime. Raises: Exception: If the session has been destroyed or the connection fails. Example: >>> await session.set_model("gpt-4.1") >>> await session.set_model("claude-sonnet-4.6", reasoning_effort="high") """ rpc_caps = None if model_capabilities is not None: from .client import _capabilities_to_dict rpc_caps = _RpcModelCapabilitiesOverride.from_dict( _capabilities_to_dict(model_capabilities) ) await self.rpc.model.switch_to( ModelSwitchToRequest( model_id=model, reasoning_effort=reasoning_effort, reasoning_summary=( _RpcReasoningSummary(reasoning_summary) if reasoning_summary is not None else None ), context_tier=(_RpcContextTier(context_tier) if context_tier is not None else None), model_capabilities=rpc_caps, ) ) async def log( self, message: str, *, level: str | None = None, ephemeral: bool | None = None, ) -> None: """ Log a message to the session timeline. The message appears in the session event stream and is visible to SDK consumers and (for non-ephemeral messages) persisted to the session event log on disk. Args: message: The human-readable message to log. level: Log severity level ("info", "warning", "error"). Defaults to "info". ephemeral: When True, the message is transient and not persisted to disk. Raises: Exception: If the session has been destroyed or the connection fails. Example: >>> await session.log("Processing started") >>> await session.log("Something looks off", level="warning") >>> await session.log("Operation failed", level="error") >>> await session.log("Temporary status update", ephemeral=True) """ params = LogRequest( message=message, level=SessionLogLevel(level) if level is not None else None, ephemeral=ephemeral, ) await self.rpc.log(params)