Skip to content

fix(webapp): dedupe realtimeStreams array push on stream create#3651

Draft
ericallam wants to merge 1 commit into
mainfrom
hotfix/realtimestreams-dedupe
Draft

fix(webapp): dedupe realtimeStreams array push on stream create#3651
ericallam wants to merge 1 commit into
mainfrom
hotfix/realtimestreams-dedupe

Conversation

@ericallam
Copy link
Copy Markdown
Member

@ericallam ericallam commented May 17, 2026

Summary

The PUT handler at /realtime/v1/streams/:runId/:target/:streamId ran taskRun.update({ realtimeStreams: { push: streamId } }) on every call, even when the streamId was already present. SDK call patterns that re-initialize the same stream key on every chunk produce a per-write row UPDATE, duplicate entries pile up in the array, and the row-lock + TOAST rewrite cost grows unbounded on long-running stream sessions.

Fix

Mirror the sibling append handler: read the array first and only push when the streamId isn't already present. Identical behavior for first-time stream creation; repeat creates short-circuit to a single indexed read. The dashboard's per-run stream listing keeps working because the first create still records the entry.

Test plan

  • A fresh PUT for a new (run, streamId) adds the entry to the array
  • A repeat PUT for the same pair leaves the array unchanged
  • 404 is returned when the run doesn't exist; 400 when the run is completed

The PUT handler unconditionally appended streamId to TaskRun.realtimeStreams on
every call, so repeat inits for the same (run, streamId) hot-looped a row UPDATE
and bloated the array. Read first and only push when the streamId isn't already
present, matching the existing append handler.
@changeset-bot
Copy link
Copy Markdown

changeset-bot Bot commented May 17, 2026

⚠️ No Changeset found

Latest commit: a184fb1

Merging this PR will not cause a version bump for any packages. If these changes should not result in a new version, you're good to go. If these changes should result in a version bump, you need to add a changeset.

This PR includes no changesets

When changesets are added to this PR, you'll see the packages that this PR includes changesets for and the associated semver types

Click here to learn what changesets are, and how to add one.

Click here if you're a maintainer who wants to add a changeset to this PR

@coderabbitai
Copy link
Copy Markdown
Contributor

coderabbitai Bot commented May 17, 2026

Review Change Stack

Walkthrough

The PR implements deduplication of realtime stream writes in the PUT /realtime/v1/streams/:runId/:target/:streamId endpoint. The handler now fetches the target task run, validates it exists and is not completed, and conditionally appends the stream ID to the realtimeStreams array only if it is not already present. Stream initialization uses the target run's version, and the response returns this version value. A changelog entry documents the fix.

Estimated code review effort

🎯 2 (Simple) | ⏱️ ~8 minutes

