diff --git a/packages/opencode/src/acp/agent.ts b/packages/opencode/src/acp/agent.ts index f8792393c608..3817aaaa3925 100644 --- a/packages/opencode/src/acp/agent.ts +++ b/packages/opencode/src/acp/agent.ts @@ -47,7 +47,8 @@ export namespace ACP { private connection: AgentSideConnection private config: ACPConfig private sdk: OpencodeClient - private sessionManager + private sessionManager: ACPSessionManager + private sessionAbortControllers = new Map() constructor(connection: AgentSideConnection, config: ACPConfig) { this.connection = connection @@ -65,8 +66,20 @@ export namespace ACP { { optionId: "always", kind: "allow_always", name: "Always allow" }, { optionId: "reject", kind: "reject_once", name: "Reject" }, ] - this.config.sdk.event.subscribe({ directory }).then(async (events) => { - for await (const event of events.stream) { + + // Create and store abort controller for this session's event stream + const abortController = new AbortController() + this.sessionAbortControllers.set(sessionId, abortController) + + this.config.sdk.event + .subscribe( + { directory }, + { + signal: abortController.signal, + }, + ) + .then(async (events) => { + for await (const event of events.stream) { switch (event.type) { case "permission.asked": try { @@ -345,6 +358,18 @@ export namespace ACP { } } }) + .catch((error) => { + // Ignore abort errors - they're expected during cleanup + if (error?.name === "AbortError" || error?.message?.includes("abort")) { + log.info("event stream aborted", { sessionId }) + return + } + log.error("event stream error", { error, sessionId }) + }) + .finally(() => { + // Clean up abort controller when stream ends + this.sessionAbortControllers.delete(sessionId) + }) } async initialize(params: InitializeRequest): Promise { @@ -962,6 +987,42 @@ export namespace ACP { { throwOnError: true }, ) } + + /** + * Dispose of all session resources including event streams and session map entries. + * This should be called when the agent is being destroyed to prevent memory leaks. + */ + async dispose() { + log.info("disposing agent", { sessionCount: this.sessionAbortControllers.size }) + + // Abort all event streams + for (const [sessionId, controller] of this.sessionAbortControllers.entries()) { + log.info("aborting event stream", { sessionId }) + controller.abort() + } + this.sessionAbortControllers.clear() + + // Clear all sessions from the manager + this.sessionManager.clear() + } + + /** + * Close a specific session and clean up its resources. + * This should be called when a session is explicitly closed/ended. + */ + closeSession(sessionId: string) { + log.info("closing session", { sessionId }) + + // Abort the event stream for this session + const controller = this.sessionAbortControllers.get(sessionId) + if (controller) { + controller.abort() + this.sessionAbortControllers.delete(sessionId) + } + + // Remove from session manager + this.sessionManager.delete(sessionId) + } } function toToolKind(toolName: string): ToolKind { diff --git a/packages/opencode/src/acp/session.ts b/packages/opencode/src/acp/session.ts index 70b658347055..0b15d4a14318 100644 --- a/packages/opencode/src/acp/session.ts +++ b/packages/opencode/src/acp/session.ts @@ -13,6 +13,14 @@ export class ACPSessionManager { this.sdk = sdk } + delete(sessionId: string): boolean { + return this.sessions.delete(sessionId) + } + + clear(): void { + this.sessions.clear() + } + async create(cwd: string, mcpServers: McpServer[], model?: ACPSessionState["model"]): Promise { const session = await this.sdk.session .create( diff --git a/packages/opencode/src/acp/types.ts b/packages/opencode/src/acp/types.ts index 42b23091237e..9126e2374a28 100644 --- a/packages/opencode/src/acp/types.ts +++ b/packages/opencode/src/acp/types.ts @@ -11,6 +11,7 @@ export interface ACPSessionState { modelID: string } modeId?: string + abortController?: AbortController } export interface ACPConfig { diff --git a/packages/opencode/src/cli/cmd/github.ts b/packages/opencode/src/cli/cmd/github.ts index 927c964c9d8b..cc0049b31773 100644 --- a/packages/opencode/src/cli/cmd/github.ts +++ b/packages/opencode/src/cli/cmd/github.ts @@ -475,6 +475,7 @@ export const GithubRunCommand = cmd({ let gitConfig: string let session: { id: string; title: string; version: string } let shareId: string | undefined + let unsubscribeSessionEvents: (() => void) | undefined let exitCode = 0 type PromptFiles = Awaited>["promptFiles"] const triggerCommentId = isCommentEvent @@ -814,6 +815,9 @@ export const GithubRunCommand = cmd({ } function subscribeSessionEvents() { + // Cleanup any existing subscription before creating a new one + unsubscribeSessionEvents?.() + const TOOL: Record = { todowrite: ["Todo", UI.Style.TEXT_WARNING_BOLD], todoread: ["Todo", UI.Style.TEXT_WARNING_BOLD], @@ -837,7 +841,7 @@ export const GithubRunCommand = cmd({ } let text = "" - Bus.subscribe(MessageV2.Event.PartUpdated, async (evt) => { + const unsubscribe = Bus.subscribe(MessageV2.Event.PartUpdated, async (evt) => { if (evt.properties.part.sessionID !== session.id) return //if (evt.properties.part.messageID === messageID) return const part = evt.properties.part @@ -864,6 +868,8 @@ export const GithubRunCommand = cmd({ } } }) + + unsubscribeSessionEvents = unsubscribe } async function summarize(response: string) { diff --git a/packages/opencode/src/cli/cmd/tui/app.tsx b/packages/opencode/src/cli/cmd/tui/app.tsx index 2ec1fb703f95..f436a020f2ed 100644 --- a/packages/opencode/src/cli/cmd/tui/app.tsx +++ b/packages/opencode/src/cli/cmd/tui/app.tsx @@ -110,11 +110,40 @@ export function tui(input: { // promise to prevent immediate exit return new Promise(async (resolve) => { const mode = await getTerminalBackgroundColor() + let exitCalled = false const onExit = async () => { + if (exitCalled) return + exitCalled = true await input.onExit?.() resolve() } + // Set up signal handlers for graceful terminal cleanup + const setupSignalHandlers = (cleanup: () => Promise) => { + const signals: NodeJS.Signals[] = ["SIGINT", "SIGTERM"] + + for (const signal of signals) { + process.once(signal, async () => { + // Only run cleanup if we're in a TTY and have a renderer to clean up + if (process.stdin.isTTY) { + // Reset terminal to a usable state + process.stdout.write("\x1b[?25h") // Show cursor + process.stdout.write("\x1b[0m") // Reset attributes + process.stdout.write("\x1b[r") // Reset scrolling region + // Exit alternate screen buffer if we're in one + process.stdout.write("\x1b[?1049l") + // Restore normal screen buffering + if (process.stdin.isRaw) process.stdin.setRawMode(false) + } + await cleanup() + process.exit(0) + }) + } + } + + // Set up signal handlers early, before TUI initialization + setupSignalHandlers(onExit) + render( () => { return ( diff --git a/packages/opencode/src/cli/cmd/tui/context/sync.tsx b/packages/opencode/src/cli/cmd/tui/context/sync.tsx index 0edc911344c3..3d6f9a48c49e 100644 --- a/packages/opencode/src/cli/cmd/tui/context/sync.tsx +++ b/packages/opencode/src/cli/cmd/tui/context/sync.tsx @@ -25,7 +25,7 @@ import { createSimpleContext } from "./helper" import type { Snapshot } from "@/snapshot" import { useExit } from "./exit" import { useArgs } from "./args" -import { batch, onMount } from "solid-js" +import { batch, onCleanup, onMount } from "solid-js" import { Log } from "@/util/log" import type { Path } from "@opencode-ai/sdk" @@ -104,7 +104,7 @@ export const { use: useSync, provider: SyncProvider } = createSimpleContext({ const sdk = useSDK() - sdk.event.listen((e) => { + const unsubscribe = sdk.event.listen((e) => { const event = e.details switch (event.type) { case "server.instance.disposed": @@ -307,6 +307,9 @@ export const { use: useSync, provider: SyncProvider } = createSimpleContext({ } }) + // Clean up event listener on unmount to prevent memory leak + onCleanup(unsubscribe) + const exit = useExit() const args = useArgs() diff --git a/packages/opencode/src/cli/cmd/tui/routes/session/footer.tsx b/packages/opencode/src/cli/cmd/tui/routes/session/footer.tsx index 8ace2fff3725..55c775a726da 100644 --- a/packages/opencode/src/cli/cmd/tui/routes/session/footer.tsx +++ b/packages/opencode/src/cli/cmd/tui/routes/session/footer.tsx @@ -3,6 +3,7 @@ import { useTheme } from "../../context/theme" import { useSync } from "../../context/sync" import { useDirectory } from "../../context/directory" import { useConnected } from "../../component/dialog-model" +import { useLocal } from "../../context/local" import { createStore } from "solid-js/store" import { useRoute } from "../../context/route" @@ -10,9 +11,11 @@ export function Footer() { const { theme } = useTheme() const sync = useSync() const route = useRoute() + const local = useLocal() const mcp = createMemo(() => Object.values(sync.data.mcp).filter((x) => x.status === "connected").length) const mcpError = createMemo(() => Object.values(sync.data.mcp).some((x) => x.status === "failed")) const lsp = createMemo(() => Object.keys(sync.data.lsp)) + const agents = createMemo(() => local.agent.list().length) const permissions = createMemo(() => { if (route.data.type !== "session") return [] return sync.data.permission[route.data.sessionID] ?? [] @@ -24,10 +27,10 @@ export function Footer() { welcome: false, }) - onMount(() => { - // Track all timeouts to ensure proper cleanup - const timeouts: ReturnType[] = [] + // Track timeouts at component scope for proper cleanup + const timeouts: ReturnType[] = [] + onMount(() => { function tick() { if (connected()) return if (!store.welcome) { @@ -43,10 +46,10 @@ export function Footer() { } } timeouts.push(setTimeout(() => tick(), 10_000)) + }) - onCleanup(() => { - timeouts.forEach(clearTimeout) - }) + onCleanup(() => { + timeouts.forEach(clearTimeout) }) return ( @@ -66,6 +69,10 @@ export function Footer() { {permissions().length > 1 ? "s" : ""} + + โ—ˆ {agents()} Agent + {agents() > 1 ? "s" : ""} + 0 ? theme.success : theme.textMuted }}>โ€ข {lsp().length} LSP diff --git a/packages/opencode/src/index.ts b/packages/opencode/src/index.ts index 6dc5e99e91ef..42b84a185a15 100644 --- a/packages/opencode/src/index.ts +++ b/packages/opencode/src/index.ts @@ -26,6 +26,33 @@ import { EOL } from "os" import { WebCommand } from "./cli/cmd/web" import { PrCommand } from "./cli/cmd/pr" import { SessionCommand } from "./cli/cmd/session" +import { Instance } from "./project/instance" + +// Track whether cleanup has been performed to avoid duplicate cleanup +let cleanupPerformed = false + +// Cleanup handler to dispose all instances (closes MCP clients, LSP servers, etc.) +async function performCleanup(signal: string) { + if (cleanupPerformed) return + cleanupPerformed = true + + Log.Default.info("cleanup triggered by signal", { signal }) + try { + await Instance.disposeAll() + } catch (error) { + Log.Default.error("error during cleanup", { error }) + } +} + +// Register signal handlers for cleanup +const signals = ["SIGTERM", "SIGINT"] as const +for (const signal of signals) { + process.on(signal, () => { + performCleanup(signal).finally(() => { + process.exit(128 + (signal === "SIGINT" ? 2 : 15)) + }) + }) +} process.on("unhandledRejection", (e) => { Log.Default.error("rejection", { @@ -39,121 +66,133 @@ process.on("uncaughtException", (e) => { }) }) -const cli = yargs(hideBin(process.argv)) - .parserConfiguration({ "populate--": true }) - .scriptName("opencode") - .wrap(100) - .help("help", "show help") - .alias("help", "h") - .version("version", "show version number", Installation.VERSION) - .alias("version", "v") - .option("print-logs", { - describe: "print logs to stderr", - type: "boolean", - }) - .option("log-level", { - describe: "log level", - type: "string", - choices: ["DEBUG", "INFO", "WARN", "ERROR"], - }) - .middleware(async (opts) => { - await Log.init({ - print: process.argv.includes("--print-logs"), - dev: Installation.isLocal(), - level: (() => { - if (opts.logLevel) return opts.logLevel as Log.Level - if (Installation.isLocal()) return "DEBUG" - return "INFO" - })(), +// Main function that wraps the CLI with proper cleanup +async function main() { + const cli = yargs(hideBin(process.argv)) + .parserConfiguration({ "populate--": true }) + .scriptName("opencode") + .wrap(100) + .help("help", "show help") + .alias("help", "h") + .version("version", "show version number", Installation.VERSION) + .alias("version", "v") + .option("print-logs", { + describe: "print logs to stderr", + type: "boolean", + }) + .option("log-level", { + describe: "log level", + type: "string", + choices: ["DEBUG", "INFO", "WARN", "ERROR"], }) + .middleware(async (opts) => { + await Log.init({ + print: process.argv.includes("--print-logs"), + dev: Installation.isLocal(), + level: (() => { + if (opts.logLevel) return opts.logLevel as Log.Level + if (Installation.isLocal()) return "DEBUG" + return "INFO" + })(), + }) - process.env.AGENT = "1" - process.env.OPENCODE = "1" + process.env.AGENT = "1" + process.env.OPENCODE = "1" - Log.Default.info("opencode", { - version: Installation.VERSION, - args: process.argv.slice(2), + Log.Default.info("opencode", { + version: Installation.VERSION, + args: process.argv.slice(2), + }) }) - }) - .usage("\n" + UI.logo()) - .completion("completion", "generate shell completion script") - .command(AcpCommand) - .command(McpCommand) - .command(TuiThreadCommand) - .command(AttachCommand) - .command(RunCommand) - .command(GenerateCommand) - .command(DebugCommand) - .command(AuthCommand) - .command(AgentCommand) - .command(UpgradeCommand) - .command(UninstallCommand) - .command(ServeCommand) - .command(WebCommand) - .command(ModelsCommand) - .command(StatsCommand) - .command(ExportCommand) - .command(ImportCommand) - .command(GithubCommand) - .command(PrCommand) - .command(SessionCommand) - .fail((msg, err) => { - if ( - msg?.startsWith("Unknown argument") || - msg?.startsWith("Not enough non-option arguments") || - msg?.startsWith("Invalid values:") - ) { + .usage("\n" + UI.logo()) + .completion("completion", "generate shell completion script") + .command(AcpCommand) + .command(McpCommand) + .command(TuiThreadCommand) + .command(AttachCommand) + .command(RunCommand) + .command(GenerateCommand) + .command(DebugCommand) + .command(AuthCommand) + .command(AgentCommand) + .command(UpgradeCommand) + .command(UninstallCommand) + .command(ServeCommand) + .command(WebCommand) + .command(ModelsCommand) + .command(StatsCommand) + .command(ExportCommand) + .command(ImportCommand) + .command(GithubCommand) + .command(PrCommand) + .command(SessionCommand) + .fail((msg, err) => { + if ( + msg?.startsWith("Unknown argument") || + msg?.startsWith("Not enough non-option arguments") || + msg?.startsWith("Invalid values:") + ) { + if (err) throw err + cli.showHelp("log") + } if (err) throw err - cli.showHelp("log") - } - if (err) throw err - process.exit(1) - }) - .strict() - -try { - await cli.parse() -} catch (e) { - let data: Record = {} - if (e instanceof NamedError) { - const obj = e.toObject() - Object.assign(data, { - ...obj.data, + process.exit(1) }) - } + .strict() - if (e instanceof Error) { - Object.assign(data, { - name: e.name, - message: e.message, - cause: e.cause?.toString(), - stack: e.stack, - }) - } + try { + await cli.parse() + } catch (e) { + let data: Record = {} + if (e instanceof NamedError) { + const obj = e.toObject() + Object.assign(data, { + ...obj.data, + }) + } - if (e instanceof ResolveMessage) { - Object.assign(data, { - name: e.name, - message: e.message, - code: e.code, - specifier: e.specifier, - referrer: e.referrer, - position: e.position, - importKind: e.importKind, - }) - } - Log.Default.error("fatal", data) - const formatted = FormatError(e) - if (formatted) UI.error(formatted) - if (formatted === undefined) { - UI.error("Unexpected error, check log file at " + Log.file() + " for more details" + EOL) - console.error(e instanceof Error ? e.message : String(e)) + if (e instanceof Error) { + Object.assign(data, { + name: e.name, + message: e.message, + cause: e.cause?.toString(), + stack: e.stack, + }) + } + + if (e instanceof ResolveMessage) { + Object.assign(data, { + name: e.name, + message: e.message, + code: e.code, + specifier: e.specifier, + referrer: e.referrer, + position: e.position, + importKind: e.importKind, + }) + } + Log.Default.error("fatal", data) + const formatted = FormatError(e) + if (formatted) UI.error(formatted) + if (formatted === undefined) { + UI.error("Unexpected error, check log file at " + Log.file() + " for more details" + EOL) + console.error(e instanceof Error ? e.message : String(e)) + } + process.exitCode = 1 } - process.exitCode = 1 -} finally { - // Some subprocesses don't react properly to SIGTERM and similar signals. - // Most notably, some docker-container-based MCP servers don't handle such signals unless - // run using `docker run --init`. - // Explicitly exit to avoid any hanging subprocesses. - process.exit() } + +// Run main with cleanup +main() + .then(async () => { + await performCleanup("main") + // Some subprocesses don't react properly to SIGTERM and similar signals. + // Most notably, some docker-container-based MCP servers don't handle such signals unless + // run using `docker run --init`. + // Explicitly exit to avoid any hanging subprocesses. + process.exit(process.exitCode ?? 0) + }) + .catch(async (error) => { + await performCleanup("error") + process.exit(1) + }) diff --git a/packages/opencode/src/lsp/client.ts b/packages/opencode/src/lsp/client.ts index 8704b65acb5b..5388d3309b35 100644 --- a/packages/opencode/src/lsp/client.ts +++ b/packages/opencode/src/lsp/client.ts @@ -241,6 +241,8 @@ export namespace LSPClient { connection.end() connection.dispose() input.server.process.kill() + diagnostics.clear() + for (const path in files) delete files[path] l.info("shutdown") }, } diff --git a/packages/opencode/src/lsp/index.ts b/packages/opencode/src/lsp/index.ts index 0fd3b69dfcd9..581ab2d01daf 100644 --- a/packages/opencode/src/lsp/index.ts +++ b/packages/opencode/src/lsp/index.ts @@ -10,6 +10,7 @@ import { Config } from "../config/config" import { spawn } from "child_process" import { Instance } from "../project/instance" import { Flag } from "@/flag/flag" +import { SessionRetry } from "../session/retry" export namespace LSP { const log = Log.create({ service: "lsp" }) @@ -85,7 +86,7 @@ export namespace LSP { if (cfg.lsp === false) { log.info("all LSPs are disabled") return { - broken: new Set(), + broken: new Map(), servers, clients, spawning: new Map>(), @@ -132,7 +133,7 @@ export namespace LSP { }) return { - broken: new Set(), + broken: new Map(), servers, clients, spawning: new Map>(), @@ -174,6 +175,37 @@ export namespace LSP { }) } + /** + * Check if a broken server should be retried based on exponential backoff. + * @returns true if the server is still broken (should be skipped), false if it should be retried + */ + function isBrokenWithBackoff(key: string): boolean { + const s = state.raw() + if (!s) return false + + const brokenEntry = s.broken.get(key) + if (!brokenEntry) return false + + const now = Date.now() + const elapsed = now - brokenEntry.failTime + + // Calculate exponential backoff delay + const retryDelay = SessionRetry.delay(brokenEntry.attemptCount) + + if (elapsed >= retryDelay) { + // Time to retry - remove from broken set + log.info(`Retrying broken LSP server ${key}`, { + attemptCount: brokenEntry.attemptCount, + elapsed, + retryDelay, + }) + s.broken.delete(key) + return false + } + + return true + } + async function getClients(file: string) { const s = await state() const extension = path.parse(file).ext || file @@ -183,11 +215,21 @@ export namespace LSP { const handle = await server .spawn(root) .then((value) => { - if (!value) s.broken.add(key) + if (!value) { + const existing = s.broken.get(key) + s.broken.set(key, { + failTime: Date.now(), + attemptCount: (existing?.attemptCount ?? 0) + 1, + }) + } return value }) .catch((err) => { - s.broken.add(key) + const existing = s.broken.get(key) + s.broken.set(key, { + failTime: Date.now(), + attemptCount: (existing?.attemptCount ?? 0) + 1, + }) log.error(`Failed to spawn LSP server ${server.id}`, { error: err }) return undefined }) @@ -200,7 +242,11 @@ export namespace LSP { server: handle, root, }).catch((err) => { - s.broken.add(key) + const existing = s.broken.get(key) + s.broken.set(key, { + failTime: Date.now(), + attemptCount: (existing?.attemptCount ?? 0) + 1, + }) handle.process.kill() log.error(`Failed to initialize LSP client ${server.id}`, { error: err }) return undefined @@ -214,10 +260,29 @@ export namespace LSP { const existing = s.clients.find((x) => x.root === root && x.serverID === server.id) if (existing) { handle.process.kill() + // Server was already connected - clear broken state + s.broken.delete(key) return existing } + // Add process exit handler for crash detection + handle.process.on("exit", (code, signal) => { + const idx = s.clients.findIndex((x) => x.root === root && x.serverID === server.id) + if (idx !== -1) { + s.clients.splice(idx, 1) + const brokenEntry = s.broken.get(key) + s.broken.set(key, { + failTime: Date.now(), + attemptCount: (brokenEntry?.attemptCount ?? 0) + 1, + }) + log.error(`LSP process ${server.id} exited unexpectedly`, { code, signal, root }) + Bus.publish(Event.Updated, {}) + } + }) + s.clients.push(client) + // Successfully connected - clear broken state + s.broken.delete(key) return client } @@ -226,7 +291,7 @@ export namespace LSP { const root = await server.root(file) if (!root) continue - if (s.broken.has(root + server.id)) continue + if (isBrokenWithBackoff(root + server.id)) continue const match = s.clients.find((x) => x.root === root && x.serverID === server.id) if (match) { @@ -268,7 +333,7 @@ export namespace LSP { if (server.extensions.length && !server.extensions.includes(extension)) continue const root = await server.root(file) if (!root) continue - if (s.broken.has(root + server.id)) continue + if (isBrokenWithBackoff(root + server.id)) continue return true } return false diff --git a/packages/opencode/src/mcp/index.ts b/packages/opencode/src/mcp/index.ts index 66843aedc119..8c3180872aed 100644 --- a/packages/opencode/src/mcp/index.ts +++ b/packages/opencode/src/mcp/index.ts @@ -151,6 +151,28 @@ export namespace MCP { type TransportWithAuth = StreamableHTTPClientTransport | SSEClientTransport const pendingOAuthTransports = new Map() + // Helper function to close a transport properly + async function closeTransport(transport: TransportWithAuth | undefined): Promise { + if (!transport) return + try { + // The transport should have a close method or similar cleanup + if (typeof (transport as any).close === "function") { + await (transport as any).close() + } + } catch (error) { + log.error("Failed to close transport", { error }) + } + } + + // Helper function to set a transport while properly cleaning up the old one + async function setPendingOAuthTransport(key: string, transport: TransportWithAuth): Promise { + const existing = pendingOAuthTransports.get(key) + if (existing) { + await closeTransport(existing) + } + pendingOAuthTransports.set(key, transport) + } + // Prompt cache types type PromptInfo = Awaited>["prompts"][number] @@ -378,7 +400,7 @@ export namespace MCP { }).catch((e) => log.debug("failed to show toast", { error: e })) } else { // Store transport for later finishAuth call - pendingOAuthTransports.set(key, transport) + await setPendingOAuthTransport(key, transport) status = { status: "needs_auth" as const } // Show toast for needs_auth Bus.publish(TuiEvent.ToastShow, { @@ -766,7 +788,7 @@ export namespace MCP { } catch (error) { if (error instanceof UnauthorizedError && capturedUrl) { // Store transport for finishAuth - pendingOAuthTransports.set(mcpName, transport) + await setPendingOAuthTransport(mcpName, transport) return { authorizationUrl: capturedUrl.toString() } } throw error @@ -888,6 +910,9 @@ export namespace MCP { export async function removeAuth(mcpName: string): Promise { await McpAuth.remove(mcpName) McpOAuthCallback.cancelPending(mcpName) + // Properly close the transport before removing it + const transport = pendingOAuthTransports.get(mcpName) + await closeTransport(transport) pendingOAuthTransports.delete(mcpName) await McpAuth.clearOAuthState(mcpName) log.info("removed oauth credentials", { mcpName }) diff --git a/packages/opencode/src/plugin/index.ts b/packages/opencode/src/plugin/index.ts index 84de520b81d7..0ba8e4ba0ddc 100644 --- a/packages/opencode/src/plugin/index.ts +++ b/packages/opencode/src/plugin/index.ts @@ -92,6 +92,13 @@ export namespace Plugin { return { hooks, input, + unsubscribe: undefined as (() => void) | undefined, + } + }, + async (state) => { + state.unsubscribe?.() + for (const hook of state.hooks) { + await hook.dispose?.() } }) @@ -117,13 +124,13 @@ export namespace Plugin { } export async function init() { - const hooks = await state().then((x) => x.hooks) + const s = await state() const config = await Config.get() - for (const hook of hooks) { + for (const hook of s.hooks) { // @ts-expect-error this is because we haven't moved plugin to sdk v2 await hook.config?.(config) } - Bus.subscribeAll(async (input) => { + s.unsubscribe = Bus.subscribeAll(async (input) => { const hooks = await state().then((x) => x.hooks) for (const hook of hooks) { hook["event"]?.({ diff --git a/packages/opencode/src/project/bootstrap.ts b/packages/opencode/src/project/bootstrap.ts index 56fe4d13e664..8b4cc82d6680 100644 --- a/packages/opencode/src/project/bootstrap.ts +++ b/packages/opencode/src/project/bootstrap.ts @@ -12,6 +12,20 @@ import { Vcs } from "./vcs" import { Log } from "@/util/log" import { ShareNext } from "@/share/share-next" +const commandSubscription = Instance.state( + () => { + const unsubscribe = Bus.subscribe(Command.Event.Executed, async (payload) => { + if (payload.properties.name === Command.Default.INIT) { + await Project.setInitialized(Instance.project.id) + } + }) + return { unsubscribe } + }, + async (state) => { + state.unsubscribe() + }, +) + export async function InstanceBootstrap() { Log.Default.info("bootstrapping", { directory: Instance.directory }) await Plugin.init() @@ -22,10 +36,5 @@ export async function InstanceBootstrap() { FileWatcher.init() File.init() Vcs.init() - - Bus.subscribe(Command.Event.Executed, async (payload) => { - if (payload.properties.name === Command.Default.INIT) { - await Project.setInitialized(Instance.project.id) - } - }) + commandSubscription() } diff --git a/packages/opencode/src/project/instance.ts b/packages/opencode/src/project/instance.ts index ddaa90f1e2b4..ca481b63232d 100644 --- a/packages/opencode/src/project/instance.ts +++ b/packages/opencode/src/project/instance.ts @@ -5,6 +5,7 @@ import { State } from "./state" import { iife } from "@/util/iife" import { GlobalBus } from "@/bus/global" import { Filesystem } from "@/util/filesystem" +import { createLruCache } from "@/util/cache" interface Context { directory: string @@ -12,7 +13,17 @@ interface Context { project: Project.Info } const context = Context.create("instance") -const cache = new Map>() +const cache = createLruCache>({ + maxEntries: 20, + onEvict: async (_key, value) => { + const ctx = await value.catch(() => null) + if (ctx) { + await context.provide(ctx, async () => { + await State.dispose(ctx.directory) + }) + } + }, +}) export const Instance = { async provide(input: { directory: string; init?: () => Promise; fn: () => R }): Promise { diff --git a/packages/opencode/src/provider/models.ts b/packages/opencode/src/provider/models.ts index c5465f9880ed..5f1943ee10f8 100644 --- a/packages/opencode/src/provider/models.ts +++ b/packages/opencode/src/provider/models.ts @@ -109,4 +109,26 @@ export namespace ModelsDev { } } -setInterval(() => ModelsDev.refresh(), 60 * 1000 * 60).unref() +let intervalId: ReturnType | undefined +let exitHandlerRegistered = false + +export function startRefreshInterval() { + if (intervalId) return + intervalId = setInterval(() => ModelsDev.refresh(), 60 * 1000 * 60).unref() + + // Register exit handler only once to prevent multiple registrations + if (!exitHandlerRegistered) { + exitHandlerRegistered = true + process.on("exit", stopRefreshInterval) + } +} + +export function stopRefreshInterval() { + if (intervalId) { + clearInterval(intervalId) + intervalId = undefined + } +} + +// Auto-start the interval on module load +startRefreshInterval() diff --git a/packages/opencode/src/provider/provider.ts b/packages/opencode/src/provider/provider.ts index bcb115edf419..268a34bfeb8b 100644 --- a/packages/opencode/src/provider/provider.ts +++ b/packages/opencode/src/provider/provider.ts @@ -13,6 +13,7 @@ import { Env } from "../env" import { Instance } from "../project/instance" import { Flag } from "../flag/flag" import { iife } from "@/util/iife" +import { createLruCache } from "@/util/cache" // Direct imports for bundled providers import { createAmazonBedrock, type AmazonBedrockProviderSettings } from "@ai-sdk/amazon-bedrock" @@ -680,7 +681,9 @@ export namespace Provider { } const providers: { [providerID: string]: Info } = {} - const languages = new Map() + const languages = createLruCache({ + maxEntries: 100, + }) const modelLoaders: { [providerID: string]: CustomModelLoader } = {} @@ -948,7 +951,15 @@ export namespace Provider { return { models: languages, providers, - sdk, + sdk: createLruCache({ + maxEntries: 50, + onEvict: (key, sdk) => { + // SDK may have cleanup methods + if (sdk && typeof sdk === "object" && "destroy" in sdk) { + sdk.destroy?.() + } + }, + }), modelLoaders, } }) @@ -980,7 +991,7 @@ export namespace Provider { const key = Bun.hash.xxHash32(JSON.stringify({ npm: model.api.npm, options })) const existing = s.sdk.get(key) - if (existing) return existing + if (existing) return existing.value const customFetch = options["fetch"] @@ -1034,7 +1045,7 @@ export namespace Provider { name: model.providerID, ...options, }) - s.sdk.set(key, loaded) + s.sdk.set(key, loaded.value) return loaded as SDK } @@ -1053,7 +1064,7 @@ export namespace Provider { name: model.providerID, ...options, }) - s.sdk.set(key, loaded) + s.sdk.set(key, loaded.value) return loaded as SDK } catch (e) { throw new InitError({ providerID: model.providerID }, { cause: e }) diff --git a/packages/opencode/src/tool/webfetch.ts b/packages/opencode/src/tool/webfetch.ts index e592caac23e4..18432e745fef 100644 --- a/packages/opencode/src/tool/webfetch.ts +++ b/packages/opencode/src/tool/webfetch.ts @@ -56,17 +56,20 @@ export const WebFetchTool = Tool.define("webfetch", { "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8" } - const response = await fetch(params.url, { - signal: AbortSignal.any([controller.signal, ctx.abort]), - headers: { - "User-Agent": - "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/143.0.0.0 Safari/537.36", - Accept: acceptHeader, - "Accept-Language": "en-US,en;q=0.9", - }, - }) - - clearTimeout(timeoutId) + let response: Response + try { + response = await fetch(params.url, { + signal: AbortSignal.any([controller.signal, ctx.abort]), + headers: { + "User-Agent": + "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/143.0.0.0 Safari/537.36", + Accept: acceptHeader, + "Accept-Language": "en-US,en;q=0.9", + }, + }) + } finally { + clearTimeout(timeoutId) + } if (!response.ok) { throw new Error(`Request failed with status code: ${response.status}`) diff --git a/packages/opencode/src/util/cache.ts b/packages/opencode/src/util/cache.ts new file mode 100644 index 000000000000..bcf1618c65bd --- /dev/null +++ b/packages/opencode/src/util/cache.ts @@ -0,0 +1,76 @@ +/** + * LRU cache with max entries limit for preventing memory leaks + */ + +export type LruCacheOpts = { + maxEntries?: number + onEvict?: (key: any, value: any) => void +} + +type LruCacheEntry = { + value: V + lastAccess: number +} + +export function createLruCache(opts: LruCacheOpts = {}) { + const { maxEntries = Infinity, onEvict } = opts + const cache = new Map>() + + function evictOne() { + let oldestKey: K | null = null + let oldestAccess = Infinity + + for (const [key, entry] of cache) { + if (entry.lastAccess < oldestAccess) { + oldestAccess = entry.lastAccess + oldestKey = key + } + } + + if (oldestKey !== null) { + delete_(oldestKey) + } + } + + function delete_(key: K): boolean { + const entry = cache.get(key) + if (!entry) return false + onEvict?.(key, entry.value) + return cache.delete(key) + } + + return { + get(key: K): V | undefined { + const entry = cache.get(key) + if (!entry) return undefined + entry.lastAccess = Date.now() + return entry.value + }, + + set(key: K, value: V): void { + if (cache.size >= maxEntries && !cache.has(key)) { + evictOne() + } + cache.set(key, { value, lastAccess: Date.now() }) + }, + + has(key: K): boolean { + return cache.has(key) + }, + + delete(key: K): boolean { + return delete_(key) + }, + + clear(): void { + for (const [key, entry] of cache) { + onEvict?.(key, entry.value) + } + cache.clear() + }, + + get size() { + return cache.size + }, + } +} diff --git a/packages/opencode/src/util/queue.ts b/packages/opencode/src/util/queue.ts index a1af53fe8f09..8eb7220d7e69 100644 --- a/packages/opencode/src/util/queue.ts +++ b/packages/opencode/src/util/queue.ts @@ -1,32 +1,306 @@ +interface QueueNode { + value: T + next: QueueNode | null +} + +interface QueueMetrics { + enqueued: number + dequeued: number + dropped: number + cacheHits: number + cacheMisses: number + currentSize: number +} + +interface CacheEntry { + value: T + expires: number +} + export class AsyncQueue implements AsyncIterable { - private queue: T[] = [] + private head: QueueNode | null = null + private tail: QueueNode | null = null + private size: number = 0 private resolvers: ((value: T) => void)[] = [] - push(item: T) { + // Bounded queue settings + private maxSize: number + private dropStrategy: "oldest" | "newest" | "block" + + // Cache for deduplication (optional) + private cache: Map> | null = null + private cacheKeyFn: ((item: T) => string) | null = null + private cacheTTL: number | null = null + private readonly DEFAULT_CACHE_TTL = 5000 // 5 seconds + + // Metrics + private metrics: QueueMetrics = { + enqueued: 0, + dequeued: 0, + dropped: 0, + cacheHits: 0, + cacheMisses: 0, + currentSize: 0, + } + + constructor(options?: { + maxSize?: number + dropStrategy?: "oldest" | "newest" | "block" + enableCache?: boolean + cacheKeyFn?: (item: T) => string + cacheTTL?: number + }) { + this.maxSize = options?.maxSize ?? Infinity + this.dropStrategy = options?.dropStrategy ?? "oldest" + + if (options?.enableCache) { + this.cache = new Map() + this.cacheKeyFn = options.cacheKeyFn ?? ((item: any) => JSON.stringify(item)) + this.cacheTTL = options.cacheTTL ?? this.DEFAULT_CACHE_TTL + } + } + + push(item: T): boolean { + // Check cache first if enabled + if (this.cache && this.cacheKeyFn) { + const key = this.cacheKeyFn(item) + const cached = this.cache.get(key) + if (cached && cached.expires > Date.now()) { + this.metrics.cacheHits++ + const resolve = this.resolvers.shift() + if (resolve) resolve(cached.value) + else this.enqueueNode(cached.value) + return true + } + this.metrics.cacheMisses++ + } + + // Handle bounded queue + if (this.size >= this.maxSize) { + if (this.dropStrategy === "newest") { + this.metrics.dropped++ + return false + } else if (this.dropStrategy === "oldest") { + this.dequeueNode() + this.metrics.dropped++ + } else { + // block strategy - wait for space + return this.pushWhenAvailable(item) + } + } + + // Store in cache if enabled + if (this.cache && this.cacheKeyFn) { + const key = this.cacheKeyFn(item) + this.cache.set(key, { + value: item, + expires: Date.now() + (this.cacheTTL ?? this.DEFAULT_CACHE_TTL), + }) + // Cleanup expired entries periodically + this.cleanupCache() + } + + this.enqueueNode(item) + const resolve = this.resolvers.shift() if (resolve) resolve(item) - else this.queue.push(item) + + return true + } + + private enqueueNode(item: T): void { + const node: QueueNode = { value: item, next: null } + if (!this.tail) { + this.head = this.tail = node + } else { + this.tail.next = node + this.tail = node + } + this.size++ + this.metrics.enqueued++ + this.metrics.currentSize = this.size + } + + private dequeueNode(): T | null { + if (!this.head) return null + const value = this.head.value + this.head = this.head.next + if (!this.head) this.tail = null + this.size-- + this.metrics.dequeued++ + this.metrics.currentSize = this.size + return value + } + + private async pushWhenAvailable(item: T): Promise { + return new Promise((resolve) => { + const checkAndPush = () => { + if (this.size < this.maxSize) { + this.enqueueNode(item) + const waiter = this.resolvers.shift() + if (waiter) waiter(item) + resolve(true) + } else { + // Retry after a short delay + setTimeout(checkAndPush, 10) + } + } + checkAndPush() + }) } async next(): Promise { - if (this.queue.length > 0) return this.queue.shift()! + const item = this.dequeueNode() + if (item) return item + return new Promise((resolve) => this.resolvers.push(resolve)) } async *[Symbol.asyncIterator]() { while (true) yield await this.next() } + + // Public API for metrics and introspection + getMetrics(): Readonly { + return { ...this.metrics } + } + + clear(): void { + this.head = this.tail = null + this.size = 0 + this.metrics.currentSize = 0 + } + + get length(): number { + return this.size + } + + get pending(): number { + return this.resolvers.length + } + + private cleanupCache(): void { + if (!this.cache) return + const now = Date.now() + for (const [key, entry] of this.cache.entries()) { + if (entry.expires <= now) { + this.cache.delete(key) + } + } + } + + clearCache(): void { + this.cache?.clear() + } +} + +export interface WorkOptions { + concurrency?: number | { min: number; max: number } + onProgress?: (completed: number, total: number) => void + onError?: (error: Error, item: T) => void + enableBatching?: boolean + batchSize?: number +} + +export interface WorkMetrics { + completed: number + failed: number + total: number + currentConcurrency: number } -export async function work(concurrency: number, items: T[], fn: (item: T) => Promise) { +export async function work( + concurrency: number | { min: number; max: number }, + items: T[], + fn: (item: T) => Promise, + options?: WorkOptions +): Promise { const pending = [...items] - await Promise.all( - Array.from({ length: concurrency }, async () => { - while (true) { - const item = pending.pop() - if (item === undefined) return + const total = items.length + let completed = 0 + let failed = 0 + let activeWorkers = 0 + + // Dynamic concurrency settings + const minConcurrency = typeof concurrency === "number" ? concurrency : concurrency.min + const maxConcurrency = typeof concurrency === "number" ? concurrency : concurrency.max + + // Batch processing for better throughput + const batchSize = options?.batchSize ?? 1 + const enableBatching = options?.enableBatching ?? batchSize > 1 + + // Result queue for worker coordination + const resultQueue = new AsyncQueue<{ success: boolean; error?: Error }>() + + const processBatch = async (batch: T[]): Promise => { + const promises = batch.map(async (item) => { + try { await fn(item) + completed++ + resultQueue.push({ success: true }) + } catch (error) { + failed++ + resultQueue.push({ success: false, error: error as Error }) + options?.onError?.(error as Error, item) + } + if (options?.onProgress) { + options.onProgress(completed + failed, total) + } + }) + await Promise.all(promises) + } + + const worker = async (): Promise => { + activeWorkers++ + while (pending.length > 0) { + // Check if we should scale down + if (activeWorkers > Math.ceil(pending.length / batchSize) && activeWorkers > minConcurrency) { + activeWorkers-- + return + } + + if (enableBatching) { + const batch: T[] = [] + for (let i = 0; i < batchSize && pending.length > 0; i++) { + batch.push(pending.pop()!) + } + if (batch.length > 0) { + await processBatch(batch) + } + } else { + const item = pending.pop() + if (item !== undefined) { + await processBatch([item]) + } + } + } + activeWorkers-- + } + + // Start with minimum concurrency + const initialWorkers = Math.min(minConcurrency, Math.ceil(total / batchSize)) + const workers = Array.from({ length: initialWorkers }, () => worker()) + + // Dynamic scaling: add workers if there's a backlog + const scaler = setInterval(() => { + const needed = Math.min( + maxConcurrency - activeWorkers, + Math.ceil(pending.length / batchSize) - activeWorkers + ) + if (needed > 0 && activeWorkers < maxConcurrency) { + for (let i = 0; i < needed; i++) { + workers.push(worker()) } - }), - ) + } + }, 100) // Check every 100ms + + await Promise.all(workers) + clearInterval(scaler) + + return { + completed, + failed, + total, + currentConcurrency: activeWorkers, + } } diff --git a/packages/plugin/src/index.ts b/packages/plugin/src/index.ts index e57eff579e63..261dc7440750 100644 --- a/packages/plugin/src/index.ts +++ b/packages/plugin/src/index.ts @@ -215,4 +215,8 @@ export interface Hooks { input: { sessionID: string; messageID: string; partID: string }, output: { text: string }, ) => Promise + /** + * Called when the plugin is being disposed/cleaned up + */ + dispose?: () => Promise } diff --git a/packages/slack/src/index.ts b/packages/slack/src/index.ts index d07e3dfb4161..8959c2aae11a 100644 --- a/packages/slack/src/index.ts +++ b/packages/slack/src/index.ts @@ -19,22 +19,53 @@ const opencode = await createOpencode({ }) console.log("โœ… Opencode server ready") -const sessions = new Map() +const sessions = new Map() + +// Session cleanup: remove sessions older than 1 hour +const SESSION_TIMEOUT_MS = 60 * 60 * 1000 +const MAX_SESSIONS = 100 + +function cleanupOldSessions() { + const now = Date.now() + for (const [key, session] of sessions.entries()) { + if (now - session.lastUsed > SESSION_TIMEOUT_MS || sessions.size > MAX_SESSIONS) { + sessions.delete(key) + console.log("๐Ÿงน Cleaned up session:", key) + } + } +} + +// Run cleanup periodically +setInterval(cleanupOldSessions, 5 * 60 * 1000) // Every 5 minutes + +const abortController = new AbortController() ;(async () => { - const events = await opencode.client.event.subscribe() - for await (const event of events.stream) { - if (event.type === "message.part.updated") { - const part = event.properties.part - if (part.type === "tool") { - // Find the session for this tool update - for (const [sessionKey, session] of sessions.entries()) { - if (session.sessionId === part.sessionID) { - handleToolUpdate(part, session.channel, session.thread) - break + try { + const events = await opencode.client.event.subscribe( + {}, + { signal: abortController.signal }, + ) + for await (const event of events.stream) { + if (event.type === "message.part.updated") { + const part = event.properties.part + if (part.type === "tool") { + // Find the session for this tool update + for (const [sessionKey, session] of sessions.entries()) { + if (session.sessionId === part.sessionID) { + session.lastUsed = Date.now() + handleToolUpdate(part, session.channel, session.thread) + break + } } } } } + } catch (error) { + if ((error as any).name === "AbortError") { + console.log("๐Ÿ›‘ Event stream aborted") + } else { + console.error("โŒ Event stream error:", error) + } } })() @@ -90,7 +121,7 @@ app.message(async ({ message, say }) => { console.log("โœ… Created opencode session:", createResult.data.id) - session = { client, server, sessionId: createResult.data.id, channel, thread } + session = { client, server, sessionId: createResult.data.id, channel, thread, lastUsed: Date.now() } sessions.set(sessionKey, session) const shareResult = await client.session.share({ path: { id: createResult.data.id } }) @@ -102,6 +133,7 @@ app.message(async ({ message, say }) => { } console.log("๐Ÿ“ Sending to opencode:", message.text) + session.lastUsed = Date.now() const result = await session.client.session.prompt({ path: { id: session.sessionId }, body: { parts: [{ type: "text", text: message.text }] }, @@ -143,3 +175,18 @@ app.command("/test", async ({ command, ack, say }) => { await app.start() console.log("โšก๏ธ Slack bot is running!") + +// Graceful shutdown handler +process.on("SIGINT", () => { + console.log("\n๐Ÿ›‘ Shutting down...") + abortController.abort() + sessions.clear() + process.exit(0) +}) + +process.on("SIGTERM", () => { + console.log("\n๐Ÿ›‘ Shutting down...") + abortController.abort() + sessions.clear() + process.exit(0) +}) diff --git a/packages/ui/src/components/session-turn.tsx b/packages/ui/src/components/session-turn.tsx index ae1321bac140..a8eb61dc71c0 100644 --- a/packages/ui/src/components/session-turn.tsx +++ b/packages/ui/src/components/session-turn.tsx @@ -271,6 +271,17 @@ export function SessionTurn( const isShellMode = createMemo(() => !!shellModePart()) + const hasReasoningParts = createMemo(() => { + for (const m of assistantMessages()) { + const msgParts = data.store.part[m.id] + if (!msgParts) continue + for (const p of msgParts) { + if (p?.type === "reasoning") return true + } + } + return false + }) + const rawStatus = createMemo(() => { const msgs = assistantMessages() let last: PartType | undefined @@ -376,6 +387,8 @@ export function SessionTurn( diffLimit: diffInit, status: rawStatus(), duration: duration(), + userMessageHovered: false, + showReasoning: false, }) createEffect( @@ -564,7 +577,7 @@ export function SessionTurn( message={assistantMessage} responsePartId={responsePartId()} hideResponsePart={hideResponsePart()} - hideReasoning={!working()} + hideReasoning={!working() && !store.showReasoning} /> )} @@ -596,6 +609,17 @@ export function SessionTurn(

Response

+ + + + +
+ + {(assistantMessage) => ( + + )} + +
+
diff --git a/packages/ui/src/components/tooltip.tsx b/packages/ui/src/components/tooltip.tsx index c38ee5847dbe..2331de4a5a92 100644 --- a/packages/ui/src/components/tooltip.tsx +++ b/packages/ui/src/components/tooltip.tsx @@ -1,5 +1,5 @@ import { Tooltip as KobalteTooltip } from "@kobalte/core/tooltip" -import { children, createSignal, Match, onMount, splitProps, Switch, type JSX } from "solid-js" +import { children, createSignal, Match, onCleanup, onMount, splitProps, Switch, type JSX } from "solid-js" import type { ComponentProps } from "solid-js" export interface TooltipProps extends ComponentProps { @@ -36,17 +36,34 @@ export function Tooltip(props: TooltipProps) { onMount(() => { const childElements = c() + const cleanupFns: (() => void)[] = [] + + const addListeners = (el: HTMLElement) => { + const focusHandler = () => setOpen(true) + const blurHandler = () => setOpen(false) + el.addEventListener("focus", focusHandler) + el.addEventListener("blur", blurHandler) + cleanupFns.push(() => { + el.removeEventListener("focus", focusHandler) + el.removeEventListener("blur", blurHandler) + }) + } + if (childElements instanceof HTMLElement) { - childElements.addEventListener("focus", () => setOpen(true)) - childElements.addEventListener("blur", () => setOpen(false)) + addListeners(childElements) } else if (Array.isArray(childElements)) { for (const child of childElements) { if (child instanceof HTMLElement) { - child.addEventListener("focus", () => setOpen(true)) - child.addEventListener("blur", () => setOpen(false)) + addListeners(child) } } } + + onCleanup(() => { + for (const cleanup of cleanupFns) { + cleanup() + } + }) }) return (