From a184fb17dc61da2276e704e72031b4e375998152 Mon Sep 17 00:00:00 2001 From: Eric Allam Date: Sun, 17 May 2026 22:19:57 +0100 Subject: [PATCH] fix(webapp): dedupe realtimeStreams array push on stream create The PUT handler unconditionally appended streamId to TaskRun.realtimeStreams on every call, so repeat inits for the same (run, streamId) hot-looped a row UPDATE and bloated the array. Read first and only push when the streamId isn't already present, matching the existing append handler. --- .server-changes/realtimestreams-dedupe.md | 6 ++++ ...ime.v1.streams.$runId.$target.$streamId.ts | 28 +++++++++++++------ 2 files changed, 25 insertions(+), 9 deletions(-) create mode 100644 .server-changes/realtimestreams-dedupe.md diff --git a/.server-changes/realtimestreams-dedupe.md b/.server-changes/realtimestreams-dedupe.md new file mode 100644 index 0000000000..69987f7b4b --- /dev/null +++ b/.server-changes/realtimestreams-dedupe.md @@ -0,0 +1,6 @@ +--- +area: webapp +type: fix +--- + +Dedupe the `realtimeStreams` array push on `PUT /realtime/v1/streams/:runId/:target/:streamId` so repeat stream-init calls for the same `(run, streamId)` skip the row UPDATE, mirroring the existing append handler. diff --git a/apps/webapp/app/routes/realtime.v1.streams.$runId.$target.$streamId.ts b/apps/webapp/app/routes/realtime.v1.streams.$runId.$target.$streamId.ts index 9ca8e36f4e..dd3d3bf31d 100644 --- a/apps/webapp/app/routes/realtime.v1.streams.$runId.$target.$streamId.ts +++ b/apps/webapp/app/routes/realtime.v1.streams.$runId.$target.$streamId.ts @@ -62,31 +62,41 @@ const { action } = createActionApiRoute( if (request.method === "PUT") { // This is the "create" endpoint - const updatedRun = await prisma.taskRun.update({ + const target = await prisma.taskRun.findFirst({ where: { friendlyId: targetId, runtimeEnvironmentId: authentication.environment.id, }, - data: { - realtimeStreams: { - push: params.streamId, - }, - }, select: { + id: true, + realtimeStreams: true, realtimeStreamsVersion: true, completedAt: true, }, }); - if (updatedRun.completedAt) { + if (!target) { + return new Response("Run not found", { status: 404 }); + } + + if (target.completedAt) { return new Response("Cannot initialize a realtime stream on a completed run", { status: 400, }); } + if (!target.realtimeStreams.includes(params.streamId)) { + await prisma.taskRun.update({ + where: { id: target.id }, + data: { + realtimeStreams: { push: params.streamId }, + }, + }); + } + const realtimeStream = getRealtimeStreamInstance( authentication.environment, - updatedRun.realtimeStreamsVersion, + target.realtimeStreamsVersion, basinContext ); @@ -94,7 +104,7 @@ const { action } = createActionApiRoute( return json( { - version: updatedRun.realtimeStreamsVersion, + version: target.realtimeStreamsVersion, }, { status: 202, headers: responseHeaders } );