🚥 Pre-merge checks | ✅ 4 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Description check ⚠️ Warning The description provides a comprehensive summary, detailed fix explanation, and test plan covering both success and failure cases. However, several required template sections are missing or incomplete: the issue reference (Closes #), the contributing guide checklist, testing steps documentation, and changelog formatting. Add the issue reference (Closes #XXXX), complete the checklist items, document the test execution steps taken, and add a properly formatted changelog entry as specified in the template.
✅ Passed checks (4 passed)
Check name Status Explanation
Title check ✅ Passed The title accurately summarizes the main change: deduplicating realtimeStreams array pushes on stream creation, which matches the core fix described in the PR objectives.
Docstring Coverage ✅ Passed No functions found in the changed files to evaluate docstring coverage. Skipping docstring coverage check.
Linked Issues check ✅ Passed Check skipped because no linked issues were found for this pull request.
Out of Scope Changes check ✅ Passed Check skipped because no linked issues were found for this pull request.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
📝 Generate docstrings
  • Create stacked PR
  • Commit on current branch
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch hotfix/realtimestreams-dedupe

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Nitpick comments (1)
apps/webapp/app/routes/realtime.v1.streams.$runId.$target.$streamId.ts (1)

63-110: ⚡ Quick win

Add @crumbs markers in this modified server path.

This changed block is missing crumbs annotations required by repo guidelines.

As per coding guidelines: "Add crumbs as you write code — not just when debugging. Mark lines with // @Crumbs or wrap blocks in `// `#region` `@crumbs."

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@apps/webapp/app/routes/realtime.v1.streams`.$runId.$target.$streamId.ts
around lines 63 - 110, This server PUT handler is missing required crumbs
annotations; add `// `@crumbs`` markers around the modified realtime stream flow
(for example place `// `@crumbs`` immediately above the block that checks
request.method === "PUT" or at least above the prisma.taskRun.findFirst call,
above the prisma.taskRun.update call that pushes to realtimeStreams, and above
the call to getRealtimeStreamInstance / realtimeStream.initializeStream) so the
create-initialize path (symbols: request.method === "PUT",
prisma.taskRun.findFirst, prisma.taskRun.update, getRealtimeStreamInstance,
realtimeStream.initializeStream, and the final json return) is annotated per
guidelines.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@apps/webapp/app/routes/realtime.v1.streams`.$runId.$target.$streamId.ts:
- Around line 65-95: Replace the separate findFirst + update with a single
conditional DB write: perform a prisma.taskRun.updateMany (or
prisma.taskRun.update with a NOT has check) where the record id equals target.id
AND NOT { realtimeStreams: { has: params.streamId } }, and set data: {
realtimeStreams: { push: params.streamId } }; then verify the returned count
(from updateMany) or catch a zero-result to detect "already present" instead of
relying on the prior membership check. This makes the check+append atomic (use
prisma.taskRun.updateMany with where: { id: target.id, NOT: { realtimeStreams: {
has: params.streamId } } } and data: { realtimeStreams: { push: params.streamId
} } and handle count === 0 accordingly).

---

Nitpick comments:
In `@apps/webapp/app/routes/realtime.v1.streams`.$runId.$target.$streamId.ts:
- Around line 63-110: This server PUT handler is missing required crumbs
annotations; add `// `@crumbs`` markers around the modified realtime stream flow
(for example place `// `@crumbs`` immediately above the block that checks
request.method === "PUT" or at least above the prisma.taskRun.findFirst call,
above the prisma.taskRun.update call that pushes to realtimeStreams, and above
the call to getRealtimeStreamInstance / realtimeStream.initializeStream) so the
create-initialize path (symbols: request.method === "PUT",
prisma.taskRun.findFirst, prisma.taskRun.update, getRealtimeStreamInstance,
realtimeStream.initializeStream, and the final json return) is annotated per
guidelines.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Repository UI

Review profile: CHILL

Plan: Pro

Run ID: b43b5778-4595-4cfa-9387-e3fb1a1ec9a6

📥 Commits

Reviewing files that changed from the base of the PR and between 55fa2d4 and a184fb1.

📒 Files selected for processing (2)
  • .server-changes/realtimestreams-dedupe.md
  • apps/webapp/app/routes/realtime.v1.streams.$runId.$target.$streamId.ts
📜 Review details
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (31)
  • GitHub Check: webapp / 🧪 Unit Tests: Webapp (6, 8)
  • GitHub Check: webapp / 🧪 Unit Tests: Webapp (7, 8)
  • GitHub Check: webapp / 🧪 Unit Tests: Webapp (8, 8)
  • GitHub Check: webapp / 🧪 Unit Tests: Webapp (5, 8)
  • GitHub Check: webapp / 🧪 Unit Tests: Webapp (4, 8)
  • GitHub Check: webapp / 🧪 Unit Tests: Webapp (2, 8)
  • GitHub Check: webapp / 🧪 Unit Tests: Webapp (3, 8)
  • GitHub Check: webapp / 🧪 Unit Tests: Webapp (1, 8)
  • GitHub Check: typecheck / typecheck
  • GitHub Check: e2e-webapp / 🧪 E2E Tests: Webapp
  • GitHub Check: 🛡️ E2E Auth Tests (full)
  • GitHub Check: Analyze (javascript-typescript)
  • GitHub Check: units / internal / 🧪 Unit Tests: Internal (6, 8)
  • GitHub Check: units / internal / 🧪 Unit Tests: Internal (7, 8)
  • GitHub Check: units / internal / 🧪 Unit Tests: Internal (5, 8)
  • GitHub Check: units / internal / 🧪 Unit Tests: Internal (8, 8)
  • GitHub Check: units / internal / 🧪 Unit Tests: Internal (4, 8)
  • GitHub Check: units / internal / 🧪 Unit Tests: Internal (3, 8)
  • GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (2, 8)
  • GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (8, 8)
  • GitHub Check: units / internal / 🧪 Unit Tests: Internal (1, 8)
  • GitHub Check: units / internal / 🧪 Unit Tests: Internal (2, 8)
  • GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (7, 8)
  • GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (4, 8)
  • GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (6, 8)
  • GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (5, 8)
  • GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (1, 8)
  • GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (3, 8)
  • GitHub Check: units / packages / 🧪 Unit Tests: Packages (1, 1)
  • GitHub Check: units / e2e-webapp / 🧪 E2E Tests: Webapp
  • GitHub Check: typecheck / typecheck
🧰 Additional context used
📓 Path-based instructions (7)
**/*.{ts,tsx}

📄 CodeRabbit inference engine (.github/copilot-instructions.md)

**/*.{ts,tsx}: Use types over interfaces for TypeScript
Avoid using enums; prefer string unions or const objects instead

**/*.{ts,tsx}: Import from @trigger.dev/core subpaths only, never from the root. Subpath imports must be used to maintain proper module boundaries.
When writing Trigger.dev tasks, always import from @trigger.dev/sdk. Never use @trigger.dev/sdk/v3 or deprecated client.defineJob.
Prisma is version 6.14.0. Use the Prisma client from internal-packages/database for all database operations.
For ClickHouse client, schema migrations, and analytics queries, use internal-packages/clickhouse.

Files:

  • apps/webapp/app/routes/realtime.v1.streams.$runId.$target.$streamId.ts
{packages/core,apps/webapp}/**/*.{ts,tsx}

📄 CodeRabbit inference engine (.github/copilot-instructions.md)

Use zod for validation in packages/core and apps/webapp

Files:

  • apps/webapp/app/routes/realtime.v1.streams.$runId.$target.$streamId.ts
**/*.{ts,tsx,js,jsx}

📄 CodeRabbit inference engine (.github/copilot-instructions.md)

Use function declarations instead of default exports

Add crumbs as you write code — not just when debugging. Mark lines with // @Crumbs or wrap blocks in `// `#region` `@crumbs. They stay on the branch throughout development and are stripped by agentcrumbs strip before merge.

Files:

  • apps/webapp/app/routes/realtime.v1.streams.$runId.$target.$streamId.ts
**/*.ts

📄 CodeRabbit inference engine (.cursor/rules/otel-metrics.mdc)

**/*.ts: When creating or editing OTEL metrics (counters, histograms, gauges), ensure metric attributes have low cardinality by using only enums, booleans, bounded error codes, or bounded shard IDs
Do not use high-cardinality attributes in OTEL metrics such as UUIDs/IDs (envId, userId, runId, projectId, organizationId), unbounded integers (itemCount, batchSize, retryCount), timestamps (createdAt, startTime), or free-form strings (errorMessage, taskName, queueName)
When exporting OTEL metrics via OTLP to Prometheus, be aware that the exporter automatically adds unit suffixes to metric names (e.g., 'my_duration_ms' becomes 'my_duration_ms_milliseconds', 'my_counter' becomes 'my_counter_total'). Account for these transformations when writing Grafana dashboards or Prometheus queries

Files:

  • apps/webapp/app/routes/realtime.v1.streams.$runId.$target.$streamId.ts
apps/webapp/**/*.{ts,tsx}

📄 CodeRabbit inference engine (.cursor/rules/webapp.mdc)

apps/webapp/**/*.{ts,tsx}: Access environment variables through the env export of env.server.ts instead of directly accessing process.env
Use subpath exports from @trigger.dev/core package instead of importing from the root @trigger.dev/core path

Use named constants for sentinel/placeholder values (e.g. const UNSET_VALUE = '__unset__') instead of raw string literals scattered across comparisons

Files:

  • apps/webapp/app/routes/realtime.v1.streams.$runId.$target.$streamId.ts
**/*.{ts,tsx,js,jsx,json,md,css,scss}

📄 CodeRabbit inference engine (AGENTS.md)

Code formatting is enforced using Prettier. Run pnpm run format before committing

Files:

  • apps/webapp/app/routes/realtime.v1.streams.$runId.$target.$streamId.ts
apps/**/*.{ts,tsx,js,jsx}

📄 CodeRabbit inference engine (CLAUDE.md)

When modifying only server components (apps/webapp/, apps/supervisor/, etc.) with no package changes, add a .server-changes/ file instead of a changeset. See .server-changes/README.md for format and documentation.

Files:

  • apps/webapp/app/routes/realtime.v1.streams.$runId.$target.$streamId.ts
🧠 Learnings (4)
📚 Learning: 2026-05-14T14:54:39.095Z
Learnt from: ericallam
Repo: triggerdotdev/trigger.dev PR: 3545
File: .server-changes/agent-view-sessions.md:10-10
Timestamp: 2026-05-14T14:54:39.095Z
Learning: In the `trigger.dev` repository, do not flag inconsistent dot vs slash notation in route/path strings inside `.server-changes/*.md` files. These markdown files are consumed verbatim into the changelog, so the mixed notation (e.g., `resources.orgs.../runs.$runParam/...`) is intentional and should be preserved as-is.

Applied to files:

  • .server-changes/realtimestreams-dedupe.md
📚 Learning: 2026-03-22T13:26:12.060Z
Learnt from: ericallam
Repo: triggerdotdev/trigger.dev PR: 3244
File: apps/webapp/app/components/code/TextEditor.tsx:81-86
Timestamp: 2026-03-22T13:26:12.060Z
Learning: In the triggerdotdev/trigger.dev codebase, do not flag `navigator.clipboard.writeText(...)` calls for `missing-await`/`unhandled-promise` issues. These clipboard writes are intentionally invoked without `await` and without `catch` handlers across the project; keep that behavior consistent when reviewing TypeScript/TSX files (e.g., usages like in `apps/webapp/app/components/code/TextEditor.tsx`).

Applied to files:

  • apps/webapp/app/routes/realtime.v1.streams.$runId.$target.$streamId.ts
📚 Learning: 2026-03-22T19:24:14.403Z
Learnt from: matt-aitken
Repo: triggerdotdev/trigger.dev PR: 3187
File: apps/webapp/app/v3/services/alerts/deliverErrorGroupAlert.server.ts:200-204
Timestamp: 2026-03-22T19:24:14.403Z
Learning: In the triggerdotdev/trigger.dev codebase, webhook URLs are not expected to contain embedded credentials/secrets (e.g., fields like `ProjectAlertWebhookProperties` should only hold credential-free webhook endpoints). During code review, if you see logging or inclusion of raw webhook URLs in error messages, do not automatically treat it as a credential-leak/secrets-in-logs issue by default—first verify the URL does not contain embedded credentials (for example, no username/password in the URL, no obvious secret/token query params or fragments). If the URL is credential-free per this project’s conventions, allow the logging.

Applied to files:

  • apps/webapp/app/routes/realtime.v1.streams.$runId.$target.$streamId.ts
📚 Learning: 2026-05-12T21:04:05.815Z
Learnt from: ericallam
Repo: triggerdotdev/trigger.dev PR: 3542
File: apps/webapp/app/components/sessions/v1/SessionStatus.tsx:1-3
Timestamp: 2026-05-12T21:04:05.815Z
Learning: In this Remix + TypeScript codebase, do not flag a server/client boundary violation when a file imports only types from a module matching `*.server`.

Specifically, it’s safe to import types using `import type { Foo } from "*.server"` or `import { type Foo } from "*.server"` because TypeScript erases type-only imports at compile time and they emit no JavaScript, so they won’t cross the Remix server/client bundle boundary.

Only raise the boundary concern for value imports (e.g., `import { Foo }` without `type`, or `import Foo`), since those produce JavaScript output.

Applied to files:

  • apps/webapp/app/routes/realtime.v1.streams.$runId.$target.$streamId.ts
🔇 Additional comments (1)
.server-changes/realtimestreams-dedupe.md (1)

1-7: LGTM!

Comment on lines +65 to +95
const target = await prisma.taskRun.findFirst({
where: {
friendlyId: targetId,
runtimeEnvironmentId: authentication.environment.id,
},
data: {
realtimeStreams: {
push: params.streamId,
},
},
select: {
id: true,
realtimeStreams: true,
realtimeStreamsVersion: true,
completedAt: true,
},
});

if (updatedRun.completedAt) {
if (!target) {
return new Response("Run not found", { status: 404 });
}

if (target.completedAt) {
return new Response("Cannot initialize a realtime stream on a completed run", {
status: 400,
});
}

if (!target.realtimeStreams.includes(params.streamId)) {
await prisma.taskRun.update({
where: { id: target.id },
data: {
realtimeStreams: { push: params.streamId },
},
});
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | 🏗️ Heavy lift

Dedup remains race-prone under concurrent PUTs.

Line 88 checks membership and Lines 89-94 push in a separate statement, so two concurrent requests can both pass the check and both append streamId. Please make this check+append atomic at the DB level (single conditional write / lock-scoped transaction) to fully prevent duplicate writes.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@apps/webapp/app/routes/realtime.v1.streams`.$runId.$target.$streamId.ts
around lines 65 - 95, Replace the separate findFirst + update with a single
conditional DB write: perform a prisma.taskRun.updateMany (or
prisma.taskRun.update with a NOT has check) where the record id equals target.id
AND NOT { realtimeStreams: { has: params.streamId } }, and set data: {
realtimeStreams: { push: params.streamId } }; then verify the returned count
(from updateMany) or catch a zero-result to detect "already present" instead of
relying on the prior membership check. This makes the check+append atomic (use
prisma.taskRun.updateMany with where: { id: target.id, NOT: { realtimeStreams: {
has: params.streamId } } } and data: { realtimeStreams: { push: params.streamId
} } and handle count === 0 accordingly).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant