Skip to content

Conversation

@ericallam
Copy link
Member

No description provided.

@changeset-bot
Copy link

changeset-bot bot commented Jan 16, 2026

⚠️ No Changeset found

Latest commit: 35eb666

Merging this PR will not cause a version bump for any packages. If these changes should not result in a new version, you're good to go. If these changes should result in a version bump, you need to add a changeset.

This PR includes no changesets

When changesets are added to this PR, you'll see the packages that this PR includes changesets for and the associated semver types

Click here to learn what changesets are, and how to add one.

Click here if you're a maintainer who wants to add a changeset to this PR

@coderabbitai
Copy link
Contributor

coderabbitai bot commented Jan 16, 2026

Walkthrough

This pull request refactors the streaming architecture to use an SSE loader-based pattern. The RunStreamPresenter is updated to expose a singleton loader (runStreamLoader) that encapsulates stream initialization, lifecycle management with pub/sub integration, and cleanup handling. The route handler is simplified to delegate stream logic to this loader instead of implementing SSE handling directly. Supporting services are enhanced with metrics collection for subscriber tracking and improved event listener management via the once flag and message handler references to ensure proper resource cleanup.

Estimated code review effort

🎯 3 (Moderate) | ⏱️ ~25 minutes

🚥 Pre-merge checks | ✅ 1 | ❌ 2
❌ Failed checks (2 warnings)
Check name Status Explanation Resolution
Description check ⚠️ Warning The PR description is empty; required sections from the template (Testing, Changelog, issue link) are missing. Add a description following the repository template, including issue link, testing steps, and changelog entry.
Docstring Coverage ⚠️ Warning Docstring coverage is 0.00% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (1 passed)
Check name Status Explanation
Title check ✅ Passed The title clearly describes the main objective of the PR - fixing trace pubsub redis client cleanup to prevent memory leaks.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing touches
  • 📝 Generate docstrings

🧹 Recent nitpick comments
apps/webapp/app/v3/services/tracePubSub.server.ts (1)

55-61: Consider ensuring subscriber count is decremented even on errors.

If redis.unsubscribe() or redis.quit() throws, _subscriberCount-- won't execute, potentially causing metric drift over time.

🔧 Suggested improvement
     const unsubscribe = async () => {
       // Remove the message listener before closing the connection
       redis.off("message", messageHandler);
-      await redis.unsubscribe(channel);
-      await redis.quit();
-      this._subscriberCount--;
+      try {
+        await redis.unsubscribe(channel);
+        await redis.quit();
+      } finally {
+        this._subscriberCount--;
+      }
     };

📜 Recent review details

Configuration used: Repository UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 5504e7f and 35eb666.

📒 Files selected for processing (4)
  • apps/webapp/app/presenters/v3/RunStreamPresenter.server.ts
  • apps/webapp/app/routes/_app.orgs.$organizationSlug.projects.$projectParam.env.$envParam.runs.$runParam.stream/route.tsx
  • apps/webapp/app/services/realtime/redisRealtimeStreams.server.ts
  • apps/webapp/app/v3/services/tracePubSub.server.ts
🧰 Additional context used
📓 Path-based instructions (9)
**/*.{ts,tsx}

📄 CodeRabbit inference engine (.github/copilot-instructions.md)

**/*.{ts,tsx}: Use types over interfaces for TypeScript
Avoid using enums; prefer string unions or const objects instead

**/*.{ts,tsx}: Always import tasks from @trigger.dev/sdk, never use @trigger.dev/sdk/v3 or deprecated client.defineJob pattern
Every Trigger.dev task must be exported and have a unique id property with no timeouts in the run function

Files:

  • apps/webapp/app/presenters/v3/RunStreamPresenter.server.ts
  • apps/webapp/app/v3/services/tracePubSub.server.ts
  • apps/webapp/app/services/realtime/redisRealtimeStreams.server.ts
  • apps/webapp/app/routes/_app.orgs.$organizationSlug.projects.$projectParam.env.$envParam.runs.$runParam.stream/route.tsx
{packages/core,apps/webapp}/**/*.{ts,tsx}

📄 CodeRabbit inference engine (.github/copilot-instructions.md)

