Skip to content

Commit a184fb1

Browse files
committed
fix(webapp): dedupe realtimeStreams array push on stream create
The PUT handler unconditionally appended streamId to TaskRun.realtimeStreams on every call, so repeat inits for the same (run, streamId) hot-looped a row UPDATE and bloated the array. Read first and only push when the streamId isn't already present, matching the existing append handler.
1 parent a5ba406 commit a184fb1

2 files changed

Lines changed: 25 additions & 9 deletions

File tree

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
---
2+
area: webapp
3+
type: fix
4+
---
5+
6+
Dedupe the `realtimeStreams` array push on `PUT /realtime/v1/streams/:runId/:target/:streamId` so repeat stream-init calls for the same `(run, streamId)` skip the row UPDATE, mirroring the existing append handler.

apps/webapp/app/routes/realtime.v1.streams.$runId.$target.$streamId.ts

Lines changed: 19 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -62,39 +62,49 @@ const { action } = createActionApiRoute(
6262

6363
if (request.method === "PUT") {
6464
// This is the "create" endpoint
65-
const updatedRun = await prisma.taskRun.update({
65+
const target = await prisma.taskRun.findFirst({
6666
where: {
6767
friendlyId: targetId,
6868
runtimeEnvironmentId: authentication.environment.id,
6969
},
70-
data: {
71-
realtimeStreams: {
72-
push: params.streamId,
73-
},
74-
},
7570
select: {
71+
id: true,
72+
realtimeStreams: true,
7673
realtimeStreamsVersion: true,
7774
completedAt: true,
7875
},
7976
});
8077

81-
if (updatedRun.completedAt) {
78+
if (!target) {
79+
return new Response("Run not found", { status: 404 });
80+
}
81+
82+
if (target.completedAt) {
8283
return new Response("Cannot initialize a realtime stream on a completed run", {
8384
status: 400,
8485
});
8586
}
8687

88+
if (!target.realtimeStreams.includes(params.streamId)) {
89+
await prisma.taskRun.update({
90+
where: { id: target.id },
91+
data: {
92+
realtimeStreams: { push: params.streamId },
93+
},
94+
});
95+
}
96+
8797
const realtimeStream = getRealtimeStreamInstance(
8898
authentication.environment,
89-
updatedRun.realtimeStreamsVersion,
99+
target.realtimeStreamsVersion,
90100
basinContext
91101
);
92102

93103
const { responseHeaders } = await realtimeStream.initializeStream(targetId, params.streamId);
94104

95105
return json(
96106
{
97-
version: updatedRun.realtimeStreamsVersion,
107+
version: target.realtimeStreamsVersion,
98108
},
99109
{ status: 202, headers: responseHeaders }
100110
);

0 commit comments

Comments
 (0)