Use zod for validation in packages/core and apps/webapp

Files:

  • apps/webapp/app/presenters/v3/RunStreamPresenter.server.ts
  • apps/webapp/app/v3/services/tracePubSub.server.ts
  • apps/webapp/app/services/realtime/redisRealtimeStreams.server.ts
  • apps/webapp/app/routes/_app.orgs.$organizationSlug.projects.$projectParam.env.$envParam.runs.$runParam.stream/route.tsx
**/*.{ts,tsx,js,jsx}

📄 CodeRabbit inference engine (.github/copilot-instructions.md)

Use function declarations instead of default exports

Import from @trigger.dev/core using subpaths only, never import from root

Files:

  • apps/webapp/app/presenters/v3/RunStreamPresenter.server.ts
  • apps/webapp/app/v3/services/tracePubSub.server.ts
  • apps/webapp/app/services/realtime/redisRealtimeStreams.server.ts
  • apps/webapp/app/routes/_app.orgs.$organizationSlug.projects.$projectParam.env.$envParam.runs.$runParam.stream/route.tsx
apps/webapp/app/**/*.{ts,tsx}

📄 CodeRabbit inference engine (.cursor/rules/webapp.mdc)

Access all environment variables through the env export of env.server.ts instead of directly accessing process.env in the Trigger.dev webapp

Files:

  • apps/webapp/app/presenters/v3/RunStreamPresenter.server.ts
  • apps/webapp/app/v3/services/tracePubSub.server.ts
  • apps/webapp/app/services/realtime/redisRealtimeStreams.server.ts
  • apps/webapp/app/routes/_app.orgs.$organizationSlug.projects.$projectParam.env.$envParam.runs.$runParam.stream/route.tsx
apps/webapp/**/*.{ts,tsx}

📄 CodeRabbit inference engine (.cursor/rules/webapp.mdc)

apps/webapp/**/*.{ts,tsx}: When importing from @trigger.dev/core in the webapp, use subpath exports from the package.json instead of importing from the root path
Follow the Remix 2.1.0 and Express server conventions when updating the main trigger.dev webapp

Access environment variables via env export from apps/webapp/app/env.server.ts, never use process.env directly

Files:

  • apps/webapp/app/presenters/v3/RunStreamPresenter.server.ts
  • apps/webapp/app/v3/services/tracePubSub.server.ts
  • apps/webapp/app/services/realtime/redisRealtimeStreams.server.ts
  • apps/webapp/app/routes/_app.orgs.$organizationSlug.projects.$projectParam.env.$envParam.runs.$runParam.stream/route.tsx
**/*.ts

📄 CodeRabbit inference engine (.cursor/rules/otel-metrics.mdc)

**/*.ts: When creating or editing OTEL metrics (counters, histograms, gauges), ensure metric attributes have low cardinality by using only enums, booleans, bounded error codes, or bounded shard IDs
Do not use high-cardinality attributes in OTEL metrics such as UUIDs/IDs (envId, userId, runId, projectId, organizationId), unbounded integers (itemCount, batchSize, retryCount), timestamps (createdAt, startTime), or free-form strings (errorMessage, taskName, queueName)
When exporting OTEL metrics via OTLP to Prometheus, be aware that the exporter automatically adds unit suffixes to metric names (e.g., 'my_duration_ms' becomes 'my_duration_ms_milliseconds', 'my_counter' becomes 'my_counter_total'). Account for these transformations when writing Grafana dashboards or Prometheus queries

Files:

  • apps/webapp/app/presenters/v3/RunStreamPresenter.server.ts
  • apps/webapp/app/v3/services/tracePubSub.server.ts
  • apps/webapp/app/services/realtime/redisRealtimeStreams.server.ts
**/*.{js,ts,jsx,tsx,json,md,yaml,yml}

📄 CodeRabbit inference engine (AGENTS.md)

Format code using Prettier before committing

Files:

  • apps/webapp/app/presenters/v3/RunStreamPresenter.server.ts
  • apps/webapp/app/v3/services/tracePubSub.server.ts
  • apps/webapp/app/services/realtime/redisRealtimeStreams.server.ts
  • apps/webapp/app/routes/_app.orgs.$organizationSlug.projects.$projectParam.env.$envParam.runs.$runParam.stream/route.tsx
apps/webapp/app/v3/services/**/*.server.{ts,tsx}

📄 CodeRabbit inference engine (.cursor/rules/webapp.mdc)

Organize services in the webapp following the pattern app/v3/services/*/*.server.ts

Files:

  • apps/webapp/app/v3/services/tracePubSub.server.ts
apps/webapp/app/services/**/*.server.{ts,tsx}

📄 CodeRabbit inference engine (.cursor/rules/webapp.mdc)

Separate testable services from configuration files; follow the pattern of realtimeClient.server.ts (testable service) and realtimeClientGlobal.server.ts (configuration) in the webapp

Files:

  • apps/webapp/app/services/realtime/redisRealtimeStreams.server.ts
🧠 Learnings (10)
📚 Learning: 2025-11-27T16:26:58.661Z
Learnt from: CR
Repo: triggerdotdev/trigger.dev PR: 0
File: .cursor/rules/webapp.mdc:0-0
Timestamp: 2025-11-27T16:26:58.661Z
Learning: Applies to apps/webapp/app/v3/presenters/**/*.server.{ts,tsx} : Organize presenters in the webapp following the pattern `app/v3/presenters/*/*.server.ts` to move complex loader code into classes

Applied to files:

  • apps/webapp/app/presenters/v3/RunStreamPresenter.server.ts
  • apps/webapp/app/routes/_app.orgs.$organizationSlug.projects.$projectParam.env.$envParam.runs.$runParam.stream/route.tsx
📚 Learning: 2025-11-27T16:27:35.304Z
Learnt from: CR
Repo: triggerdotdev/trigger.dev PR: 0
File: .cursor/rules/writing-tasks.mdc:0-0
Timestamp: 2025-11-27T16:27:35.304Z
Learning: Applies to **/trigger/**/*.{ts,tsx,js,jsx} : Use `.withStreams()` to subscribe to realtime streams from task metadata in addition to run changes

Applied to files:

  • apps/webapp/app/presenters/v3/RunStreamPresenter.server.ts
  • apps/webapp/app/services/realtime/redisRealtimeStreams.server.ts
  • apps/webapp/app/routes/_app.orgs.$organizationSlug.projects.$projectParam.env.$envParam.runs.$runParam.stream/route.tsx
📚 Learning: 2025-11-27T16:26:58.661Z
Learnt from: CR
Repo: triggerdotdev/trigger.dev PR: 0
File: .cursor/rules/webapp.mdc:0-0
Timestamp: 2025-11-27T16:26:58.661Z
Learning: Use the Run Engine 2.0 from `internal/run-engine` for new run lifecycle code in the webapp instead of the legacy run engine

Applied to files:

  • apps/webapp/app/presenters/v3/RunStreamPresenter.server.ts
📚 Learning: 2025-07-12T18:06:04.133Z
Learnt from: matt-aitken
Repo: triggerdotdev/trigger.dev PR: 2264
File: apps/webapp/app/services/runsRepository.server.ts:172-174
Timestamp: 2025-07-12T18:06:04.133Z
Learning: In apps/webapp/app/services/runsRepository.server.ts, the in-memory status filtering after fetching runs from Prisma is intentionally used as a workaround for ClickHouse data delays. This approach is acceptable because the result set is limited to a maximum of 100 runs due to pagination, making the performance impact negligible.

Applied to files:

  • apps/webapp/app/presenters/v3/RunStreamPresenter.server.ts
📚 Learning: 2025-11-27T16:27:35.304Z
Learnt from: CR
Repo: triggerdotdev/trigger.dev PR: 0
File: .cursor/rules/writing-tasks.mdc:0-0
Timestamp: 2025-11-27T16:27:35.304Z
Learning: Applies to **/trigger/**/*.{ts,tsx,js,jsx} : Subscribe to run updates using `runs.subscribeToRun()` for realtime monitoring of task execution

Applied to files:

  • apps/webapp/app/presenters/v3/RunStreamPresenter.server.ts
📚 Learning: 2025-11-27T16:26:37.432Z
Learnt from: CR
Repo: triggerdotdev/trigger.dev PR: 0
File: .github/copilot-instructions.md:0-0
Timestamp: 2025-11-27T16:26:37.432Z
Learning: Applies to internal-packages/database/**/*.{ts,tsx} : Use Prisma for database interactions in internal-packages/database with PostgreSQL

Applied to files:

  • apps/webapp/app/presenters/v3/RunStreamPresenter.server.ts
📚 Learning: 2025-11-27T16:26:58.661Z
Learnt from: CR
Repo: triggerdotdev/trigger.dev PR: 0
File: .cursor/rules/webapp.mdc:0-0
Timestamp: 2025-11-27T16:26:58.661Z
Learning: Leverage the PostgreSQL database through the `trigger.dev/database` Prisma 6.14.0 client in the webapp for all data access patterns

Applied to files:

  • apps/webapp/app/presenters/v3/RunStreamPresenter.server.ts
📚 Learning: 2026-01-12T17:18:09.451Z
Learnt from: matt-aitken
Repo: triggerdotdev/trigger.dev PR: 2870
File: apps/webapp/app/services/redisConcurrencyLimiter.server.ts:56-66
Timestamp: 2026-01-12T17:18:09.451Z
Learning: In `apps/webapp/app/services/redisConcurrencyLimiter.server.ts`, the query concurrency limiter will not be deployed with Redis Cluster mode, so multi-key operations (keyKey and globalKey in different hash slots) are acceptable and will function correctly in standalone Redis mode.

Applied to files:

  • apps/webapp/app/services/realtime/redisRealtimeStreams.server.ts
📚 Learning: 2025-12-08T15:19:56.823Z
Learnt from: 0ski
Repo: triggerdotdev/trigger.dev PR: 2760
File: apps/webapp/app/routes/_app.orgs.$organizationSlug.projects.$projectParam.env.$envParam.runs.$runParam/route.tsx:278-281
Timestamp: 2025-12-08T15:19:56.823Z
Learning: In apps/webapp/app/routes/_app.orgs.$organizationSlug.projects.$projectParam.env.$envParam.runs.$runParam/route.tsx, the tableState search parameter uses intentional double-encoding: the parameter value contains a URL-encoded URLSearchParams string, so decodeURIComponent(value("tableState") ?? "") is required to fully decode it before parsing with new URLSearchParams(). This pattern allows bundling multiple filter/pagination params as a single search parameter.

Applied to files:

  • apps/webapp/app/routes/_app.orgs.$organizationSlug.projects.$projectParam.env.$envParam.runs.$runParam.stream/route.tsx
📚 Learning: 2025-11-27T16:26:58.661Z
Learnt from: CR
Repo: triggerdotdev/trigger.dev PR: 0
File: .cursor/rules/webapp.mdc:0-0
Timestamp: 2025-11-27T16:26:58.661Z
Learning: Applies to apps/webapp/**/*.{ts,tsx} : Follow the Remix 2.1.0 and Express server conventions when updating the main trigger.dev webapp

Applied to files:

  • apps/webapp/app/routes/_app.orgs.$organizationSlug.projects.$projectParam.env.$envParam.runs.$runParam.stream/route.tsx
🧬 Code graph analysis (3)
apps/webapp/app/presenters/v3/RunStreamPresenter.server.ts (3)
apps/webapp/app/utils/sse.ts (1)
  • createSSELoader (40-210)
apps/webapp/app/v3/services/tracePubSub.server.ts (1)
  • tracePubSub (70-70)
packages/core/src/v3/apiClient/runStream.ts (1)
  • unsubscribe (458-463)
apps/webapp/app/v3/services/tracePubSub.server.ts (3)
packages/core/src/v3/apiClient/runStream.ts (1)
  • unsubscribe (458-463)
internal-packages/tracing/src/index.ts (1)
  • Gauge (23-23)
apps/webapp/app/metrics.server.ts (1)
  • metricsRegister (5-5)
apps/webapp/app/routes/_app.orgs.$organizationSlug.projects.$projectParam.env.$envParam.runs.$runParam.stream/route.tsx (2)
apps/webapp/app/services/session.server.ts (1)
  • requireUserId (25-35)
apps/webapp/app/presenters/v3/RunStreamPresenter.server.ts (1)
  • runStreamLoader (137-140)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (23)
  • GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (3, 8)
  • GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (7, 8)
  • GitHub Check: units / internal / 🧪 Unit Tests: Internal (1, 8)
  • GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (6, 8)
  • GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (8, 8)
  • GitHub Check: units / internal / 🧪 Unit Tests: Internal (7, 8)
  • GitHub Check: units / internal / 🧪 Unit Tests: Internal (6, 8)
  • GitHub Check: units / internal / 🧪 Unit Tests: Internal (8, 8)
  • GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (5, 8)
  • GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (4, 8)
  • GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (2, 8)
  • GitHub Check: units / internal / 🧪 Unit Tests: Internal (5, 8)
  • GitHub Check: units / internal / 🧪 Unit Tests: Internal (3, 8)
  • GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (1, 8)
  • GitHub Check: units / internal / 🧪 Unit Tests: Internal (4, 8)
  • GitHub Check: e2e / 🧪 CLI v3 tests (ubuntu-latest - npm)
  • GitHub Check: units / internal / 🧪 Unit Tests: Internal (2, 8)
  • GitHub Check: e2e / 🧪 CLI v3 tests (ubuntu-latest - pnpm)
  • GitHub Check: e2e / 🧪 CLI v3 tests (windows-latest - npm)
  • GitHub Check: typecheck / typecheck
  • GitHub Check: units / packages / 🧪 Unit Tests: Packages (1, 1)
  • GitHub Check: e2e / 🧪 CLI v3 tests (windows-latest - pnpm)
  • GitHub Check: Analyze (javascript-typescript)
🔇 Additional comments (8)
apps/webapp/app/services/realtime/redisRealtimeStreams.server.ts (1)

275-275: LGTM!

Adding { once: true } ensures the cleanup listener is automatically removed after firing once, preventing potential duplicate cleanup calls. This complements the existing isCleanedUp guard and aligns with the PR's goal of preventing resource leaks.

apps/webapp/app/v3/services/tracePubSub.server.ts (2)

48-52: LGTM! Key fix for the resource leak.

Storing a reference to messageHandler enables explicit removal via redis.off() during cleanup. This is the core fix preventing listener accumulation and the associated memory/redis client leaks.


84-91: LGTM! Good observability addition.

The Gauge metric follows best practices: no high-cardinality attributes, uses the collect() callback pattern for reading current state, and provides visibility into active pub/sub subscribers for debugging resource leaks.

apps/webapp/app/presenters/v3/RunStreamPresenter.server.ts (4)

24-48: LGTM!

The handler setup correctly validates the run parameter, efficiently queries only the needed traceId field, and provides appropriate error responses for missing or not-found runs.


56-86: LGTM!

The initStream handler correctly:

  • Creates a throttled send function to prevent overwhelming the client
  • Stores the messageListener reference for proper cleanup
  • Handles send errors gracefully by aborting the stream

98-129: LGTM! Proper listener cleanup before unsubscribe.

The cleanup handler correctly removes the message listener before calling removeAllListeners() and unsubscribing from Redis. The fire-and-forget pattern for unsubscribe() with .then/.catch logging is appropriate since the SSE framework calls cleanup synchronously.


136-140: LGTM!

The singleton pattern ensures a single loader function is created and reused across requests, following the project's established patterns.

apps/webapp/app/routes/_app.orgs.$organizationSlug.projects.$projectParam.env.$envParam.runs.$runParam.stream/route.tsx (1)

5-11: LGTM!

The route is cleanly refactored to delegate streaming logic to the shared runStreamLoader, keeping only authentication responsibility in the route. This follows good separation of concerns and Remix conventions.

✏️ Tip: You can disable this entire section by setting review_details to false in your review settings.


Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@vibe-kanban-cloud
Copy link

Review Complete

Your review story is ready!

View Story

Comment !reviewfast on this PR to re-generate the story.

@ericallam ericallam marked this pull request as ready for review January 16, 2026 14:58
@ericallam ericallam merged commit 72594a4 into main Jan 16, 2026
32 checks passed
@ericallam ericallam deleted the ea-branch-114 branch January 16, 2026 15:20
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants